View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import com.google.common.base.Joiner;
24  
25  import org.apache.commons.cli.CommandLine;
26  import org.apache.commons.lang.RandomStringUtils;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.IntegrationTestBase;
37  import org.apache.hadoop.hbase.IntegrationTestingUtility;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.Admin;
41  import org.apache.hadoop.hbase.client.Connection;
42  import org.apache.hadoop.hbase.client.ConnectionFactory;
43  import org.apache.hadoop.hbase.client.Consistency;
44  import org.apache.hadoop.hbase.client.RegionLocator;
45  import org.apache.hadoop.hbase.client.Result;
46  import org.apache.hadoop.hbase.client.Scan;
47  import org.apache.hadoop.hbase.client.Table;
48  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
49  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
50  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
51  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
52  import org.apache.hadoop.hbase.regionserver.InternalScanner;
53  import org.apache.hadoop.hbase.regionserver.RegionScanner;
54  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
57  import org.apache.hadoop.hbase.util.RegionSplitter;
58  import org.apache.hadoop.io.LongWritable;
59  import org.apache.hadoop.io.NullWritable;
60  import org.apache.hadoop.io.Writable;
61  import org.apache.hadoop.io.WritableComparable;
62  import org.apache.hadoop.io.WritableComparator;
63  import org.apache.hadoop.io.WritableUtils;
64  import org.apache.hadoop.mapreduce.InputFormat;
65  import org.apache.hadoop.mapreduce.InputSplit;
66  import org.apache.hadoop.mapreduce.Job;
67  import org.apache.hadoop.mapreduce.JobContext;
68  import org.apache.hadoop.mapreduce.Mapper;
69  import org.apache.hadoop.mapreduce.Partitioner;
70  import org.apache.hadoop.mapreduce.RecordReader;
71  import org.apache.hadoop.mapreduce.Reducer;
72  import org.apache.hadoop.mapreduce.TaskAttemptContext;
73  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
74  import org.apache.hadoop.util.StringUtils;
75  import org.apache.hadoop.util.ToolRunner;
76  import org.junit.Test;
77  import org.junit.experimental.categories.Category;
78  
79  import java.io.DataInput;
80  import java.io.DataOutput;
81  import java.io.IOException;
82  import java.util.ArrayList;
83  import java.util.List;
84  import java.util.Map;
85  import java.util.Random;
86  import java.util.Set;
87  import java.util.concurrent.atomic.AtomicLong;
88  
89  /**
90   * Test Bulk Load and MR on a distributed cluster.
91   * It starts an MR job that creates linked chains
92   *
93   * The format of rows is like this:
94   * Row Key -> Long
95   *
96   * L:<< Chain Id >> -> Row Key of the next link in the chain
97   * S:<< Chain Id >> -> The step in the chain that his link is.
98   * D:<< Chain Id >> -> Random Data.
99   *
100  * All chains start on row 0.
101  * All rk's are > 0.
102  *
103  * After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
104  *
105  * There are a few options exposed:
106  *
107  * hbase.IntegrationTestBulkLoad.chainLength
108  * The number of rows that will be part of each and every chain.
109  *
110  * hbase.IntegrationTestBulkLoad.numMaps
111  * The number of mappers that will be run.  Each mapper creates on linked list chain.
112  *
113  * hbase.IntegrationTestBulkLoad.numImportRounds
114  * How many jobs will be run to create linked lists.
115  *
116  * hbase.IntegrationTestBulkLoad.tableName
117  * The name of the table.
118  *
119  * hbase.IntegrationTestBulkLoad.replicaCount
120  * How many region replicas to configure for the table under test.
121  */
122 @Category(IntegrationTests.class)
123 public class IntegrationTestBulkLoad extends IntegrationTestBase {
124 
125   private static final Log LOG = LogFactory.getLog(IntegrationTestBulkLoad.class);
126 
127   private static final byte[] CHAIN_FAM = Bytes.toBytes("L");
128   private static final byte[] SORT_FAM  = Bytes.toBytes("S");
129   private static final byte[] DATA_FAM  = Bytes.toBytes("D");
130 
131   private static String CHAIN_LENGTH_KEY = "hbase.IntegrationTestBulkLoad.chainLength";
132   private static int CHAIN_LENGTH = 500000;
133 
134   private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
135   private static int NUM_MAPS = 1;
136 
137   private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
138   private static int NUM_IMPORT_ROUNDS = 1;
139 
140   private static String ROUND_NUM_KEY = "hbase.IntegrationTestBulkLoad.roundNum";
141 
142   private static String TABLE_NAME_KEY = "hbase.IntegrationTestBulkLoad.tableName";
143   private static String TABLE_NAME = "IntegrationTestBulkLoad";
144 
145   private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
146   private static int NUM_REPLICA_COUNT_DEFAULT = 1;
147 
148   private static final String OPT_LOAD = "load";
149   private static final String OPT_CHECK = "check";
150 
151   private boolean load = false;
152   private boolean check = false;
153 
154   public static class SlowMeCoproScanOperations extends BaseRegionObserver {
155     static final AtomicLong sleepTime = new AtomicLong(2000);
156     Random r = new Random();
157     AtomicLong countOfNext = new AtomicLong(0);
158     AtomicLong countOfOpen = new AtomicLong(0);
159     public SlowMeCoproScanOperations() {}
160     @Override
161     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
162         final Scan scan, final RegionScanner s) throws IOException {
163       if (countOfOpen.incrementAndGet() == 2) { //slowdown openScanner randomly
164         slowdownCode(e);
165       }
166       return s;
167     }
168 
169     @Override
170     public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e,
171         final InternalScanner s, final List<Result> results,
172         final int limit, final boolean hasMore) throws IOException {
173       //this will slow down a certain next operation if the conditions are met. The slowness
174       //will allow the call to go to a replica
175       countOfNext.incrementAndGet();
176       if (countOfNext.get() == 0 || countOfNext.get() == 4) {
177         slowdownCode(e);
178       }
179       return true;
180     }
181     protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
182       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
183         try {
184           if (sleepTime.get() > 0) {
185             LOG.info("Sleeping for " + sleepTime.get() + " ms");
186             Thread.sleep(sleepTime.get());
187           }
188         } catch (InterruptedException e1) {
189           LOG.error(e1);
190         }
191       }
192     }
193   }
194 
195   /**
196    * Modify table {@code getTableName()} to carry {@link SlowMeCoproScanOperations}.
197    */
198   private void installSlowingCoproc() throws IOException, InterruptedException {
199     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
200     if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
201 
202     TableName t = getTablename();
203     Admin admin = util.getHBaseAdmin();
204     HTableDescriptor desc = admin.getTableDescriptor(t);
205     desc.addCoprocessor(SlowMeCoproScanOperations.class.getName());
206     HBaseTestingUtility.modifyTableSync(admin, desc);
207     //sleep for sometime. Hope is that the regions are closed/opened before
208     //the sleep returns. TODO: do this better
209     Thread.sleep(30000);
210   }
211 
212   @Test
213   public void testBulkLoad() throws Exception {
214     runLoad();
215     installSlowingCoproc();
216     runCheckWithRetry();
217   }
218 
219   public void runLoad() throws Exception {
220     setupTable();
221     int numImportRounds = getConf().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
222     LOG.info("Running load with numIterations:" + numImportRounds);
223     for (int i = 0; i < numImportRounds; i++) {
224       runLinkedListMRJob(i);
225     }
226   }
227 
228   private byte[][] getSplits(int numRegions) {
229     RegionSplitter.UniformSplit split = new RegionSplitter.UniformSplit();
230     split.setFirstRow(Bytes.toBytes(0L));
231     split.setLastRow(Bytes.toBytes(Long.MAX_VALUE));
232     return split.split(numRegions);
233   }
234 
235   private void setupTable() throws IOException, InterruptedException {
236     if (util.getHBaseAdmin().tableExists(getTablename())) {
237       util.deleteTable(getTablename());
238     }
239 
240     util.createTable(
241         getTablename().getName(),
242         new byte[][]{CHAIN_FAM, SORT_FAM, DATA_FAM},
243         getSplits(16)
244     );
245 
246     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
247     if (replicaCount == NUM_REPLICA_COUNT_DEFAULT) return;
248 
249     TableName t = getTablename();
250     HBaseTestingUtility.setReplicas(util.getHBaseAdmin(), t, replicaCount);
251   }
252 
253   private void runLinkedListMRJob(int iteration) throws Exception {
254     String jobName =  IntegrationTestBulkLoad.class.getSimpleName() + " - " +
255         EnvironmentEdgeManager.currentTime();
256     Configuration conf = new Configuration(util.getConfiguration());
257     Path p = null;
258     if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) {
259       p = util.getDataTestDirOnTestFS(getTablename() + "-" + iteration);
260     } else {
261       p = new Path(conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY));
262     }
263 
264     conf.setBoolean("mapreduce.map.speculative", false);
265     conf.setBoolean("mapreduce.reduce.speculative", false);
266     conf.setInt(ROUND_NUM_KEY, iteration);
267 
268     Job job = new Job(conf);
269 
270     job.setJobName(jobName);
271 
272     // set the input format so that we can create map tasks with no data input.
273     job.setInputFormatClass(ITBulkLoadInputFormat.class);
274 
275     // Set the mapper classes.
276     job.setMapperClass(LinkedListCreationMapper.class);
277     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
278     job.setMapOutputValueClass(KeyValue.class);
279 
280     // Use the identity reducer
281     // So nothing to do here.
282 
283     // Set this jar.
284     job.setJarByClass(getClass());
285 
286     // Set where to place the hfiles.
287     FileOutputFormat.setOutputPath(job, p);
288     try (Connection conn = ConnectionFactory.createConnection(conf);
289         Admin admin = conn.getAdmin();
290         Table table = conn.getTable(getTablename());
291         RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
292 
293       // Configure the partitioner and other things needed for HFileOutputFormat.
294       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
295 
296       // Run the job making sure it works.
297       assertEquals(true, job.waitForCompletion(true));
298 
299       // Create a new loader.
300       LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
301 
302       // Load the HFiles in.
303       loader.doBulkLoad(p, admin, table, regionLocator);
304     }
305 
306     // Delete the files.
307     util.getTestFileSystem().delete(p, true);
308   }
309 
310   public static class EmptySplit extends InputSplit implements Writable {
311     @Override
312     public void write(DataOutput out) throws IOException { }
313     @Override
314     public void readFields(DataInput in) throws IOException { }
315     @Override
316     public long getLength() { return 0L; }
317     @Override
318     public String[] getLocations() { return new String[0]; }
319   }
320 
321   public static class FixedRecordReader<K, V> extends RecordReader<K, V> {
322     private int index = -1;
323     private K[] keys;
324     private V[] values;
325 
326     public FixedRecordReader(K[] keys, V[] values) {
327       this.keys = keys;
328       this.values = values;
329     }
330     @Override
331     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
332     InterruptedException { }
333     @Override
334     public boolean nextKeyValue() throws IOException, InterruptedException {
335       return ++index < keys.length;
336     }
337     @Override
338     public K getCurrentKey() throws IOException, InterruptedException {
339       return keys[index];
340     }
341     @Override
342     public V getCurrentValue() throws IOException, InterruptedException {
343       return values[index];
344     }
345     @Override
346     public float getProgress() throws IOException, InterruptedException {
347       return (float)index / keys.length;
348     }
349     @Override
350     public void close() throws IOException {
351     }
352   }
353 
354   public static class ITBulkLoadInputFormat extends InputFormat<LongWritable, LongWritable> {
355     @Override
356     public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
357       int numSplits = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
358       ArrayList<InputSplit> ret = new ArrayList<InputSplit>(numSplits);
359       for (int i = 0; i < numSplits; ++i) {
360         ret.add(new EmptySplit());
361       }
362       return ret;
363     }
364 
365     @Override
366     public RecordReader<LongWritable, LongWritable> createRecordReader(InputSplit split,
367       TaskAttemptContext context)
368           throws IOException, InterruptedException {
369       int taskId = context.getTaskAttemptID().getTaskID().getId();
370       int numMapTasks = context.getConfiguration().getInt(NUM_MAPS_KEY, NUM_MAPS);
371       int numIterations = context.getConfiguration().getInt(NUM_IMPORT_ROUNDS_KEY, NUM_IMPORT_ROUNDS);
372       int iteration = context.getConfiguration().getInt(ROUND_NUM_KEY, 0);
373 
374       taskId = taskId + iteration * numMapTasks;
375       numMapTasks = numMapTasks * numIterations;
376 
377       long chainId = Math.abs(new Random().nextLong());
378       chainId = chainId - (chainId % numMapTasks) + taskId; // ensure that chainId is unique per task and across iterations
379       LongWritable[] keys = new LongWritable[] {new LongWritable(chainId)};
380 
381       return new FixedRecordReader<LongWritable, LongWritable>(keys, keys);
382     }
383   }
384 
385   /**
386    * Mapper that creates a linked list of KeyValues.
387    *
388    * Each map task generates one linked list.
389    * All lists start on row key 0L.
390    * All lists should be CHAIN_LENGTH long.
391    */
392   public static class LinkedListCreationMapper
393       extends Mapper<LongWritable, LongWritable, ImmutableBytesWritable, KeyValue> {
394 
395     private Random rand = new Random();
396 
397     @Override
398     protected void map(LongWritable key, LongWritable value, Context context)
399         throws IOException, InterruptedException {
400       long chainId = value.get();
401       LOG.info("Starting mapper with chainId:" + chainId);
402 
403       byte[] chainIdArray = Bytes.toBytes(chainId);
404       long currentRow = 0;
405 
406       long chainLength = context.getConfiguration().getLong(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
407       long nextRow = getNextRow(0, chainLength);
408 
409       for (long i = 0; i < chainLength; i++) {
410         byte[] rk = Bytes.toBytes(currentRow);
411 
412         // Next link in the chain.
413         KeyValue linkKv = new KeyValue(rk, CHAIN_FAM, chainIdArray, Bytes.toBytes(nextRow));
414         // What link in the chain this is.
415         KeyValue sortKv = new KeyValue(rk, SORT_FAM, chainIdArray, Bytes.toBytes(i));
416         // Added data so that large stores are created.
417         KeyValue dataKv = new KeyValue(rk, DATA_FAM, chainIdArray,
418           Bytes.toBytes(RandomStringUtils.randomAlphabetic(50))
419         );
420 
421         // Emit the key values.
422         context.write(new ImmutableBytesWritable(rk), linkKv);
423         context.write(new ImmutableBytesWritable(rk), sortKv);
424         context.write(new ImmutableBytesWritable(rk), dataKv);
425         // Move to the next row.
426         currentRow = nextRow;
427         nextRow = getNextRow(i+1, chainLength);
428       }
429     }
430 
431     /** Returns a unique row id within this chain for this index */
432     private long getNextRow(long index, long chainLength) {
433       long nextRow = Math.abs(rand.nextLong());
434       // use significant bits from the random number, but pad with index to ensure it is unique
435       // this also ensures that we do not reuse row = 0
436       // row collisions from multiple mappers are fine, since we guarantee unique chainIds
437       nextRow = nextRow - (nextRow % chainLength) + index;
438       return nextRow;
439     }
440   }
441 
442   /**
443    * Writable class used as the key to group links in the linked list.
444    *
445    * Used as the key emited from a pass over the table.
446    */
447   public static class LinkKey implements WritableComparable<LinkKey> {
448 
449     private Long chainId;
450 
451     public Long getOrder() {
452       return order;
453     }
454 
455     public Long getChainId() {
456       return chainId;
457     }
458 
459     private Long order;
460 
461     public LinkKey() {}
462 
463     public LinkKey(long chainId, long order) {
464       this.chainId = chainId;
465       this.order = order;
466     }
467 
468     @Override
469     public int compareTo(LinkKey linkKey) {
470       int res = getChainId().compareTo(linkKey.getChainId());
471       if (res == 0) {
472         res = getOrder().compareTo(linkKey.getOrder());
473       }
474       return res;
475     }
476 
477     @Override
478     public void write(DataOutput dataOutput) throws IOException {
479       WritableUtils.writeVLong(dataOutput, chainId);
480       WritableUtils.writeVLong(dataOutput, order);
481     }
482 
483     @Override
484     public void readFields(DataInput dataInput) throws IOException {
485       chainId = WritableUtils.readVLong(dataInput);
486       order = WritableUtils.readVLong(dataInput);
487     }
488   }
489 
490   /**
491    * Writable used as the value emitted from a pass over the hbase table.
492    */
493   public static class LinkChain implements WritableComparable<LinkChain> {
494 
495     public Long getNext() {
496       return next;
497     }
498 
499     public Long getRk() {
500       return rk;
501     }
502 
503     public LinkChain() {}
504 
505     public LinkChain(Long rk, Long next) {
506       this.rk = rk;
507       this.next = next;
508     }
509 
510     private Long rk;
511     private Long next;
512 
513     @Override
514     public int compareTo(LinkChain linkChain) {
515       int res = getRk().compareTo(linkChain.getRk());
516       if (res == 0) {
517         res = getNext().compareTo(linkChain.getNext());
518       }
519       return res;
520     }
521 
522     @Override
523     public void write(DataOutput dataOutput) throws IOException {
524       WritableUtils.writeVLong(dataOutput, rk);
525       WritableUtils.writeVLong(dataOutput, next);
526     }
527 
528     @Override
529     public void readFields(DataInput dataInput) throws IOException {
530       rk = WritableUtils.readVLong(dataInput);
531       next = WritableUtils.readVLong(dataInput);
532     }
533   }
534 
535   /**
536    * Class to figure out what partition to send a link in the chain to.  This is based upon
537    * the linkKey's ChainId.
538    */
539   public static class NaturalKeyPartitioner extends Partitioner<LinkKey, LinkChain> {
540     @Override
541     public int getPartition(LinkKey linkKey,
542                             LinkChain linkChain,
543                             int numPartitions) {
544       int hash = linkKey.getChainId().hashCode();
545       return Math.abs(hash % numPartitions);
546     }
547   }
548 
549   /**
550    * Comparator used to figure out if a linkKey should be grouped together.  This is based upon the
551    * linkKey's ChainId.
552    */
553   public static class NaturalKeyGroupingComparator extends WritableComparator {
554 
555     protected NaturalKeyGroupingComparator() {
556       super(LinkKey.class, true);
557     }
558 
559     @Override
560     public int compare(WritableComparable w1, WritableComparable w2) {
561       LinkKey k1 = (LinkKey) w1;
562       LinkKey k2 = (LinkKey) w2;
563 
564       return k1.getChainId().compareTo(k2.getChainId());
565     }
566   }
567 
568   /**
569    * Comparator used to order linkKeys so that they are passed to a reducer in order.  This is based
570    * upon linkKey ChainId and Order.
571    */
572   public static class CompositeKeyComparator extends WritableComparator {
573 
574     protected CompositeKeyComparator() {
575       super(LinkKey.class, true);
576     }
577 
578     @Override
579     public int compare(WritableComparable w1, WritableComparable w2) {
580       LinkKey k1 = (LinkKey) w1;
581       LinkKey k2 = (LinkKey) w2;
582 
583       return k1.compareTo(k2);
584     }
585   }
586 
587   /**
588    * Mapper to pass over the table.
589    *
590    * For every row there could be multiple chains that landed on this row. So emit a linkKey
591    * and value for each.
592    */
593   public static class LinkedListCheckingMapper extends TableMapper<LinkKey, LinkChain> {
594     @Override
595     protected void map(ImmutableBytesWritable key, Result value, Context context)
596         throws IOException, InterruptedException {
597       long longRk = Bytes.toLong(value.getRow());
598 
599       for (Map.Entry<byte[], byte[]> entry : value.getFamilyMap(CHAIN_FAM).entrySet()) {
600         long chainId = Bytes.toLong(entry.getKey());
601         long next = Bytes.toLong(entry.getValue());
602         Cell c = value.getColumnCells(SORT_FAM, entry.getKey()).get(0);
603         long order = Bytes.toLong(CellUtil.cloneValue(c));
604         context.write(new LinkKey(chainId, order), new LinkChain(longRk, next));
605       }
606     }
607   }
608 
609   /**
610    * Class that does the actual checking of the links.
611    *
612    * All links in the chain should be grouped and sorted when sent to this class.  Then the chain
613    * will be traversed making sure that no link is missing and that the chain is the correct length.
614    *
615    * This will throw an exception if anything is not correct.  That causes the job to fail if any
616    * data is corrupt.
617    */
618   public static class LinkedListCheckingReducer
619       extends Reducer<LinkKey, LinkChain, NullWritable, NullWritable> {
620     @Override
621     protected void reduce(LinkKey key, Iterable<LinkChain> values, Context context)
622         throws java.io.IOException, java.lang.InterruptedException {
623       long next = -1L;
624       long prev = -1L;
625       long count = 0L;
626 
627       for (LinkChain lc : values) {
628 
629         if (next == -1) {
630           if (lc.getRk() != 0L) {
631             String msg = "Chains should all start at rk 0, but read rk " + lc.getRk()
632                 + ". Chain:" + key.chainId + ", order:" + key.order;
633             logError(msg, context);
634             throw new RuntimeException(msg);
635           }
636           next = lc.getNext();
637         } else {
638           if (next != lc.getRk()) {
639             String msg = "Missing a link in the chain. Prev rk " + prev + " was, expecting "
640                 + next + " but got " + lc.getRk() + ". Chain:" + key.chainId
641                 + ", order:" + key.order;
642             logError(msg, context);
643             throw new RuntimeException(msg);
644           }
645           prev = lc.getRk();
646           next = lc.getNext();
647         }
648         count++;
649       }
650 
651       int expectedChainLen = context.getConfiguration().getInt(CHAIN_LENGTH_KEY, CHAIN_LENGTH);
652       if (count != expectedChainLen) {
653         String msg = "Chain wasn't the correct length.  Expected " + expectedChainLen + " got "
654             + count + ". Chain:" + key.chainId + ", order:" + key.order;
655         logError(msg, context);
656         throw new RuntimeException(msg);
657       }
658     }
659 
660     private static void logError(String msg, Context context) throws IOException {
661       TableName table = getTableName(context.getConfiguration());
662 
663       LOG.error("Failure in chain verification: " + msg);
664       try (Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
665           Admin admin = connection.getAdmin()) {
666         LOG.error("cluster status:\n" + admin.getClusterStatus());
667         LOG.error("table regions:\n"
668             + Joiner.on("\n").join(admin.getTableRegions(table)));
669       }
670     }
671   }
672 
673   private void runCheckWithRetry() throws IOException, ClassNotFoundException, InterruptedException {
674     try {
675       runCheck();
676     } catch (Throwable t) {
677       LOG.warn("Received " + StringUtils.stringifyException(t));
678       LOG.warn("Running the check MR Job again to see whether an ephemeral problem or not");
679       runCheck();
680       throw t; // we should still fail the test even if second retry succeeds
681     }
682     // everything green
683   }
684 
685 
686   /**
687    * After adding data to the table start a mr job to
688    * @throws IOException
689    * @throws ClassNotFoundException
690    * @throws InterruptedException
691    */
692   private void runCheck() throws IOException, ClassNotFoundException, InterruptedException {
693     LOG.info("Running check");
694     Configuration conf = getConf();
695     String jobName = getTablename() + "_check" + EnvironmentEdgeManager.currentTime();
696     Path p = util.getDataTestDirOnTestFS(jobName);
697 
698     Job job = new Job(conf);
699     job.setJarByClass(getClass());
700     job.setJobName(jobName);
701 
702     job.setPartitionerClass(NaturalKeyPartitioner.class);
703     job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
704     job.setSortComparatorClass(CompositeKeyComparator.class);
705 
706     Scan scan = new Scan();
707     scan.addFamily(CHAIN_FAM);
708     scan.addFamily(SORT_FAM);
709     scan.setMaxVersions(1);
710     scan.setCacheBlocks(false);
711     scan.setBatch(1000);
712 
713     int replicaCount = conf.getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
714     if (replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
715       scan.setConsistency(Consistency.TIMELINE);
716     }
717 
718     TableMapReduceUtil.initTableMapperJob(
719         getTablename().getName(),
720         scan,
721         LinkedListCheckingMapper.class,
722         LinkKey.class,
723         LinkChain.class,
724         job
725     );
726 
727     job.setReducerClass(LinkedListCheckingReducer.class);
728     job.setOutputKeyClass(NullWritable.class);
729     job.setOutputValueClass(NullWritable.class);
730 
731     FileOutputFormat.setOutputPath(job, p);
732 
733     assertEquals(true, job.waitForCompletion(true));
734 
735     // Delete the files.
736     util.getTestFileSystem().delete(p, true);
737   }
738 
739   @Override
740   public void setUpCluster() throws Exception {
741     util = getTestingUtil(getConf());
742     util.initializeCluster(1);
743     int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
744     if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
745       LOG.debug("Region Replicas enabled: " + replicaCount);
746     }
747 
748     // Scale this up on a real cluster
749     if (util.isDistributedCluster()) {
750       util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
751           Integer.toString(util.getHBaseAdmin().getClusterStatus().getServersSize() * 10)
752       );
753       util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
754     } else {
755       util.startMiniMapReduceCluster();
756     }
757   }
758 
759   @Override
760   protected void addOptions() {
761     super.addOptions();
762     super.addOptNoArg(OPT_CHECK, "Run check only");
763     super.addOptNoArg(OPT_LOAD, "Run load only");
764   }
765 
766   @Override
767   protected void processOptions(CommandLine cmd) {
768     super.processOptions(cmd);
769     check = cmd.hasOption(OPT_CHECK);
770     load = cmd.hasOption(OPT_LOAD);
771   }
772 
773   @Override
774   public int runTestFromCommandLine() throws Exception {
775     if (load) {
776       runLoad();
777     } else if (check) {
778       installSlowingCoproc();
779       runCheckWithRetry();
780     } else {
781       testBulkLoad();
782     }
783     return 0;
784   }
785 
786   @Override
787   public TableName getTablename() {
788     return getTableName(getConf());
789   }
790 
791   public static TableName getTableName(Configuration conf) {
792     return TableName.valueOf(conf.get(TABLE_NAME_KEY, TABLE_NAME));
793   }
794 
795   @Override
796   protected Set<String> getColumnFamilies() {
797     return null;
798   }
799 
800   public static void main(String[] args) throws Exception {
801     Configuration conf = HBaseConfiguration.create();
802     IntegrationTestingUtility.setUseDistributedCluster(conf);
803     int status =  ToolRunner.run(conf, new IntegrationTestBulkLoad(), args);
804     System.exit(status);
805   }
806 }