View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.ThreadPoolExecutor;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellUtil;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionLocation;
40  import org.apache.hadoop.hbase.testclassification.MediumTests;
41  import org.apache.hadoop.hbase.ServerName;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.Waiter;
44  import org.apache.hadoop.hbase.codec.KeyValueCodec;
45  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.JVMClusterUtil;
48  import org.apache.hadoop.hbase.util.Threads;
49  import org.junit.AfterClass;
50  import org.junit.Assert;
51  import org.junit.Before;
52  import org.junit.BeforeClass;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  
56  @Category(MediumTests.class)
57  public class TestMultiParallel {
58    private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
59  
60    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61    private static final byte[] VALUE = Bytes.toBytes("value");
62    private static final byte[] QUALIFIER = Bytes.toBytes("qual");
63    private static final String FAMILY = "family";
64    private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
65    private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
66    private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
67    private static final byte [][] KEYS = makeKeys();
68  
69    private static final int slaves = 5; // also used for testing HTable pool size
70    private static Connection CONNECTION;
71  
72    @BeforeClass public static void beforeClass() throws Exception {
73      // Uncomment the following lines if more verbosity is needed for
74      // debugging (see HBASE-12285 for details).
75      //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
76      //((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL);
77      //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
78      UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
79          KeyValueCodec.class.getCanonicalName());
80      UTIL.startMiniCluster(slaves);
81      HTable t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
82      UTIL.waitTableEnabled(TEST_TABLE);
83      t.close();
84      CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
85    }
86  
87    @AfterClass public static void afterClass() throws Exception {
88      CONNECTION.close();
89      UTIL.shutdownMiniCluster();
90    }
91  
92    @Before public void before() throws Exception {
93      LOG.info("before");
94      if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
95        // Distribute regions
96        UTIL.getMiniHBaseCluster().getMaster().balance();
97  
98        // Wait until completing balance
99        UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
100     }
101     LOG.info("before done");
102   }
103 
104   private static byte[][] makeKeys() {
105     byte [][] starterKeys = HBaseTestingUtility.KEYS;
106     // Create a "non-uniform" test set with the following characteristics:
107     // a) Unequal number of keys per region
108 
109     // Don't use integer as a multiple, so that we have a number of keys that is
110     // not a multiple of the number of regions
111     int numKeys = (int) ((float) starterKeys.length * 10.33F);
112 
113     List<byte[]> keys = new ArrayList<byte[]>();
114     for (int i = 0; i < numKeys; i++) {
115       int kIdx = i % starterKeys.length;
116       byte[] k = starterKeys[kIdx];
117       byte[] cp = new byte[k.length + 1];
118       System.arraycopy(k, 0, cp, 0, k.length);
119       cp[k.length] = new Integer(i % 256).byteValue();
120       keys.add(cp);
121     }
122 
123     // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
124     // should work)
125     // c) keys are not in sorted order (within a region), to ensure that the
126     // sorting code and index mapping doesn't break the functionality
127     for (int i = 0; i < 100; i++) {
128       int kIdx = i % starterKeys.length;
129       byte[] k = starterKeys[kIdx];
130       byte[] cp = new byte[k.length + 1];
131       System.arraycopy(k, 0, cp, 0, k.length);
132       cp[k.length] = new Integer(i % 256).byteValue();
133       keys.add(cp);
134     }
135     return keys.toArray(new byte [][] {new byte [] {}});
136   }
137 
138 
139   /**
140    * This is for testing the active number of threads that were used while
141    * doing a batch operation. It inserts one row per region via the batch
142    * operation, and then checks the number of active threads.
143    * For HBASE-3553
144    * @throws IOException
145    * @throws InterruptedException
146    * @throws NoSuchFieldException
147    * @throws SecurityException
148    */
149   @Test(timeout=300000)
150   public void testActiveThreadsCount() throws Exception {
151     try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
152       ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
153       try {
154         try (Table t = connection.getTable(TEST_TABLE, executor)) {
155           List<Put> puts = constructPutRequests(); // creates a Put for every region
156           t.batch(puts);
157           HashSet<ServerName> regionservers = new HashSet<ServerName>();
158           try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
159             for (Row r : puts) {
160               HRegionLocation location = locator.getRegionLocation(r.getRow());
161               regionservers.add(location.getServerName());
162             }
163           }
164           assertEquals(regionservers.size(), executor.getLargestPoolSize());
165         }
166       } finally {
167         executor.shutdownNow();
168       }
169     }
170   }
171 
172   @Test(timeout=300000)
173   public void testBatchWithGet() throws Exception {
174     LOG.info("test=testBatchWithGet");
175     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
176 
177     // load test data
178     List<Put> puts = constructPutRequests();
179     table.batch(puts);
180 
181     // create a list of gets and run it
182     List<Row> gets = new ArrayList<Row>();
183     for (byte[] k : KEYS) {
184       Get get = new Get(k);
185       get.addColumn(BYTES_FAMILY, QUALIFIER);
186       gets.add(get);
187     }
188     Result[] multiRes = new Result[gets.size()];
189     table.batch(gets, multiRes);
190 
191     // Same gets using individual call API
192     List<Result> singleRes = new ArrayList<Result>();
193     for (Row get : gets) {
194       singleRes.add(table.get((Get) get));
195     }
196     // Compare results
197     Assert.assertEquals(singleRes.size(), multiRes.length);
198     for (int i = 0; i < singleRes.size(); i++) {
199       Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
200       Cell[] singleKvs = singleRes.get(i).rawCells();
201       Cell[] multiKvs = multiRes[i].rawCells();
202       for (int j = 0; j < singleKvs.length; j++) {
203         Assert.assertEquals(singleKvs[j], multiKvs[j]);
204         Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
205             CellUtil.cloneValue(multiKvs[j])));
206       }
207     }
208     table.close();
209   }
210 
211   @Test
212   public void testBadFam() throws Exception {
213     LOG.info("test=testBadFam");
214     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
215 
216     List<Row> actions = new ArrayList<Row>();
217     Put p = new Put(Bytes.toBytes("row1"));
218     p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
219     actions.add(p);
220     p = new Put(Bytes.toBytes("row2"));
221     p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
222     actions.add(p);
223 
224     // row1 and row2 should be in the same region.
225 
226     Object [] r = new Object[actions.size()];
227     try {
228       table.batch(actions, r);
229       fail();
230     } catch (RetriesExhaustedWithDetailsException ex) {
231       LOG.debug(ex);
232       // good!
233       assertFalse(ex.mayHaveClusterIssues());
234     }
235     assertEquals(2, r.length);
236     assertTrue(r[0] instanceof Throwable);
237     assertTrue(r[1] instanceof Result);
238     table.close();
239   }
240 
241   @Test (timeout=300000)
242   public void testFlushCommitsNoAbort() throws Exception {
243     LOG.info("test=testFlushCommitsNoAbort");
244     doTestFlushCommits(false);
245   }
246 
247   /**
248    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
249    * unit tests will take an unnecessarily long time to run.
250    *
251    * @throws Exception
252    */
253   @Test (timeout=360000)
254   public void testFlushCommitsWithAbort() throws Exception {
255     LOG.info("test=testFlushCommitsWithAbort");
256     doTestFlushCommits(true);
257   }
258 
259   /**
260    * Set table auto flush to false and test flushing commits
261    * @param doAbort true if abort one regionserver in the testing
262    * @throws Exception
263    */
264   private void doTestFlushCommits(boolean doAbort) throws Exception {
265     // Load the data
266     LOG.info("get new table");
267     Table table = UTIL.getConnection().getTable(TEST_TABLE);
268     table.setWriteBufferSize(10 * 1024 * 1024);
269 
270     LOG.info("constructPutRequests");
271     List<Put> puts = constructPutRequests();
272     table.put(puts);
273     LOG.info("puts");
274     final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
275         .size();
276     assert liveRScount > 0;
277     JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
278         .getLiveRegionServerThreads().get(0);
279     if (doAbort) {
280       liveRS.getRegionServer().abort("Aborting for tests",
281           new Exception("doTestFlushCommits"));
282       // If we wait for no regions being online after we abort the server, we
283       // could ensure the master has re-assigned the regions on killed server
284       // after writing successfully. It means the server we aborted is dead
285       // and detected by matser
286       while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
287         Thread.sleep(100);
288       }
289       // try putting more keys after the abort. same key/qual... just validating
290       // no exceptions thrown
291       puts = constructPutRequests();
292       table.put(puts);
293     }
294 
295     LOG.info("validating loaded data");
296     validateLoadedData(table);
297 
298     // Validate server and region count
299     List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
300     int count = 0;
301     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
302       count++;
303       LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
304     }
305     LOG.info("Count=" + count);
306     Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
307         (doAbort ? (liveRScount - 1) : liveRScount), count);
308     if (doAbort) {
309       UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
310       UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
311         @Override
312         public boolean evaluate() throws Exception {
313           return UTIL.getMiniHBaseCluster().getMaster()
314               .getClusterStatus().getServersSize() == (liveRScount - 1);
315         }
316       });
317       UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
318     }
319 
320     table.close();
321     LOG.info("done");
322   }
323 
324   @Test (timeout=300000)
325   public void testBatchWithPut() throws Exception {
326     LOG.info("test=testBatchWithPut");
327     Table table = CONNECTION.getTable(TEST_TABLE);
328     // put multiple rows using a batch
329     List<Put> puts = constructPutRequests();
330 
331     Object[] results = table.batch(puts);
332     validateSizeAndEmpty(results, KEYS.length);
333 
334     if (true) {
335       int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
336       assert liveRScount > 0;
337       JVMClusterUtil.RegionServerThread liveRS =
338         UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
339       liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
340       puts = constructPutRequests();
341       try {
342         results = table.batch(puts);
343       } catch (RetriesExhaustedWithDetailsException ree) {
344         LOG.info(ree.getExhaustiveDescription());
345         table.close();
346         throw ree;
347       }
348       validateSizeAndEmpty(results, KEYS.length);
349     }
350 
351     validateLoadedData(table);
352     table.close();
353   }
354 
355   @Test(timeout=300000)
356   public void testBatchWithDelete() throws Exception {
357     LOG.info("test=testBatchWithDelete");
358     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
359 
360     // Load some data
361     List<Put> puts = constructPutRequests();
362     Object[] results = table.batch(puts);
363     validateSizeAndEmpty(results, KEYS.length);
364 
365     // Deletes
366     List<Row> deletes = new ArrayList<Row>();
367     for (int i = 0; i < KEYS.length; i++) {
368       Delete delete = new Delete(KEYS[i]);
369       delete.addFamily(BYTES_FAMILY);
370       deletes.add(delete);
371     }
372     results = table.batch(deletes);
373     validateSizeAndEmpty(results, KEYS.length);
374 
375     // Get to make sure ...
376     for (byte[] k : KEYS) {
377       Get get = new Get(k);
378       get.addColumn(BYTES_FAMILY, QUALIFIER);
379       Assert.assertFalse(table.exists(get));
380     }
381     table.close();
382   }
383 
384   @Test(timeout=300000)
385   public void testHTableDeleteWithList() throws Exception {
386     LOG.info("test=testHTableDeleteWithList");
387     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
388 
389     // Load some data
390     List<Put> puts = constructPutRequests();
391     Object[] results = table.batch(puts);
392     validateSizeAndEmpty(results, KEYS.length);
393 
394     // Deletes
395     ArrayList<Delete> deletes = new ArrayList<Delete>();
396     for (int i = 0; i < KEYS.length; i++) {
397       Delete delete = new Delete(KEYS[i]);
398       delete.deleteFamily(BYTES_FAMILY);
399       deletes.add(delete);
400     }
401     table.delete(deletes);
402     Assert.assertTrue(deletes.isEmpty());
403 
404     // Get to make sure ...
405     for (byte[] k : KEYS) {
406       Get get = new Get(k);
407       get.addColumn(BYTES_FAMILY, QUALIFIER);
408       Assert.assertFalse(table.exists(get));
409     }
410     table.close();
411   }
412 
413   @Test(timeout=300000)
414   public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
415     LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
416     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
417 
418     List<Row> puts = new ArrayList<Row>();
419     for (int i = 0; i < 100; i++) {
420       Put put = new Put(ONE_ROW);
421       byte[] qual = Bytes.toBytes("column" + i);
422       put.add(BYTES_FAMILY, qual, VALUE);
423       puts.add(put);
424     }
425     Object[] results = table.batch(puts);
426 
427     // validate
428     validateSizeAndEmpty(results, 100);
429 
430     // get the data back and validate that it is correct
431     List<Row> gets = new ArrayList<Row>();
432     for (int i = 0; i < 100; i++) {
433       Get get = new Get(ONE_ROW);
434       byte[] qual = Bytes.toBytes("column" + i);
435       get.addColumn(BYTES_FAMILY, qual);
436       gets.add(get);
437     }
438 
439     Object[] multiRes = table.batch(gets);
440 
441     int idx = 0;
442     for (Object r : multiRes) {
443       byte[] qual = Bytes.toBytes("column" + idx);
444       validateResult(r, qual, VALUE);
445       idx++;
446     }
447     table.close();
448   }
449 
450   @Test(timeout=300000)
451   public void testBatchWithIncrementAndAppend() throws Exception {
452     LOG.info("test=testBatchWithIncrementAndAppend");
453     final byte[] QUAL1 = Bytes.toBytes("qual1");
454     final byte[] QUAL2 = Bytes.toBytes("qual2");
455     final byte[] QUAL3 = Bytes.toBytes("qual3");
456     final byte[] QUAL4 = Bytes.toBytes("qual4");
457     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
458     Delete d = new Delete(ONE_ROW);
459     table.delete(d);
460     Put put = new Put(ONE_ROW);
461     put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
462     put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
463     table.put(put);
464 
465     Increment inc = new Increment(ONE_ROW);
466     inc.addColumn(BYTES_FAMILY, QUAL2, 1);
467     inc.addColumn(BYTES_FAMILY, QUAL3, 1);
468 
469     Append a = new Append(ONE_ROW);
470     a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
471     a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
472     List<Row> actions = new ArrayList<Row>();
473     actions.add(inc);
474     actions.add(a);
475 
476     Object[] multiRes = table.batch(actions);
477     validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
478     validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
479     validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
480     validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
481     table.close();
482   }
483 
484   @Test(timeout=300000)
485   public void testNonceCollision() throws Exception {
486     LOG.info("test=testNonceCollision");
487     final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
488     Table table = connection.getTable(TEST_TABLE);
489     Put put = new Put(ONE_ROW);
490     put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
491 
492     // Replace nonce manager with the one that returns each nonce twice.
493     NonceGenerator cnm = new PerClientRandomNonceGenerator() {
494       long lastNonce = -1;
495       @Override
496       public synchronized long newNonce() {
497         long nonce = 0;
498         if (lastNonce == -1) {
499           lastNonce = nonce = super.newNonce();
500         } else {
501           nonce = lastNonce;
502           lastNonce = -1L;
503         }
504         return nonce;
505       }
506     };
507 
508     NonceGenerator oldCnm =
509       ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
510 
511     // First test sequential requests.
512     try {
513       Increment inc = new Increment(ONE_ROW);
514       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
515       table.increment(inc);
516       inc = new Increment(ONE_ROW);
517       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
518       try {
519         table.increment(inc);
520         fail("Should have thrown an exception");
521       } catch (OperationConflictException ex) {
522       }
523       Get get = new Get(ONE_ROW);
524       get.addColumn(BYTES_FAMILY, QUALIFIER);
525       Result result = table.get(get);
526       validateResult(result, QUALIFIER, Bytes.toBytes(1L));
527 
528       // Now run a bunch of requests in parallel, exactly half should succeed.
529       int numRequests = 40;
530       final CountDownLatch startedLatch = new CountDownLatch(numRequests);
531       final CountDownLatch startLatch = new CountDownLatch(1);
532       final CountDownLatch doneLatch = new CountDownLatch(numRequests);
533       for (int i = 0; i < numRequests; ++i) {
534         Runnable r = new Runnable() {
535           @Override
536           public void run() {
537             Table table = null;
538             try {
539               table = connection.getTable(TEST_TABLE);
540             } catch (IOException e) {
541               fail("Not expected");
542             }
543             Increment inc = new Increment(ONE_ROW);
544             inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
545             startedLatch.countDown();
546             try {
547               startLatch.await();
548             } catch (InterruptedException e) {
549               fail("Not expected");
550             }
551             try {
552               table.increment(inc);
553             } catch (OperationConflictException ex) { // Some threads are expected to fail.
554             } catch (IOException ioEx) {
555               fail("Not expected");
556             }
557             doneLatch.countDown();
558           }
559         };
560         Threads.setDaemonThreadRunning(new Thread(r));
561       }
562       startedLatch.await(); // Wait until all threads are ready...
563       startLatch.countDown(); // ...and unleash the herd!
564       doneLatch.await();
565       // Now verify
566       get = new Get(ONE_ROW);
567       get.addColumn(BYTES_FAMILY, QUALIFIER);
568       result = table.get(get);
569       validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
570       table.close();
571     } finally {
572       ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
573     }
574   }
575 
576   @Test(timeout=300000)
577   public void testBatchWithMixedActions() throws Exception {
578     LOG.info("test=testBatchWithMixedActions");
579     Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
580 
581     // Load some data to start
582     Object[] results = table.batch(constructPutRequests());
583     validateSizeAndEmpty(results, KEYS.length);
584 
585     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
586     // put
587     List<Row> actions = new ArrayList<Row>();
588 
589     byte[] qual2 = Bytes.toBytes("qual2");
590     byte[] val2 = Bytes.toBytes("putvalue2");
591 
592     // 0 get
593     Get get = new Get(KEYS[10]);
594     get.addColumn(BYTES_FAMILY, QUALIFIER);
595     actions.add(get);
596 
597     // 1 get
598     get = new Get(KEYS[11]);
599     get.addColumn(BYTES_FAMILY, QUALIFIER);
600     actions.add(get);
601 
602     // 2 put of new column
603     Put put = new Put(KEYS[10]);
604     put.add(BYTES_FAMILY, qual2, val2);
605     actions.add(put);
606 
607     // 3 delete
608     Delete delete = new Delete(KEYS[20]);
609     delete.deleteFamily(BYTES_FAMILY);
610     actions.add(delete);
611 
612     // 4 get
613     get = new Get(KEYS[30]);
614     get.addColumn(BYTES_FAMILY, QUALIFIER);
615     actions.add(get);
616 
617     // There used to be a 'get' of a previous put here, but removed
618     // since this API really cannot guarantee order in terms of mixed
619     // get/puts.
620 
621     // 5 put of new column
622     put = new Put(KEYS[40]);
623     put.add(BYTES_FAMILY, qual2, val2);
624     actions.add(put);
625 
626     results = table.batch(actions);
627 
628     // Validation
629 
630     validateResult(results[0]);
631     validateResult(results[1]);
632     validateEmpty(results[2]);
633     validateEmpty(results[3]);
634     validateResult(results[4]);
635     validateEmpty(results[5]);
636 
637     // validate last put, externally from the batch
638     get = new Get(KEYS[40]);
639     get.addColumn(BYTES_FAMILY, qual2);
640     Result r = table.get(get);
641     validateResult(r, qual2, val2);
642 
643     table.close();
644   }
645 
646   // // Helper methods ////
647 
648   private void validateResult(Object r) {
649     validateResult(r, QUALIFIER, VALUE);
650   }
651 
652   private void validateResult(Object r1, byte[] qual, byte[] val) {
653     Result r = (Result)r1;
654     Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
655     byte[] value = r.getValue(BYTES_FAMILY, qual);
656     if (0 != Bytes.compareTo(val, value)) {
657       fail("Expected [" + Bytes.toStringBinary(val)
658           + "] but got [" + Bytes.toStringBinary(value) + "]");
659     }
660   }
661 
662   private List<Put> constructPutRequests() {
663     List<Put> puts = new ArrayList<>();
664     for (byte[] k : KEYS) {
665       Put put = new Put(k);
666       put.add(BYTES_FAMILY, QUALIFIER, VALUE);
667       puts.add(put);
668     }
669     return puts;
670   }
671 
672   private void validateLoadedData(Table table) throws IOException {
673     // get the data back and validate that it is correct
674     LOG.info("Validating data on " + table);
675     for (byte[] k : KEYS) {
676       Get get = new Get(k);
677       get.addColumn(BYTES_FAMILY, QUALIFIER);
678       Result r = table.get(get);
679       Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
680       Assert.assertEquals(0, Bytes.compareTo(VALUE, r
681           .getValue(BYTES_FAMILY, QUALIFIER)));
682     }
683   }
684 
685   private void validateEmpty(Object r1) {
686     Result result = (Result)r1;
687     Assert.assertTrue(result != null);
688     Assert.assertTrue(result.getRow() == null);
689     Assert.assertEquals(0, result.rawCells().length);
690   }
691 
692   private void validateSizeAndEmpty(Object[] results, int expectedSize) {
693     // Validate got back the same number of Result objects, all empty
694     Assert.assertEquals(expectedSize, results.length);
695     for (Object result : results) {
696       validateEmpty(result);
697     }
698   }
699 }