View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Arrays;
28  import java.util.List;
29  import java.util.Random;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.MultithreadedTestUtil;
47  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
48  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Append;
51  import org.apache.hadoop.hbase.client.Delete;
52  import org.apache.hadoop.hbase.client.Durability;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.Increment;
55  import org.apache.hadoop.hbase.client.Mutation;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.RowMutations;
59  import org.apache.hadoop.hbase.client.Scan;
60  import org.apache.hadoop.hbase.filter.BinaryComparator;
61  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
62  import org.apache.hadoop.hbase.io.HeapSize;
63  import org.apache.hadoop.hbase.testclassification.MediumTests;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.wal.WAL;
66  import org.junit.After;
67  import org.junit.Before;
68  import org.junit.Rule;
69  import org.junit.Test;
70  import org.junit.experimental.categories.Category;
71  import org.junit.rules.TestName;
72  
73  /**
74   * Testing of HRegion.incrementColumnValue, HRegion.increment,
75   * and HRegion.append
76   */
77  @Category(MediumTests.class) // Starts 100 threads
78  public class TestAtomicOperation {
79    static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
80    @Rule public TestName name = new TestName();
81  
82    HRegion region = null;
83    private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
84  
85    // Test names
86    static  byte[] tableName;
87    static final byte[] qual1 = Bytes.toBytes("qual1");
88    static final byte[] qual2 = Bytes.toBytes("qual2");
89    static final byte[] qual3 = Bytes.toBytes("qual3");
90    static final byte[] value1 = Bytes.toBytes("value1");
91    static final byte[] value2 = Bytes.toBytes("value2");
92    static final byte [] row = Bytes.toBytes("rowA");
93    static final byte [] row2 = Bytes.toBytes("rowB");
94  
95    @Before
96    public void setup() {
97      tableName = Bytes.toBytes(name.getMethodName());
98    }
99  
100   @After
101   public void teardown() throws IOException {
102     if (region != null) {
103       ((HRegion)region).close();
104       region = null;
105     }
106   }
107   //////////////////////////////////////////////////////////////////////////////
108   // New tests that doesn't spin up a mini cluster but rather just test the
109   // individual code pieces in the HRegion.
110   //////////////////////////////////////////////////////////////////////////////
111 
112   /**
113    * Test basic append operation.
114    * More tests in
115    * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
116    */
117   @Test
118   public void testAppend() throws IOException {
119     initHRegion(tableName, name.getMethodName(), fam1);
120     String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
121     " The Universe, and Everything";
122     String v2 = " is... 42.";
123     Append a = new Append(row);
124     a.setReturnResults(false);
125     a.add(fam1, qual1, Bytes.toBytes(v1));
126     a.add(fam1, qual2, Bytes.toBytes(v2));
127     assertNull(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE));
128     a = new Append(row);
129     a.add(fam1, qual1, Bytes.toBytes(v2));
130     a.add(fam1, qual2, Bytes.toBytes(v1));
131     Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
132     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
133     assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
134   }
135 
136   /**
137    * Test multi-threaded increments.
138    */
139   @Test
140   public void testIncrementMultiThreads() throws IOException {
141     LOG.info("Starting test testIncrementMultiThreads");
142     // run a with mixed column families (1 and 3 versions)
143     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
144 
145     // Create 100 threads, each will increment by its own quantity. All 100 threads update the
146     // same row over two column families.
147     int numThreads = 100;
148     int incrementsPerThread = 1000;
149     Incrementer[] all = new Incrementer[numThreads];
150     int expectedTotal = 0;
151 
152     // create all threads
153     for (int i = 0; i < numThreads; i++) {
154       all[i] = new Incrementer(region, i, i, incrementsPerThread);
155       expectedTotal += (i * incrementsPerThread);
156     }
157 
158     // run all threads
159     for (int i = 0; i < numThreads; i++) {
160       all[i].start();
161     }
162 
163     // wait for all threads to finish
164     for (int i = 0; i < numThreads; i++) {
165       try {
166         all[i].join();
167       } catch (InterruptedException e) {
168       }
169     }
170     assertICV(row, fam1, qual1, expectedTotal);
171     assertICV(row, fam1, qual2, expectedTotal*2);
172     assertICV(row, fam2, qual3, expectedTotal*3);
173     LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
174   }
175 
176 
177   private void assertICV(byte [] row,
178                          byte [] familiy,
179                          byte[] qualifier,
180                          long amount) throws IOException {
181     // run a get and see?
182     Get get = new Get(row);
183     get.addColumn(familiy, qualifier);
184     Result result = region.get(get);
185     assertEquals(1, result.size());
186 
187     Cell kv = result.rawCells()[0];
188     long r = Bytes.toLong(CellUtil.cloneValue(kv));
189     assertEquals(amount, r);
190   }
191 
192   private void initHRegion (byte [] tableName, String callingMethod,
193       byte[] ... families)
194     throws IOException {
195     initHRegion(tableName, callingMethod, null, families);
196   }
197 
198   private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
199     byte[] ... families)
200   throws IOException {
201     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
202     int i=0;
203     for(byte [] family : families) {
204       HColumnDescriptor hcd = new HColumnDescriptor(family);
205       hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
206       htd.addFamily(hcd);
207     }
208     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
209     region = TEST_UTIL.createLocalHRegion(info, htd);
210   }
211 
212   /**
213    * A thread that makes increment calls always on the same row, this.row against two column
214    * families on this row.
215    */
216   public static class Incrementer extends Thread {
217 
218     private final HRegion region;
219     private final int numIncrements;
220     private final int amount;
221 
222 
223     public Incrementer(HRegion region, int threadNumber, int amount, int numIncrements) {
224       super("Incrementer." + threadNumber);
225       this.region = region;
226       this.numIncrements = numIncrements;
227       this.amount = amount;
228       setDaemon(true);
229     }
230 
231     @Override
232     public void run() {
233       for (int i=0; i<numIncrements; i++) {
234         try {
235           Increment inc = new Increment(row);
236           inc.addColumn(fam1, qual1, amount);
237           inc.addColumn(fam1, qual2, amount*2);
238           inc.addColumn(fam2, qual3, amount*3);
239           inc.setDurability(Durability.ASYNC_WAL);
240           Result result = region.increment(inc);
241           assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
242              Bytes.toLong(result.getValue(fam1, qual2)));
243           long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
244           long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
245           assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
246             fam1Increment, fam2Increment);
247         } catch (IOException e) {
248           e.printStackTrace();
249         }
250       }
251     }
252   }
253 
254   @Test
255   public void testAppendMultiThreads() throws IOException {
256     LOG.info("Starting test testAppendMultiThreads");
257     // run a with mixed column families (1 and 3 versions)
258     initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
259 
260     int numThreads = 100;
261     int opsPerThread = 100;
262     AtomicOperation[] all = new AtomicOperation[numThreads];
263     final byte[] val = new byte[]{1};
264 
265     AtomicInteger failures = new AtomicInteger(0);
266     // create all threads
267     for (int i = 0; i < numThreads; i++) {
268       all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
269         @Override
270         public void run() {
271           for (int i=0; i<numOps; i++) {
272             try {
273               Append a = new Append(row);
274               a.add(fam1, qual1, val);
275               a.add(fam1, qual2, val);
276               a.add(fam2, qual3, val);
277               a.setDurability(Durability.ASYNC_WAL);
278               region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
279 
280               Get g = new Get(row);
281               Result result = region.get(g);
282               assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
283               assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
284             } catch (IOException e) {
285               e.printStackTrace();
286               failures.incrementAndGet();
287               fail();
288             }
289           }
290         }
291       };
292     }
293 
294     // run all threads
295     for (int i = 0; i < numThreads; i++) {
296       all[i].start();
297     }
298 
299     // wait for all threads to finish
300     for (int i = 0; i < numThreads; i++) {
301       try {
302         all[i].join();
303       } catch (InterruptedException e) {
304       }
305     }
306     assertEquals(0, failures.get());
307     Get g = new Get(row);
308     Result result = region.get(g);
309     assertEquals(result.getValue(fam1, qual1).length, 10000);
310     assertEquals(result.getValue(fam1, qual2).length, 10000);
311     assertEquals(result.getValue(fam2, qual3).length, 10000);
312   }
313   /**
314    * Test multi-threaded row mutations.
315    */
316   @Test
317   public void testRowMutationMultiThreads() throws IOException {
318 
319     LOG.info("Starting test testRowMutationMultiThreads");
320     initHRegion(tableName, name.getMethodName(), fam1);
321 
322     // create 10 threads, each will alternate between adding and
323     // removing a column
324     int numThreads = 10;
325     int opsPerThread = 500;
326     AtomicOperation[] all = new AtomicOperation[numThreads];
327 
328     AtomicLong timeStamps = new AtomicLong(0);
329     AtomicInteger failures = new AtomicInteger(0);
330     // create all threads
331     for (int i = 0; i < numThreads; i++) {
332       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
333         @Override
334         public void run() {
335           boolean op = true;
336           for (int i=0; i<numOps; i++) {
337             try {
338               // throw in some flushes
339               if (i%10==0) {
340                 synchronized(region) {
341                   LOG.debug("flushing");
342                   region.flush(true);
343                   if (i%100==0) {
344                     region.compact(false);
345                   }
346                 }
347               }
348               long ts = timeStamps.incrementAndGet();
349               RowMutations rm = new RowMutations(row);
350               if (op) {
351                 Put p = new Put(row, ts);
352                 p.add(fam1, qual1, value1);
353                 p.setDurability(Durability.ASYNC_WAL);
354                 rm.add(p);
355                 Delete d = new Delete(row);
356                 d.deleteColumns(fam1, qual2, ts);
357                 d.setDurability(Durability.ASYNC_WAL);
358                 rm.add(d);
359               } else {
360                 Delete d = new Delete(row);
361                 d.deleteColumns(fam1, qual1, ts);
362                 d.setDurability(Durability.ASYNC_WAL);
363                 rm.add(d);
364                 Put p = new Put(row, ts);
365                 p.add(fam1, qual2, value2);
366                 p.setDurability(Durability.ASYNC_WAL);
367                 rm.add(p);
368               }
369               region.mutateRow(rm);
370               op ^= true;
371               // check: should always see exactly one column
372               Get g = new Get(row);
373               Result r = region.get(g);
374               if (r.size() != 1) {
375                 LOG.debug(r);
376                 failures.incrementAndGet();
377                 fail();
378               }
379             } catch (IOException e) {
380               e.printStackTrace();
381               failures.incrementAndGet();
382               fail();
383             }
384           }
385         }
386       };
387     }
388 
389     // run all threads
390     for (int i = 0; i < numThreads; i++) {
391       all[i].start();
392     }
393 
394     // wait for all threads to finish
395     for (int i = 0; i < numThreads; i++) {
396       try {
397         all[i].join();
398       } catch (InterruptedException e) {
399       }
400     }
401     assertEquals(0, failures.get());
402   }
403 
404 
405   /**
406    * Test multi-threaded region mutations.
407    */
408   @Test
409   public void testMultiRowMutationMultiThreads() throws IOException {
410 
411     LOG.info("Starting test testMultiRowMutationMultiThreads");
412     initHRegion(tableName, name.getMethodName(), fam1);
413 
414     // create 10 threads, each will alternate between adding and
415     // removing a column
416     int numThreads = 10;
417     int opsPerThread = 500;
418     AtomicOperation[] all = new AtomicOperation[numThreads];
419 
420     AtomicLong timeStamps = new AtomicLong(0);
421     AtomicInteger failures = new AtomicInteger(0);
422     final List<byte[]> rowsToLock = Arrays.asList(row, row2);
423     // create all threads
424     for (int i = 0; i < numThreads; i++) {
425       all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
426         @Override
427         public void run() {
428           boolean op = true;
429           for (int i=0; i<numOps; i++) {
430             try {
431               // throw in some flushes
432               if (i%10==0) {
433                 synchronized(region) {
434                   LOG.debug("flushing");
435                   region.flush(true);
436                   if (i%100==0) {
437                     region.compact(false);
438                   }
439                 }
440               }
441               long ts = timeStamps.incrementAndGet();
442               List<Mutation> mrm = new ArrayList<Mutation>();
443               if (op) {
444                 Put p = new Put(row2, ts);
445                 p.add(fam1, qual1, value1);
446                 p.setDurability(Durability.ASYNC_WAL);
447                 mrm.add(p);
448                 Delete d = new Delete(row);
449                 d.deleteColumns(fam1, qual1, ts);
450                 d.setDurability(Durability.ASYNC_WAL);
451                 mrm.add(d);
452               } else {
453                 Delete d = new Delete(row2);
454                 d.deleteColumns(fam1, qual1, ts);
455                 d.setDurability(Durability.ASYNC_WAL);
456                 mrm.add(d);
457                 Put p = new Put(row, ts);
458                 p.setDurability(Durability.ASYNC_WAL);
459                 p.add(fam1, qual1, value2);
460                 mrm.add(p);
461               }
462               region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
463               op ^= true;
464               // check: should always see exactly one column
465               Scan s = new Scan(row);
466               RegionScanner rs = region.getScanner(s);
467               List<Cell> r = new ArrayList<Cell>();
468               while (rs.next(r))
469                 ;
470               rs.close();
471               if (r.size() != 1) {
472                 LOG.debug(r);
473                 failures.incrementAndGet();
474                 fail();
475               }
476             } catch (IOException e) {
477               e.printStackTrace();
478               failures.incrementAndGet();
479               fail();
480             }
481           }
482         }
483       };
484     }
485 
486     // run all threads
487     for (int i = 0; i < numThreads; i++) {
488       all[i].start();
489     }
490 
491     // wait for all threads to finish
492     for (int i = 0; i < numThreads; i++) {
493       try {
494         all[i].join();
495       } catch (InterruptedException e) {
496       }
497     }
498     assertEquals(0, failures.get());
499   }
500 
501   public static class AtomicOperation extends Thread {
502     protected final HRegion region;
503     protected final int numOps;
504     protected final AtomicLong timeStamps;
505     protected final AtomicInteger failures;
506     protected final Random r = new Random();
507 
508     public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
509         AtomicInteger failures) {
510       this.region = region;
511       this.numOps = numOps;
512       this.timeStamps = timeStamps;
513       this.failures = failures;
514     }
515   }
516 
517   private static CountDownLatch latch = new CountDownLatch(1);
518   private enum TestStep {
519     INIT,                  // initial put of 10 to set value of the cell
520     PUT_STARTED,           // began doing a put of 50 to cell
521     PUT_COMPLETED,         // put complete (released RowLock, but may not have advanced MVCC).
522     CHECKANDPUT_STARTED,   // began checkAndPut: if 10 -> 11
523     CHECKANDPUT_COMPLETED  // completed checkAndPut
524     // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
525   }
526   private static volatile TestStep testStep = TestStep.INIT;
527   private final String family = "f1";
528 
529   /**
530    * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
531    * MVCC.
532    *
533    * Moved into TestAtomicOperation from its original location, TestHBase7051
534    */
535   @Test
536   public void testPutAndCheckAndPutInParallel() throws Exception {
537 
538     final String tableName = "testPutAndCheckAndPut";
539     Configuration conf = TEST_UTIL.getConfiguration();
540     conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
541     final MockHRegion region = (MockHRegion) TEST_UTIL.createLocalHRegion(Bytes.toBytes(tableName),
542         null, null, tableName, conf, false, Durability.SYNC_WAL, null, Bytes.toBytes(family));
543 
544     Put[] puts = new Put[1];
545     Put put = new Put(Bytes.toBytes("r1"));
546     put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
547     puts[0] = put;
548 
549     region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
550     MultithreadedTestUtil.TestContext ctx =
551       new MultithreadedTestUtil.TestContext(conf);
552     ctx.addThread(new PutThread(ctx, region));
553     ctx.addThread(new CheckAndPutThread(ctx, region));
554     ctx.startThreads();
555     while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
556       Thread.sleep(100);
557     }
558     ctx.stop();
559     Scan s = new Scan();
560     RegionScanner scanner = region.getScanner(s);
561     List<Cell> results = new ArrayList<Cell>();
562     ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
563     scanner.next(results, scannerContext);
564     for (Cell keyValue : results) {
565       assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
566     }
567 
568   }
569 
570   private class PutThread extends TestThread {
571     private HRegion region;
572     PutThread(TestContext ctx, HRegion region) {
573       super(ctx);
574       this.region = region;
575     }
576 
577     public void doWork() throws Exception {
578       Put[] puts = new Put[1];
579       Put put = new Put(Bytes.toBytes("r1"));
580       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
581       puts[0] = put;
582       testStep = TestStep.PUT_STARTED;
583       region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
584     }
585   }
586 
587   private class CheckAndPutThread extends TestThread {
588     private HRegion region;
589     CheckAndPutThread(TestContext ctx, HRegion region) {
590       super(ctx);
591       this.region = region;
592    }
593 
594     public void doWork() throws Exception {
595       Put[] puts = new Put[1];
596       Put put = new Put(Bytes.toBytes("r1"));
597       put.add(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
598       puts[0] = put;
599       while (testStep != TestStep.PUT_COMPLETED) {
600         Thread.sleep(100);
601       }
602       testStep = TestStep.CHECKANDPUT_STARTED;
603       region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
604         CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
605       testStep = TestStep.CHECKANDPUT_COMPLETED;
606     }
607   }
608 
609   public static class MockHRegion extends HRegion {
610 
611     public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
612         final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
613       super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
614     }
615 
616     @Override
617     public RowLock getRowLockInternal(final byte[] row, boolean waitForLock) throws IOException {
618       if (testStep == TestStep.CHECKANDPUT_STARTED) {
619         latch.countDown();
620       }
621       return new WrappedRowLock(super.getRowLockInternal(row, waitForLock));
622     }
623 
624     public class WrappedRowLock extends RowLockImpl {
625 
626       private WrappedRowLock(RowLock rowLock) {
627         setContext(((RowLockImpl)rowLock).getContext());
628       }
629 
630       @Override
631       public void release() {
632         if (testStep == TestStep.INIT) {
633           super.release();
634           return;
635         }
636 
637         if (testStep == TestStep.PUT_STARTED) {
638           try {
639             testStep = TestStep.PUT_COMPLETED;
640             super.release();
641             // put has been written to the memstore and the row lock has been released, but the
642             // MVCC has not been advanced.  Prior to fixing HBASE-7051, the following order of
643             // operations would cause the non-atomicity to show up:
644             // 1) Put releases row lock (where we are now)
645             // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
646             //    because the MVCC has not advanced
647             // 3) Put advances MVCC
648             // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
649             // (see below), and then wait some more to give the checkAndPut time to read the old
650             // value.
651             latch.await();
652             Thread.sleep(1000);
653           } catch (InterruptedException e) {
654             Thread.currentThread().interrupt();
655           }
656         }
657         else if (testStep == TestStep.CHECKANDPUT_STARTED) {
658           super.release();
659         }
660       }
661     }
662   }
663 }