1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22
23 import com.google.common.base.Joiner;
24
25 import org.apache.commons.cli.CommandLine;
26 import org.apache.commons.lang.RandomStringUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.IntegrationTestBase;
37 import org.apache.hadoop.hbase.IntegrationTestingUtility;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.Admin;
41 import org.apache.hadoop.hbase.client.Connection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Consistency;
44 import org.apache.hadoop.hbase.client.RegionLocator;
45 import org.apache.hadoop.hbase.client.Result;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.client.Table;
48 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
49 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
50 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
51 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52 import org.apache.hadoop.hbase.regionserver.InternalScanner;
53 import org.apache.hadoop.hbase.regionserver.RegionScanner;
54 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
57 import org.apache.hadoop.hbase.util.RegionSplitter;
58 import org.apache.hadoop.io.LongWritable;
59 import org.apache.hadoop.io.NullWritable;
60 import org.apache.hadoop.io.Writable;
61 import org.apache.hadoop.io.WritableComparable;
62 import org.apache.hadoop.io.WritableComparator;
63 import org.apache.hadoop.io.WritableUtils;
64 import org.apache.hadoop.mapreduce.InputFormat;
65 import org.apache.hadoop.mapreduce.InputSplit;
66 import org.apache.hadoop.mapreduce.Job;
67 import org.apache.hadoop.mapreduce.JobContext;
68 import org.apache.hadoop.mapreduce.Mapper;
69 import org.apache.hadoop.mapreduce.Partitioner;
70 import org.apache.hadoop.mapreduce.RecordReader;
71 import org.apache.hadoop.mapreduce.Reducer;
72 import org.apache.hadoop.mapreduce.TaskAttemptContext;
73 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
74 import org.apache.hadoop.util.StringUtils;
75 import org.apache.hadoop.util.ToolRunner;
76 import org.junit.Test;
77 import org.junit.experimental.categories.Category;
78
79 import java.io.DataInput;
80 import java.io.DataOutput;
81 import java.io.IOException;
82 import java.util.ArrayList;
83 import java.util.List;
84 import java.util.Map;
85 import java.util.Random;
86 import java.util.Set;
87 import java.util.concurrent.atomic.AtomicLong;
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @Category(IntegrationTests.class)
123 public class IntegrationTestBulkLoad extends IntegrationTestBase {
124
125 private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
126
127 private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
128 private static final byte[] SORT_FAM = Bytes.toBytes("S");
129 private static final byte[] DATA_FAM = Bytes.toBytes("D");
130
131 private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
132 private static int CHAIN_LENGTH = 500000;
133
134 private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
135 private static int NUM_MAPS = 1;
136
137 private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
138 private static int NUM_IMPORT_ROUNDS = 1;
139
140 private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
141
142 private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
143 private static String TABLE_NAME = "IntegrationTestBulkLoad";
144
145 private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
146 private static int NUM_REPLICA_COUNT_DEFAULT = 1;
147
148 private static final String OPT_LOAD = "load";
149 private static final String OPT_CHECK = "check";
150
151 private boolean load = false;
152 private boolean check = false;
153
154 public static class SlowMeCoproScanOperations extends BaseRegionObserver {
155 static final AtomicLong sleepTime = new AtomicLong(2000);
156 Random r = new Random();
157 AtomicLong countOfNext = new AtomicLong(0);
158 AtomicLong countOfOpen = new AtomicLong(0);
159 public SlowMeCoproScanOperations() {}
160 @Override
161 public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
162 final Scan scan, final RegionScanner s) throws IOException {
163 if (countOfOpen.incrementAndGet() == 2) {
164 slowdownCode(e);
165 }
166 return s;
167 }
168
169 @Override
170 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
171 final InternalScanner s, final List<Result> results,
172 final int limit, final boolean hasMore) throws IOException {
173
174
175 countOfNext.incrementAndGet();
176 if (countOfNext.get() == 0 || countOfNext.get() == 4) {
177 slowdownCode(e);
178 }
179 return true;
180 }
181 protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
182 if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
183 try {
184 if (sleepTime.get() > 0) {
185 LOG.info("Sleeping for " + sleepTime.get() + " ms");
186 Thread.sleep(sleepTime.get());
187 }
188 } catch (InterruptedException e1) {
189 LOG.error(e1);
190 }
191 }
192 }
193 }
194
195
196
197
198 private void installSlowingCoproc() throws IOException, InterruptedException {
199 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
200 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
201
202 TableName t = getTablename();
203 Admin admin = util.getHBaseAdmin();
204 HTableDescriptor desc = admin.getTableDescriptor(t);
205 desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
206 HBaseTestingUtility.modifyTableSync(admin, desc);
207
208
209 Thread.sleep(30000);
210 }
211
212 @Test
213 public void testBulkLoad() throws Exception {
214 runLoad();
215 installSlowingCoproc();
216 runCheckWithRetry();
217 }
218
219 public void runLoad() throws Exception {
220 setupTable();
221 int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
222 LOG.info("Running load with numIterations:" + numImportRounds);
223 for (int i = 0; i < numImportRounds; i++) {
224 runLinkedListMRJob(i);
225 }
226 }
227
228 private byte[][] getSplits(int numRegions) {
229 RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
230 split.setFirstRow(Bytes.toBytes(0L));
231 split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
232 return split.split(numRegions);
233 }
234
235 private void setupTable() throws IOException, InterruptedException {
236 if (util.getHBaseAdmin().tableExists(getTablename())) {
237 util.deleteTable(getTablename());
238 }
239
240 util.createTable(
241 getTablename().getName(),
242 new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
243 getSplits(16)
244 );
245
246 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
247 if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
248
249 TableName t = getTablename();
250 HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount);
251 }
252
253 private void runLinkedListMRJob(int iteration) throws Exception {
254 String jobName = IntegrationTestBulkLoad.class.getSimpleName() + " - " +
255 EnvironmentEdgeManager.currentTime();
256 Configuration conf = new Configuration(util.getConfiguration());
257 Path p = null;
258 if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
259 p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
260 } else {
261 p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
262 }
263
264 conf.setBoolean("mapreduce.map.speculative", false);
265 conf.setBoolean("mapreduce.reduce.speculative", false);
266 conf.setInt(ROUND_NUM_KEY, iteration);
267
268 Job job = new Job(conf);
269
270 job.setJobName(jobName);
271
272
273 job.setInputFormatClass(ITBulkLoadInputFormat.class);
274
275
276 job.setMapperClass(LinkedListCreationMapper.class);
277 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
278 job.setMapOutputValueClass(KeyValue.class);
279
280
281
282
283
284 job.setJarByClass(getClass());
285
286
287 FileOutputFormat.setOutputPath(job, p);
288 try (Connection conn = ConnectionFactory.createConnection(conf);
289 Admin admin = conn.getAdmin();
290 Table table = conn.getTable(getTablename());
291 RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
292
293
294 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
295
296
297 assertEquals(true, job.waitForCompletion(true));
298
299
300 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
301
302
303 loader.doBulkLoad(p, admin, table, regionLocator);
304 }
305
306
307 util.getTestFileSystem().delete(p, true);
308 }
309
310 public static class EmptySplit extends InputSplit implements Writable {
311 @Override
312 public void write(DataOutput out) throws IOException { }
313 @Override
314 public void readFields(DataInput in) throws IOException { }
315 @Override
316 public long getLength() { return 0L; }
317 @Override
318 public String[] getLocations() { return new String[0]; }
319 }
320
321 public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
322 private int index = -1;
323 private K[] keys;
324 private V[] values;
325
326 public FixedRecordReader(K[] keys, V[] values) {
327 this.keys = keys;
328 this.values = values;
329 }
330 @Override
331 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
332 InterruptedException { }
333 @Override
334 public boolean nextKeyValue() throws IOException, InterruptedException {
335 return ++index < keys.length;
336 }
337 @Override
338 public K getCurrentKey() throws IOException, InterruptedException {
339 return keys[index];
340 }
341 @Override
342 public V getCurrentValue() throws IOException, InterruptedException {
343 return values[index];
344 }
345 @Override
346 public float getProgress() throws IOException, InterruptedException {
347 return (float)index / keys.length;
348 }
349 @Override
350 public void close() throws IOException {
351 }
352 }
353
354 public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
355 @Override
356 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
357 int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
358 ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
359 for (int i = 0; i < numSplits; ++i) {
360 ret.add(new EmptySplit());
361 }
362 return ret;
363 }
364
365 @Override
366 public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
367 TaskAttemptContext context)
368 throws IOException, InterruptedException {
369 int taskId = context.getTaskAttemptID().getTaskID().getId();
370 int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
371 int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
372 int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
373
374 taskId = taskId + iteration * numMapTasks;
375 numMapTasks = numMapTasks * numIterations;
376
377 long chainId = Math.abs(new Random().nextLong());
378 chainId = chainId - (chainId % numMapTasks) + taskId;
379 LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
380
381 return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
382 }
383 }
384
385
386
387
388
389
390
391
392 public static class LinkedListCreationMapper
393 extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
394
395 private Random rand = new Random();
396
397 @Override
398 protected void map(LongWritable key, LongWritable value, Context context)
399 throws IOException, InterruptedException {
400 long chainId = value.get();
401 LOG.info("Starting mapper with chainId:" + chainId);
402
403 byte[] chainIdArray = Bytes.toBytes(chainId);
404 long currentRow = 0;
405
406 long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
407 long nextRow = getNextRow(0, chainLength);
408
409 for (long i = 0; i < chainLength; i++) {
410 byte[] rk = Bytes.toBytes(currentRow);
411
412
413 KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
414
415 KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
416
417 KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
418 Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
419 );
420
421
422 context.write(new ImmutableBytesWritable(rk), linkKv);
423 context.write(new ImmutableBytesWritable(rk), sortKv);
424 context.write(new ImmutableBytesWritable(rk), dataKv);
425
426 currentRow = nextRow;
427 nextRow = getNextRow(i+1, chainLength);
428 }
429 }
430
431
432 private long getNextRow(long index, long chainLength) {
433 long nextRow = Math.abs(rand.nextLong());
434
435
436
437 nextRow = nextRow - (nextRow % chainLength) + index;
438 return nextRow;
439 }
440 }
441
442
443
444
445
446
447 public static class LinkKey implements WritableComparable<LinkKey> {
448
449 private Long chainId;
450
451 public Long getOrder() {
452 return order;
453 }
454
455 public Long getChainId() {
456 return chainId;
457 }
458
459 private Long order;
460
461 public LinkKey() {}
462
463 public LinkKey(long chainId, long order) {
464 this.chainId = chainId;
465 this.order = order;
466 }
467
468 @Override
469 public int compareTo(LinkKey linkKey) {
470 int res = getChainId().compareTo(linkKey.getChainId());
471 if (res == 0) {
472 res = getOrder().compareTo(linkKey.getOrder());
473 }
474 return res;
475 }
476
477 @Override
478 public void write(DataOutput dataOutput) throws IOException {
479 WritableUtils.writeVLong(dataOutput, chainId);
480 WritableUtils.writeVLong(dataOutput, order);
481 }
482
483 @Override
484 public void readFields(DataInput dataInput) throws IOException {
485 chainId = WritableUtils.readVLong(dataInput);
486 order = WritableUtils.readVLong(dataInput);
487 }
488 }
489
490
491
492
493 public static class LinkChain implements WritableComparable<LinkChain> {
494
495 public Long getNext() {
496 return next;
497 }
498
499 public Long getRk() {
500 return rk;
501 }
502
503 public LinkChain() {}
504
505 public LinkChain(Long rk, Long next) {
506 this.rk = rk;
507 this.next = next;
508 }
509
510 private Long rk;
511 private Long next;
512
513 @Override
514 public int compareTo(LinkChain linkChain) {
515 int res = getRk().compareTo(linkChain.getRk());
516 if (res == 0) {
517 res = getNext().compareTo(linkChain.getNext());
518 }
519 return res;
520 }
521
522 @Override
523 public void write(DataOutput dataOutput) throws IOException {
524 WritableUtils.writeVLong(dataOutput, rk);
525 WritableUtils.writeVLong(dataOutput, next);
526 }
527
528 @Override
529 public void readFields(DataInput dataInput) throws IOException {
530 rk = WritableUtils.readVLong(dataInput);
531 next = WritableUtils.readVLong(dataInput);
532 }
533 }
534
535
536
537
538
539 public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
540 @Override
541 public int getPartition(LinkKey linkKey,
542 LinkChain linkChain,
543 int numPartitions) {
544 int hash = linkKey.getChainId().hashCode();
545 return Math.abs(hash % numPartitions);
546 }
547 }
548
549
550
551
552
553 public static class NaturalKeyGroupingComparator extends WritableComparator {
554
555 protected NaturalKeyGroupingComparator() {
556 super(LinkKey.class, true);
557 }
558
559 @Override
560 public int compare(WritableComparable w1, WritableComparable w2) {
561 LinkKey k1 = (LinkKey) w1;
562 LinkKey k2 = (LinkKey) w2;
563
564 return k1.getChainId().compareTo(k2.getChainId());
565 }
566 }
567
568
569
570
571
572 public static class CompositeKeyComparator extends WritableComparator {
573
574 protected CompositeKeyComparator() {
575 super(LinkKey.class, true);
576 }
577
578 @Override
579 public int compare(WritableComparable w1, WritableComparable w2) {
580 LinkKey k1 = (LinkKey) w1;
581 LinkKey k2 = (LinkKey) w2;
582
583 return k1.compareTo(k2);
584 }
585 }
586
587
588
589
590
591
592
593 public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
594 @Override
595 protected void map(ImmutableBytesWritable key, Result value, Context context)
596 throws IOException, InterruptedException {
597 long longRk = Bytes.toLong(value.getRow());
598
599 for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
600 long chainId = Bytes.toLong(entry.getKey());
601 long next = Bytes.toLong(entry.getValue());
602 Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
603 long order = Bytes.toLong(CellUtil.cloneValue(c));
604 context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
605 }
606 }
607 }
608
609
610
611
612
613
614
615
616
617
618 public static class LinkedListCheckingReducer
619 extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
620 @Override
621 protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
622 throws java.io.IOException, java.lang.InterruptedException {
623 long next = -1L;
624 long prev = -1L;
625 long count = 0L;
626
627 for (LinkChain lc : values) {
628
629 if (next == -1) {
630 if (lc.getRk() != 0L) {
631 String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
632 + ". Chain:" + key.chainId + ", order:" + key.order;
633 logError(msg, context);
634 throw new RuntimeException(msg);
635 }
636 next = lc.getNext();
637 } else {
638 if (next != lc.getRk()) {
639 String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
640 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
641 + ", order:" + key.order;
642 logError(msg, context);
643 throw new RuntimeException(msg);
644 }
645 prev = lc.getRk();
646 next = lc.getNext();
647 }
648 count++;
649 }
650
651 int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
652 if (count != expectedChainLen) {
653 String msg = "Chain wasn't the correct length. Expected " + expectedChainLen + " got "
654 + count + ". Chain:" + key.chainId + ", order:" + key.order;
655 logError(msg, context);
656 throw new RuntimeException(msg);
657 }
658 }
659
660 private static void logError(String msg, Context context) throws IOException {
661 TableName table = getTableName(context.getConfiguration());
662
663 LOG.error("Failure in chain verification: " + msg);
664 try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
665 Admin admin = connection.getAdmin()) {
666 LOG.error("cluster status:\n" + admin.getClusterStatus());
667 LOG.error("table regions:\n"
668 + Joiner.on("\n").join(admin.getTableRegions(table)));
669 }
670 }
671 }
672
673 private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
674 try {
675 runCheck();
676 } catch (Throwable t) {
677 LOG.warn("Received " + StringUtils.stringifyException(t));
678 LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
679 runCheck();
680 throw t;
681 }
682
683 }
684
685
686
687
688
689
690
691
692 private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
693 LOG.info("Running check");
694 Configuration conf = getConf();
695 String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
696 Path p = util.getDataTestDirOnTestFS(jobName);
697
698 Job job = new Job(conf);
699 job.setJarByClass(getClass());
700 job.setJobName(jobName);
701
702 job.setPartitionerClass(NaturalKeyPartitioner.class);
703 job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
704 job.setSortComparatorClass(CompositeKeyComparator.class);
705
706 Scan scan = new Scan();
707 scan.addFamily(CHAIN_FAM);
708 scan.addFamily(SORT_FAM);
709 scan.setMaxVersions(1);
710 scan.setCacheBlocks(false);
711 scan.setBatch(1000);
712
713 int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
714 if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
715 scan.setConsistency(Consistency.TIMELINE);
716 }
717
718 TableMapReduceUtil.initTableMapperJob(
719 getTablename().getName(),
720 scan,
721 LinkedListCheckingMapper.class,
722 LinkKey.class,
723 LinkChain.class,
724 job
725 );
726
727 job.setReducerClass(LinkedListCheckingReducer.class);
728 job.setOutputKeyClass(NullWritable.class);
729 job.setOutputValueClass(NullWritable.class);
730
731 FileOutputFormat.setOutputPath(job, p);
732
733 assertEquals(true, job.waitForCompletion(true));
734
735
736 util.getTestFileSystem().delete(p, true);
737 }
738
739 @Override
740 public void setUpCluster() throws Exception {
741 util = getTestingUtil(getConf());
742 util.initializeCluster(1);
743 int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
744 if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
745 LOG.debug("Region Replicas enabled: " + replicaCount);
746 }
747
748
749 if (util.isDistributedCluster()) {
750 util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
751 Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
752 );
753 util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
754 } else {
755 util.startMiniMapReduceCluster();
756 }
757 }
758
759 @Override
760 protected void addOptions() {
761 super.addOptions();
762 super.addOptNoArg(OPT_CHECK, "Run check only");
763 super.addOptNoArg(OPT_LOAD, "Run load only");
764 }
765
766 @Override
767 protected void processOptions(CommandLine cmd) {
768 super.processOptions(cmd);
769 check = cmd.hasOption(OPT_CHECK);
770 load = cmd.hasOption(OPT_LOAD);
771 }
772
773 @Override
774 public int runTestFromCommandLine() throws Exception {
775 if (load) {
776 runLoad();
777 } else if (check) {
778 installSlowingCoproc();
779 runCheckWithRetry();
780 } else {
781 testBulkLoad();
782 }
783 return 0;
784 }
785
786 @Override
787 public TableName getTablename() {
788 return getTableName(getConf());
789 }
790
791 public static TableName getTableName(Configuration conf) {
792 return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
793 }
794
795 @Override
796 protected Set<String> getColumnFamilies() {
797 return null;
798 }
799
800 public static void main(String[] args) throws Exception {
801 Configuration conf = HBaseConfiguration.create();
802 IntegrationTestingUtility.setUseDistributedCluster(conf);
803 int status = ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
804 System.exit(status);
805 }
806 }