1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
25
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.ThreadPoolExecutor;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HRegionLocation;
40 import org.apache.hadoop.hbase.testclassification.MediumTests;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.Waiter;
44 import org.apache.hadoop.hbase.codec.KeyValueCodec;
45 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.JVMClusterUtil;
48 import org.apache.hadoop.hbase.util.Threads;
49 import org.junit.AfterClass;
50 import org.junit.Assert;
51 import org.junit.Before;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 @Category(MediumTests.class)
57 public class TestMultiParallel {
58 private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
59
60 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61 private static final byte[] VALUE = Bytes.toBytes("value");
62 private static final byte[] QUALIFIER = Bytes.toBytes("qual");
63 private static final String FAMILY = "family";
64 private static final TableName TEST_TABLE = TableName.valueOf("multi_test_table");
65 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
66 private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
67 private static final byte [][] KEYS = makeKeys();
68
69 private static final int slaves = 5;
70 private static Connection CONNECTION;
71
72 @BeforeClass public static void beforeClass() throws Exception {
73
74
75
76
77
78 UTIL.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
79 KeyValueCodec.class.getCanonicalName());
80 UTIL.startMiniCluster(slaves);
81 HTable t = UTIL.createMultiRegionTable(TEST_TABLE, Bytes.toBytes(FAMILY));
82 UTIL.waitTableEnabled(TEST_TABLE);
83 t.close();
84 CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
85 }
86
87 @AfterClass public static void afterClass() throws Exception {
88 CONNECTION.close();
89 UTIL.shutdownMiniCluster();
90 }
91
92 @Before public void before() throws Exception {
93 LOG.info("before");
94 if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
95
96 UTIL.getMiniHBaseCluster().getMaster().balance();
97
98
99 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
100 }
101 LOG.info("before done");
102 }
103
104 private static byte[][] makeKeys() {
105 byte [][] starterKeys = HBaseTestingUtility.KEYS;
106
107
108
109
110
111 int numKeys = (int) ((float) starterKeys.length * 10.33F);
112
113 List<byte[]> keys = new ArrayList<byte[]>();
114 for (int i = 0; i < numKeys; i++) {
115 int kIdx = i % starterKeys.length;
116 byte[] k = starterKeys[kIdx];
117 byte[] cp = new byte[k.length + 1];
118 System.arraycopy(k, 0, cp, 0, k.length);
119 cp[k.length] = new Integer(i % 256).byteValue();
120 keys.add(cp);
121 }
122
123
124
125
126
127 for (int i = 0; i < 100; i++) {
128 int kIdx = i % starterKeys.length;
129 byte[] k = starterKeys[kIdx];
130 byte[] cp = new byte[k.length + 1];
131 System.arraycopy(k, 0, cp, 0, k.length);
132 cp[k.length] = new Integer(i % 256).byteValue();
133 keys.add(cp);
134 }
135 return keys.toArray(new byte [][] {new byte [] {}});
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149 @Test(timeout=300000)
150 public void testActiveThreadsCount() throws Exception {
151 try (Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration())) {
152 ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration());
153 try {
154 try (Table t = connection.getTable(TEST_TABLE, executor)) {
155 List<Put> puts = constructPutRequests();
156 t.batch(puts);
157 HashSet<ServerName> regionservers = new HashSet<ServerName>();
158 try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) {
159 for (Row r : puts) {
160 HRegionLocation location = locator.getRegionLocation(r.getRow());
161 regionservers.add(location.getServerName());
162 }
163 }
164 assertEquals(regionservers.size(), executor.getLargestPoolSize());
165 }
166 } finally {
167 executor.shutdownNow();
168 }
169 }
170 }
171
172 @Test(timeout=300000)
173 public void testBatchWithGet() throws Exception {
174 LOG.info("test=testBatchWithGet");
175 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
176
177
178 List<Put> puts = constructPutRequests();
179 table.batch(puts);
180
181
182 List<Row> gets = new ArrayList<Row>();
183 for (byte[] k : KEYS) {
184 Get get = new Get(k);
185 get.addColumn(BYTES_FAMILY, QUALIFIER);
186 gets.add(get);
187 }
188 Result[] multiRes = new Result[gets.size()];
189 table.batch(gets, multiRes);
190
191
192 List<Result> singleRes = new ArrayList<Result>();
193 for (Row get : gets) {
194 singleRes.add(table.get((Get) get));
195 }
196
197 Assert.assertEquals(singleRes.size(), multiRes.length);
198 for (int i = 0; i < singleRes.size(); i++) {
199 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
200 Cell[] singleKvs = singleRes.get(i).rawCells();
201 Cell[] multiKvs = multiRes[i].rawCells();
202 for (int j = 0; j < singleKvs.length; j++) {
203 Assert.assertEquals(singleKvs[j], multiKvs[j]);
204 Assert.assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(singleKvs[j]),
205 CellUtil.cloneValue(multiKvs[j])));
206 }
207 }
208 table.close();
209 }
210
211 @Test
212 public void testBadFam() throws Exception {
213 LOG.info("test=testBadFam");
214 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
215
216 List<Row> actions = new ArrayList<Row>();
217 Put p = new Put(Bytes.toBytes("row1"));
218 p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
219 actions.add(p);
220 p = new Put(Bytes.toBytes("row2"));
221 p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
222 actions.add(p);
223
224
225
226 Object [] r = new Object[actions.size()];
227 try {
228 table.batch(actions, r);
229 fail();
230 } catch (RetriesExhaustedWithDetailsException ex) {
231 LOG.debug(ex);
232
233 assertFalse(ex.mayHaveClusterIssues());
234 }
235 assertEquals(2, r.length);
236 assertTrue(r[0] instanceof Throwable);
237 assertTrue(r[1] instanceof Result);
238 table.close();
239 }
240
241 @Test (timeout=300000)
242 public void testFlushCommitsNoAbort() throws Exception {
243 LOG.info("test=testFlushCommitsNoAbort");
244 doTestFlushCommits(false);
245 }
246
247
248
249
250
251
252
253 @Test (timeout=360000)
254 public void testFlushCommitsWithAbort() throws Exception {
255 LOG.info("test=testFlushCommitsWithAbort");
256 doTestFlushCommits(true);
257 }
258
259
260
261
262
263
264 private void doTestFlushCommits(boolean doAbort) throws Exception {
265
266 LOG.info("get new table");
267 Table table = UTIL.getConnection().getTable(TEST_TABLE);
268 table.setWriteBufferSize(10 * 1024 * 1024);
269
270 LOG.info("constructPutRequests");
271 List<Put> puts = constructPutRequests();
272 table.put(puts);
273 LOG.info("puts");
274 final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()
275 .size();
276 assert liveRScount > 0;
277 JVMClusterUtil.RegionServerThread liveRS = UTIL.getMiniHBaseCluster()
278 .getLiveRegionServerThreads().get(0);
279 if (doAbort) {
280 liveRS.getRegionServer().abort("Aborting for tests",
281 new Exception("doTestFlushCommits"));
282
283
284
285
286 while (liveRS.getRegionServer().getNumberOfOnlineRegions() != 0) {
287 Thread.sleep(100);
288 }
289
290
291 puts = constructPutRequests();
292 table.put(puts);
293 }
294
295 LOG.info("validating loaded data");
296 validateLoadedData(table);
297
298
299 List<JVMClusterUtil.RegionServerThread> liveRSs = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
300 int count = 0;
301 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
302 count++;
303 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
304 }
305 LOG.info("Count=" + count);
306 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
307 (doAbort ? (liveRScount - 1) : liveRScount), count);
308 if (doAbort) {
309 UTIL.getMiniHBaseCluster().waitOnRegionServer(0);
310 UTIL.waitFor(15 * 1000, new Waiter.Predicate<Exception>() {
311 @Override
312 public boolean evaluate() throws Exception {
313 return UTIL.getMiniHBaseCluster().getMaster()
314 .getClusterStatus().getServersSize() == (liveRScount - 1);
315 }
316 });
317 UTIL.waitFor(15 * 1000, UTIL.predicateNoRegionsInTransition());
318 }
319
320 table.close();
321 LOG.info("done");
322 }
323
324 @Test (timeout=300000)
325 public void testBatchWithPut() throws Exception {
326 LOG.info("test=testBatchWithPut");
327 Table table = CONNECTION.getTable(TEST_TABLE);
328
329 List<Put> puts = constructPutRequests();
330
331 Object[] results = table.batch(puts);
332 validateSizeAndEmpty(results, KEYS.length);
333
334 if (true) {
335 int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().size();
336 assert liveRScount > 0;
337 JVMClusterUtil.RegionServerThread liveRS =
338 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().get(0);
339 liveRS.getRegionServer().abort("Aborting for tests", new Exception("testBatchWithPut"));
340 puts = constructPutRequests();
341 try {
342 results = table.batch(puts);
343 } catch (RetriesExhaustedWithDetailsException ree) {
344 LOG.info(ree.getExhaustiveDescription());
345 table.close();
346 throw ree;
347 }
348 validateSizeAndEmpty(results, KEYS.length);
349 }
350
351 validateLoadedData(table);
352 table.close();
353 }
354
355 @Test(timeout=300000)
356 public void testBatchWithDelete() throws Exception {
357 LOG.info("test=testBatchWithDelete");
358 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
359
360
361 List<Put> puts = constructPutRequests();
362 Object[] results = table.batch(puts);
363 validateSizeAndEmpty(results, KEYS.length);
364
365
366 List<Row> deletes = new ArrayList<Row>();
367 for (int i = 0; i < KEYS.length; i++) {
368 Delete delete = new Delete(KEYS[i]);
369 delete.addFamily(BYTES_FAMILY);
370 deletes.add(delete);
371 }
372 results = table.batch(deletes);
373 validateSizeAndEmpty(results, KEYS.length);
374
375
376 for (byte[] k : KEYS) {
377 Get get = new Get(k);
378 get.addColumn(BYTES_FAMILY, QUALIFIER);
379 Assert.assertFalse(table.exists(get));
380 }
381 table.close();
382 }
383
384 @Test(timeout=300000)
385 public void testHTableDeleteWithList() throws Exception {
386 LOG.info("test=testHTableDeleteWithList");
387 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
388
389
390 List<Put> puts = constructPutRequests();
391 Object[] results = table.batch(puts);
392 validateSizeAndEmpty(results, KEYS.length);
393
394
395 ArrayList<Delete> deletes = new ArrayList<Delete>();
396 for (int i = 0; i < KEYS.length; i++) {
397 Delete delete = new Delete(KEYS[i]);
398 delete.deleteFamily(BYTES_FAMILY);
399 deletes.add(delete);
400 }
401 table.delete(deletes);
402 Assert.assertTrue(deletes.isEmpty());
403
404
405 for (byte[] k : KEYS) {
406 Get get = new Get(k);
407 get.addColumn(BYTES_FAMILY, QUALIFIER);
408 Assert.assertFalse(table.exists(get));
409 }
410 table.close();
411 }
412
413 @Test(timeout=300000)
414 public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
415 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
416 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
417
418 List<Row> puts = new ArrayList<Row>();
419 for (int i = 0; i < 100; i++) {
420 Put put = new Put(ONE_ROW);
421 byte[] qual = Bytes.toBytes("column" + i);
422 put.add(BYTES_FAMILY, qual, VALUE);
423 puts.add(put);
424 }
425 Object[] results = table.batch(puts);
426
427
428 validateSizeAndEmpty(results, 100);
429
430
431 List<Row> gets = new ArrayList<Row>();
432 for (int i = 0; i < 100; i++) {
433 Get get = new Get(ONE_ROW);
434 byte[] qual = Bytes.toBytes("column" + i);
435 get.addColumn(BYTES_FAMILY, qual);
436 gets.add(get);
437 }
438
439 Object[] multiRes = table.batch(gets);
440
441 int idx = 0;
442 for (Object r : multiRes) {
443 byte[] qual = Bytes.toBytes("column" + idx);
444 validateResult(r, qual, VALUE);
445 idx++;
446 }
447 table.close();
448 }
449
450 @Test(timeout=300000)
451 public void testBatchWithIncrementAndAppend() throws Exception {
452 LOG.info("test=testBatchWithIncrementAndAppend");
453 final byte[] QUAL1 = Bytes.toBytes("qual1");
454 final byte[] QUAL2 = Bytes.toBytes("qual2");
455 final byte[] QUAL3 = Bytes.toBytes("qual3");
456 final byte[] QUAL4 = Bytes.toBytes("qual4");
457 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
458 Delete d = new Delete(ONE_ROW);
459 table.delete(d);
460 Put put = new Put(ONE_ROW);
461 put.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("abc"));
462 put.add(BYTES_FAMILY, QUAL2, Bytes.toBytes(1L));
463 table.put(put);
464
465 Increment inc = new Increment(ONE_ROW);
466 inc.addColumn(BYTES_FAMILY, QUAL2, 1);
467 inc.addColumn(BYTES_FAMILY, QUAL3, 1);
468
469 Append a = new Append(ONE_ROW);
470 a.add(BYTES_FAMILY, QUAL1, Bytes.toBytes("def"));
471 a.add(BYTES_FAMILY, QUAL4, Bytes.toBytes("xyz"));
472 List<Row> actions = new ArrayList<Row>();
473 actions.add(inc);
474 actions.add(a);
475
476 Object[] multiRes = table.batch(actions);
477 validateResult(multiRes[1], QUAL1, Bytes.toBytes("abcdef"));
478 validateResult(multiRes[1], QUAL4, Bytes.toBytes("xyz"));
479 validateResult(multiRes[0], QUAL2, Bytes.toBytes(2L));
480 validateResult(multiRes[0], QUAL3, Bytes.toBytes(1L));
481 table.close();
482 }
483
484 @Test(timeout=300000)
485 public void testNonceCollision() throws Exception {
486 LOG.info("test=testNonceCollision");
487 final Connection connection = ConnectionFactory.createConnection(UTIL.getConfiguration());
488 Table table = connection.getTable(TEST_TABLE);
489 Put put = new Put(ONE_ROW);
490 put.add(BYTES_FAMILY, QUALIFIER, Bytes.toBytes(0L));
491
492
493 NonceGenerator cnm = new PerClientRandomNonceGenerator() {
494 long lastNonce = -1;
495 @Override
496 public synchronized long newNonce() {
497 long nonce = 0;
498 if (lastNonce == -1) {
499 lastNonce = nonce = super.newNonce();
500 } else {
501 nonce = lastNonce;
502 lastNonce = -1L;
503 }
504 return nonce;
505 }
506 };
507
508 NonceGenerator oldCnm =
509 ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)connection, cnm);
510
511
512 try {
513 Increment inc = new Increment(ONE_ROW);
514 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
515 table.increment(inc);
516 inc = new Increment(ONE_ROW);
517 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
518 try {
519 table.increment(inc);
520 fail("Should have thrown an exception");
521 } catch (OperationConflictException ex) {
522 }
523 Get get = new Get(ONE_ROW);
524 get.addColumn(BYTES_FAMILY, QUALIFIER);
525 Result result = table.get(get);
526 validateResult(result, QUALIFIER, Bytes.toBytes(1L));
527
528
529 int numRequests = 40;
530 final CountDownLatch startedLatch = new CountDownLatch(numRequests);
531 final CountDownLatch startLatch = new CountDownLatch(1);
532 final CountDownLatch doneLatch = new CountDownLatch(numRequests);
533 for (int i = 0; i < numRequests; ++i) {
534 Runnable r = new Runnable() {
535 @Override
536 public void run() {
537 Table table = null;
538 try {
539 table = connection.getTable(TEST_TABLE);
540 } catch (IOException e) {
541 fail("Not expected");
542 }
543 Increment inc = new Increment(ONE_ROW);
544 inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
545 startedLatch.countDown();
546 try {
547 startLatch.await();
548 } catch (InterruptedException e) {
549 fail("Not expected");
550 }
551 try {
552 table.increment(inc);
553 } catch (OperationConflictException ex) {
554 } catch (IOException ioEx) {
555 fail("Not expected");
556 }
557 doneLatch.countDown();
558 }
559 };
560 Threads.setDaemonThreadRunning(new Thread(r));
561 }
562 startedLatch.await();
563 startLatch.countDown();
564 doneLatch.await();
565
566 get = new Get(ONE_ROW);
567 get.addColumn(BYTES_FAMILY, QUALIFIER);
568 result = table.get(get);
569 validateResult(result, QUALIFIER, Bytes.toBytes((numRequests / 2) + 1L));
570 table.close();
571 } finally {
572 ConnectionManager.injectNonceGeneratorForTesting((ClusterConnection)connection, oldCnm);
573 }
574 }
575
576 @Test(timeout=300000)
577 public void testBatchWithMixedActions() throws Exception {
578 LOG.info("test=testBatchWithMixedActions");
579 Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
580
581
582 Object[] results = table.batch(constructPutRequests());
583 validateSizeAndEmpty(results, KEYS.length);
584
585
586
587 List<Row> actions = new ArrayList<Row>();
588
589 byte[] qual2 = Bytes.toBytes("qual2");
590 byte[] val2 = Bytes.toBytes("putvalue2");
591
592
593 Get get = new Get(KEYS[10]);
594 get.addColumn(BYTES_FAMILY, QUALIFIER);
595 actions.add(get);
596
597
598 get = new Get(KEYS[11]);
599 get.addColumn(BYTES_FAMILY, QUALIFIER);
600 actions.add(get);
601
602
603 Put put = new Put(KEYS[10]);
604 put.add(BYTES_FAMILY, qual2, val2);
605 actions.add(put);
606
607
608 Delete delete = new Delete(KEYS[20]);
609 delete.deleteFamily(BYTES_FAMILY);
610 actions.add(delete);
611
612
613 get = new Get(KEYS[30]);
614 get.addColumn(BYTES_FAMILY, QUALIFIER);
615 actions.add(get);
616
617
618
619
620
621
622 put = new Put(KEYS[40]);
623 put.add(BYTES_FAMILY, qual2, val2);
624 actions.add(put);
625
626 results = table.batch(actions);
627
628
629
630 validateResult(results[0]);
631 validateResult(results[1]);
632 validateEmpty(results[2]);
633 validateEmpty(results[3]);
634 validateResult(results[4]);
635 validateEmpty(results[5]);
636
637
638 get = new Get(KEYS[40]);
639 get.addColumn(BYTES_FAMILY, qual2);
640 Result r = table.get(get);
641 validateResult(r, qual2, val2);
642
643 table.close();
644 }
645
646
647
648 private void validateResult(Object r) {
649 validateResult(r, QUALIFIER, VALUE);
650 }
651
652 private void validateResult(Object r1, byte[] qual, byte[] val) {
653 Result r = (Result)r1;
654 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
655 byte[] value = r.getValue(BYTES_FAMILY, qual);
656 if (0 != Bytes.compareTo(val, value)) {
657 fail("Expected [" + Bytes.toStringBinary(val)
658 + "] but got [" + Bytes.toStringBinary(value) + "]");
659 }
660 }
661
662 private List<Put> constructPutRequests() {
663 List<Put> puts = new ArrayList<>();
664 for (byte[] k : KEYS) {
665 Put put = new Put(k);
666 put.add(BYTES_FAMILY, QUALIFIER, VALUE);
667 puts.add(put);
668 }
669 return puts;
670 }
671
672 private void validateLoadedData(Table table) throws IOException {
673
674 LOG.info("Validating data on " + table);
675 for (byte[] k : KEYS) {
676 Get get = new Get(k);
677 get.addColumn(BYTES_FAMILY, QUALIFIER);
678 Result r = table.get(get);
679 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
680 Assert.assertEquals(0, Bytes.compareTo(VALUE, r
681 .getValue(BYTES_FAMILY, QUALIFIER)));
682 }
683 }
684
685 private void validateEmpty(Object r1) {
686 Result result = (Result)r1;
687 Assert.assertTrue(result != null);
688 Assert.assertTrue(result.getRow() == null);
689 Assert.assertEquals(0, result.rawCells().length);
690 }
691
692 private void validateSizeAndEmpty(Object[] results, int expectedSize) {
693
694 Assert.assertEquals(expectedSize, results.length);
695 for (Object result : results) {
696 validateEmpty(result);
697 }
698 }
699 }