View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.util.concurrent.atomic.AtomicReference;
23  import java.util.concurrent.CountDownLatch;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
30  import org.apache.hadoop.hbase.ProcedureInfo;
31  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
32  import org.apache.hadoop.hbase.testclassification.SmallTests;
33  import org.apache.hadoop.hbase.security.User;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.NonceKey;
36  import org.apache.hadoop.hbase.util.Threads;
37  
38  import org.junit.After;
39  import org.junit.Before;
40  import org.junit.Test;
41  import org.junit.experimental.categories.Category;
42  
43  import static org.junit.Assert.assertEquals;
44  import static org.junit.Assert.assertFalse;
45  import static org.junit.Assert.assertTrue;
46  
47  @Category({SmallTests.class})
48  public class TestProcedureNonce {
49    private static final Log LOG = LogFactory.getLog(TestProcedureNonce.class);
50  
51    private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
52  
53    private static TestProcEnv procEnv;
54    private static ProcedureExecutor<TestProcEnv> procExecutor;
55    private static ProcedureStore procStore;
56  
57    private HBaseCommonTestingUtility htu;
58    private FileSystem fs;
59    private Path logDir;
60  
61    @Before
62    public void setUp() throws IOException {
63      htu = new HBaseCommonTestingUtility();
64      Path testDir = htu.getDataTestDir();
65      fs = testDir.getFileSystem(htu.getConfiguration());
66      assertTrue(testDir.depth() > 1);
67  
68      logDir = new Path(testDir, "proc-logs");
69      procEnv = new TestProcEnv();
70      procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
71      procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
72      procExecutor.testing = new ProcedureExecutor.Testing();
73      procStore.start(PROCEDURE_EXECUTOR_SLOTS);
74      procExecutor.start(PROCEDURE_EXECUTOR_SLOTS);
75    }
76  
77    @After
78    public void tearDown() throws IOException {
79      procExecutor.stop();
80      procStore.stop(false);
81      fs.delete(logDir, true);
82    }
83  
84    @Test(timeout=30000)
85    public void testCompletedProcWithSameNonce() throws Exception {
86      final long nonceGroup = 123;
87      final long nonce = 2222;
88  
89      // register the nonce
90      final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
91      assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
92  
93      // Submit a proc and wait for its completion
94      Procedure proc = new TestSingleStepProcedure();
95      long procId = procExecutor.submitProcedure(proc, nonceKey);
96      ProcedureTestingUtility.waitProcedure(procExecutor, procId);
97  
98      // Restart
99      ProcedureTestingUtility.restart(procExecutor);
100     ProcedureTestingUtility.waitProcedure(procExecutor, procId);
101 
102     // try to register a procedure with the same nonce
103     // we should get back the old procId
104     assertEquals(procId, procExecutor.registerNonce(nonceKey));
105 
106     ProcedureInfo result = procExecutor.getResult(procId);
107     ProcedureTestingUtility.assertProcNotFailed(result);
108   }
109 
110   @Test(timeout=30000)
111   public void testRunningProcWithSameNonce() throws Exception {
112     final long nonceGroup = 456;
113     final long nonce = 33333;
114 
115     // register the nonce
116     final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
117     assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
118 
119     // Submit a proc and use a latch to prevent the step execution until we submitted proc2
120     CountDownLatch latch = new CountDownLatch(1);
121     TestSingleStepProcedure proc = new TestSingleStepProcedure();
122     procEnv.setWaitLatch(latch);
123     long procId = procExecutor.submitProcedure(proc, nonceKey);
124     while (proc.step != 1) Threads.sleep(25);
125 
126     // try to register a procedure with the same nonce
127     // we should get back the old procId
128     assertEquals(procId, procExecutor.registerNonce(nonceKey));
129 
130     // complete the procedure
131     latch.countDown();
132 
133     // Restart, the procedure is not completed yet
134     ProcedureTestingUtility.restart(procExecutor);
135     ProcedureTestingUtility.waitProcedure(procExecutor, procId);
136 
137     // try to register a procedure with the same nonce
138     // we should get back the old procId
139     assertEquals(procId, procExecutor.registerNonce(nonceKey));
140 
141     ProcedureInfo result = procExecutor.getResult(procId);
142     ProcedureTestingUtility.assertProcNotFailed(result);
143   }
144 
145   @Test
146   public void testSetFailureResultForNonce() throws IOException {
147     final long nonceGroup = 234;
148     final long nonce = 55555;
149 
150     // check and register the request nonce
151     final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
152     assertFalse(procExecutor.registerNonce(nonceKey) >= 0);
153 
154     procExecutor.setFailureResultForNonce(nonceKey, "testProc", User.getCurrent(),
155       new IOException("test failure"));
156 
157     final long procId = procExecutor.registerNonce(nonceKey);
158     ProcedureInfo result = procExecutor.getResult(procId);
159     ProcedureTestingUtility.assertProcFailed(result);
160   }
161 
162   @Test(timeout=30000)
163   public void testConcurrentNonceRegistration() throws IOException {
164     testConcurrentNonceRegistration(true, 567, 44444);
165   }
166 
167   @Test(timeout=30000)
168   public void testConcurrentNonceRegistrationWithRollback() throws IOException {
169     testConcurrentNonceRegistration(false, 890, 55555);
170   }
171 
172   private void testConcurrentNonceRegistration(final boolean submitProcedure,
173       final long nonceGroup, final long nonce) throws IOException {
174     // register the nonce
175     final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
176 
177     final AtomicReference<Throwable> t1Exception = new AtomicReference();
178     final AtomicReference<Throwable> t2Exception = new AtomicReference();
179 
180     final CountDownLatch t1NonceRegisteredLatch = new CountDownLatch(1);
181     final CountDownLatch t2BeforeNonceRegisteredLatch = new CountDownLatch(1);
182     final Thread[] threads = new Thread[2];
183     threads[0] = new Thread() {
184       @Override
185       public void run() {
186         try {
187           // release the nonce and wake t2
188           assertFalse("unexpected already registered nonce",
189             procExecutor.registerNonce(nonceKey) >= 0);
190           t1NonceRegisteredLatch.countDown();
191 
192           // hold the submission until t2 is registering the nonce
193           t2BeforeNonceRegisteredLatch.await();
194           Threads.sleep(1000);
195 
196           if (submitProcedure) {
197             CountDownLatch latch = new CountDownLatch(1);
198             TestSingleStepProcedure proc = new TestSingleStepProcedure();
199             procEnv.setWaitLatch(latch);
200 
201             procExecutor.submitProcedure(proc, nonceKey);
202             Threads.sleep(100);
203 
204             // complete the procedure
205             latch.countDown();
206           } else {
207             procExecutor.unregisterNonceIfProcedureWasNotSubmitted(nonceKey);
208           }
209         } catch (Throwable e) {
210           t1Exception.set(e);
211         } finally {
212           t1NonceRegisteredLatch.countDown();
213           t2BeforeNonceRegisteredLatch.countDown();
214         }
215       }
216     };
217 
218     threads[1] = new Thread() {
219       @Override
220       public void run() {
221         try {
222           // wait until t1 has registered the nonce
223           t1NonceRegisteredLatch.await();
224 
225           // register the nonce
226           t2BeforeNonceRegisteredLatch.countDown();
227           assertFalse("unexpected non registered nonce",
228             procExecutor.registerNonce(nonceKey) < 0);
229         } catch (Throwable e) {
230           t2Exception.set(e);
231         } finally {
232           t1NonceRegisteredLatch.countDown();
233           t2BeforeNonceRegisteredLatch.countDown();
234         }
235       }
236     };
237 
238     for (int i = 0; i < threads.length; ++i) threads[i].start();
239     for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]);
240     ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
241     assertEquals(null, t1Exception.get());
242     assertEquals(null, t2Exception.get());
243   }
244 
245   public static class TestSingleStepProcedure extends SequentialProcedure<TestProcEnv> {
246     private int step = 0;
247 
248     public TestSingleStepProcedure() { }
249 
250     @Override
251     protected Procedure[] execute(TestProcEnv env) throws InterruptedException {
252       step++;
253       env.waitOnLatch();
254       LOG.debug("execute procedure " + this + " step=" + step);
255       step++;
256       setResult(Bytes.toBytes(step));
257       return null;
258     }
259 
260     @Override
261     protected void rollback(TestProcEnv env) { }
262 
263     @Override
264     protected boolean abort(TestProcEnv env) { return true; }
265   }
266 
267   private static class TestProcEnv {
268     private CountDownLatch latch = null;
269 
270     /**
271      * set/unset a latch. every procedure execute() step will wait on the latch if any.
272      */
273     public void setWaitLatch(CountDownLatch latch) {
274       this.latch = latch;
275     }
276 
277     public void waitOnLatch() throws InterruptedException {
278       if (latch != null) {
279         latch.await();
280       }
281     }
282   }
283 }