1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.lang.reflect.Field;
30 import java.lang.reflect.Modifier;
31 import java.net.SocketTimeoutException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.SynchronousQueue;
39 import java.util.concurrent.ThreadPoolExecutor;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.concurrent.atomic.AtomicReference;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.hbase.Cell;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HBaseTestingUtility;
52 import org.apache.hadoop.hbase.HConstants;
53 import org.apache.hadoop.hbase.HRegionLocation;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.RegionLocations;
56 import org.apache.hadoop.hbase.ServerName;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.Waiter;
59 import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
60 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
61 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
62 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
63 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
64 import org.apache.hadoop.hbase.exceptions.DeserializationException;
65 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
66 import org.apache.hadoop.hbase.filter.Filter;
67 import org.apache.hadoop.hbase.filter.FilterBase;
68 import org.apache.hadoop.hbase.ipc.RpcClient;
69 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
70 import org.apache.hadoop.hbase.master.HMaster;
71 import org.apache.hadoop.hbase.regionserver.HRegionServer;
72 import org.apache.hadoop.hbase.regionserver.Region;
73 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
74 import org.apache.hadoop.hbase.testclassification.MediumTests;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77 import org.apache.hadoop.hbase.util.JVMClusterUtil;
78 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
79 import org.apache.hadoop.hbase.util.Threads;
80 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
81 import org.junit.AfterClass;
82 import org.junit.Assert;
83 import org.junit.BeforeClass;
84 import org.junit.Ignore;
85 import org.junit.Test;
86 import org.junit.experimental.categories.Category;
87
88 import com.google.common.collect.Lists;
89
90
91
92
93 @Category(MediumTests.class)
94 public class TestHCM {
95 private static final Log LOG = LogFactory.getLog(TestHCM.class);
96 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
97 private static final TableName TABLE_NAME =
98 TableName.valueOf("test");
99 private static final TableName TABLE_NAME1 =
100 TableName.valueOf("test1");
101 private static final TableName TABLE_NAME2 =
102 TableName.valueOf("test2");
103 private static final TableName TABLE_NAME3 =
104 TableName.valueOf("test3");
105 private static final TableName TABLE_NAME4 =
106 TableName.valueOf("test4");
107 private static final byte[] FAM_NAM = Bytes.toBytes("f");
108 private static final byte[] ROW = Bytes.toBytes("bbb");
109 private static final byte[] ROW_X = Bytes.toBytes("xxx");
110 private static Random _randy = new Random();
111
112
113
114
115 public static class SleepAndFailFirstTime extends BaseRegionObserver {
116 static final AtomicLong ct = new AtomicLong(0);
117 static final String SLEEP_TIME_CONF_KEY =
118 "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
119 static final long DEFAULT_SLEEP_TIME = 20000;
120 static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
121
122 public SleepAndFailFirstTime() {
123 }
124
125 @Override
126 public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
127 RegionCoprocessorEnvironment env = c.getEnvironment();
128 Configuration conf = env.getConfiguration();
129 sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
130 }
131
132 @Override
133 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
134 final Get get, final List<Cell> results) throws IOException {
135 Threads.sleep(sleepTime.get());
136 if (ct.incrementAndGet() == 1){
137 throw new IOException("first call I fail");
138 }
139 }
140 }
141
142 public static class SleepCoprocessor extends BaseRegionObserver {
143 public static final int SLEEP_TIME = 5000;
144
145 @Override public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
146 final Get get, final List<Cell> results) throws IOException {
147 Threads.sleep(SLEEP_TIME);
148 }
149 }
150
151 @BeforeClass
152 public static void setUpBeforeClass() throws Exception {
153 TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
154 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
155 TEST_UTIL.startMiniCluster(2);
156 }
157
158 @AfterClass public static void tearDownAfterClass() throws Exception {
159 TEST_UTIL.shutdownMiniCluster();
160 }
161
162
163 private static int getHConnectionManagerCacheSize(){
164 return HConnectionTestingUtility.getConnectionCount();
165 }
166
167 @Test
168 public void testClusterConnection() throws IOException {
169 ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
170 5, TimeUnit.SECONDS,
171 new SynchronousQueue<Runnable>(),
172 Threads.newDaemonThreadFactory("test-hcm"));
173
174 HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
175 HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
176
177 assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
178
179 String tableName = "testClusterConnection";
180 TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
181 HTable t = (HTable)con1.getTable(tableName, otherPool);
182
183 assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
184
185 assertTrue(otherPool == t.getPool());
186 t.close();
187
188 t = (HTable)con2.getTable(tableName);
189
190 assertTrue(otherPool == t.getPool());
191 t.close();
192
193 t = (HTable)con2.getTable(Bytes.toBytes(tableName));
194
195 assertTrue(otherPool == t.getPool());
196 t.close();
197
198 t = (HTable)con2.getTable(TableName.valueOf(tableName));
199
200 assertTrue(otherPool == t.getPool());
201 t.close();
202
203 t = (HTable)con1.getTable(tableName);
204 ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
205
206 assertNotNull("An internal Thread pool should have been created", pool);
207
208 assertTrue(t.getPool() == pool);
209 t.close();
210
211 t = (HTable)con1.getTable(tableName);
212
213 assertTrue(t.getPool() == pool);
214 t.close();
215
216 con1.close();
217
218 assertTrue(pool.isShutdown());
219
220 con2.close();
221
222 assertFalse(otherPool.isShutdown());
223 otherPool.shutdownNow();
224 }
225
226
227
228
229
230 @Test
231 public void testAdminFactory() throws IOException {
232 Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
233 Admin admin = con1.getAdmin();
234 assertTrue(admin.getConnection() == con1);
235 assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
236 con1.close();
237 }
238
239
240 @Ignore @Test(expected = RegionServerStoppedException.class)
241 public void testClusterStatus() throws Exception {
242
243 TableName tn =
244 TableName.valueOf("testClusterStatus");
245 byte[] cf = "cf".getBytes();
246 byte[] rk = "rk1".getBytes();
247
248 JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
249 rs.waitForServerOnline();
250 final ServerName sn = rs.getRegionServer().getServerName();
251
252 HTable t = TEST_UTIL.createTable(tn, cf);
253 TEST_UTIL.waitTableAvailable(tn);
254
255 while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
256 getRegionStates().isRegionsInTransition()){
257 Thread.sleep(1);
258 }
259 final HConnectionImplementation hci = (HConnectionImplementation)t.getConnection();
260 while (t.getRegionLocation(rk).getPort() != sn.getPort()){
261 TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
262 getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
263 while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
264 getRegionStates().isRegionsInTransition()){
265 Thread.sleep(1);
266 }
267 hci.clearRegionCache(tn);
268 }
269 Assert.assertNotNull(hci.clusterStatusListener);
270 TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
271
272 Put p1 = new Put(rk);
273 p1.add(cf, "qual".getBytes(), "val".getBytes());
274 t.put(p1);
275
276 rs.getRegionServer().abort("I'm dead");
277
278
279 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
280 @Override
281 public boolean evaluate() throws Exception {
282 return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
283 getDeadServers().isDeadServer(sn);
284 }
285 });
286
287 TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
288 @Override
289 public boolean evaluate() throws Exception {
290 return hci.clusterStatusListener.isDeadServer(sn);
291 }
292 });
293
294 t.close();
295 hci.getClient(sn);
296 }
297
298
299
300
301
302 @Test
303 public void testConnectionCloseAllowsInterrupt() throws Exception {
304 testConnectionClose(true);
305 }
306
307 @Test
308 public void testConnectionNotAllowsInterrupt() throws Exception {
309 testConnectionClose(false);
310 }
311
312
313
314
315
316
317
318
319
320 @Test
321 public void testOperationTimeout() throws Exception {
322 HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
323 hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
324 HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
325 table.setRpcTimeout(Integer.MAX_VALUE);
326
327 table.setOperationTimeout(120 * 1000);
328 table.get(new Get(FAM_NAM));
329
330
331 SleepAndFailFirstTime.ct.set(0);
332 try {
333 table.setOperationTimeout(30 * 1000);
334 table.get(new Get(FAM_NAM));
335 Assert.fail("We expect an exception here");
336 } catch (SocketTimeoutException e) {
337
338
339
340 LOG.info("We received an exception, as expected ", e);
341 } catch (IOException e) {
342 Assert.fail("Wrong exception:" + e.getMessage());
343 } finally {
344 table.close();
345 }
346 }
347
348 @Test(expected = RetriesExhaustedException.class)
349 public void testRpcTimeout() throws Exception {
350 HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
351 hdt.addCoprocessor(SleepCoprocessor.class.getName());
352 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
353
354 try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
355 assert t instanceof HTable;
356 HTable table = (HTable) t;
357 table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
358 table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
359 table.get(new Get(FAM_NAM));
360 }
361 }
362
363
364
365
366 @Test
367 public void testRpcRetryingCallerSleep() throws Exception {
368 HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcRetryingCallerSleep");
369 hdt.addCoprocessorWithSpec("|" + SleepAndFailFirstTime.class.getName() + "||"
370 + SleepAndFailFirstTime.SLEEP_TIME_CONF_KEY + "=2000");
371 TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
372
373 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
374 c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
375 c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
376
377 Connection connection = ConnectionFactory.createConnection(c);
378 Table t = connection.getTable(TableName.valueOf("HCM-testRpcRetryingCallerSleep"));
379 if (t instanceof HTable) {
380 HTable table = (HTable) t;
381 table.setOperationTimeout(8000);
382
383 table.get(new Get(FAM_NAM));
384
385
386 SleepAndFailFirstTime.ct.set(0);
387 try {
388 table.setOperationTimeout(6000);
389
390
391 table.get(new Get(FAM_NAM));
392 Assert.fail("We expect an exception here");
393 } catch (SocketTimeoutException e) {
394 LOG.info("We received an exception, as expected ", e);
395 } catch (IOException e) {
396 Assert.fail("Wrong exception:" + e.getMessage());
397 } finally {
398 table.close();
399 connection.close();
400 }
401 }
402 }
403
404 @Test
405 public void testCallableSleep() throws Exception {
406 long pauseTime;
407 long baseTime = 100;
408 TableName tableName = TableName.valueOf("HCM-testCallableSleep");
409 HTable table = TEST_UTIL.createTable(tableName, FAM_NAM);
410 RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
411 TEST_UTIL.getConnection(), tableName, ROW) {
412 public Object call(int timeout) throws IOException {
413 return null;
414 }
415 };
416
417 regionServerCallable.prepare(false);
418 for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
419 pauseTime = regionServerCallable.sleep(baseTime, i);
420 assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
421 assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
422 }
423
424 RegionAdminServiceCallable<Object> regionAdminServiceCallable =
425 new RegionAdminServiceCallable<Object>(
426 (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
427 TEST_UTIL.getConfiguration()), tableName, ROW) {
428 public Object call(int timeout) throws IOException {
429 return null;
430 }
431 };
432
433 regionAdminServiceCallable.prepare(false);
434 for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
435 pauseTime = regionAdminServiceCallable.sleep(baseTime, i);
436 assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
437 assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
438 }
439
440 MasterCallable masterCallable = new MasterCallable((HConnection) TEST_UTIL.getConnection()) {
441 public Object call(int timeout) throws IOException {
442 return null;
443 }
444 };
445
446 for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
447 pauseTime = masterCallable.sleep(baseTime, i);
448 assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
449 assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
450 }
451 }
452
453 private void testConnectionClose(boolean allowsInterrupt) throws Exception {
454 TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
455 TEST_UTIL.createTable(tableName, FAM_NAM).close();
456
457 boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
458
459 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
460
461 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
462 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100);
463 c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0);
464 c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0);
465 c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
466
467 final HTable table = new HTable(c2, tableName);
468
469 Put put = new Put(ROW);
470 put.add(FAM_NAM, ROW, ROW);
471 table.put(put);
472
473
474 final AtomicInteger step = new AtomicInteger(0);
475
476 final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
477 Thread t = new Thread("testConnectionCloseThread") {
478 @Override
479 public void run() {
480 int done = 0;
481 try {
482 step.set(1);
483 while (step.get() == 1) {
484 Get get = new Get(ROW);
485 table.get(get);
486 done++;
487 if (done % 100 == 0)
488 LOG.info("done=" + done);
489 }
490 } catch (Throwable t) {
491 failed.set(t);
492 LOG.error(t);
493 }
494 step.set(3);
495 }
496 };
497 t.start();
498 TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
499 @Override
500 public boolean evaluate() throws Exception {
501 return step.get() == 1;
502 }
503 });
504
505 ServerName sn = table.getRegionLocation(ROW).getServerName();
506 ConnectionManager.HConnectionImplementation conn =
507 (ConnectionManager.HConnectionImplementation) table.getConnection();
508 RpcClient rpcClient = conn.getRpcClient();
509
510 LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
511 for (int i = 0; i < 5000; i++) {
512 rpcClient.cancelConnections(sn);
513 Thread.sleep(5);
514 }
515
516 step.compareAndSet(1, 2);
517
518
519 TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
520 @Override
521 public boolean evaluate() throws Exception {
522 return step.get() == 3;
523 }
524 });
525 table.close();
526 Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
527 TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
528 }
529
530
531
532
533 @Test
534 public void testConnectionIdle() throws Exception {
535 TableName tableName = TableName.valueOf("HCM-testConnectionIdle");
536 TEST_UTIL.createTable(tableName, FAM_NAM).close();
537 int idleTime = 20000;
538 boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
539
540 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
541
542 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
543 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
544 c2.setInt(RpcClient.IDLE_TIME, idleTime);
545
546 final Table table = new HTable(c2, tableName);
547
548 Put put = new Put(ROW);
549 put.add(FAM_NAM, ROW, ROW);
550 table.put(put);
551
552 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
553 mee.setValue(System.currentTimeMillis());
554 EnvironmentEdgeManager.injectEdge(mee);
555 LOG.info("first get");
556 table.get(new Get(ROW));
557
558 LOG.info("first get - changing the time & sleeping");
559 mee.incValue(idleTime + 1000);
560 Thread.sleep(1500);
561
562
563 LOG.info("second get - connection has been marked idle in the middle");
564
565
566 table.get(new Get(ROW));
567 mee.incValue(idleTime + 1000);
568
569 LOG.info("third get - connection is idle, but the reader doesn't know yet");
570
571
572
573
574
575
576
577 table.get(new Get(ROW));
578
579 LOG.info("we're done - time will change back");
580
581 table.close();
582 EnvironmentEdgeManager.reset();
583 TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
584 }
585
586
587
588
589
590
591 @Test
592 public void testConnectionCut() throws Exception {
593
594 TableName tableName = TableName.valueOf("HCM-testConnectionCut");
595
596 TEST_UTIL.createTable(tableName, FAM_NAM).close();
597 boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
598
599 Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
600
601 c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
602 c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
603 c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
604
605 HTable table = new HTable(c2, tableName);
606
607 Put p = new Put(FAM_NAM);
608 p.add(FAM_NAM, FAM_NAM, FAM_NAM);
609 table.put(p);
610
611 final HConnectionImplementation hci = (HConnectionImplementation)table.getConnection();
612 final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
613
614 Get get = new Get(FAM_NAM);
615 Assert.assertNotNull(table.get(get));
616
617 get = new Get(FAM_NAM);
618 get.setFilter(new BlockingFilter());
619
620
621 Thread t = new Thread() {
622 @Override
623 public void run() {
624 synchronized (syncBlockingFilter) {
625 try {
626 syncBlockingFilter.wait();
627 } catch (InterruptedException e) {
628 throw new RuntimeException(e);
629 }
630 }
631 hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
632 }
633 };
634
635 t.start();
636 try {
637 table.get(get);
638 Assert.fail();
639 } catch (IOException expected) {
640 LOG.debug("Received: " + expected);
641 Assert.assertFalse(expected instanceof SocketTimeoutException);
642 Assert.assertFalse(syncBlockingFilter.get());
643 } finally {
644 syncBlockingFilter.set(true);
645 t.join();
646 HConnectionManager.getConnection(c2).close();
647 TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
648 }
649
650 table.close();
651 }
652
653 protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
654
655 public static class BlockingFilter extends FilterBase {
656 @Override
657 public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
658 int i = 0;
659 while (i++ < 1000 && !syncBlockingFilter.get()) {
660 synchronized (syncBlockingFilter) {
661 syncBlockingFilter.notifyAll();
662 }
663 Threads.sleep(100);
664 }
665 syncBlockingFilter.set(true);
666 return false;
667 }
668 @Override
669 public ReturnCode filterKeyValue(Cell ignored) throws IOException {
670 return ReturnCode.INCLUDE;
671 }
672
673 public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
674 return new BlockingFilter();
675 }
676 }
677
678 @Test
679 public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
680
681 Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
682 new HashMap<HConnectionKey, HConnectionImplementation>();
683 oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
684
685 ConnectionManager.CONNECTION_INSTANCES.clear();
686
687 try {
688 HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
689 connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
690 "test abortingHConnectionRemovesItselfFromHCM"));
691 Assert.assertNotSame(connection,
692 HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
693 } finally {
694
695 ConnectionManager.CONNECTION_INSTANCES.clear();
696 ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
697 }
698 }
699
700
701
702
703
704
705 @Test
706 public void testRegionCaching() throws Exception{
707 TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
708 Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
709 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
710 HTable table = new HTable(conf, TABLE_NAME);
711
712 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
713 Put put = new Put(ROW);
714 put.add(FAM_NAM, ROW, ROW);
715 table.put(put);
716 ConnectionManager.HConnectionImplementation conn =
717 (ConnectionManager.HConnectionImplementation)table.getConnection();
718
719 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
720
721 final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() + 1;
722 HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
723 conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
724 ServerName.valueOf("127.0.0.1", nextPort,
725 HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
726 Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
727 .getRegionLocation().getPort(), nextPort);
728
729 conn.clearRegionCache(TABLE_NAME, ROW.clone());
730 RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
731 assertNull("What is this location?? " + rl, rl);
732
733
734
735 conn.clearRegionCache(TABLE_NAME);
736 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
737 Put put2 = new Put(ROW);
738 put2.add(FAM_NAM, ROW, ROW);
739 table.put(put2);
740 assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
741 assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
742
743 TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
744 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
745
746
747 while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
748 Thread.sleep(1);
749 }
750
751
752 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
753 byte[] regionName = toMove.getRegionInfo().getRegionName();
754 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
755
756
757 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
758 int destServerId = (curServerId == 0 ? 1 : 0);
759
760 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
761 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
762
763 ServerName destServerName = destServer.getServerName();
764
765
766 Assert.assertTrue(curServer != destServer);
767 Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
768 Assert.assertFalse( toMove.getPort() == destServerName.getPort());
769 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
770 Assert.assertNull(destServer.getOnlineRegion(regionName));
771 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
772 getAssignmentManager().getRegionStates().isRegionsInTransition());
773
774
775
776 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
777 TEST_UTIL.getHBaseAdmin().move(
778 toMove.getRegionInfo().getEncodedNameAsBytes(),
779 destServerName.getServerName().getBytes()
780 );
781
782 while (destServer.getOnlineRegion(regionName) == null ||
783 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
784 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
785 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
786
787 Thread.sleep(1);
788 }
789
790 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
791
792
793 Assert.assertNull(curServer.getOnlineRegion(regionName));
794 Assert.assertNotNull(destServer.getOnlineRegion(regionName));
795 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
796 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
797
798
799
800 Assert.assertFalse(
801 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
802 .getPort() == destServerName.getPort());
803
804
805
806 LOG.info("Put starting");
807 Put put3 = new Put(ROW);
808 put3.add(FAM_NAM, ROW, ROW);
809 try {
810 table.put(put3);
811 Assert.fail("Unreachable point");
812 } catch (RetriesExhaustedWithDetailsException e){
813 LOG.info("Put done, exception caught: " + e.getClass());
814 Assert.assertEquals(1, e.getNumExceptions());
815 Assert.assertEquals(1, e.getCauses().size());
816 Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
817
818
819 Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
820 Assert.assertNotNull(cause);
821 Assert.assertTrue(cause instanceof RegionMovedException);
822 }
823 Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
824 Assert.assertEquals(
825 "Previous server was " + curServer.getServerName().getHostAndPort(),
826 destServerName.getPort(),
827 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
828
829 Assert.assertFalse(destServer.getRegionsInTransitionInRS()
830 .containsKey(encodedRegionNameBytes));
831 Assert.assertFalse(curServer.getRegionsInTransitionInRS()
832 .containsKey(encodedRegionNameBytes));
833
834
835 LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
836 TEST_UTIL.getHBaseAdmin().move(
837 toMove.getRegionInfo().getEncodedNameAsBytes(),
838 curServer.getServerName().getServerName().getBytes()
839 );
840
841 while (curServer.getOnlineRegion(regionName) == null ||
842 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
843 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
844 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
845
846 Thread.sleep(1);
847 }
848
849
850 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
851 Assert.assertNull(destServer.getOnlineRegion(regionName));
852 LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
853
854
855 Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
856 curServer.getServerName().getPort());
857
858 Scan sc = new Scan();
859 sc.setStopRow(ROW);
860 sc.setStartRow(ROW);
861
862
863
864 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
865
866 try {
867 ResultScanner rs = table.getScanner(sc);
868 while (rs.next() != null) {
869 }
870 Assert.fail("Unreachable point");
871 } catch (RetriesExhaustedException e) {
872 LOG.info("Scan done, expected exception caught: " + e.getClass());
873 }
874
875
876 Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
877 Assert.assertEquals(
878 "Previous server was "+destServer.getServerName().getHostAndPort(),
879 curServer.getServerName().getPort(),
880 conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
881
882 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
883 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
884 table.close();
885 }
886
887
888
889
890
891 @Test
892 public void testConnectionManagement() throws Exception{
893 Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
894 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
895 HTable table = (HTable) conn.getTable(TABLE_NAME1);
896 table.close();
897 assertFalse(conn.isClosed());
898 assertFalse(table.getPool().isShutdown());
899 table = (HTable) conn.getTable(TABLE_NAME1);
900 table.close();
901 assertFalse(table.getPool().isShutdown());
902 conn.close();
903 assertTrue(table.getPool().isShutdown());
904 table0.close();
905 }
906
907
908
909
910 @Test(timeout = 60000)
911 public void testCacheSeqNums() throws Exception{
912 HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
913 Put put = new Put(ROW);
914 put.add(FAM_NAM, ROW, ROW);
915 table.put(put);
916 ConnectionManager.HConnectionImplementation conn =
917 (ConnectionManager.HConnectionImplementation)table.getConnection();
918
919 HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
920 assertNotNull(location);
921
922 ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
923
924
925 int nextPort = location.getPort() + 1;
926 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
927 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
928 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
929 Assert.assertEquals(nextPort, location.getPort());
930
931
932 nextPort = location.getPort() + 1;
933 conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
934 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
935 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
936 Assert.assertEquals(nextPort, location.getPort());
937
938
939 nextPort = location.getPort() + 1;
940 conn.updateCachedLocation(location.getRegionInfo(), anySource,
941 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
942 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
943 Assert.assertEquals(nextPort, location.getPort());
944
945
946 nextPort = location.getPort() + 1;
947 conn.updateCachedLocation(location.getRegionInfo(), anySource,
948 ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
949 location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
950 Assert.assertEquals(nextPort - 1, location.getPort());
951 table.close();
952 }
953
954
955
956
957
958 @Test
959 public void testConnectionSameness() throws Exception {
960 Connection previousConnection = null;
961 for (int i = 0; i < 2; i++) {
962
963 Configuration configuration = TEST_UTIL.getConfiguration();
964 configuration.set("some_key", String.valueOf(_randy.nextInt()));
965 LOG.info("The hash code of the current configuration is: "
966 + configuration.hashCode());
967 Connection currentConnection = HConnectionManager
968 .getConnection(configuration);
969 if (previousConnection != null) {
970 assertTrue(
971 "Did not get the same connection even though its key didn't change",
972 previousConnection == currentConnection);
973 }
974 previousConnection = currentConnection;
975
976
977
978
979 configuration.set("other_key", String.valueOf(_randy.nextInt()));
980 }
981 }
982
983
984
985
986
987
988
989 @Deprecated
990 @Test
991 public void testConnectionUniqueness() throws Exception {
992 int zkmaxconnections = TEST_UTIL.getConfiguration().
993 getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
994 HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
995
996
997
998 int maxConnections = Math.min(zkmaxconnections - 1, 20);
999 List<HConnection> connections = new ArrayList<HConnection>(maxConnections);
1000 Connection previousConnection = null;
1001 try {
1002 for (int i = 0; i < maxConnections; i++) {
1003
1004 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
1005 configuration.set("some_key", String.valueOf(_randy.nextInt()));
1006 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
1007 String.valueOf(_randy.nextInt()));
1008 LOG.info("The hash code of the current configuration is: "
1009 + configuration.hashCode());
1010 HConnection currentConnection =
1011 HConnectionManager.getConnection(configuration);
1012 if (previousConnection != null) {
1013 assertTrue("Got the same connection even though its key changed!",
1014 previousConnection != currentConnection);
1015 }
1016
1017
1018
1019
1020 configuration.set("other_key", String.valueOf(_randy.nextInt()));
1021
1022 previousConnection = currentConnection;
1023 LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
1024 + getHConnectionManagerCacheSize());
1025 Thread.sleep(50);
1026 connections.add(currentConnection);
1027 }
1028 } finally {
1029 for (Connection c: connections) {
1030
1031 HConnectionManager.deleteConnection(c.getConfiguration());
1032 }
1033 }
1034 }
1035
1036 @Test
1037 public void testClosing() throws Exception {
1038 Configuration configuration =
1039 new Configuration(TEST_UTIL.getConfiguration());
1040 configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
1041 String.valueOf(_randy.nextInt()));
1042
1043 Connection c1 = ConnectionFactory.createConnection(configuration);
1044
1045 Connection c2 = ConnectionFactory.createConnection(configuration);
1046
1047 Connection c3 = HConnectionManager.getConnection(configuration);
1048 Connection c4 = HConnectionManager.getConnection(configuration);
1049 assertTrue(c3 == c4);
1050
1051 c1.close();
1052 assertTrue(c1.isClosed());
1053 assertFalse(c2.isClosed());
1054 assertFalse(c3.isClosed());
1055
1056 c3.close();
1057
1058 assertFalse(c3.isClosed());
1059 c3.close();
1060 assertTrue(c3.isClosed());
1061
1062 Connection c5 = HConnectionManager.getConnection(configuration);
1063 assertTrue(c5 != c3);
1064
1065 assertFalse(c2.isClosed());
1066 c2.close();
1067 assertTrue(c2.isClosed());
1068 c5.close();
1069 assertTrue(c5.isClosed());
1070 }
1071
1072
1073
1074
1075
1076 @Test
1077 public void testCreateConnection() throws Exception {
1078 Configuration configuration = TEST_UTIL.getConfiguration();
1079 Connection c1 = ConnectionFactory.createConnection(configuration);
1080 Connection c2 = ConnectionFactory.createConnection(configuration);
1081
1082 assertTrue(c1 != c2);
1083 assertTrue(c1.getConfiguration() == c2.getConfiguration());
1084
1085 Connection c3 = HConnectionManager.getConnection(configuration);
1086 assertTrue(c1 != c3);
1087 assertTrue(c2 != c3);
1088 }
1089
1090
1091
1092
1093
1094
1095
1096 @Test(timeout = 60000)
1097 public void testConnection() throws Exception{
1098
1099 Configuration c = new Configuration();
1100 c.set(HConstants.ZOOKEEPER_QUORUM,
1101 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
1102 c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
1103 TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
1104
1105
1106 HConnection conn = HConnectionManager.getConnection(c);
1107 assertTrue( conn.isMasterRunning() );
1108 conn.close();
1109 }
1110
1111 private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
1112 Field numTries = hci.getClass().getDeclaredField("numTries");
1113 numTries.setAccessible(true);
1114 Field modifiersField = Field.class.getDeclaredField("modifiers");
1115 modifiersField.setAccessible(true);
1116 modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
1117 final int prevNumRetriesVal = (Integer)numTries.get(hci);
1118 numTries.set(hci, newVal);
1119
1120 return prevNumRetriesVal;
1121 }
1122
1123 @Test (timeout=30000)
1124 public void testMulti() throws Exception {
1125 HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
1126 try {
1127 ConnectionManager.HConnectionImplementation conn =
1128 ( ConnectionManager.HConnectionImplementation)table.getConnection();
1129
1130
1131
1132 conn.clearRegionCache(TABLE_NAME3);
1133 Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
1134
1135 TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
1136 HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
1137
1138
1139 while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1140 Thread.sleep(1);
1141 }
1142
1143 Put put = new Put(ROW_X);
1144 put.add(FAM_NAM, ROW_X, ROW_X);
1145 table.put(put);
1146
1147
1148 HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
1149 byte[] regionName = toMove.getRegionInfo().getRegionName();
1150 byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
1151
1152
1153 int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
1154 int destServerId = (curServerId == 0 ? 1 : 0);
1155
1156 HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
1157 HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
1158
1159 ServerName destServerName = destServer.getServerName();
1160
1161
1162 List<Region> regions = curServer.getOnlineRegions(TABLE_NAME3);
1163 byte[] otherRow = null;
1164 for (Region region : regions) {
1165 if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
1166 && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
1167 otherRow = region.getRegionInfo().getStartKey();
1168 break;
1169 }
1170 }
1171 assertNotNull(otherRow);
1172
1173 if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
1174 Put put2 = new Put(otherRow);
1175 put2.add(FAM_NAM, otherRow, otherRow);
1176 table.put(put2);
1177
1178
1179 Assert.assertTrue(curServer != destServer);
1180 Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
1181 Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
1182 Assert.assertNotNull(curServer.getOnlineRegion(regionName));
1183 Assert.assertNull(destServer.getOnlineRegion(regionName));
1184 Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
1185 getAssignmentManager().getRegionStates().isRegionsInTransition());
1186
1187
1188
1189 LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
1190 TEST_UTIL.getHBaseAdmin().move(
1191 toMove.getRegionInfo().getEncodedNameAsBytes(),
1192 destServerName.getServerName().getBytes()
1193 );
1194
1195 while (destServer.getOnlineRegion(regionName) == null ||
1196 destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1197 curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1198 master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1199
1200 Thread.sleep(1);
1201 }
1202
1203 LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
1204
1205
1206 Assert.assertNull(curServer.getOnlineRegion(regionName));
1207 Assert.assertNotNull(destServer.getOnlineRegion(regionName));
1208 Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1209 Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1210
1211
1212
1213 Assert.assertFalse(
1214 conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
1215 .getPort() == destServerName.getPort());
1216
1217
1218 final int prevNumRetriesVal = setNumTries(conn, 2);
1219
1220 Put put3 = new Put(ROW_X);
1221 put3.add(FAM_NAM, ROW_X, ROW_X);
1222 Put put4 = new Put(otherRow);
1223 put4.add(FAM_NAM, otherRow, otherRow);
1224
1225
1226 table.batch(Lists.newArrayList(put4, put3));
1227
1228
1229 setNumTries(conn, prevNumRetriesVal);
1230 } finally {
1231 table.close();
1232 }
1233 }
1234
1235 @Test
1236 public void testErrorBackoffTimeCalculation() throws Exception {
1237
1238 final long ANY_PAUSE = 100;
1239 ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
1240 ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
1241
1242 ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
1243 EnvironmentEdgeManager.injectEdge(timeMachine);
1244 try {
1245 long timeBase = timeMachine.currentTime();
1246 long largeAmountOfTime = ANY_PAUSE * 1000;
1247 ConnectionManager.ServerErrorTracker tracker =
1248 new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
1249
1250
1251 assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
1252
1253
1254 tracker.reportServerError(location);
1255 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1256 tracker.calculateBackoffTime(location, ANY_PAUSE));
1257 tracker.reportServerError(location);
1258 tracker.reportServerError(location);
1259 tracker.reportServerError(location);
1260 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1261 tracker.calculateBackoffTime(location, ANY_PAUSE));
1262
1263
1264 assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1265 tracker.reportServerError(diffLocation);
1266 assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1267 tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1268
1269
1270 assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1271 tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1272 } finally {
1273 EnvironmentEdgeManager.reset();
1274 }
1275 }
1276
1277 private static void assertEqualsWithJitter(long expected, long actual) {
1278 assertEqualsWithJitter(expected, actual, expected);
1279 }
1280
1281 private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1282 assertTrue("Value not within jitter: " + expected + " vs " + actual,
1283 Math.abs(actual - expected) <= (0.01f * jitterBase));
1284 }
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294 @Ignore ("Flakey test: See HBASE-8996")@Test
1295 public void testDeleteForZKConnLeak() throws Exception {
1296 TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
1297 final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
1298 config.setInt("zookeeper.recovery.retry", 1);
1299 config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
1300 config.setInt("hbase.rpc.timeout", 2000);
1301 config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
1302
1303 ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
1304 5, TimeUnit.SECONDS,
1305 new SynchronousQueue<Runnable>(),
1306 Threads.newDaemonThreadFactory("test-hcm-delete"));
1307
1308 pool.submit(new Runnable() {
1309 @Override
1310 public void run() {
1311 while (!Thread.interrupted()) {
1312 try {
1313 HConnection conn = HConnectionManager.getConnection(config);
1314 LOG.info("Connection " + conn);
1315 HConnectionManager.deleteStaleConnection(conn);
1316 LOG.info("Connection closed " + conn);
1317
1318
1319
1320 Threads.sleep(10);
1321 } catch (Exception e) {
1322 }
1323 }
1324 }
1325 });
1326
1327
1328 for (int i = 0; i < 30; i++) {
1329 Connection c1 = null;
1330 try {
1331 c1 = ConnectionManager.getConnectionInternal(config);
1332 LOG.info("HTable connection " + i + " " + c1);
1333 Table table = new HTable(config, TABLE_NAME4, pool);
1334 table.close();
1335 LOG.info("HTable connection " + i + " closed " + c1);
1336 } catch (Exception e) {
1337 LOG.info("We actually want this to happen!!!! So we can see if we are leaking zk", e);
1338 } finally {
1339 if (c1 != null) {
1340 if (c1.isClosed()) {
1341
1342 Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
1343 zkwField.setAccessible(true);
1344 Object watcher = zkwField.get(c1);
1345
1346 if (watcher != null) {
1347 if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
1348
1349
1350 Thread.sleep(1000);
1351 if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
1352 pool.shutdownNow();
1353 fail("Live zookeeper in closed connection");
1354 }
1355 }
1356 }
1357 }
1358 c1.close();
1359 }
1360 }
1361 }
1362 pool.shutdownNow();
1363 }
1364
1365 @Test(timeout = 60000)
1366 public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1367 Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1368
1369 TableName tableName = TableName.valueOf("testConnectionRideOverClusterRestart");
1370 TEST_UTIL.createTable(tableName.getName(), new byte[][] {FAM_NAM}, config).close();
1371
1372 Connection connection = ConnectionFactory.createConnection(config);
1373 Table table = connection.getTable(tableName);
1374
1375
1376 table.get(new Get(Bytes.toBytes("foo")));
1377
1378
1379 TEST_UTIL.shutdownMiniHBaseCluster();
1380 TEST_UTIL.restartHBaseCluster(2);
1381
1382 table.get(new Get(Bytes.toBytes("foo")));
1383 TEST_UTIL.deleteTable(tableName);
1384 table.close();
1385 connection.close();
1386 }
1387 }
1388