View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
29  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
30  import org.apache.hadoop.hbase.testclassification.LargeTests;
31  import org.junit.After;
32  import org.junit.Before;
33  import org.junit.Ignore;
34  import org.junit.Test;
35  import org.junit.experimental.categories.Category;
36  
37  import static org.junit.Assert.assertTrue;
38  import static org.junit.Assert.fail;
39  
40  @Category(LargeTests.class)
41  public class TestProcedureReplayOrder {
42    private static final Log LOG = LogFactory.getLog(TestProcedureReplayOrder.class);
43  
44    private static final Procedure NULL_PROC = null;
45  
46    private ProcedureExecutor<Void> procExecutor;
47    private TestProcedureEnv procEnv;
48    private ProcedureStore procStore;
49  
50    private HBaseCommonTestingUtility htu;
51    private FileSystem fs;
52    private Path testDir;
53    private Path logDir;
54  
55    @Before
56    public void setUp() throws IOException {
57      htu = new HBaseCommonTestingUtility();
58      htu.getConfiguration().setInt("hbase.procedure.store.wal.sync.wait.msec", 10);
59  
60      testDir = htu.getDataTestDir();
61      fs = testDir.getFileSystem(htu.getConfiguration());
62      assertTrue(testDir.depth() > 1);
63  
64      logDir = new Path(testDir, "proc-logs");
65      procEnv = new TestProcedureEnv();
66      procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
67      procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
68      procStore.start(24);
69      procExecutor.start(1);
70    }
71  
72    @After
73    public void tearDown() throws IOException {
74      procExecutor.stop();
75      procStore.stop(false);
76      fs.delete(logDir, true);
77    }
78  
79    @Test(timeout=90000)
80    public void testSingleStepReplyOrder() throws Exception {
81      // avoid the procedure to be runnable
82      procEnv.setAcquireLock(false);
83  
84      // submit the procedures
85      submitProcedures(16, 25, TestSingleStepProcedure.class);
86  
87      // restart the executor and allow the procedures to run
88      ProcedureTestingUtility.restart(procExecutor, new Runnable() {
89        @Override
90        public void run() {
91          procEnv.setAcquireLock(true);
92        }
93      });
94  
95      // wait the execution of all the procedures and
96      // assert that the execution order was sorted by procId
97      ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
98      procEnv.assertSortedExecList();
99  
100     // TODO: FIXME: This should be revisited
101   }
102 
103   @Ignore
104   @Test(timeout=90000)
105   public void testMultiStepReplyOrder() throws Exception {
106     // avoid the procedure to be runnable
107     procEnv.setAcquireLock(false);
108 
109     // submit the procedures
110     submitProcedures(16, 10, TestTwoStepProcedure.class);
111 
112     // restart the executor and allow the procedures to run
113     ProcedureTestingUtility.restart(procExecutor, new Runnable() {
114       @Override
115       public void run() {
116         procEnv.setAcquireLock(true);
117       }
118     });
119 
120     fail("TODO: FIXME: NOT IMPLEMENT REPLAY ORDER");
121   }
122 
123   private void submitProcedures(final int nthreads, final int nprocPerThread,
124       final Class<?> procClazz) throws Exception {
125     Thread[] submitThreads = new Thread[nthreads];
126     for (int i = 0; i < submitThreads.length; ++i) {
127       submitThreads[i] = new Thread() {
128         @Override
129         public void run() {
130           for (int i = 0; i < nprocPerThread; ++i) {
131             try {
132               procExecutor.submitProcedure((Procedure)procClazz.newInstance());
133             } catch (InstantiationException|IllegalAccessException e) {
134               LOG.error("unable to instantiate the procedure", e);
135               fail("failure during the proc.newInstance(): " + e.getMessage());
136             }
137           }
138         }
139       };
140     }
141 
142     for (int i = 0; i < submitThreads.length; ++i) {
143       submitThreads[i].start();
144     }
145 
146     for (int i = 0; i < submitThreads.length; ++i) {
147       submitThreads[i].join();
148     }
149   }
150 
151   private static class TestProcedureEnv {
152     private ArrayList<Long> execList = new ArrayList<Long>();
153     private boolean acquireLock = true;
154 
155     public void setAcquireLock(boolean acquireLock) {
156       this.acquireLock = acquireLock;
157     }
158 
159     public boolean canAcquireLock() {
160       return acquireLock;
161     }
162 
163     public void addToExecList(final Procedure proc) {
164       execList.add(proc.getProcId());
165     }
166 
167     public ArrayList<Long> getExecList() {
168       return execList;
169     }
170 
171     public void assertSortedExecList() {
172       LOG.debug("EXEC LIST: " + execList);
173       for (int i = 1; i < execList.size(); ++i) {
174         assertTrue("exec list not sorted: " + execList.get(i-1) + " >= " + execList.get(i),
175           execList.get(i-1) < execList.get(i));
176       }
177     }
178   }
179 
180   public static class TestSingleStepProcedure extends SequentialProcedure<TestProcedureEnv> {
181     public TestSingleStepProcedure() { }
182 
183     @Override
184     protected Procedure[] execute(TestProcedureEnv env) {
185       LOG.debug("execute procedure " + this);
186       env.addToExecList(this);
187       return null;
188     }
189 
190     protected boolean acquireLock(final TestProcedureEnv env) {
191       return env.canAcquireLock();
192     }
193 
194     @Override
195     protected void rollback(TestProcedureEnv env) { }
196 
197     @Override
198     protected boolean abort(TestProcedureEnv env) { return true; }
199   }
200 
201   public static class TestTwoStepProcedure extends SequentialProcedure<TestProcedureEnv> {
202     public TestTwoStepProcedure() { }
203 
204     @Override
205     protected Procedure[] execute(TestProcedureEnv env) {
206       LOG.debug("execute procedure " + this);
207       env.addToExecList(this);
208       return new Procedure[] { new TestSingleStepProcedure() };
209     }
210 
211     protected boolean acquireLock(final TestProcedureEnv env) {
212       return true;
213     }
214 
215     @Override
216     protected void rollback(TestProcedureEnv env) { }
217 
218     @Override
219     protected boolean abort(TestProcedureEnv env) { return true; }
220   }
221 }