View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertTrue;
21  import static org.junit.Assert.fail;
22  
23  import java.io.IOException;
24  import java.net.SocketTimeoutException;
25  import java.util.Comparator;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.SortedMap;
30  import java.util.concurrent.ConcurrentSkipListMap;
31  import java.util.concurrent.ExecutorService;
32  import java.util.concurrent.Executors;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.hadoop.hbase.util.ByteStringer;
37  import org.apache.commons.lang.NotImplementedException;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.conf.Configured;
42  import org.apache.hadoop.hbase.DoNotRetryIOException;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HRegionLocation;
47  import org.apache.hadoop.hbase.RegionLocations;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.RegionTooBusyException;
50  import org.apache.hadoop.hbase.ServerName;
51  import org.apache.hadoop.hbase.testclassification.SmallTests;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
54  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
56  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
57  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
58  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
59  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
60  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
63  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
64  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
65  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
69  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
70  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
71  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
73  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
74  import org.apache.hadoop.hbase.security.User;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.Pair;
77  import org.apache.hadoop.hbase.util.Threads;
78  import org.apache.hadoop.util.Tool;
79  import org.apache.hadoop.util.ToolRunner;
80  import org.junit.Before;
81  import org.junit.Ignore;
82  import org.junit.Test;
83  import org.junit.experimental.categories.Category;
84  import org.mockito.Mockito;
85  
86  import com.google.common.base.Stopwatch;
87  import com.google.protobuf.ByteString;
88  import com.google.protobuf.RpcController;
89  import com.google.protobuf.ServiceException;
90  
91  /**
92   * Test client behavior w/o setting up a cluster.
93   * Mock up cluster emissions.
94   */
95  @Category(SmallTests.class)
96  public class TestClientNoCluster extends Configured implements Tool {
97    private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
98    private Configuration conf;
99    public static final ServerName META_SERVERNAME =
100       ServerName.valueOf("meta.example.org", 16010, 12345);
101 
102   @Before
103   public void setUp() throws Exception {
104     this.conf = HBaseConfiguration.create();
105     // Run my HConnection overrides.  Use my little HConnectionImplementation below which
106     // allows me insert mocks and also use my Registry below rather than the default zk based
107     // one so tests run faster and don't have zk dependency.
108     this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
109   }
110 
111   /**
112    * Simple cluster registry inserted in place of our usual zookeeper based one.
113    */
114   static class SimpleRegistry implements Registry {
115     final ServerName META_HOST = META_SERVERNAME;
116 
117     @Override
118     public void init(Connection connection) {
119     }
120 
121     @Override
122     public RegionLocations getMetaRegionLocation() throws IOException {
123       return new RegionLocations(
124         new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST));
125     }
126 
127     @Override
128     public String getClusterId() {
129       return HConstants.CLUSTER_ID_DEFAULT;
130     }
131 
132     @Override
133     public boolean isTableOnlineState(TableName tableName, boolean enabled)
134     throws IOException {
135       return enabled;
136     }
137 
138     @Override
139     public int getCurrentNrHRS() throws IOException {
140       return 1;
141     }
142   }
143 
144   /**
145    * Remove the @Ignore to try out timeout and retry asettings
146    * @throws IOException
147    */
148   @Ignore
149   @Test
150   public void testTimeoutAndRetries() throws IOException {
151     Configuration localConfig = HBaseConfiguration.create(this.conf);
152     // This override mocks up our exists/get call to throw a RegionServerStoppedException.
153     localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
154     Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
155     Throwable t = null;
156     LOG.info("Start");
157     try {
158       // An exists call turns into a get w/ a flag.
159       table.exists(new Get(Bytes.toBytes("abc")));
160     } catch (SocketTimeoutException e) {
161       // I expect this exception.
162       LOG.info("Got expected exception", e);
163       t = e;
164     } catch (RetriesExhaustedException e) {
165       // This is the old, unwanted behavior.  If we get here FAIL!!!
166       fail();
167     } finally {
168       table.close();
169     }
170     LOG.info("Stop");
171     assertTrue(t != null);
172   }
173 
174   /**
175    * Test that operation timeout prevails over rpc default timeout and retries, etc.
176    * @throws IOException
177    */
178   @Test
179   public void testRpcTimeout() throws IOException {
180     Configuration localConfig = HBaseConfiguration.create(this.conf);
181     // This override mocks up our exists/get call to throw a RegionServerStoppedException.
182     localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
183     int pause = 10;
184     localConfig.setInt("hbase.client.pause", pause);
185     localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
186     // Set the operation timeout to be < the pause.  Expectation is that after first pause, we will
187     // fail out of the rpc because the rpc timeout will have been set to the operation tiemout
188     // and it has expired.  Otherwise, if this functionality is broke, all retries will be run --
189     // all ten of them -- and we'll get the RetriesExhaustedException exception.
190     localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
191     Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
192     Throwable t = null;
193     try {
194       // An exists call turns into a get w/ a flag.
195       table.exists(new Get(Bytes.toBytes("abc")));
196     } catch (SocketTimeoutException e) {
197       // I expect this exception.
198       LOG.info("Got expected exception", e);
199       t = e;
200     } catch (RetriesExhaustedException e) {
201       // This is the old, unwanted behavior.  If we get here FAIL!!!
202       fail();
203     } finally {
204       table.close();
205     }
206     assertTrue(t != null);
207   }
208 
209   @Test
210   public void testDoNotRetryMetaScanner() throws IOException {
211     this.conf.set("hbase.client.connection.impl",
212       RegionServerStoppedOnScannerOpenConnection.class.getName());
213     try (Connection connection = ConnectionFactory.createConnection(conf)) {
214       MetaScanner.metaScan(connection, null);
215     }
216   }
217 
218   @Test
219   public void testDoNotRetryOnScanNext() throws IOException {
220     this.conf.set("hbase.client.connection.impl",
221       RegionServerStoppedOnScannerOpenConnection.class.getName());
222     // Go against meta else we will try to find first region for the table on construction which
223     // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
224     // good for a bit of testing.
225     Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
226     ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
227     try {
228       Result result = null;
229       while ((result = scanner.next()) != null) {
230         LOG.info(result);
231       }
232     } finally {
233       scanner.close();
234       table.close();
235     }
236   }
237 
238   @Test
239   public void testRegionServerStoppedOnScannerOpen() throws IOException {
240     this.conf.set("hbase.client.connection.impl",
241       RegionServerStoppedOnScannerOpenConnection.class.getName());
242     // Go against meta else we will try to find first region for the table on construction which
243     // means we'll have to do a bunch more mocking.  Tests that go against meta only should be
244     // good for a bit of testing.
245     Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
246     ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
247     try {
248       Result result = null;
249       while ((result = scanner.next()) != null) {
250         LOG.info(result);
251       }
252     } finally {
253       scanner.close();
254       table.close();
255     }
256   }
257 
258   /**
259    * Override to shutdown going to zookeeper for cluster id and meta location.
260    */
261   static class ScanOpenNextThenExceptionThenRecoverConnection
262   extends ConnectionManager.HConnectionImplementation {
263     final ClientService.BlockingInterface stub;
264 
265     ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
266         boolean managed, ExecutorService pool) throws IOException {
267       super(conf, managed);
268       // Mock up my stub so open scanner returns a scanner id and then on next, we throw
269       // exceptions for three times and then after that, we return no more to scan.
270       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
271       long sid = 12345L;
272       try {
273         Mockito.when(stub.scan((RpcController)Mockito.any(),
274             (ClientProtos.ScanRequest)Mockito.any())).
275           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
276           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
277           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
278               setMoreResults(false).build());
279       } catch (ServiceException e) {
280         throw new IOException(e);
281       }
282     }
283 
284     @Override
285     public BlockingInterface getClient(ServerName sn) throws IOException {
286       return this.stub;
287     }
288   }
289 
290   @Test
291   public void testConnectionClosedOnRegionLocate() throws IOException {
292     Configuration testConf = new Configuration(this.conf);
293     testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
294     // Go against meta else we will try to find first region for the table on construction which
295     // means we'll have to do a bunch more mocking. Tests that go against meta only should be
296     // good for a bit of testing.
297     Connection connection = ConnectionFactory.createConnection(testConf);
298     Table table = connection.getTable(TableName.META_TABLE_NAME);
299     connection.close();
300     try {
301       Get get = new Get(Bytes.toBytes("dummyRow"));
302       table.get(get);
303       fail("Should have thrown DoNotRetryException but no exception thrown");
304     } catch (Exception e) {
305       if (!(e instanceof DoNotRetryIOException)) {
306         String errMsg =
307             "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
308         LOG.error(errMsg, e);
309         fail(errMsg);
310       }
311     } finally {
312       table.close();
313     }
314   }
315 
316   /**
317    * Override to shutdown going to zookeeper for cluster id and meta location.
318    */
319   static class RegionServerStoppedOnScannerOpenConnection
320   extends ConnectionManager.HConnectionImplementation {
321     final ClientService.BlockingInterface stub;
322 
323     RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
324         ExecutorService pool, User user) throws IOException {
325       super(conf, managed);
326       // Mock up my stub so open scanner returns a scanner id and then on next, we throw
327       // exceptions for three times and then after that, we return no more to scan.
328       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
329       long sid = 12345L;
330       try {
331         Mockito.when(stub.scan((RpcController)Mockito.any(),
332             (ClientProtos.ScanRequest)Mockito.any())).
333           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
334           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
335           thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
336               setMoreResults(false).build());
337       } catch (ServiceException e) {
338         throw new IOException(e);
339       }
340     }
341 
342     @Override
343     public BlockingInterface getClient(ServerName sn) throws IOException {
344       return this.stub;
345     }
346   }
347 
348   /**
349    * Override to check we are setting rpc timeout right.
350    */
351   static class RpcTimeoutConnection
352   extends ConnectionManager.HConnectionImplementation {
353     final ClientService.BlockingInterface stub;
354 
355     RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
356     throws IOException {
357       super(conf, managed);
358       // Mock up my stub so an exists call -- which turns into a get -- throws an exception
359       this.stub = Mockito.mock(ClientService.BlockingInterface.class);
360       try {
361         Mockito.when(stub.get((RpcController)Mockito.any(),
362             (ClientProtos.GetRequest)Mockito.any())).
363           thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
364       } catch (ServiceException e) {
365         throw new IOException(e);
366       }
367     }
368 
369     @Override
370     public BlockingInterface getClient(ServerName sn) throws IOException {
371       return this.stub;
372     }
373   }
374 
375   /**
376    * Fake many regionservers and many regions on a connection implementation.
377    */
378   static class ManyServersManyRegionsConnection
379   extends ConnectionManager.HConnectionImplementation {
380     // All access should be synchronized
381     final Map<ServerName, ClientService.BlockingInterface> serversByClient;
382 
383     /**
384      * Map of faked-up rows of a 'meta table'.
385      */
386     final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
387     final AtomicLong sequenceids = new AtomicLong(0);
388     private final Configuration conf;
389 
390     ManyServersManyRegionsConnection(Configuration conf, boolean managed,
391         ExecutorService pool, User user)
392     throws IOException {
393       super(conf, managed, pool, user);
394       int serverCount = conf.getInt("hbase.test.servers", 10);
395       this.serversByClient =
396         new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
397       this.meta = makeMeta(Bytes.toBytes(
398         conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
399         conf.getInt("hbase.test.regions", 100),
400         conf.getLong("hbase.test.namespace.span", 1000),
401         serverCount);
402       this.conf = conf;
403     }
404 
405     @Override
406     public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
407       // if (!sn.toString().startsWith("meta")) LOG.info(sn);
408       ClientService.BlockingInterface stub = null;
409       synchronized (this.serversByClient) {
410         stub = this.serversByClient.get(sn);
411         if (stub == null) {
412           stub = new FakeServer(this.conf, meta, sequenceids);
413           this.serversByClient.put(sn, stub);
414         }
415       }
416       return stub;
417     }
418   }
419 
420   static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
421       final AtomicLong sequenceids, final MultiRequest request) {
422     // Make a response to match the request.  Act like there were no failures.
423     ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
424     // Per Region.
425     RegionActionResult.Builder regionActionResultBuilder =
426         RegionActionResult.newBuilder();
427     ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
428     for (RegionAction regionAction: request.getRegionActionList()) {
429       regionActionResultBuilder.clear();
430       // Per Action in a Region.
431       for (ClientProtos.Action action: regionAction.getActionList()) {
432         roeBuilder.clear();
433         // Return empty Result and proper index as result.
434         roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
435         roeBuilder.setIndex(action.getIndex());
436         regionActionResultBuilder.addResultOrException(roeBuilder.build());
437       }
438       builder.addRegionActionResult(regionActionResultBuilder.build());
439     }
440     return builder.build();
441   }
442 
443   /**
444    * Fake 'server'.
445    * Implements the ClientService responding as though it were a 'server' (presumes a new
446    * ClientService.BlockingInterface made per server).
447    */
448   static class FakeServer implements ClientService.BlockingInterface {
449     private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
450     private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
451     private final AtomicLong sequenceids;
452     private final long multiPause;
453     private final int tooManyMultiRequests;
454 
455     FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
456         final AtomicLong sequenceids) {
457       this.meta = meta;
458       this.sequenceids = sequenceids;
459 
460       // Pause to simulate the server taking time applying the edits.  This will drive up the
461       // number of threads used over in client.
462       this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
463       this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
464     }
465 
466     @Override
467     public GetResponse get(RpcController controller, GetRequest request)
468     throws ServiceException {
469       boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
470         request.getRegion().getType());
471       if (!metaRegion) {
472         return doGetResponse(request);
473       }
474       return doMetaGetResponse(meta, request);
475     }
476 
477     private GetResponse doGetResponse(GetRequest request) {
478       ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
479       ByteString row = request.getGet().getRow();
480       resultBuilder.addCell(getStartCode(row));
481       GetResponse.Builder builder = GetResponse.newBuilder();
482       builder.setResult(resultBuilder.build());
483       return builder.build();
484     }
485 
486     @Override
487     public MutateResponse mutate(RpcController controller,
488         MutateRequest request) throws ServiceException {
489       throw new NotImplementedException();
490     }
491 
492     @Override
493     public ScanResponse scan(RpcController controller,
494         ScanRequest request) throws ServiceException {
495       // Presume it is a scan of meta for now. Not all scans provide a region spec expecting
496       // the server to keep reference by scannerid.  TODO.
497       return doMetaScanResponse(meta, sequenceids, request);
498     }
499 
500     @Override
501     public BulkLoadHFileResponse bulkLoadHFile(
502         RpcController controller, BulkLoadHFileRequest request)
503         throws ServiceException {
504       throw new NotImplementedException();
505     }
506 
507     @Override
508     public CoprocessorServiceResponse execService(
509         RpcController controller, CoprocessorServiceRequest request)
510         throws ServiceException {
511       throw new NotImplementedException();
512     }
513 
514     @Override
515     public MultiResponse multi(RpcController controller, MultiRequest request)
516     throws ServiceException {
517       int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
518       try {
519         if (concurrentInvocations >= tooManyMultiRequests) {
520           throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
521            concurrentInvocations));
522         }
523         Threads.sleep(multiPause);
524         return doMultiResponse(meta, sequenceids, request);
525       } finally {
526         this.multiInvocationsCount.decrementAndGet();
527       }
528     }
529 
530     @Override
531     public CoprocessorServiceResponse execRegionServerService(RpcController controller,
532         CoprocessorServiceRequest request) throws ServiceException {
533       throw new NotImplementedException();
534     }
535   }
536 
537   static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
538       final AtomicLong sequenceids, final ScanRequest request) {
539     ScanResponse.Builder builder = ScanResponse.newBuilder();
540     int max = request.getNumberOfRows();
541     int count = 0;
542     Map<byte [], Pair<HRegionInfo, ServerName>> tail =
543       request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
544       ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
545     for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
546       // Can be 0 on open of a scanner -- i.e. rpc to setup scannerid only.
547       if (max <= 0) break;
548       if (++count > max) break;
549       HRegionInfo hri = e.getValue().getFirst();
550       ByteString row = ByteStringer.wrap(hri.getRegionName());
551       resultBuilder.clear();
552       resultBuilder.addCell(getRegionInfo(row, hri));
553       resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
554       resultBuilder.addCell(getStartCode(row));
555       builder.addResults(resultBuilder.build());
556       // Set more to false if we are on the last region in table.
557       if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
558       else builder.setMoreResults(true);
559     }
560     // If no scannerid, set one.
561     builder.setScannerId(request.hasScannerId()?
562       request.getScannerId(): sequenceids.incrementAndGet());
563     return builder.build();
564   }
565 
566   static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
567       final GetRequest request) {
568     ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
569     ByteString row = request.getGet().getRow();
570     Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
571     if (p == null) {
572       if (request.getGet().getClosestRowBefore()) {
573         byte [] bytes = row.toByteArray();
574         SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
575           bytes != null? meta.headMap(bytes): meta;
576         p = head == null? null: head.get(head.lastKey());
577       }
578     }
579     if (p != null) {
580       resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
581       resultBuilder.addCell(getServer(row, p.getSecond()));
582     }
583     resultBuilder.addCell(getStartCode(row));
584     GetResponse.Builder builder = GetResponse.newBuilder();
585     builder.setResult(resultBuilder.build());
586     return builder.build();
587   }
588 
589   /**
590    * @param name region name or encoded region name.
591    * @param type
592    * @return True if we are dealing with a hbase:meta region.
593    */
594   static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
595     switch (type) {
596     case REGION_NAME:
597       return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
598     case ENCODED_REGION_NAME:
599       return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
600     default: throw new UnsupportedOperationException();
601     }
602   }
603 
604   private final static ByteString CATALOG_FAMILY_BYTESTRING =
605       ByteStringer.wrap(HConstants.CATALOG_FAMILY);
606   private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
607       ByteStringer.wrap(HConstants.REGIONINFO_QUALIFIER);
608   private final static ByteString SERVER_QUALIFIER_BYTESTRING =
609       ByteStringer.wrap(HConstants.SERVER_QUALIFIER);
610 
611   static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
612     CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
613     cellBuilder.setRow(row);
614     cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
615     cellBuilder.setTimestamp(System.currentTimeMillis());
616     return cellBuilder;
617   }
618 
619   static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
620     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
621     cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
622     cellBuilder.setValue(ByteStringer.wrap(hri.toByteArray()));
623     return cellBuilder.build();
624   }
625 
626   static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
627     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
628     cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
629     cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
630     return cellBuilder.build();
631   }
632 
633   static CellProtos.Cell getStartCode(final ByteString row) {
634     CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
635     cellBuilder.setQualifier(ByteStringer.wrap(HConstants.STARTCODE_QUALIFIER));
636     // TODO:
637     cellBuilder.setValue(ByteStringer.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
638     return cellBuilder.build();
639   }
640 
641   private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
642 
643   /**
644    * Format passed integer.  Zero-pad.
645    * Copied from hbase-server PE class and small amendment.  Make them share.
646    * @param number
647    * @return Returns zero-prefixed 10-byte wide decimal version of passed
648    * number (Does absolute in case number is negative).
649    */
650   private static byte [] format(final long number) {
651     byte [] b = new byte[10];
652     long d = number;
653     for (int i = b.length - 1; i >= 0; i--) {
654       b[i] = (byte)((d % 10) + '0');
655       d /= 10;
656     }
657     return b;
658   }
659 
660   /**
661    * @param count
662    * @param namespaceSpan
663    * @return <code>count</code> regions
664    */
665   private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
666       final long namespaceSpan) {
667     byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
668     byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
669     long interval = namespaceSpan / count;
670     HRegionInfo [] hris = new HRegionInfo[count];
671     for (int i = 0; i < count; i++) {
672       if (i == 0) {
673         endKey = format(interval);
674       } else {
675         startKey = endKey;
676         if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
677         else endKey = format((i + 1) * interval);
678       }
679       hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
680     }
681     return hris;
682   }
683 
684   /**
685    * @param count
686    * @return Return <code>count</code> servernames.
687    */
688   private static ServerName [] makeServerNames(final int count) {
689     ServerName [] sns = new ServerName[count];
690     for (int i = 0; i < count; i++) {
691       sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
692     }
693     return sns;
694   }
695 
696   /**
697    * Comparator for meta row keys.
698    */
699   private static class MetaRowsComparator implements Comparator<byte []> {
700     private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
701     @Override
702     public int compare(byte[] left, byte[] right) {
703       return delegate.compareRows(left, 0, left.length, right, 0, right.length);
704     }
705   }
706 
707   /**
708    * Create up a map that is keyed by meta row name and whose value is the HRegionInfo and
709    * ServerName to return for this row.
710    * @return Map with faked hbase:meta content in it.
711    */
712   static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
713       final int regionCount, final long namespaceSpan, final int serverCount) {
714     // I need a comparator for meta rows so we sort properly.
715     SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
716       new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
717     HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
718     ServerName [] serverNames = makeServerNames(serverCount);
719     int per = regionCount / serverCount;
720     int count = 0;
721     for (HRegionInfo hri: hris) {
722       Pair<HRegionInfo, ServerName> p =
723         new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
724       meta.put(hri.getRegionName(), p);
725     }
726     return meta;
727   }
728 
729   /**
730    * Code for each 'client' to run.
731    *
732    * @param id
733    * @param c
734    * @param sharedConnection
735    * @throws IOException
736    */
737   static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
738     long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
739     long startTime = System.currentTimeMillis();
740     final int printInterval = 100000;
741     Random rd = new Random(id);
742     boolean get = c.getBoolean("hbase.test.do.gets", false);
743     TableName tableName = TableName.valueOf(BIG_USER_TABLE);
744     if (get) {
745       try (Table table = sharedConnection.getTable(tableName)){
746         Stopwatch stopWatch = new Stopwatch();
747         stopWatch.start();
748         for (int i = 0; i < namespaceSpan; i++) {
749           byte [] b = format(rd.nextLong());
750           Get g = new Get(b);
751           table.get(g);
752           if (i % printInterval == 0) {
753             LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
754             stopWatch.reset();
755             stopWatch.start();
756           }
757         }
758         LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
759             (System.currentTimeMillis() - startTime) + "ms");
760       }
761     } else {
762       try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
763         Stopwatch stopWatch = new Stopwatch();
764         stopWatch.start();
765         for (int i = 0; i < namespaceSpan; i++) {
766           byte [] b = format(rd.nextLong());
767           Put p = new Put(b);
768           p.add(HConstants.CATALOG_FAMILY, b, b);
769           mutator.mutate(p);
770           if (i % printInterval == 0) {
771             LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
772             stopWatch.reset();
773             stopWatch.start();
774           }
775         }
776         LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
777             (System.currentTimeMillis() - startTime) + "ms");
778         }
779     }
780   }
781 
782   @Override
783   public int run(String[] arg0) throws Exception {
784     int errCode = 0;
785     // TODO: Make command options.
786     // How many servers to fake.
787     final int servers = 1;
788     // How many regions to put on the faked servers.
789     final int regions = 100000;
790     // How many 'keys' in the faked regions.
791     final long namespaceSpan = 50000000;
792     // How long to take to pause after doing a put; make this long if you want to fake a struggling
793     // server.
794     final long multiPause = 0;
795     // Check args make basic sense.
796     if ((namespaceSpan < regions) || (regions < servers)) {
797       throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
798         regions + " which must be > servers=" + servers);
799     }
800 
801     // Set my many servers and many regions faking connection in place.
802     getConf().set("hbase.client.connection.impl",
803       ManyServersManyRegionsConnection.class.getName());
804     // Use simple kv registry rather than zk
805     getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
806     // When to report fails.  Default is we report the 10th.  This means we'll see log everytime
807     // an exception is thrown -- usually RegionTooBusyException when we have more than
808     // hbase.test.multi.too.many requests outstanding at any time.
809     getConf().setInt("hbase.client.start.log.errors.counter", 0);
810 
811     // Ugly but this is only way to pass in configs.into ManyServersManyRegionsConnection class.
812     getConf().setInt("hbase.test.regions", regions);
813     getConf().setLong("hbase.test.namespace.span", namespaceSpan);
814     getConf().setLong("hbase.test.servers", servers);
815     getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
816     getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
817     // Let there be ten outstanding requests at a time before we throw RegionBusyException.
818     getConf().setInt("hbase.test.multi.too.many", 10);
819     final int clients = 2;
820 
821     // Have them all share the same connection so they all share the same instance of
822     // ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
823     final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
824       // Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
825     // Share a connection so I can keep counts in the 'server' on concurrency.
826     final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
827     try {
828       Thread [] ts = new Thread[clients];
829       for (int j = 0; j < ts.length; j++) {
830         final int id = j;
831         ts[j] = new Thread("" + j) {
832           final Configuration c = getConf();
833 
834           @Override
835           public void run() {
836             try {
837               cycle(id, c, sharedConnection);
838             } catch (IOException e) {
839               e.printStackTrace();
840             }
841           }
842         };
843         ts[j].start();
844       }
845       for (int j = 0; j < ts.length; j++) {
846         ts[j].join();
847       }
848     } finally {
849       sharedConnection.close();
850     }
851     return errCode;
852   }
853 
854   /**
855    * Run a client instance against a faked up server.
856    * @param args TODO
857    * @throws Exception
858    */
859   public static void main(String[] args) throws Exception {
860     System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
861   }
862 }