1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertTrue;
21 import static org.junit.Assert.fail;
22
23 import java.io.IOException;
24 import java.net.SocketTimeoutException;
25 import java.util.Comparator;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.SortedMap;
30 import java.util.concurrent.ConcurrentSkipListMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35
36 import org.apache.hadoop.hbase.util.ByteStringer;
37 import org.apache.commons.lang.NotImplementedException;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.conf.Configured;
42 import org.apache.hadoop.hbase.DoNotRetryIOException;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.RegionLocations;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.RegionTooBusyException;
50 import org.apache.hadoop.hbase.ServerName;
51 import org.apache.hadoop.hbase.testclassification.SmallTests;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
54 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
55 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
56 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
57 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
58 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
59 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
60 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
61 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
64 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
65 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
66 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
68 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
69 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
70 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
71 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
72 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
73 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
74 import org.apache.hadoop.hbase.security.User;
75 import org.apache.hadoop.hbase.util.Bytes;
76 import org.apache.hadoop.hbase.util.Pair;
77 import org.apache.hadoop.hbase.util.Threads;
78 import org.apache.hadoop.util.Tool;
79 import org.apache.hadoop.util.ToolRunner;
80 import org.junit.Before;
81 import org.junit.Ignore;
82 import org.junit.Test;
83 import org.junit.experimental.categories.Category;
84 import org.mockito.Mockito;
85
86 import com.google.common.base.Stopwatch;
87 import com.google.protobuf.ByteString;
88 import com.google.protobuf.RpcController;
89 import com.google.protobuf.ServiceException;
90
91
92
93
94
95 @Category(SmallTests.class)
96 public class TestClientNoCluster extends Configured implements Tool {
97 private static final Log LOG = LogFactory.getLog(TestClientNoCluster.class);
98 private Configuration conf;
99 public static final ServerName META_SERVERNAME =
100 ServerName.valueOf("meta.example.org", 16010, 12345);
101
102 @Before
103 public void setUp() throws Exception {
104 this.conf = HBaseConfiguration.create();
105
106
107
108 this.conf.set("hbase.client.registry.impl", SimpleRegistry.class.getName());
109 }
110
111
112
113
114 static class SimpleRegistry implements Registry {
115 final ServerName META_HOST = META_SERVERNAME;
116
117 @Override
118 public void init(Connection connection) {
119 }
120
121 @Override
122 public RegionLocations getMetaRegionLocation() throws IOException {
123 return new RegionLocations(
124 new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, META_HOST));
125 }
126
127 @Override
128 public String getClusterId() {
129 return HConstants.CLUSTER_ID_DEFAULT;
130 }
131
132 @Override
133 public boolean isTableOnlineState(TableName tableName, boolean enabled)
134 throws IOException {
135 return enabled;
136 }
137
138 @Override
139 public int getCurrentNrHRS() throws IOException {
140 return 1;
141 }
142 }
143
144
145
146
147
148 @Ignore
149 @Test
150 public void testTimeoutAndRetries() throws IOException {
151 Configuration localConfig = HBaseConfiguration.create(this.conf);
152
153 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
154 Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
155 Throwable t = null;
156 LOG.info("Start");
157 try {
158
159 table.exists(new Get(Bytes.toBytes("abc")));
160 } catch (SocketTimeoutException e) {
161
162 LOG.info("Got expected exception", e);
163 t = e;
164 } catch (RetriesExhaustedException e) {
165
166 fail();
167 } finally {
168 table.close();
169 }
170 LOG.info("Stop");
171 assertTrue(t != null);
172 }
173
174
175
176
177
178 @Test
179 public void testRpcTimeout() throws IOException {
180 Configuration localConfig = HBaseConfiguration.create(this.conf);
181
182 localConfig.set("hbase.client.connection.impl", RpcTimeoutConnection.class.getName());
183 int pause = 10;
184 localConfig.setInt("hbase.client.pause", pause);
185 localConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 10);
186
187
188
189
190 localConfig.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, pause - 1);
191 Table table = new HTable(localConfig, TableName.META_TABLE_NAME);
192 Throwable t = null;
193 try {
194
195 table.exists(new Get(Bytes.toBytes("abc")));
196 } catch (SocketTimeoutException e) {
197
198 LOG.info("Got expected exception", e);
199 t = e;
200 } catch (RetriesExhaustedException e) {
201
202 fail();
203 } finally {
204 table.close();
205 }
206 assertTrue(t != null);
207 }
208
209 @Test
210 public void testDoNotRetryMetaScanner() throws IOException {
211 this.conf.set("hbase.client.connection.impl",
212 RegionServerStoppedOnScannerOpenConnection.class.getName());
213 try (Connection connection = ConnectionFactory.createConnection(conf)) {
214 MetaScanner.metaScan(connection, null);
215 }
216 }
217
218 @Test
219 public void testDoNotRetryOnScanNext() throws IOException {
220 this.conf.set("hbase.client.connection.impl",
221 RegionServerStoppedOnScannerOpenConnection.class.getName());
222
223
224
225 Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
226 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
227 try {
228 Result result = null;
229 while ((result = scanner.next()) != null) {
230 LOG.info(result);
231 }
232 } finally {
233 scanner.close();
234 table.close();
235 }
236 }
237
238 @Test
239 public void testRegionServerStoppedOnScannerOpen() throws IOException {
240 this.conf.set("hbase.client.connection.impl",
241 RegionServerStoppedOnScannerOpenConnection.class.getName());
242
243
244
245 Table table = new HTable(this.conf, TableName.META_TABLE_NAME);
246 ResultScanner scanner = table.getScanner(HConstants.CATALOG_FAMILY);
247 try {
248 Result result = null;
249 while ((result = scanner.next()) != null) {
250 LOG.info(result);
251 }
252 } finally {
253 scanner.close();
254 table.close();
255 }
256 }
257
258
259
260
261 static class ScanOpenNextThenExceptionThenRecoverConnection
262 extends ConnectionManager.HConnectionImplementation {
263 final ClientService.BlockingInterface stub;
264
265 ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
266 boolean managed, ExecutorService pool) throws IOException {
267 super(conf, managed);
268
269
270 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
271 long sid = 12345L;
272 try {
273 Mockito.when(stub.scan((RpcController)Mockito.any(),
274 (ClientProtos.ScanRequest)Mockito.any())).
275 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
276 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
277 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
278 setMoreResults(false).build());
279 } catch (ServiceException e) {
280 throw new IOException(e);
281 }
282 }
283
284 @Override
285 public BlockingInterface getClient(ServerName sn) throws IOException {
286 return this.stub;
287 }
288 }
289
290 @Test
291 public void testConnectionClosedOnRegionLocate() throws IOException {
292 Configuration testConf = new Configuration(this.conf);
293 testConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
294
295
296
297 Connection connection = ConnectionFactory.createConnection(testConf);
298 Table table = connection.getTable(TableName.META_TABLE_NAME);
299 connection.close();
300 try {
301 Get get = new Get(Bytes.toBytes("dummyRow"));
302 table.get(get);
303 fail("Should have thrown DoNotRetryException but no exception thrown");
304 } catch (Exception e) {
305 if (!(e instanceof DoNotRetryIOException)) {
306 String errMsg =
307 "Should have thrown DoNotRetryException but actually " + e.getClass().getSimpleName();
308 LOG.error(errMsg, e);
309 fail(errMsg);
310 }
311 } finally {
312 table.close();
313 }
314 }
315
316
317
318
319 static class RegionServerStoppedOnScannerOpenConnection
320 extends ConnectionManager.HConnectionImplementation {
321 final ClientService.BlockingInterface stub;
322
323 RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
324 ExecutorService pool, User user) throws IOException {
325 super(conf, managed);
326
327
328 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
329 long sid = 12345L;
330 try {
331 Mockito.when(stub.scan((RpcController)Mockito.any(),
332 (ClientProtos.ScanRequest)Mockito.any())).
333 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
334 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
335 thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
336 setMoreResults(false).build());
337 } catch (ServiceException e) {
338 throw new IOException(e);
339 }
340 }
341
342 @Override
343 public BlockingInterface getClient(ServerName sn) throws IOException {
344 return this.stub;
345 }
346 }
347
348
349
350
351 static class RpcTimeoutConnection
352 extends ConnectionManager.HConnectionImplementation {
353 final ClientService.BlockingInterface stub;
354
355 RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
356 throws IOException {
357 super(conf, managed);
358
359 this.stub = Mockito.mock(ClientService.BlockingInterface.class);
360 try {
361 Mockito.when(stub.get((RpcController)Mockito.any(),
362 (ClientProtos.GetRequest)Mockito.any())).
363 thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito")));
364 } catch (ServiceException e) {
365 throw new IOException(e);
366 }
367 }
368
369 @Override
370 public BlockingInterface getClient(ServerName sn) throws IOException {
371 return this.stub;
372 }
373 }
374
375
376
377
378 static class ManyServersManyRegionsConnection
379 extends ConnectionManager.HConnectionImplementation {
380
381 final Map<ServerName, ClientService.BlockingInterface> serversByClient;
382
383
384
385
386 final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
387 final AtomicLong sequenceids = new AtomicLong(0);
388 private final Configuration conf;
389
390 ManyServersManyRegionsConnection(Configuration conf, boolean managed,
391 ExecutorService pool, User user)
392 throws IOException {
393 super(conf, managed, pool, user);
394 int serverCount = conf.getInt("hbase.test.servers", 10);
395 this.serversByClient =
396 new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
397 this.meta = makeMeta(Bytes.toBytes(
398 conf.get("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE))),
399 conf.getInt("hbase.test.regions", 100),
400 conf.getLong("hbase.test.namespace.span", 1000),
401 serverCount);
402 this.conf = conf;
403 }
404
405 @Override
406 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
407
408 ClientService.BlockingInterface stub = null;
409 synchronized (this.serversByClient) {
410 stub = this.serversByClient.get(sn);
411 if (stub == null) {
412 stub = new FakeServer(this.conf, meta, sequenceids);
413 this.serversByClient.put(sn, stub);
414 }
415 }
416 return stub;
417 }
418 }
419
420 static MultiResponse doMultiResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
421 final AtomicLong sequenceids, final MultiRequest request) {
422
423 ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
424
425 RegionActionResult.Builder regionActionResultBuilder =
426 RegionActionResult.newBuilder();
427 ResultOrException.Builder roeBuilder = ResultOrException.newBuilder();
428 for (RegionAction regionAction: request.getRegionActionList()) {
429 regionActionResultBuilder.clear();
430
431 for (ClientProtos.Action action: regionAction.getActionList()) {
432 roeBuilder.clear();
433
434 roeBuilder.setResult(ClientProtos.Result.getDefaultInstance());
435 roeBuilder.setIndex(action.getIndex());
436 regionActionResultBuilder.addResultOrException(roeBuilder.build());
437 }
438 builder.addRegionActionResult(regionActionResultBuilder.build());
439 }
440 return builder.build();
441 }
442
443
444
445
446
447
448 static class FakeServer implements ClientService.BlockingInterface {
449 private AtomicInteger multiInvocationsCount = new AtomicInteger(0);
450 private final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta;
451 private final AtomicLong sequenceids;
452 private final long multiPause;
453 private final int tooManyMultiRequests;
454
455 FakeServer(final Configuration c, final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
456 final AtomicLong sequenceids) {
457 this.meta = meta;
458 this.sequenceids = sequenceids;
459
460
461
462 this.multiPause = c.getLong("hbase.test.multi.pause.when.done", 0);
463 this.tooManyMultiRequests = c.getInt("hbase.test.multi.too.many", 3);
464 }
465
466 @Override
467 public GetResponse get(RpcController controller, GetRequest request)
468 throws ServiceException {
469 boolean metaRegion = isMetaRegion(request.getRegion().getValue().toByteArray(),
470 request.getRegion().getType());
471 if (!metaRegion) {
472 return doGetResponse(request);
473 }
474 return doMetaGetResponse(meta, request);
475 }
476
477 private GetResponse doGetResponse(GetRequest request) {
478 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
479 ByteString row = request.getGet().getRow();
480 resultBuilder.addCell(getStartCode(row));
481 GetResponse.Builder builder = GetResponse.newBuilder();
482 builder.setResult(resultBuilder.build());
483 return builder.build();
484 }
485
486 @Override
487 public MutateResponse mutate(RpcController controller,
488 MutateRequest request) throws ServiceException {
489 throw new NotImplementedException();
490 }
491
492 @Override
493 public ScanResponse scan(RpcController controller,
494 ScanRequest request) throws ServiceException {
495
496
497 return doMetaScanResponse(meta, sequenceids, request);
498 }
499
500 @Override
501 public BulkLoadHFileResponse bulkLoadHFile(
502 RpcController controller, BulkLoadHFileRequest request)
503 throws ServiceException {
504 throw new NotImplementedException();
505 }
506
507 @Override
508 public CoprocessorServiceResponse execService(
509 RpcController controller, CoprocessorServiceRequest request)
510 throws ServiceException {
511 throw new NotImplementedException();
512 }
513
514 @Override
515 public MultiResponse multi(RpcController controller, MultiRequest request)
516 throws ServiceException {
517 int concurrentInvocations = this.multiInvocationsCount.incrementAndGet();
518 try {
519 if (concurrentInvocations >= tooManyMultiRequests) {
520 throw new ServiceException(new RegionTooBusyException("concurrentInvocations=" +
521 concurrentInvocations));
522 }
523 Threads.sleep(multiPause);
524 return doMultiResponse(meta, sequenceids, request);
525 } finally {
526 this.multiInvocationsCount.decrementAndGet();
527 }
528 }
529
530 @Override
531 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
532 CoprocessorServiceRequest request) throws ServiceException {
533 throw new NotImplementedException();
534 }
535 }
536
537 static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
538 final AtomicLong sequenceids, final ScanRequest request) {
539 ScanResponse.Builder builder = ScanResponse.newBuilder();
540 int max = request.getNumberOfRows();
541 int count = 0;
542 Map<byte [], Pair<HRegionInfo, ServerName>> tail =
543 request.hasScan()? meta.tailMap(request.getScan().getStartRow().toByteArray()): meta;
544 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
545 for (Map.Entry<byte [], Pair<HRegionInfo, ServerName>> e: tail.entrySet()) {
546
547 if (max <= 0) break;
548 if (++count > max) break;
549 HRegionInfo hri = e.getValue().getFirst();
550 ByteString row = ByteStringer.wrap(hri.getRegionName());
551 resultBuilder.clear();
552 resultBuilder.addCell(getRegionInfo(row, hri));
553 resultBuilder.addCell(getServer(row, e.getValue().getSecond()));
554 resultBuilder.addCell(getStartCode(row));
555 builder.addResults(resultBuilder.build());
556
557 if (hri.getEndKey().length <= 0) builder.setMoreResults(false);
558 else builder.setMoreResults(true);
559 }
560
561 builder.setScannerId(request.hasScannerId()?
562 request.getScannerId(): sequenceids.incrementAndGet());
563 return builder.build();
564 }
565
566 static GetResponse doMetaGetResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta,
567 final GetRequest request) {
568 ClientProtos.Result.Builder resultBuilder = ClientProtos.Result.newBuilder();
569 ByteString row = request.getGet().getRow();
570 Pair<HRegionInfo, ServerName> p = meta.get(row.toByteArray());
571 if (p == null) {
572 if (request.getGet().getClosestRowBefore()) {
573 byte [] bytes = row.toByteArray();
574 SortedMap<byte [], Pair<HRegionInfo, ServerName>> head =
575 bytes != null? meta.headMap(bytes): meta;
576 p = head == null? null: head.get(head.lastKey());
577 }
578 }
579 if (p != null) {
580 resultBuilder.addCell(getRegionInfo(row, p.getFirst()));
581 resultBuilder.addCell(getServer(row, p.getSecond()));
582 }
583 resultBuilder.addCell(getStartCode(row));
584 GetResponse.Builder builder = GetResponse.newBuilder();
585 builder.setResult(resultBuilder.build());
586 return builder.build();
587 }
588
589
590
591
592
593
594 static boolean isMetaRegion(final byte [] name, final RegionSpecifierType type) {
595 switch (type) {
596 case REGION_NAME:
597 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), name);
598 case ENCODED_REGION_NAME:
599 return Bytes.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), name);
600 default: throw new UnsupportedOperationException();
601 }
602 }
603
604 private final static ByteString CATALOG_FAMILY_BYTESTRING =
605 ByteStringer.wrap(HConstants.CATALOG_FAMILY);
606 private final static ByteString REGIONINFO_QUALIFIER_BYTESTRING =
607 ByteStringer.wrap(HConstants.REGIONINFO_QUALIFIER);
608 private final static ByteString SERVER_QUALIFIER_BYTESTRING =
609 ByteStringer.wrap(HConstants.SERVER_QUALIFIER);
610
611 static CellProtos.Cell.Builder getBaseCellBuilder(final ByteString row) {
612 CellProtos.Cell.Builder cellBuilder = CellProtos.Cell.newBuilder();
613 cellBuilder.setRow(row);
614 cellBuilder.setFamily(CATALOG_FAMILY_BYTESTRING);
615 cellBuilder.setTimestamp(System.currentTimeMillis());
616 return cellBuilder;
617 }
618
619 static CellProtos.Cell getRegionInfo(final ByteString row, final HRegionInfo hri) {
620 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
621 cellBuilder.setQualifier(REGIONINFO_QUALIFIER_BYTESTRING);
622 cellBuilder.setValue(ByteStringer.wrap(hri.toByteArray()));
623 return cellBuilder.build();
624 }
625
626 static CellProtos.Cell getServer(final ByteString row, final ServerName sn) {
627 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
628 cellBuilder.setQualifier(SERVER_QUALIFIER_BYTESTRING);
629 cellBuilder.setValue(ByteString.copyFromUtf8(sn.getHostAndPort()));
630 return cellBuilder.build();
631 }
632
633 static CellProtos.Cell getStartCode(final ByteString row) {
634 CellProtos.Cell.Builder cellBuilder = getBaseCellBuilder(row);
635 cellBuilder.setQualifier(ByteStringer.wrap(HConstants.STARTCODE_QUALIFIER));
636
637 cellBuilder.setValue(ByteStringer.wrap(Bytes.toBytes(META_SERVERNAME.getStartcode())));
638 return cellBuilder.build();
639 }
640
641 private static final byte [] BIG_USER_TABLE = Bytes.toBytes("t");
642
643
644
645
646
647
648
649
650 private static byte [] format(final long number) {
651 byte [] b = new byte[10];
652 long d = number;
653 for (int i = b.length - 1; i >= 0; i--) {
654 b[i] = (byte)((d % 10) + '0');
655 d /= 10;
656 }
657 return b;
658 }
659
660
661
662
663
664
665 private static HRegionInfo [] makeHRegionInfos(final byte [] tableName, final int count,
666 final long namespaceSpan) {
667 byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
668 byte [] endKey = HConstants.EMPTY_BYTE_ARRAY;
669 long interval = namespaceSpan / count;
670 HRegionInfo [] hris = new HRegionInfo[count];
671 for (int i = 0; i < count; i++) {
672 if (i == 0) {
673 endKey = format(interval);
674 } else {
675 startKey = endKey;
676 if (i == count - 1) endKey = HConstants.EMPTY_BYTE_ARRAY;
677 else endKey = format((i + 1) * interval);
678 }
679 hris[i] = new HRegionInfo(TableName.valueOf(tableName), startKey, endKey);
680 }
681 return hris;
682 }
683
684
685
686
687
688 private static ServerName [] makeServerNames(final int count) {
689 ServerName [] sns = new ServerName[count];
690 for (int i = 0; i < count; i++) {
691 sns[i] = ServerName.valueOf("" + i + ".example.org", 16010, i);
692 }
693 return sns;
694 }
695
696
697
698
699 private static class MetaRowsComparator implements Comparator<byte []> {
700 private final KeyValue.KVComparator delegate = new KeyValue.MetaComparator();
701 @Override
702 public int compare(byte[] left, byte[] right) {
703 return delegate.compareRows(left, 0, left.length, right, 0, right.length);
704 }
705 }
706
707
708
709
710
711
712 static SortedMap<byte [], Pair<HRegionInfo, ServerName>> makeMeta(final byte [] tableName,
713 final int regionCount, final long namespaceSpan, final int serverCount) {
714
715 SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta =
716 new ConcurrentSkipListMap<byte[], Pair<HRegionInfo,ServerName>>(new MetaRowsComparator());
717 HRegionInfo [] hris = makeHRegionInfos(tableName, regionCount, namespaceSpan);
718 ServerName [] serverNames = makeServerNames(serverCount);
719 int per = regionCount / serverCount;
720 int count = 0;
721 for (HRegionInfo hri: hris) {
722 Pair<HRegionInfo, ServerName> p =
723 new Pair<HRegionInfo, ServerName>(hri, serverNames[count++ / per]);
724 meta.put(hri.getRegionName(), p);
725 }
726 return meta;
727 }
728
729
730
731
732
733
734
735
736
737 static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException {
738 long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000);
739 long startTime = System.currentTimeMillis();
740 final int printInterval = 100000;
741 Random rd = new Random(id);
742 boolean get = c.getBoolean("hbase.test.do.gets", false);
743 TableName tableName = TableName.valueOf(BIG_USER_TABLE);
744 if (get) {
745 try (Table table = sharedConnection.getTable(tableName)){
746 Stopwatch stopWatch = new Stopwatch();
747 stopWatch.start();
748 for (int i = 0; i < namespaceSpan; i++) {
749 byte [] b = format(rd.nextLong());
750 Get g = new Get(b);
751 table.get(g);
752 if (i % printInterval == 0) {
753 LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis());
754 stopWatch.reset();
755 stopWatch.start();
756 }
757 }
758 LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
759 (System.currentTimeMillis() - startTime) + "ms");
760 }
761 } else {
762 try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) {
763 Stopwatch stopWatch = new Stopwatch();
764 stopWatch.start();
765 for (int i = 0; i < namespaceSpan; i++) {
766 byte [] b = format(rd.nextLong());
767 Put p = new Put(b);
768 p.add(HConstants.CATALOG_FAMILY, b, b);
769 mutator.mutate(p);
770 if (i % printInterval == 0) {
771 LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis());
772 stopWatch.reset();
773 stopWatch.start();
774 }
775 }
776 LOG.info("Finished a cycle putting " + namespaceSpan + " in " +
777 (System.currentTimeMillis() - startTime) + "ms");
778 }
779 }
780 }
781
782 @Override
783 public int run(String[] arg0) throws Exception {
784 int errCode = 0;
785
786
787 final int servers = 1;
788
789 final int regions = 100000;
790
791 final long namespaceSpan = 50000000;
792
793
794 final long multiPause = 0;
795
796 if ((namespaceSpan < regions) || (regions < servers)) {
797 throw new IllegalArgumentException("namespaceSpan=" + namespaceSpan + " must be > regions=" +
798 regions + " which must be > servers=" + servers);
799 }
800
801
802 getConf().set("hbase.client.connection.impl",
803 ManyServersManyRegionsConnection.class.getName());
804
805 getConf().set("hbase.client.registry.impl", SimpleRegistry.class.getName());
806
807
808
809 getConf().setInt("hbase.client.start.log.errors.counter", 0);
810
811
812 getConf().setInt("hbase.test.regions", regions);
813 getConf().setLong("hbase.test.namespace.span", namespaceSpan);
814 getConf().setLong("hbase.test.servers", servers);
815 getConf().set("hbase.test.tablename", Bytes.toString(BIG_USER_TABLE));
816 getConf().setLong("hbase.test.multi.pause.when.done", multiPause);
817
818 getConf().setInt("hbase.test.multi.too.many", 10);
819 final int clients = 2;
820
821
822
823 final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
824
825
826 final Connection sharedConnection = ConnectionFactory.createConnection(getConf()
827 try {
828 Thread [] ts = new Thread[clients];
829 for (int j = 0; j < ts.length; j++) {
830 final int id = j;
831 ts[j] = new Thread("" + j) {
832 final Configuration c = getConf();
833
834 @Override
835 public void run() {
836 try {
837 cycle(id, c, sharedConnection);
838 } catch (IOException e) {
839 e.printStackTrace();
840 }
841 }
842 };
843 ts[j].start();
844 }
845 for (int j = 0; j < ts.length; j++) {
846 ts[j].join();
847 }
848 } finally {
849 sharedConnection.close();
850 }
851 return errCode;
852 }
853
854
855
856
857
858
859 public static void main(String[] args) throws Exception {
860 System.exit(ToolRunner.run(HBaseConfiguration.create(), new TestClientNoCluster(), args));
861 }
862 }