View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.lang.reflect.Field;
30  import java.lang.reflect.Modifier;
31  import java.net.SocketTimeoutException;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.SynchronousQueue;
39  import java.util.concurrent.ThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  import java.util.concurrent.atomic.AtomicReference;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.hbase.Cell;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HBaseTestingUtility;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.RegionLocations;
56  import org.apache.hadoop.hbase.ServerName;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.Waiter;
59  import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
60  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
61  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
62  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
63  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
64  import org.apache.hadoop.hbase.exceptions.DeserializationException;
65  import org.apache.hadoop.hbase.exceptions.RegionMovedException;
66  import org.apache.hadoop.hbase.filter.Filter;
67  import org.apache.hadoop.hbase.filter.FilterBase;
68  import org.apache.hadoop.hbase.ipc.RpcClient;
69  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
70  import org.apache.hadoop.hbase.master.HMaster;
71  import org.apache.hadoop.hbase.regionserver.HRegionServer;
72  import org.apache.hadoop.hbase.regionserver.Region;
73  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
74  import org.apache.hadoop.hbase.testclassification.MediumTests;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.JVMClusterUtil;
78  import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
79  import org.apache.hadoop.hbase.util.Threads;
80  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
81  import org.junit.AfterClass;
82  import org.junit.Assert;
83  import org.junit.BeforeClass;
84  import org.junit.Ignore;
85  import org.junit.Test;
86  import org.junit.experimental.categories.Category;
87  
88  import com.google.common.collect.Lists;
89  
90  /**
91   * This class is for testing HBaseConnectionManager features
92   */
93  @Category(MediumTests.class)
94  public class TestHCM {
95    private static final Log LOG = LogFactory.getLog(TestHCM.class);
96    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
97    private static final TableName TABLE_NAME =
98        TableName.valueOf("test");
99    private static final TableName TABLE_NAME1 =
100       TableName.valueOf("test1");
101   private static final TableName TABLE_NAME2 =
102       TableName.valueOf("test2");
103   private static final TableName TABLE_NAME3 =
104       TableName.valueOf("test3");
105   private static final TableName TABLE_NAME4 =
106       TableName.valueOf("test4");
107   private static final byte[] FAM_NAM = Bytes.toBytes("f");
108   private static final byte[] ROW = Bytes.toBytes("bbb");
109   private static final byte[] ROW_X = Bytes.toBytes("xxx");
110   private static Random _randy = new Random();
111 
112 /**
113 * This copro sleeps 20 second. The first call it fails. The second time, it works.
114 */
115   public static class SleepAndFailFirstTime extends BaseRegionObserver {
116     static final AtomicLong ct = new AtomicLong(0);
117     static final String SLEEP_TIME_CONF_KEY =
118         "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
119     static final long DEFAULT_SLEEP_TIME = 20000;
120     static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
121 
122     public SleepAndFailFirstTime() {
123     }
124 
125     @Override
126     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
127       RegionCoprocessorEnvironment env = c.getEnvironment();
128       Configuration conf = env.getConfiguration();
129       sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
130     }
131 
132     @Override
133     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
134               final Get get, final List<Cell> results) throws IOException {
135       Threads.sleep(sleepTime.get());
136       if (ct.incrementAndGet() == 1){
137         throw new IOException("first call I fail");
138       }
139     }
140   }
141 
142   public static class SleepCoprocessor extends BaseRegionObserver {
143     public static final int SLEEP_TIME = 5000;
144 
145     @Override public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
146         final Get get, final List<Cell> results) throws IOException {
147       Threads.sleep(SLEEP_TIME);
148     }
149   }
150 
151   @BeforeClass
152   public static void setUpBeforeClass() throws Exception {
153     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
154     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
155     TEST_UTIL.startMiniCluster(2);
156   }
157 
158   @AfterClass public static void tearDownAfterClass() throws Exception {
159     TEST_UTIL.shutdownMiniCluster();
160   }
161 
162 
163   private static int getHConnectionManagerCacheSize(){
164     return HConnectionTestingUtility.getConnectionCount();
165   }
166 
167   @Test
168   public void testClusterConnection() throws IOException {
169     ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
170         5, TimeUnit.SECONDS,
171         new SynchronousQueue<Runnable>(),
172         Threads.newDaemonThreadFactory("test-hcm"));
173 
174     HConnection con1 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
175     HConnection con2 = HConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
176     // make sure the internally created ExecutorService is the one passed
177     assertTrue(otherPool == ((HConnectionImplementation)con2).getCurrentBatchPool());
178 
179     String tableName = "testClusterConnection";
180     TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
181     HTable t = (HTable)con1.getTable(tableName, otherPool);
182     // make sure passing a pool to the getTable does not trigger creation of an internal pool
183     assertNull("Internal Thread pool should be null", ((HConnectionImplementation)con1).getCurrentBatchPool());
184     // table should use the pool passed
185     assertTrue(otherPool == t.getPool());
186     t.close();
187 
188     t = (HTable)con2.getTable(tableName);
189     // table should use the connectin's internal pool
190     assertTrue(otherPool == t.getPool());
191     t.close();
192 
193     t = (HTable)con2.getTable(Bytes.toBytes(tableName));
194     // try other API too
195     assertTrue(otherPool == t.getPool());
196     t.close();
197 
198     t = (HTable)con2.getTable(TableName.valueOf(tableName));
199     // try other API too
200     assertTrue(otherPool == t.getPool());
201     t.close();
202 
203     t = (HTable)con1.getTable(tableName);
204     ExecutorService pool = ((HConnectionImplementation)con1).getCurrentBatchPool();
205     // make sure an internal pool was created
206     assertNotNull("An internal Thread pool should have been created", pool);
207     // and that the table is using it
208     assertTrue(t.getPool() == pool);
209     t.close();
210 
211     t = (HTable)con1.getTable(tableName);
212     // still using the *same* internal pool
213     assertTrue(t.getPool() == pool);
214     t.close();
215 
216     con1.close();
217     // if the pool was created on demand it should be closed upon connection close
218     assertTrue(pool.isShutdown());
219 
220     con2.close();
221     // if the pool is passed, it is not closed
222     assertFalse(otherPool.isShutdown());
223     otherPool.shutdownNow();
224   }
225 
226   /**
227    * Naive test to check that HConnection#getAdmin returns a properly constructed HBaseAdmin object
228    * @throws IOException Unable to construct admin
229    */
230   @Test
231   public void testAdminFactory() throws IOException {
232     Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
233     Admin admin = con1.getAdmin();
234     assertTrue(admin.getConnection() == con1);
235     assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
236     con1.close();
237   }
238 
239   // Fails too often!  Needs work.  HBASE-12558
240   @Ignore @Test(expected = RegionServerStoppedException.class)
241   public void testClusterStatus() throws Exception {
242 
243     TableName tn =
244         TableName.valueOf("testClusterStatus");
245     byte[] cf = "cf".getBytes();
246     byte[] rk = "rk1".getBytes();
247 
248     JVMClusterUtil.RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
249     rs.waitForServerOnline();
250     final ServerName sn = rs.getRegionServer().getServerName();
251 
252     HTable t = TEST_UTIL.createTable(tn, cf);
253     TEST_UTIL.waitTableAvailable(tn);
254 
255     while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
256         getRegionStates().isRegionsInTransition()){
257       Thread.sleep(1);
258     }
259     final HConnectionImplementation hci =  (HConnectionImplementation)t.getConnection();
260     while (t.getRegionLocation(rk).getPort() != sn.getPort()){
261       TEST_UTIL.getHBaseAdmin().move(t.getRegionLocation(rk).getRegionInfo().
262           getEncodedNameAsBytes(), Bytes.toBytes(sn.toString()));
263       while(TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().
264           getRegionStates().isRegionsInTransition()){
265         Thread.sleep(1);
266       }
267       hci.clearRegionCache(tn);
268     }
269     Assert.assertNotNull(hci.clusterStatusListener);
270     TEST_UTIL.assertRegionOnServer(t.getRegionLocation(rk).getRegionInfo(), sn, 20000);
271 
272     Put p1 = new Put(rk);
273     p1.add(cf, "qual".getBytes(), "val".getBytes());
274     t.put(p1);
275 
276     rs.getRegionServer().abort("I'm dead");
277 
278     // We want the status to be updated. That's a least 10 second
279     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
280       @Override
281       public boolean evaluate() throws Exception {
282         return TEST_UTIL.getHBaseCluster().getMaster().getServerManager().
283             getDeadServers().isDeadServer(sn);
284       }
285     });
286 
287     TEST_UTIL.waitFor(40000, 1000, true, new Waiter.Predicate<Exception>() {
288       @Override
289       public boolean evaluate() throws Exception {
290         return hci.clusterStatusListener.isDeadServer(sn);
291       }
292     });
293 
294     t.close();
295     hci.getClient(sn);  // will throw an exception: RegionServerStoppedException
296   }
297 
298   /**
299    * Test that we can handle connection close: it will trigger a retry, but the calls will
300    *  finish.
301    */
302   @Test
303   public void testConnectionCloseAllowsInterrupt() throws Exception {
304     testConnectionClose(true);
305   }
306 
307   @Test
308   public void testConnectionNotAllowsInterrupt() throws Exception {
309     testConnectionClose(false);
310   }
311 
312   /**
313    * Test that an operation can fail if we read the global operation timeout, even if the
314    * individual timeout is fine. We do that with:
315    * - client side: an operation timeout of 30 seconds
316    * - server side: we sleep 20 second at each attempt. The first work fails, the second one
317    * succeeds. But the client won't wait that much, because 20 + 20 > 30, so the client
318    * timeouted when the server answers.
319    */
320   @Test
321   public void testOperationTimeout() throws Exception {
322     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
323     hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
324     HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
325     table.setRpcTimeout(Integer.MAX_VALUE);
326     // Check that it works if the timeout is big enough
327     table.setOperationTimeout(120 * 1000);
328     table.get(new Get(FAM_NAM));
329 
330     // Resetting and retrying. Will fail this time, not enough time for the second try
331     SleepAndFailFirstTime.ct.set(0);
332     try {
333       table.setOperationTimeout(30 * 1000);
334       table.get(new Get(FAM_NAM));
335       Assert.fail("We expect an exception here");
336     } catch (SocketTimeoutException e) {
337       // The client has a CallTimeout class, but it's not shared.We're not very clean today,
338       //  in the general case you can expect the call to stop, but the exception may vary.
339       // In this test however, we're sure that it will be a socket timeout.
340       LOG.info("We received an exception, as expected ", e);
341     } catch (IOException e) {
342       Assert.fail("Wrong exception:" + e.getMessage());
343     } finally {
344       table.close();
345     }
346   }
347 
348   @Test(expected = RetriesExhaustedException.class)
349   public void testRpcTimeout() throws Exception {
350     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
351     hdt.addCoprocessor(SleepCoprocessor.class.getName());
352     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
353 
354     try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
355       assert t instanceof HTable;
356       HTable table = (HTable) t;
357       table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
358       table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
359       table.get(new Get(FAM_NAM));
360     }
361   }
362 
363   /**
364    * Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
365    */
366   @Test
367   public void testRpcRetryingCallerSleep() throws Exception {
368     HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcRetryingCallerSleep");
369     hdt.addCoprocessorWithSpec("|" + SleepAndFailFirstTime.class.getName() + "||"
370         + SleepAndFailFirstTime.SLEEP_TIME_CONF_KEY + "=2000");
371     TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
372 
373     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
374     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
375     c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
376 
377     Connection connection = ConnectionFactory.createConnection(c);
378     Table t = connection.getTable(TableName.valueOf("HCM-testRpcRetryingCallerSleep"));
379     if (t instanceof HTable) {
380       HTable table = (HTable) t;
381       table.setOperationTimeout(8000);
382       // Check that it works. Because 2s + 3s * RETRY_BACKOFF[0] + 2s < 8s
383       table.get(new Get(FAM_NAM));
384 
385       // Resetting and retrying.
386       SleepAndFailFirstTime.ct.set(0);
387       try {
388         table.setOperationTimeout(6000);
389         // Will fail this time. After sleep, there are not enough time for second retry
390         // Beacuse 2s + 3s + 2s > 6s
391         table.get(new Get(FAM_NAM));
392         Assert.fail("We expect an exception here");
393       } catch (SocketTimeoutException e) {
394         LOG.info("We received an exception, as expected ", e);
395       } catch (IOException e) {
396         Assert.fail("Wrong exception:" + e.getMessage());
397       } finally {
398         table.close();
399         connection.close();
400       }
401     }
402   }
403 
404   @Test
405   public void testCallableSleep() throws Exception {
406     long pauseTime;
407     long baseTime = 100;
408     TableName tableName = TableName.valueOf("HCM-testCallableSleep");
409     HTable table = TEST_UTIL.createTable(tableName, FAM_NAM);
410     RegionServerCallable<Object> regionServerCallable = new RegionServerCallable<Object>(
411         TEST_UTIL.getConnection(), tableName, ROW) {
412       public Object call(int timeout) throws IOException {
413         return null;
414       }
415     };
416 
417     regionServerCallable.prepare(false);
418     for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
419       pauseTime = regionServerCallable.sleep(baseTime, i);
420       assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
421       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
422     }
423 
424     RegionAdminServiceCallable<Object> regionAdminServiceCallable =
425         new RegionAdminServiceCallable<Object>(
426         (ClusterConnection) TEST_UTIL.getConnection(), new RpcControllerFactory(
427             TEST_UTIL.getConfiguration()), tableName, ROW) {
428       public Object call(int timeout) throws IOException {
429         return null;
430       }
431     };
432 
433     regionAdminServiceCallable.prepare(false);
434     for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
435       pauseTime = regionAdminServiceCallable.sleep(baseTime, i);
436       assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
437       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
438     }
439 
440     MasterCallable masterCallable = new MasterCallable((HConnection) TEST_UTIL.getConnection()) {
441       public Object call(int timeout) throws IOException {
442         return null;
443       }
444     };
445 
446     for (int i = 0; i < HConstants.RETRY_BACKOFF.length; i++) {
447       pauseTime = masterCallable.sleep(baseTime, i);
448       assertTrue(pauseTime >= (baseTime * HConstants.RETRY_BACKOFF[i]));
449       assertTrue(pauseTime <= (baseTime * HConstants.RETRY_BACKOFF[i] * 1.01f));
450     }
451   }
452 
453   private void testConnectionClose(boolean allowsInterrupt) throws Exception {
454     TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
455     TEST_UTIL.createTable(tableName, FAM_NAM).close();
456 
457     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
458 
459     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
460     // We want to work on a separate connection.
461     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
462     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
463     c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
464     c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
465     c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
466 
467     final HTable table = new HTable(c2, tableName);
468 
469     Put put = new Put(ROW);
470     put.add(FAM_NAM, ROW, ROW);
471     table.put(put);
472 
473     // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
474     final AtomicInteger step = new AtomicInteger(0);
475 
476     final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
477     Thread t = new Thread("testConnectionCloseThread") {
478       @Override
479       public void run() {
480         int done = 0;
481         try {
482           step.set(1);
483           while (step.get() == 1) {
484             Get get = new Get(ROW);
485             table.get(get);
486             done++;
487             if (done % 100 == 0)
488               LOG.info("done=" + done);
489           }
490         } catch (Throwable t) {
491           failed.set(t);
492           LOG.error(t);
493         }
494         step.set(3);
495       }
496     };
497     t.start();
498     TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
499       @Override
500       public boolean evaluate() throws Exception {
501         return step.get() == 1;
502       }
503     });
504 
505     ServerName sn = table.getRegionLocation(ROW).getServerName();
506     ConnectionManager.HConnectionImplementation conn =
507         (ConnectionManager.HConnectionImplementation) table.getConnection();
508     RpcClient rpcClient = conn.getRpcClient();
509 
510     LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
511     for (int i = 0; i < 5000; i++) {
512       rpcClient.cancelConnections(sn);
513       Thread.sleep(5);
514     }
515 
516     step.compareAndSet(1, 2);
517     // The test may fail here if the thread doing the gets is stuck. The way to find
518     //  out what's happening is to look for the thread named 'testConnectionCloseThread'
519     TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
520       @Override
521       public boolean evaluate() throws Exception {
522         return step.get() == 3;
523       }
524     });
525     table.close();
526     Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
527     TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
528   }
529 
530   /**
531    * Test that connection can become idle without breaking everything.
532    */
533   @Test
534   public void testConnectionIdle() throws Exception {
535     TableName tableName = TableName.valueOf("HCM-testConnectionIdle");
536     TEST_UTIL.createTable(tableName, FAM_NAM).close();
537     int idleTime =  20000;
538     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
539 
540     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
541     // We want to work on a separate connection.
542     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
543     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
544     c2.setInt(RpcClient.IDLE_TIME, idleTime);
545 
546     final Table table = new HTable(c2, tableName);
547 
548     Put put = new Put(ROW);
549     put.add(FAM_NAM, ROW, ROW);
550     table.put(put);
551 
552     ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
553     mee.setValue(System.currentTimeMillis());
554     EnvironmentEdgeManager.injectEdge(mee);
555     LOG.info("first get");
556     table.get(new Get(ROW));
557 
558     LOG.info("first get - changing the time & sleeping");
559     mee.incValue(idleTime + 1000);
560     Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
561                         // 1500 = sleep time in RpcClient#waitForWork + a margin
562 
563     LOG.info("second get - connection has been marked idle in the middle");
564     // To check that the connection actually became idle would need to read some private
565     //  fields of RpcClient.
566     table.get(new Get(ROW));
567     mee.incValue(idleTime + 1000);
568 
569     LOG.info("third get - connection is idle, but the reader doesn't know yet");
570     // We're testing here a special case:
571     //  time limit reached BUT connection not yet reclaimed AND a new call.
572     //  in this situation, we don't close the connection, instead we use it immediately.
573     // If we're very unlucky we can have a race condition in the test: the connection is already
574     //  under closing when we do the get, so we have an exception, and we don't retry as the
575     //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
576     //  a test issue only.
577     table.get(new Get(ROW));
578 
579     LOG.info("we're done - time will change back");
580 
581     table.close();
582     EnvironmentEdgeManager.reset();
583     TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
584   }
585 
586     /**
587      * Test that the connection to the dead server is cut immediately when we receive the
588      *  notification.
589      * @throws Exception
590      */
591   @Test
592   public void testConnectionCut() throws Exception {
593 
594     TableName tableName = TableName.valueOf("HCM-testConnectionCut");
595 
596     TEST_UTIL.createTable(tableName, FAM_NAM).close();
597     boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
598 
599     Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
600     // We want to work on a separate connection.
601     c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
602     c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
603     c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 1000);
604 
605     HTable table = new HTable(c2, tableName);
606 
607     Put p = new Put(FAM_NAM);
608     p.add(FAM_NAM, FAM_NAM, FAM_NAM);
609     table.put(p);
610 
611     final HConnectionImplementation hci =  (HConnectionImplementation)table.getConnection();
612     final HRegionLocation loc = table.getRegionLocation(FAM_NAM);
613 
614     Get get = new Get(FAM_NAM);
615     Assert.assertNotNull(table.get(get));
616 
617     get = new Get(FAM_NAM);
618     get.setFilter(new BlockingFilter());
619 
620     // This thread will mark the server as dead while we're waiting during a get.
621     Thread t = new Thread() {
622       @Override
623       public void run() {
624         synchronized (syncBlockingFilter) {
625           try {
626             syncBlockingFilter.wait();
627           } catch (InterruptedException e) {
628             throw new RuntimeException(e);
629           }
630         }
631         hci.clusterStatusListener.deadServerHandler.newDead(loc.getServerName());
632       }
633     };
634 
635     t.start();
636     try {
637       table.get(get);
638       Assert.fail();
639     } catch (IOException expected) {
640       LOG.debug("Received: " + expected);
641       Assert.assertFalse(expected instanceof SocketTimeoutException);
642       Assert.assertFalse(syncBlockingFilter.get());
643     } finally {
644       syncBlockingFilter.set(true);
645       t.join();
646       HConnectionManager.getConnection(c2).close();
647       TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
648     }
649 
650     table.close();
651   }
652 
653   protected static final AtomicBoolean syncBlockingFilter = new AtomicBoolean(false);
654 
655   public static class BlockingFilter extends FilterBase {
656     @Override
657     public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
658       int i = 0;
659       while (i++ < 1000 && !syncBlockingFilter.get()) {
660         synchronized (syncBlockingFilter) {
661           syncBlockingFilter.notifyAll();
662         }
663         Threads.sleep(100);
664       }
665       syncBlockingFilter.set(true);
666       return false;
667     }
668     @Override
669     public ReturnCode filterKeyValue(Cell ignored) throws IOException {
670       return ReturnCode.INCLUDE;
671     }
672 
673     public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException{
674       return new BlockingFilter();
675     }
676   }
677 
678   @Test
679   public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
680     // Save off current HConnections
681     Map<HConnectionKey, HConnectionImplementation> oldHBaseInstances =
682         new HashMap<HConnectionKey, HConnectionImplementation>();
683     oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
684 
685     ConnectionManager.CONNECTION_INSTANCES.clear();
686 
687     try {
688       HConnection connection = HConnectionManager.getConnection(TEST_UTIL.getConfiguration());
689       connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
690           "test abortingHConnectionRemovesItselfFromHCM"));
691       Assert.assertNotSame(connection,
692         HConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
693     } finally {
694       // Put original HConnections back
695       ConnectionManager.CONNECTION_INSTANCES.clear();
696       ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
697     }
698   }
699 
700   /**
701    * Test that when we delete a location using the first row of a region
702    * that we really delete it.
703    * @throws Exception
704    */
705   @Test
706   public void testRegionCaching() throws Exception{
707     TEST_UTIL.createMultiRegionTable(TABLE_NAME, FAM_NAM).close();
708     Configuration conf =  new Configuration(TEST_UTIL.getConfiguration());
709     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
710     HTable table = new HTable(conf, TABLE_NAME);
711 
712     TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
713     Put put = new Put(ROW);
714     put.add(FAM_NAM, ROW, ROW);
715     table.put(put);
716     ConnectionManager.HConnectionImplementation conn =
717       (ConnectionManager.HConnectionImplementation)table.getConnection();
718 
719     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
720 
721     final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() + 1;
722     HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
723     conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(),
724         ServerName.valueOf("127.0.0.1", nextPort,
725         HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP);
726     Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW)
727       .getRegionLocation().getPort(), nextPort);
728 
729     conn.clearRegionCache(TABLE_NAME, ROW.clone());
730     RegionLocations rl = conn.getCachedLocation(TABLE_NAME, ROW);
731     assertNull("What is this location?? " + rl, rl);
732 
733     // We're now going to move the region and check that it works for the client
734     // First a new put to add the location in the cache
735     conn.clearRegionCache(TABLE_NAME);
736     Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME));
737     Put put2 = new Put(ROW);
738     put2.add(FAM_NAM, ROW, ROW);
739     table.put(put2);
740     assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
741     assertNotNull(conn.getCachedLocation(TableName.valueOf(TABLE_NAME.getName()), ROW.clone()));
742 
743     TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
744     HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
745 
746     // We can wait for all regions to be online, that makes log reading easier when debugging
747     while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
748       Thread.sleep(1);
749     }
750 
751     // Now moving the region to the second server
752     HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation();
753     byte[] regionName = toMove.getRegionInfo().getRegionName();
754     byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
755 
756     // Choose the other server.
757     int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
758     int destServerId = (curServerId == 0 ? 1 : 0);
759 
760     HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
761     HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
762 
763     ServerName destServerName = destServer.getServerName();
764 
765     // Check that we are in the expected state
766     Assert.assertTrue(curServer != destServer);
767     Assert.assertFalse(curServer.getServerName().equals(destServer.getServerName()));
768     Assert.assertFalse( toMove.getPort() == destServerName.getPort());
769     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
770     Assert.assertNull(destServer.getOnlineRegion(regionName));
771     Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
772         getAssignmentManager().getRegionStates().isRegionsInTransition());
773 
774     // Moving. It's possible that we don't have all the regions online at this point, so
775     //  the test must depends only on the region we're looking at.
776     LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
777     TEST_UTIL.getHBaseAdmin().move(
778       toMove.getRegionInfo().getEncodedNameAsBytes(),
779       destServerName.getServerName().getBytes()
780     );
781 
782     while (destServer.getOnlineRegion(regionName) == null ||
783         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
784         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
785         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
786       // wait for the move to be finished
787       Thread.sleep(1);
788     }
789 
790     LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
791 
792     // Check our new state.
793     Assert.assertNull(curServer.getOnlineRegion(regionName));
794     Assert.assertNotNull(destServer.getOnlineRegion(regionName));
795     Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
796     Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
797 
798 
799     // Cache was NOT updated and points to the wrong server
800     Assert.assertFalse(
801         conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation()
802           .getPort() == destServerName.getPort());
803 
804     // This part relies on a number of tries equals to 1.
805     // We do a put and expect the cache to be updated, even if we don't retry
806     LOG.info("Put starting");
807     Put put3 = new Put(ROW);
808     put3.add(FAM_NAM, ROW, ROW);
809     try {
810       table.put(put3);
811       Assert.fail("Unreachable point");
812     } catch (RetriesExhaustedWithDetailsException e){
813       LOG.info("Put done, exception caught: " + e.getClass());
814       Assert.assertEquals(1, e.getNumExceptions());
815       Assert.assertEquals(1, e.getCauses().size());
816       Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
817 
818       // Check that we unserialized the exception as expected
819       Throwable cause = ClientExceptionsUtil.findException(e.getCause(0));
820       Assert.assertNotNull(cause);
821       Assert.assertTrue(cause instanceof RegionMovedException);
822     }
823     Assert.assertNotNull("Cached connection is null", conn.getCachedLocation(TABLE_NAME, ROW));
824     Assert.assertEquals(
825         "Previous server was " + curServer.getServerName().getHostAndPort(),
826         destServerName.getPort(),
827         conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
828 
829     Assert.assertFalse(destServer.getRegionsInTransitionInRS()
830       .containsKey(encodedRegionNameBytes));
831     Assert.assertFalse(curServer.getRegionsInTransitionInRS()
832       .containsKey(encodedRegionNameBytes));
833 
834     // We move it back to do another test with a scan
835     LOG.info("Move starting region=" + toMove.getRegionInfo().getRegionNameAsString());
836     TEST_UTIL.getHBaseAdmin().move(
837       toMove.getRegionInfo().getEncodedNameAsBytes(),
838       curServer.getServerName().getServerName().getBytes()
839     );
840 
841     while (curServer.getOnlineRegion(regionName) == null ||
842         destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
843         curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
844         master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
845       // wait for the move to be finished
846       Thread.sleep(1);
847     }
848 
849     // Check our new state.
850     Assert.assertNotNull(curServer.getOnlineRegion(regionName));
851     Assert.assertNull(destServer.getOnlineRegion(regionName));
852     LOG.info("Move finished for region=" + toMove.getRegionInfo().getRegionNameAsString());
853 
854     // Cache was NOT updated and points to the wrong server
855     Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort() ==
856       curServer.getServerName().getPort());
857 
858     Scan sc = new Scan();
859     sc.setStopRow(ROW);
860     sc.setStartRow(ROW);
861 
862     // The scanner takes the max retries from the connection configuration, not the table as
863     // the put.
864     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
865 
866     try {
867       ResultScanner rs = table.getScanner(sc);
868       while (rs.next() != null) {
869       }
870       Assert.fail("Unreachable point");
871     } catch (RetriesExhaustedException e) {
872       LOG.info("Scan done, expected exception caught: " + e.getClass());
873     }
874 
875     // Cache is updated with the right value.
876     Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
877     Assert.assertEquals(
878       "Previous server was "+destServer.getServerName().getHostAndPort(),
879       curServer.getServerName().getPort(),
880       conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
881 
882     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
883         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
884     table.close();
885   }
886 
887   /**
888    * Test that Connection or Pool are not closed when managed externally
889    * @throws Exception
890    */
891   @Test
892   public void testConnectionManagement() throws Exception{
893     Table table0 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAM);
894     Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
895     HTable table = (HTable) conn.getTable(TABLE_NAME1);
896     table.close();
897     assertFalse(conn.isClosed());
898     assertFalse(table.getPool().isShutdown());
899     table = (HTable) conn.getTable(TABLE_NAME1);
900     table.close();
901     assertFalse(table.getPool().isShutdown());
902     conn.close();
903     assertTrue(table.getPool().isShutdown());
904     table0.close();
905   }
906 
907   /**
908    * Test that stale cache updates don't override newer cached values.
909    */
910   @Test(timeout = 60000)
911   public void testCacheSeqNums() throws Exception{
912     HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME2, FAM_NAM);
913     Put put = new Put(ROW);
914     put.add(FAM_NAM, ROW, ROW);
915     table.put(put);
916     ConnectionManager.HConnectionImplementation conn =
917       (ConnectionManager.HConnectionImplementation)table.getConnection();
918 
919     HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
920     assertNotNull(location);
921 
922     ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L);
923 
924     // Same server as already in cache reporting - overwrites any value despite seqNum.
925     int nextPort = location.getPort() + 1;
926     conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
927         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
928     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
929     Assert.assertEquals(nextPort, location.getPort());
930 
931     // No source specified - same.
932     nextPort = location.getPort() + 1;
933     conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(),
934         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
935     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
936     Assert.assertEquals(nextPort, location.getPort());
937 
938     // Higher seqNum - overwrites lower seqNum.
939     nextPort = location.getPort() + 1;
940     conn.updateCachedLocation(location.getRegionInfo(), anySource,
941         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() + 1);
942     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
943     Assert.assertEquals(nextPort, location.getPort());
944 
945     // Lower seqNum - does not overwrite higher seqNum.
946     nextPort = location.getPort() + 1;
947     conn.updateCachedLocation(location.getRegionInfo(), anySource,
948         ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1);
949     location = conn.getCachedLocation(TABLE_NAME2, ROW).getRegionLocation();
950     Assert.assertEquals(nextPort - 1, location.getPort());
951     table.close();
952   }
953 
954   /**
955    * Make sure that {@link Configuration} instances that are essentially the
956    * same map to the same {@link HConnection} instance.
957    */
958   @Test
959   public void testConnectionSameness() throws Exception {
960     Connection previousConnection = null;
961     for (int i = 0; i < 2; i++) {
962       // set random key to differentiate the connection from previous ones
963       Configuration configuration = TEST_UTIL.getConfiguration();
964       configuration.set("some_key", String.valueOf(_randy.nextInt()));
965       LOG.info("The hash code of the current configuration is: "
966           + configuration.hashCode());
967       Connection currentConnection = HConnectionManager
968           .getConnection(configuration);
969       if (previousConnection != null) {
970         assertTrue(
971             "Did not get the same connection even though its key didn't change",
972             previousConnection == currentConnection);
973       }
974       previousConnection = currentConnection;
975       // change the configuration, so that it is no longer reachable from the
976       // client's perspective. However, since its part of the LRU doubly linked
977       // list, it will eventually get thrown out, at which time it should also
978       // close the corresponding {@link HConnection}.
979       configuration.set("other_key", String.valueOf(_randy.nextInt()));
980     }
981   }
982 
983   /**
984    * Makes sure that there is no leaking of
985    * {@link ConnectionManager.HConnectionImplementation} in the {@link HConnectionManager}
986    * class.
987    * @deprecated Tests deprecated functionality.  Remove in 1.0.
988    */
989   @Deprecated
990   @Test
991   public void testConnectionUniqueness() throws Exception {
992     int zkmaxconnections = TEST_UTIL.getConfiguration().
993       getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
994           HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS);
995     // Test up to a max that is < the maximum number of zk connections.  If we
996     // go above zk connections, we just fall into cycle where we are failing
997     // to set up a session and test runs for a long time.
998     int maxConnections = Math.min(zkmaxconnections - 1, 20);
999     List<HConnection> connections = new ArrayList<HConnection>(maxConnections);
1000     Connection previousConnection = null;
1001     try {
1002       for (int i = 0; i < maxConnections; i++) {
1003         // set random key to differentiate the connection from previous ones
1004         Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
1005         configuration.set("some_key", String.valueOf(_randy.nextInt()));
1006         configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
1007             String.valueOf(_randy.nextInt()));
1008         LOG.info("The hash code of the current configuration is: "
1009             + configuration.hashCode());
1010         HConnection currentConnection =
1011           HConnectionManager.getConnection(configuration);
1012         if (previousConnection != null) {
1013           assertTrue("Got the same connection even though its key changed!",
1014               previousConnection != currentConnection);
1015         }
1016         // change the configuration, so that it is no longer reachable from the
1017         // client's perspective. However, since its part of the LRU doubly linked
1018         // list, it will eventually get thrown out, at which time it should also
1019         // close the corresponding {@link HConnection}.
1020         configuration.set("other_key", String.valueOf(_randy.nextInt()));
1021 
1022         previousConnection = currentConnection;
1023         LOG.info("The current HConnectionManager#HBASE_INSTANCES cache size is: "
1024             + getHConnectionManagerCacheSize());
1025         Thread.sleep(50);
1026         connections.add(currentConnection);
1027       }
1028     } finally {
1029       for (Connection c: connections) {
1030         // Clean up connections made so we don't interfere w/ subsequent tests.
1031         HConnectionManager.deleteConnection(c.getConfiguration());
1032       }
1033     }
1034   }
1035 
1036   @Test
1037   public void testClosing() throws Exception {
1038     Configuration configuration =
1039       new Configuration(TEST_UTIL.getConfiguration());
1040     configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
1041         String.valueOf(_randy.nextInt()));
1042 
1043     Connection c1 = ConnectionFactory.createConnection(configuration);
1044     // We create two connections with the same key.
1045     Connection c2 = ConnectionFactory.createConnection(configuration);
1046 
1047     Connection c3 = HConnectionManager.getConnection(configuration);
1048     Connection c4 = HConnectionManager.getConnection(configuration);
1049     assertTrue(c3 == c4);
1050 
1051     c1.close();
1052     assertTrue(c1.isClosed());
1053     assertFalse(c2.isClosed());
1054     assertFalse(c3.isClosed());
1055 
1056     c3.close();
1057     // still a reference left
1058     assertFalse(c3.isClosed());
1059     c3.close();
1060     assertTrue(c3.isClosed());
1061     // c3 was removed from the cache
1062     Connection c5 = HConnectionManager.getConnection(configuration);
1063     assertTrue(c5 != c3);
1064 
1065     assertFalse(c2.isClosed());
1066     c2.close();
1067     assertTrue(c2.isClosed());
1068     c5.close();
1069     assertTrue(c5.isClosed());
1070   }
1071 
1072   /**
1073    * Trivial test to verify that nobody messes with
1074    * {@link HConnectionManager#createConnection(Configuration)}
1075    */
1076   @Test
1077   public void testCreateConnection() throws Exception {
1078     Configuration configuration = TEST_UTIL.getConfiguration();
1079     Connection c1 = ConnectionFactory.createConnection(configuration);
1080     Connection c2 = ConnectionFactory.createConnection(configuration);
1081     // created from the same configuration, yet they are different
1082     assertTrue(c1 != c2);
1083     assertTrue(c1.getConfiguration() == c2.getConfiguration());
1084     // make sure these were not cached
1085     Connection c3 = HConnectionManager.getConnection(configuration);
1086     assertTrue(c1 != c3);
1087     assertTrue(c2 != c3);
1088   }
1089 
1090 
1091   /**
1092    * This test checks that one can connect to the cluster with only the
1093    *  ZooKeeper quorum set. Other stuff like master address will be read
1094    *  from ZK by the client.
1095    */
1096   @Test(timeout = 60000)
1097   public void testConnection() throws Exception{
1098     // We create an empty config and add the ZK address.
1099     Configuration c = new Configuration();
1100     c.set(HConstants.ZOOKEEPER_QUORUM,
1101       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
1102     c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
1103       TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
1104 
1105     // This should be enough to connect
1106     HConnection conn = HConnectionManager.getConnection(c);
1107     assertTrue( conn.isMasterRunning() );
1108     conn.close();
1109   }
1110 
1111   private int setNumTries(HConnectionImplementation hci, int newVal) throws Exception {
1112     Field numTries = hci.getClass().getDeclaredField("numTries");
1113     numTries.setAccessible(true);
1114     Field modifiersField = Field.class.getDeclaredField("modifiers");
1115     modifiersField.setAccessible(true);
1116     modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
1117     final int prevNumRetriesVal = (Integer)numTries.get(hci);
1118     numTries.set(hci, newVal);
1119 
1120     return prevNumRetriesVal;
1121   }
1122 
1123   @Test (timeout=30000)
1124   public void testMulti() throws Exception {
1125     HTable table = TEST_UTIL.createMultiRegionTable(TABLE_NAME3, FAM_NAM);
1126      try {
1127        ConnectionManager.HConnectionImplementation conn =
1128            ( ConnectionManager.HConnectionImplementation)table.getConnection();
1129 
1130        // We're now going to move the region and check that it works for the client
1131        // First a new put to add the location in the cache
1132        conn.clearRegionCache(TABLE_NAME3);
1133        Assert.assertEquals(0, conn.getNumberOfCachedRegionLocations(TABLE_NAME3));
1134 
1135        TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
1136        HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
1137 
1138        // We can wait for all regions to be online, that makes log reading easier when debugging
1139        while (master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1140          Thread.sleep(1);
1141        }
1142 
1143        Put put = new Put(ROW_X);
1144        put.add(FAM_NAM, ROW_X, ROW_X);
1145        table.put(put);
1146 
1147        // Now moving the region to the second server
1148        HRegionLocation toMove = conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation();
1149        byte[] regionName = toMove.getRegionInfo().getRegionName();
1150        byte[] encodedRegionNameBytes = toMove.getRegionInfo().getEncodedNameAsBytes();
1151 
1152        // Choose the other server.
1153        int curServerId = TEST_UTIL.getHBaseCluster().getServerWith(regionName);
1154        int destServerId = (curServerId == 0 ? 1 : 0);
1155 
1156        HRegionServer curServer = TEST_UTIL.getHBaseCluster().getRegionServer(curServerId);
1157        HRegionServer destServer = TEST_UTIL.getHBaseCluster().getRegionServer(destServerId);
1158 
1159        ServerName destServerName = destServer.getServerName();
1160 
1161        //find another row in the cur server that is less than ROW_X
1162        List<Region> regions = curServer.getOnlineRegions(TABLE_NAME3);
1163        byte[] otherRow = null;
1164        for (Region region : regions) {
1165          if (!region.getRegionInfo().getEncodedName().equals(toMove.getRegionInfo().getEncodedName())
1166              && Bytes.BYTES_COMPARATOR.compare(region.getRegionInfo().getStartKey(), ROW_X) < 0) {
1167            otherRow = region.getRegionInfo().getStartKey();
1168            break;
1169          }
1170        }
1171        assertNotNull(otherRow);
1172        // If empty row, set it to first row.-f
1173        if (otherRow.length <= 0) otherRow = Bytes.toBytes("aaa");
1174        Put put2 = new Put(otherRow);
1175        put2.add(FAM_NAM, otherRow, otherRow);
1176        table.put(put2); //cache put2's location
1177 
1178        // Check that we are in the expected state
1179        Assert.assertTrue(curServer != destServer);
1180        Assert.assertNotEquals(curServer.getServerName(), destServer.getServerName());
1181        Assert.assertNotEquals(toMove.getPort(), destServerName.getPort());
1182        Assert.assertNotNull(curServer.getOnlineRegion(regionName));
1183        Assert.assertNull(destServer.getOnlineRegion(regionName));
1184        Assert.assertFalse(TEST_UTIL.getMiniHBaseCluster().getMaster().
1185            getAssignmentManager().getRegionStates().isRegionsInTransition());
1186 
1187        // Moving. It's possible that we don't have all the regions online at this point, so
1188        //  the test must depends only on the region we're looking at.
1189        LOG.info("Move starting region="+toMove.getRegionInfo().getRegionNameAsString());
1190        TEST_UTIL.getHBaseAdmin().move(
1191            toMove.getRegionInfo().getEncodedNameAsBytes(),
1192            destServerName.getServerName().getBytes()
1193            );
1194 
1195        while (destServer.getOnlineRegion(regionName) == null ||
1196            destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1197            curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes) ||
1198            master.getAssignmentManager().getRegionStates().isRegionsInTransition()) {
1199          // wait for the move to be finished
1200          Thread.sleep(1);
1201         }
1202 
1203        LOG.info("Move finished for region="+toMove.getRegionInfo().getRegionNameAsString());
1204 
1205        // Check our new state.
1206        Assert.assertNull(curServer.getOnlineRegion(regionName));
1207        Assert.assertNotNull(destServer.getOnlineRegion(regionName));
1208        Assert.assertFalse(destServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1209        Assert.assertFalse(curServer.getRegionsInTransitionInRS().containsKey(encodedRegionNameBytes));
1210 
1211 
1212        // Cache was NOT updated and points to the wrong server
1213        Assert.assertFalse(
1214            conn.getCachedLocation(TABLE_NAME3, ROW_X).getRegionLocation()
1215             .getPort() == destServerName.getPort());
1216 
1217        // Hijack the number of retry to fail after 2 tries
1218        final int prevNumRetriesVal = setNumTries(conn, 2);
1219 
1220        Put put3 = new Put(ROW_X);
1221        put3.add(FAM_NAM, ROW_X, ROW_X);
1222        Put put4 = new Put(otherRow);
1223        put4.add(FAM_NAM, otherRow, otherRow);
1224 
1225        // do multi
1226        table.batch(Lists.newArrayList(put4, put3)); // first should be a valid row,
1227        // second we get RegionMovedException.
1228 
1229        setNumTries(conn, prevNumRetriesVal);
1230      } finally {
1231        table.close();
1232      }
1233   }
1234 
1235   @Test
1236   public void testErrorBackoffTimeCalculation() throws Exception {
1237     // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not.
1238     final long ANY_PAUSE = 100;
1239     ServerName location = ServerName.valueOf("127.0.0.1", 1, 0);
1240     ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0);
1241 
1242     ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge();
1243     EnvironmentEdgeManager.injectEdge(timeMachine);
1244     try {
1245       long timeBase = timeMachine.currentTime();
1246       long largeAmountOfTime = ANY_PAUSE * 1000;
1247       ConnectionManager.ServerErrorTracker tracker =
1248           new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
1249 
1250       // The default backoff is 0.
1251       assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
1252 
1253       // Check some backoff values from HConstants sequence.
1254       tracker.reportServerError(location);
1255       assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1256         tracker.calculateBackoffTime(location, ANY_PAUSE));
1257       tracker.reportServerError(location);
1258       tracker.reportServerError(location);
1259       tracker.reportServerError(location);
1260       assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[3],
1261         tracker.calculateBackoffTime(location, ANY_PAUSE));
1262 
1263       // All of this shouldn't affect backoff for different location.
1264       assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1265       tracker.reportServerError(diffLocation);
1266       assertEqualsWithJitter(ANY_PAUSE * HConstants.RETRY_BACKOFF[0],
1267         tracker.calculateBackoffTime(diffLocation, ANY_PAUSE));
1268 
1269       // Check with different base.
1270       assertEqualsWithJitter(ANY_PAUSE * 2 * HConstants.RETRY_BACKOFF[3],
1271           tracker.calculateBackoffTime(location, ANY_PAUSE * 2));
1272     } finally {
1273       EnvironmentEdgeManager.reset();
1274     }
1275   }
1276 
1277   private static void assertEqualsWithJitter(long expected, long actual) {
1278     assertEqualsWithJitter(expected, actual, expected);
1279   }
1280 
1281   private static void assertEqualsWithJitter(long expected, long actual, long jitterBase) {
1282     assertTrue("Value not within jitter: " + expected + " vs " + actual,
1283         Math.abs(actual - expected) <= (0.01f * jitterBase));
1284   }
1285 
1286   /**
1287    * Tests that a destroyed connection does not have a live zookeeper.
1288    * Below is timing based.  We put up a connection to a table and then close the connection while
1289    * having a background thread running that is forcing close of the connection to try and
1290    * provoke a close catastrophe; we are hoping for a car crash so we can see if we are leaking
1291    * zk connections.
1292    * @throws Exception
1293    */
1294   @Ignore ("Flakey test: See HBASE-8996")@Test
1295   public void testDeleteForZKConnLeak() throws Exception {
1296     TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
1297     final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
1298     config.setInt("zookeeper.recovery.retry", 1);
1299     config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
1300     config.setInt("hbase.rpc.timeout", 2000);
1301     config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
1302 
1303     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
1304       5, TimeUnit.SECONDS,
1305       new SynchronousQueue<Runnable>(),
1306       Threads.newDaemonThreadFactory("test-hcm-delete"));
1307 
1308     pool.submit(new Runnable() {
1309       @Override
1310       public void run() {
1311         while (!Thread.interrupted()) {
1312           try {
1313             HConnection conn = HConnectionManager.getConnection(config);
1314             LOG.info("Connection " + conn);
1315             HConnectionManager.deleteStaleConnection(conn);
1316             LOG.info("Connection closed " + conn);
1317             // TODO: This sleep time should be less than the time that it takes to open and close
1318             // a table.  Ideally we would do a few runs first to measure.  For now this is
1319             // timing based; hopefully we hit the bad condition.
1320             Threads.sleep(10);
1321           } catch (Exception e) {
1322           }
1323         }
1324       }
1325     });
1326 
1327     // Use connection multiple times.
1328     for (int i = 0; i < 30; i++) {
1329       Connection c1 = null;
1330       try {
1331         c1 = ConnectionManager.getConnectionInternal(config);
1332         LOG.info("HTable connection " + i + " " + c1);
1333         Table table = new HTable(config, TABLE_NAME4, pool);
1334         table.close();
1335         LOG.info("HTable connection " + i + " closed " + c1);
1336       } catch (Exception e) {
1337         LOG.info("We actually want this to happen!!!!  So we can see if we are leaking zk", e);
1338       } finally {
1339         if (c1 != null) {
1340           if (c1.isClosed()) {
1341             // cannot use getZooKeeper as method instantiates watcher if null
1342             Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
1343             zkwField.setAccessible(true);
1344             Object watcher = zkwField.get(c1);
1345 
1346             if (watcher != null) {
1347               if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
1348                 // non-synchronized access to watcher; sleep and check again in case zk connection
1349                 // hasn't been cleaned up yet.
1350                 Thread.sleep(1000);
1351                 if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
1352                   pool.shutdownNow();
1353                   fail("Live zookeeper in closed connection");
1354                 }
1355               }
1356             }
1357           }
1358           c1.close();
1359         }
1360       }
1361     }
1362     pool.shutdownNow();
1363   }
1364 
1365   @Test(timeout = 60000)
1366   public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
1367     Configuration config = new Configuration(TEST_UTIL.getConfiguration());
1368 
1369     TableName tableName = TableName.valueOf("testConnectionRideOverClusterRestart");
1370     TEST_UTIL.createTable(tableName.getName(), new byte[][] {FAM_NAM}, config).close();
1371 
1372     Connection connection = ConnectionFactory.createConnection(config);
1373     Table table = connection.getTable(tableName);
1374 
1375     // this will cache the meta location and table's region location
1376     table.get(new Get(Bytes.toBytes("foo")));
1377 
1378     // restart HBase
1379     TEST_UTIL.shutdownMiniHBaseCluster();
1380     TEST_UTIL.restartHBaseCluster(2);
1381     // this should be able to discover new locations for meta and table's region
1382     table.get(new Get(Bytes.toBytes("foo")));
1383     TEST_UTIL.deleteTable(tableName);
1384     table.close();
1385     connection.close();
1386   }
1387 }
1388