View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.test;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.security.SecureRandom;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.SortedSet;
34  import java.util.TreeSet;
35  import java.util.UUID;
36  import java.util.concurrent.ThreadLocalRandom;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.apache.commons.cli.CommandLine;
40  import org.apache.commons.cli.GnuParser;
41  import org.apache.commons.cli.HelpFormatter;
42  import org.apache.commons.cli.Options;
43  import org.apache.commons.cli.ParseException;
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.conf.Configured;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.LocatedFileStatus;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.fs.RemoteIterator;
52  import org.apache.hadoop.hbase.Cell;
53  import org.apache.hadoop.hbase.HBaseConfiguration;
54  import org.apache.hadoop.hbase.HBaseTestingUtility;
55  import org.apache.hadoop.hbase.HColumnDescriptor;
56  import org.apache.hadoop.hbase.HConstants;
57  import org.apache.hadoop.hbase.HRegionLocation;
58  import org.apache.hadoop.hbase.HTableDescriptor;
59  import org.apache.hadoop.hbase.client.HTable;
60  import org.apache.hadoop.hbase.client.Table;
61  import org.apache.hadoop.hbase.IntegrationTestBase;
62  import org.apache.hadoop.hbase.IntegrationTestingUtility;
63  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
64  import org.apache.hadoop.hbase.fs.HFileSystem;
65  import org.apache.hadoop.hbase.MasterNotRunningException;
66  import org.apache.hadoop.hbase.TableName;
67  import org.apache.hadoop.hbase.client.Admin;
68  import org.apache.hadoop.hbase.client.BufferedMutator;
69  import org.apache.hadoop.hbase.client.BufferedMutatorParams;
70  import org.apache.hadoop.hbase.client.Connection;
71  import org.apache.hadoop.hbase.client.ConnectionFactory;
72  import org.apache.hadoop.hbase.client.Get;
73  import org.apache.hadoop.hbase.client.Mutation;
74  import org.apache.hadoop.hbase.client.Put;
75  import org.apache.hadoop.hbase.client.RegionLocator;
76  import org.apache.hadoop.hbase.client.Result;
77  import org.apache.hadoop.hbase.client.ResultScanner;
78  import org.apache.hadoop.hbase.client.Scan;
79  import org.apache.hadoop.hbase.client.ScannerCallable;
80  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
81  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
82  import org.apache.hadoop.hbase.mapreduce.TableMapper;
83  import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
84  import org.apache.hadoop.hbase.mapreduce.WALPlayer;
85  import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
86  import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
87  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
88  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
89  import org.apache.hadoop.hbase.util.Bytes;
90  import org.apache.hadoop.hbase.util.RegionSplitter;
91  import org.apache.hadoop.hbase.wal.WALKey;
92  import org.apache.hadoop.io.BytesWritable;
93  import org.apache.hadoop.io.NullWritable;
94  import org.apache.hadoop.io.Writable;
95  import org.apache.hadoop.mapreduce.Counter;
96  import org.apache.hadoop.mapreduce.CounterGroup;
97  import org.apache.hadoop.mapreduce.Counters;
98  import org.apache.hadoop.mapreduce.InputFormat;
99  import org.apache.hadoop.mapreduce.InputSplit;
100 import org.apache.hadoop.mapreduce.Job;
101 import org.apache.hadoop.mapreduce.JobContext;
102 import org.apache.hadoop.mapreduce.Mapper;
103 import org.apache.hadoop.mapreduce.RecordReader;
104 import org.apache.hadoop.mapreduce.Reducer;
105 import org.apache.hadoop.mapreduce.TaskAttemptContext;
106 import org.apache.hadoop.mapreduce.TaskAttemptID;
107 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
108 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
109 import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
110 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
111 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
112 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
113 import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
114 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
115 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
116 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
117 import org.apache.hadoop.util.Tool;
118 import org.apache.hadoop.util.ToolRunner;
119 import org.junit.Test;
120 import org.junit.experimental.categories.Category;
121 
122 import com.google.common.collect.Sets;
123 
124 /**
125  * This is an integration test borrowed from goraci, written by Keith Turner,
126  * which is in turn inspired by the Accumulo test called continous ingest (ci).
127  * The original source code can be found here:
128  * https://github.com/keith-turner/goraci
129  * https://github.com/enis/goraci/
130  *
131  * Apache Accumulo [0] has a simple test suite that verifies that data is not
132  * lost at scale. This test suite is called continuous ingest. This test runs
133  * many ingest clients that continually create linked lists containing 25
134  * million nodes. At some point the clients are stopped and a map reduce job is
135  * run to ensure no linked list has a hole. A hole indicates data was lost.··
136  *
137  * The nodes in the linked list are random. This causes each linked list to
138  * spread across the table. Therefore if one part of a table loses data, then it
139  * will be detected by references in another part of the table.
140  *
141  * THE ANATOMY OF THE TEST
142  *
143  * Below is rough sketch of how data is written. For specific details look at
144  * the Generator code.
145  *
146  * 1 Write out 1 million nodes· 2 Flush the client· 3 Write out 1 million that
147  * reference previous million· 4 If this is the 25th set of 1 million nodes,
148  * then update 1st set of million to point to last· 5 goto 1
149  *
150  * The key is that nodes only reference flushed nodes. Therefore a node should
151  * never reference a missing node, even if the ingest client is killed at any
152  * point in time.
153  *
154  * When running this test suite w/ Accumulo there is a script running in
155  * parallel called the Aggitator that randomly and continuously kills server
156  * processes.·· The outcome was that many data loss bugs were found in Accumulo
157  * by doing this.· This test suite can also help find bugs that impact uptime
158  * and stability when· run for days or weeks.··
159  *
160  * This test suite consists the following· - a few Java programs· - a little
161  * helper script to run the java programs - a maven script to build it.··
162  *
163  * When generating data, its best to have each map task generate a multiple of
164  * 25 million. The reason for this is that circular linked list are generated
165  * every 25M. Not generating a multiple in 25M will result in some nodes in the
166  * linked list not having references. The loss of an unreferenced node can not
167  * be detected.
168  *
169  *
170  * Below is a description of the Java programs
171  *
172  * Generator - A map only job that generates data. As stated previously,·
173  * its best to generate data in multiples of 25M.
174  *
175  * Verify - A map reduce job that looks for holes. Look at the counts after running. REFERENCED and
176  * UNREFERENCED are· ok, any UNDEFINED counts are bad. Do not run at the· same
177  * time as the Generator.
178  *
179  * Walker - A standalone program that start following a linked list· and emits timing info.··
180  *
181  * Print - A standalone program that prints nodes in the linked list
182  *
183  * Delete - A standalone program that deletes a single node
184  *
185  * This class can be run as a unit test, as an integration test, or from the command line
186  */
187 @Category(IntegrationTests.class)
188 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
189   protected static final byte[] NO_KEY = new byte[1];
190 
191   protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
192 
193   protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
194 
195   protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
196   private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
197   private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
198 
199   //link to the id of the prev node in the linked list
200   protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
201 
202   //identifier of the mapred task that generated this row
203   protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
204 
205   //the id of the row within the same client.
206   protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
207 
208   /** How many rows to write per map task. This has to be a multiple of 25M */
209   private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
210     = "IntegrationTestBigLinkedList.generator.num_rows";
211 
212   private static final String GENERATOR_NUM_MAPPERS_KEY
213     = "IntegrationTestBigLinkedList.generator.map.tasks";
214 
215   private static final String GENERATOR_WIDTH_KEY
216     = "IntegrationTestBigLinkedList.generator.width";
217 
218   private static final String GENERATOR_WRAP_KEY
219     = "IntegrationTestBigLinkedList.generator.wrap";
220 
221   protected int NUM_SLAVES_BASE = 3; // number of slaves for the cluster
222 
223   private static final int MISSING_ROWS_TO_LOG = 10; // YARN complains when too many counters
224 
225   private static final int WIDTH_DEFAULT = 1000000;
226   private static final int WRAP_DEFAULT = 25;
227   private static final int ROWKEY_LENGTH = 16;
228 
229   protected String toRun;
230   protected String[] otherArgs;
231 
232   static class CINode {
233     byte[] key;
234     byte[] prev;
235     String client;
236     long count;
237   }
238 
239   /**
240    * A Map only job that generates random linked list and stores them.
241    */
242   static class Generator extends Configured implements Tool {
243 
244     private static final Log LOG = LogFactory.getLog(Generator.class);
245 
246     /**
247      * Set this configuration if you want to test single-column family flush works. If set, we will
248      * add a big column family and a small column family on either side of the usual ITBLL 'meta'
249      * column family. When we write out the ITBLL, we will also add to the big column family a value
250      * bigger than that for ITBLL and for small, something way smaller. The idea is that when
251      * flush-by-column family rather than by region is enabled, we can see if ITBLL is broke in any
252      * way. Here is how you would pass it:
253      * <p>
254      * $ ./bin/hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList
255      * -Dgenerator.multiple.columnfamilies=true generator 1 10 g
256      */
257     public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
258         "generator.multiple.columnfamilies";
259 
260     static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
261       static class GeneratorInputSplit extends InputSplit implements Writable {
262         @Override
263         public long getLength() throws IOException, InterruptedException {
264           return 1;
265         }
266         @Override
267         public String[] getLocations() throws IOException, InterruptedException {
268           return new String[0];
269         }
270         @Override
271         public void readFields(DataInput arg0) throws IOException {
272         }
273         @Override
274         public void write(DataOutput arg0) throws IOException {
275         }
276       }
277 
278       static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
279         private long count;
280         private long numNodes;
281         private Random rand;
282 
283         @Override
284         public void close() throws IOException {
285         }
286 
287         @Override
288         public BytesWritable getCurrentKey() throws IOException, InterruptedException {
289           byte[] bytes = new byte[ROWKEY_LENGTH];
290           rand.nextBytes(bytes);
291           return new BytesWritable(bytes);
292         }
293 
294         @Override
295         public NullWritable getCurrentValue() throws IOException, InterruptedException {
296           return NullWritable.get();
297         }
298 
299         @Override
300         public float getProgress() throws IOException, InterruptedException {
301           return (float)(count / (double)numNodes);
302         }
303 
304         @Override
305         public void initialize(InputSplit arg0, TaskAttemptContext context)
306             throws IOException, InterruptedException {
307           numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
308           // Use SecureRandom to avoid issue described in HBASE-13382.
309           rand = new SecureRandom();
310         }
311 
312         @Override
313         public boolean nextKeyValue() throws IOException, InterruptedException {
314           return count++ < numNodes;
315         }
316 
317       }
318 
319       @Override
320       public RecordReader<BytesWritable,NullWritable> createRecordReader(
321           InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
322         GeneratorRecordReader rr = new GeneratorRecordReader();
323         rr.initialize(split, context);
324         return rr;
325       }
326 
327       @Override
328       public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
329         int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
330 
331         ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
332 
333         for (int i = 0; i < numMappers; i++) {
334           splits.add(new GeneratorInputSplit());
335         }
336 
337         return splits;
338       }
339     }
340 
341     /** Ensure output files from prev-job go to map inputs for current job */
342     static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
343       @Override
344       protected boolean isSplitable(JobContext context, Path filename) {
345         return false;
346       }
347     }
348 
349     /**
350      * Some ASCII art time:
351      * <p>
352      * [ . . . ] represents one batch of random longs of length WIDTH
353      * <pre>
354      *                _________________________
355      *               |                  ______ |
356      *               |                 |      ||
357      *             .-+-----------------+-----.||
358      *             | |                 |     |||
359      * first   = [ . . . . . . . . . . . ]   |||
360      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
361      *             | | | | | | | | | | |     |||
362      * prev    = [ . . . . . . . . . . . ]   |||
363      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^     |||
364      *             | | | | | | | | | | |     |||
365      * current = [ . . . . . . . . . . . ]   |||
366      *                                       |||
367      * ...                                   |||
368      *                                       |||
369      * last    = [ . . . . . . . . . . . ]   |||
370      *             ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^_____|||
371      *             |                 |________||
372      *             |___________________________|
373      * </pre>
374      */
375     static class GeneratorMapper
376       extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
377 
378       byte[][] first = null;
379       byte[][] prev = null;
380       byte[][] current = null;
381       byte[] id;
382       long count = 0;
383       int i;
384       BufferedMutator mutator;
385       Connection connection;
386       long numNodes;
387       long wrap;
388       int width;
389       boolean multipleUnevenColumnFamilies;
390       byte[] tinyValue = new byte[] { 't' };
391       byte[] bigValue = null;
392 
393       @Override
394       protected void setup(Context context) throws IOException, InterruptedException {
395         id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
396         Configuration conf = context.getConfiguration();
397         connection = ConnectionFactory.createConnection(conf);
398         instantiateHTable();
399         this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
400         current = new byte[this.width][];
401         int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
402         this.wrap = (long)wrapMultiplier * width;
403         this.numNodes = context.getConfiguration().getLong(
404             GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
405         if (this.numNodes < this.wrap) {
406           this.wrap = this.numNodes;
407         }
408         this.multipleUnevenColumnFamilies =
409             context.getConfiguration().getBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, false);
410       }
411 
412       protected void instantiateHTable() throws IOException {
413         mutator = connection.getBufferedMutator(
414             new BufferedMutatorParams(getTableName(connection.getConfiguration()))
415                 .writeBufferSize(4 * 1024 * 1024));
416       }
417 
418       @Override
419       protected void cleanup(Context context) throws IOException ,InterruptedException {
420         mutator.close();
421         connection.close();
422       }
423 
424       @Override
425       protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
426         current[i] = new byte[key.getLength()];
427         System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
428         if (++i == current.length) {
429           LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
430             Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
431             ", i=" + i);
432           persist(output, count, prev, current, id);
433           i = 0;
434 
435           if (first == null) {
436             first = current;
437           }
438           prev = current;
439           current = new byte[this.width][];
440 
441           count += current.length;
442           output.setStatus("Count " + count);
443 
444           if (count % wrap == 0) {
445             // this block of code turns the 1 million linked list of length 25 into one giant
446             //circular linked list of 25 million
447             circularLeftShift(first);
448 
449             persist(output, -1, prev, first, null);
450 
451             first = null;
452             prev = null;
453           }
454         }
455       }
456 
457       private static <T> void circularLeftShift(T[] first) {
458         T ez = first[0];
459         System.arraycopy(first, 1, first, 0, first.length - 1);
460         first[first.length - 1] = ez;
461       }
462 
463       protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
464           throws IOException {
465         for (int i = 0; i < current.length; i++) {
466           Put put = new Put(current[i]);
467           put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
468 
469           if (count >= 0) {
470             put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
471           }
472           if (id != null) {
473             put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
474           }
475           // See if we are to write multiple columns.
476           if (this.multipleUnevenColumnFamilies) {
477             // Use any column name.
478             put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
479             // If we've not allocated bigValue, do it now. Reuse same value each time.
480             if (this.bigValue == null) {
481               this.bigValue = new byte[current[i].length * 10];
482               ThreadLocalRandom.current().nextBytes(this.bigValue);
483             }
484             // Use any column name.
485             put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
486           }
487           mutator.mutate(put);
488 
489           if (i % 1000 == 0) {
490             // Tickle progress every so often else maprunner will think us hung
491             output.progress();
492           }
493         }
494 
495         mutator.flush();
496       }
497     }
498 
499     @Override
500     public int run(String[] args) throws Exception {
501       if (args.length < 3) {
502         System.out.println("Usage : " + Generator.class.getSimpleName() +
503             " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
504         System.out.println("   where <num nodes per map> should be a multiple of " +
505             " width*wrap multiplier, 25M by default");
506         return 0;
507       }
508 
509       int numMappers = Integer.parseInt(args[0]);
510       long numNodes = Long.parseLong(args[1]);
511       Path tmpOutput = new Path(args[2]);
512       Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
513       Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
514       return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
515     }
516 
517     protected void createSchema() throws IOException {
518       Configuration conf = getConf();
519       TableName tableName = getTableName(conf);
520       try (Connection conn = ConnectionFactory.createConnection(conf);
521           Admin admin = conn.getAdmin()) {
522         if (!admin.tableExists(tableName)) {
523           HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
524           htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
525           // Always add these families. Just skip writing to them when we do not test per CF flush.
526           htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
527           htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
528           // if -DuseMob=true force all data through mob path.
529           if (conf.getBoolean("useMob", false)) {
530             for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
531               hcd.setMobEnabled(true);
532               hcd.setMobThreshold(4);
533             }
534           }
535           int numberOfServers = admin.getClusterStatus().getServers().size();
536           if (numberOfServers == 0) {
537             throw new IllegalStateException("No live regionservers");
538           }
539           int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
540                                 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
541           int totalNumberOfRegions = numberOfServers * regionsPerServer;
542           LOG.info("Number of live regionservers: " + numberOfServers + ", " +
543               "pre-splitting table into " + totalNumberOfRegions + " regions " +
544               "(default regions per server: " + regionsPerServer + ")");
545 
546           byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
547 
548           admin.createTable(htd, splits);
549         }
550       } catch (MasterNotRunningException e) {
551         LOG.error("Master not running", e);
552         throw new IOException(e);
553       }
554     }
555 
556     public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
557         Integer width, Integer wrapMuplitplier) throws Exception {
558       LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
559           + ", numNodes=" + numNodes);
560       Job job = Job.getInstance(getConf());
561 
562       job.setJobName("Random Input Generator");
563       job.setNumReduceTasks(0);
564       job.setJarByClass(getClass());
565 
566       job.setInputFormatClass(GeneratorInputFormat.class);
567       job.setOutputKeyClass(BytesWritable.class);
568       job.setOutputValueClass(NullWritable.class);
569 
570       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
571 
572       job.setMapperClass(Mapper.class); //identity mapper
573 
574       FileOutputFormat.setOutputPath(job, tmpOutput);
575       job.setOutputFormatClass(SequenceFileOutputFormat.class);
576 
577       boolean success = jobCompletion(job);
578 
579       return success ? 0 : 1;
580     }
581 
582     public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
583         Integer width, Integer wrapMuplitplier) throws Exception {
584       LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
585       createSchema();
586       Job job = Job.getInstance(getConf());
587 
588       job.setJobName("Link Generator");
589       job.setNumReduceTasks(0);
590       job.setJarByClass(getClass());
591 
592       FileInputFormat.setInputPaths(job, tmpOutput);
593       job.setInputFormatClass(OneFilePerMapperSFIF.class);
594       job.setOutputKeyClass(NullWritable.class);
595       job.setOutputValueClass(NullWritable.class);
596 
597       setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
598 
599       setMapperForGenerator(job);
600 
601       job.setOutputFormatClass(NullOutputFormat.class);
602 
603       job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
604       String multipleUnevenColumnFamiliesStr = System.getProperty(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY);
605       if (multipleUnevenColumnFamiliesStr != null) {
606         job.getConfiguration().setBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
607           Boolean.parseBoolean(multipleUnevenColumnFamiliesStr));
608       }
609       TableMapReduceUtil.addDependencyJars(job);
610       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
611       TableMapReduceUtil.initCredentials(job);
612 
613       boolean success = jobCompletion(job);
614 
615       return success ? 0 : 1;
616     }
617 
618     protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
619         ClassNotFoundException {
620       boolean success = job.waitForCompletion(true);
621       return success;
622     }
623 
624     protected void setMapperForGenerator(Job job) {
625       job.setMapperClass(GeneratorMapper.class);
626     }
627 
628     public int run(int numMappers, long numNodes, Path tmpOutput,
629         Integer width, Integer wrapMuplitplier) throws Exception {
630       int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
631       if (ret > 0) {
632         return ret;
633       }
634       return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
635     }
636   }
637 
638   /**
639    * Tool to search missing rows in WALs and hfiles.
640    * Pass in file or dir of keys to search for. Key file must have been written by Verify step
641    * (we depend on the format it writes out. We'll read them in and then search in hbase
642    * WALs and oldWALs dirs (Some of this is TODO).
643    */
644   static class Search extends Configured implements Tool {
645     private static final Log LOG = LogFactory.getLog(Search.class);
646     protected Job job;
647 
648     private static void printUsage(final String error) {
649       if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
650       System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
651     }
652 
653     @Override
654     public int run(String[] args) throws Exception {
655       if (args.length < 1 || args.length > 2) {
656         printUsage(null);
657         return 1;
658       }
659       Path inputDir = new Path(args[0]);
660       int numMappers = 1;
661       if (args.length > 1) {
662         numMappers = Integer.parseInt(args[1]);
663       }
664       return run(inputDir, numMappers);
665     }
666 
667     /**
668      * WALPlayer override that searches for keys loaded in the setup.
669      */
670     public static class WALSearcher extends WALPlayer {
671       public WALSearcher(Configuration conf) {
672         super(conf);
673       }
674 
675       /**
676        * The actual searcher mapper.
677        */
678       public static class WALMapperSearcher extends WALMapper {
679         private SortedSet<byte []> keysToFind;
680         private AtomicInteger rows = new AtomicInteger(0);
681 
682         @Override
683         public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
684             throws IOException {
685           super.setup(context);
686           try {
687             this.keysToFind = readKeysToSearch(context.getConfiguration());
688             LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
689           } catch (InterruptedException e) {
690             throw new InterruptedIOException(e.toString());
691           }
692         }
693 
694         @Override
695         protected boolean filter(Context context, Cell cell) {
696           // TODO: Can I do a better compare than this copying out key?
697           byte [] row = new byte [cell.getRowLength()];
698           System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
699           boolean b = this.keysToFind.contains(row);
700           if (b) {
701             String keyStr = Bytes.toStringBinary(row);
702             try {
703               LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
704             } catch (IOException|InterruptedException e) {
705               LOG.warn(e);
706             }
707             if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
708               context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
709             }
710             context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
711           }
712           return b;
713         }
714       }
715 
716       // Put in place the above WALMapperSearcher.
717       @Override
718       public Job createSubmittableJob(String[] args) throws IOException {
719         Job job = super.createSubmittableJob(args);
720         // Call my class instead.
721         job.setJarByClass(WALMapperSearcher.class);
722         job.setMapperClass(WALMapperSearcher.class);
723         job.setOutputFormatClass(NullOutputFormat.class);
724         return job;
725       }
726     }
727 
728     static final String FOUND_GROUP_KEY = "Found";
729     static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
730 
731     public int run(Path inputDir, int numMappers) throws Exception {
732       getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
733       SortedSet<byte []> keys = readKeysToSearch(getConf());
734       if (keys.isEmpty()) throw new RuntimeException("No keys to find");
735       LOG.info("Count of keys to find: " + keys.size());
736       for(byte [] key: keys)  LOG.info("Key: " + Bytes.toStringBinary(key));
737       Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
738       // Now read all WALs. In two dirs. Presumes certain layout.
739       Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
740       Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
741       LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
742         " against " + getConf().get(HConstants.HBASE_DIR));
743       int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
744       if (ret != 0) return ret;
745       return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
746     }
747 
748     static SortedSet<byte []> readKeysToSearch(final Configuration conf)
749     throws IOException, InterruptedException {
750       Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
751       FileSystem fs = FileSystem.get(conf);
752       SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
753       if (!fs.exists(keysInputDir)) {
754         throw new FileNotFoundException(keysInputDir.toString());
755       }
756       if (!fs.isDirectory(keysInputDir)) {
757         throw new UnsupportedOperationException("TODO");
758       } else {
759         RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
760         while(iterator.hasNext()) {
761           LocatedFileStatus keyFileStatus = iterator.next();
762           // Skip "_SUCCESS" file.
763           if (keyFileStatus.getPath().getName().startsWith("_")) continue;
764           result.addAll(readFileToSearch(conf, fs, keyFileStatus));
765         }
766       }
767       return result;
768     }
769 
770     private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
771         final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
772         InterruptedException {
773       SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
774       // Return entries that are flagged Counts.UNDEFINED in the value. Return the row. This is
775       // what is missing.
776       TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
777       try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
778           new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
779         InputSplit is =
780           new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
781         rr.initialize(is, context);
782         while (rr.nextKeyValue()) {
783           rr.getCurrentKey();
784           BytesWritable bw = rr.getCurrentValue();
785           if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
786             byte[] key = new byte[rr.getCurrentKey().getLength()];
787             System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
788                 .getLength());
789             result.add(key);
790           }
791         }
792       }
793       return result;
794     }
795   }
796 
797   /**
798    * A Map Reduce job that verifies that the linked lists generated by
799    * {@link Generator} do not have any holes.
800    */
801   static class Verify extends Configured implements Tool {
802 
803     private static final Log LOG = LogFactory.getLog(Verify.class);
804     protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
805     protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
806 
807     protected Job job;
808 
809     public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
810       private BytesWritable row = new BytesWritable();
811       private BytesWritable ref = new BytesWritable();
812 
813       private boolean multipleUnevenColumnFamilies;
814 
815       @Override
816       protected void setup(
817           Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
818           throws IOException, InterruptedException {
819         this.multipleUnevenColumnFamilies =
820             context.getConfiguration().getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
821               false);
822       }
823 
824       @Override
825       protected void map(ImmutableBytesWritable key, Result value, Context context)
826           throws IOException ,InterruptedException {
827         byte[] rowKey = key.get();
828         row.set(rowKey, 0, rowKey.length);
829         if (multipleUnevenColumnFamilies
830             && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
831               TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
832           context.write(row, DEF_LOST_FAMILIES);
833         } else {
834           context.write(row, DEF);
835         }
836         byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
837         if (prev != null && prev.length > 0) {
838           ref.set(prev, 0, prev.length);
839           context.write(ref, row);
840         } else {
841           LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
842         }
843       }
844     }
845 
846     /**
847      * Don't change the order of these enums. Their ordinals are used as type flag when we emit
848      * problems found from the reducer.
849      */
850     public static enum Counts {
851       UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
852       LOST_FAMILIES
853     }
854 
855     /**
856      * Per reducer, we output problem rows as byte arrasy so can be used as input for
857      * subsequent investigative mapreduce jobs. Each emitted value is prefaced by a one byte flag
858      * saying what sort of emission it is. Flag is the Count enum ordinal as a short.
859      */
860     public static class VerifyReducer extends
861         Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
862       private ArrayList<byte[]> refs = new ArrayList<byte[]>();
863       private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
864         Counts.UNREFERENCED.ordinal(), new byte[] {}));
865       private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
866         Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
867 
868       private AtomicInteger rows = new AtomicInteger(0);
869       private Connection connection;
870 
871       @Override
872       protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
873       throws IOException, InterruptedException {
874         super.setup(context);
875         this.connection = ConnectionFactory.createConnection(context.getConfiguration());
876       }
877 
878       @Override
879       protected void cleanup(
880           Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
881           throws IOException, InterruptedException {
882         if (this.connection != null) {
883           this.connection.close();
884         }
885         super.cleanup(context);
886       }
887 
888       /**
889        * @param ordinal
890        * @param r
891        * @return Return new byte array that has <code>ordinal</code> as prefix on front taking up
892        * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
893        */
894       public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
895         byte[] prefix = Bytes.toBytes((short)ordinal);
896         if (prefix.length != Bytes.SIZEOF_SHORT) {
897           throw new RuntimeException("Unexpected size: " + prefix.length);
898         }
899         byte[] result = new byte[prefix.length + r.length];
900         System.arraycopy(prefix, 0, result, 0, prefix.length);
901         System.arraycopy(r, 0, result, prefix.length, r.length);
902         return result;
903       }
904 
905       /**
906        * @param bs
907        * @return Type from the Counts enum of this row. Reads prefix added by
908        * {@link #addPrefixFlag(int, byte[])}
909        */
910       public static Counts whichType(final byte [] bs) {
911         int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
912         return Counts.values()[ordinal];
913       }
914 
915       /**
916        * @param bw
917        * @return Row bytes minus the type flag.
918        */
919       public static byte[] getRowOnly(BytesWritable bw) {
920         byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
921         System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
922         return bytes;
923       }
924 
925       @Override
926       public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
927           throws IOException, InterruptedException {
928         int defCount = 0;
929         boolean lostFamilies = false;
930         refs.clear();
931         for (BytesWritable type : values) {
932           if (type.getLength() == DEF.getLength()) {
933             defCount++;
934             if (type.getBytes()[0] == 1) {
935               lostFamilies = true;
936             }
937           } else {
938             byte[] bytes = new byte[type.getLength()];
939             System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
940             refs.add(bytes);
941           }
942         }
943 
944         // TODO check for more than one def, should not happen
945         StringBuilder refsSb = null;
946         String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
947         if (defCount == 0 || refs.size() != 1) {
948           refsSb = dumpExtraInfoOnRefs(key, context, refs);
949           LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
950             (refsSb != null? refsSb.toString(): ""));
951         }
952         if (lostFamilies) {
953           LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
954           context.getCounter(Counts.LOST_FAMILIES).increment(1);
955           context.write(key, LOSTFAM);
956         }
957 
958         if (defCount == 0 && refs.size() > 0) {
959           // This is bad, found a node that is referenced but not defined. It must have been
960           // lost, emit some info about this node for debugging purposes.
961           // Write out a line per reference. If more than one, flag it.;
962           for (int i = 0; i < refs.size(); i++) {
963             byte[] bs = refs.get(i);
964             int ordinal;
965             if (i <= 0) {
966               ordinal = Counts.UNDEFINED.ordinal();
967               context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
968               context.getCounter(Counts.UNDEFINED).increment(1);
969             } else {
970               ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
971               context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
972             }
973           }
974           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
975             // Print out missing row; doing get on reference gives info on when the referencer
976             // was added which can help a little debugging. This info is only available in mapper
977             // output -- the 'Linked List error Key...' log message above. What we emit here is
978             // useless for debugging.
979             context.getCounter("undef", keyString).increment(1);
980           }
981         } else if (defCount > 0 && refs.size() == 0) {
982           // node is defined but not referenced
983           context.write(key, UNREF);
984           context.getCounter(Counts.UNREFERENCED).increment(1);
985           if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
986             context.getCounter("unref", keyString).increment(1);
987           }
988         } else {
989           if (refs.size() > 1) {
990             // Skip first reference.
991             for (int i = 1; i < refs.size(); i++) {
992               context.write(key,
993                 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
994             }
995             context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
996           }
997           // node is defined and referenced
998           context.getCounter(Counts.REFERENCED).increment(1);
999         }
1000       }
1001 
1002       /**
1003        * Dump out extra info around references if there are any. Helps debugging.
1004        * @return StringBuilder filled with references if any.
1005        * @throws IOException
1006        */
1007       private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1008           final List<byte []> refs)
1009       throws IOException {
1010         StringBuilder refsSb = null;
1011         if (refs.isEmpty()) return refsSb;
1012         refsSb = new StringBuilder();
1013         String comma = "";
1014         // If a row is a reference but has no define, print the content of the row that has
1015         // this row as a 'prev'; it will help debug.  The missing row was written just before
1016         // the row we are dumping out here.
1017         TableName tn = getTableName(context.getConfiguration());
1018         try (Table t = this.connection.getTable(tn)) {
1019           for (byte [] ref : refs) {
1020             Result r = t.get(new Get(ref));
1021             List<Cell> cells = r.listCells();
1022             String ts = (cells != null && !cells.isEmpty())?
1023                 new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1024             byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1025             String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1026             b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1027             long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1028             b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1029             String refRegionLocation = "";
1030             String keyRegionLocation = "";
1031             if (b != null && b.length > 0) {
1032               try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1033                 HRegionLocation hrl = rl.getRegionLocation(b);
1034                 if (hrl != null) refRegionLocation = hrl.toString();
1035                 // Key here probably has trailing zeros on it.
1036                 hrl = rl.getRegionLocation(key.getBytes());
1037                 if (hrl != null) keyRegionLocation = hrl.toString();
1038               }
1039             }
1040             LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1041               ", refPrevEqualsKey=" +
1042                 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1043                 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1044                 ", ref row date=" + ts + ", jobStr=" + jobStr +
1045                 ", ref row count=" + count +
1046                 ", ref row regionLocation=" + refRegionLocation +
1047                 ", key row regionLocation=" + keyRegionLocation);
1048             refsSb.append(comma);
1049             comma = ",";
1050             refsSb.append(Bytes.toStringBinary(ref));
1051           }
1052         }
1053         return refsSb;
1054       }
1055     }
1056 
1057     @Override
1058     public int run(String[] args) throws Exception {
1059       if (args.length != 2) {
1060         System.out.println("Usage : " + Verify.class.getSimpleName()
1061             + " <output dir> <num reducers>");
1062         return 0;
1063       }
1064 
1065       String outputDir = args[0];
1066       int numReducers = Integer.parseInt(args[1]);
1067 
1068       return run(outputDir, numReducers);
1069     }
1070 
1071     public int run(String outputDir, int numReducers) throws Exception {
1072       return run(new Path(outputDir), numReducers);
1073     }
1074 
1075     public int run(Path outputDir, int numReducers) throws Exception {
1076       LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1077 
1078       job = Job.getInstance(getConf());
1079 
1080       job.setJobName("Link Verifier");
1081       job.setNumReduceTasks(numReducers);
1082       job.setJarByClass(getClass());
1083 
1084       setJobScannerConf(job);
1085 
1086       Scan scan = new Scan();
1087       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1088       scan.setCaching(10000);
1089       scan.setCacheBlocks(false);
1090       if (isMultiUnevenColumnFamilies()) {
1091         scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1092         scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1093         job.getConfiguration().setBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
1094       }
1095 
1096       TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1097           VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1098       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
1099 
1100       job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1101 
1102       job.setReducerClass(VerifyReducer.class);
1103       job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1104       job.setOutputKeyClass(BytesWritable.class);
1105       job.setOutputValueClass(BytesWritable.class);
1106       TextOutputFormat.setOutputPath(job, outputDir);
1107 
1108       boolean success = job.waitForCompletion(true);
1109 
1110       if (success) {
1111         Counters counters = job.getCounters();
1112         if (null == counters) {
1113           LOG.warn("Counters were null, cannot verify Job completion."
1114               + " This is commonly a result of insufficient YARN configuration.");
1115           // We don't have access to the counters to know if we have "bad" counts
1116           return 0;
1117         }
1118 
1119         // If we find no unexpected values, the job didn't outright fail
1120         if (verifyUnexpectedValues(counters)) {
1121           // We didn't check referenced+unreferenced counts, leave that to visual inspection
1122           return 0;
1123         }
1124       }
1125 
1126       // We failed
1127       return 1;
1128     }
1129 
1130     public boolean verify(long expectedReferenced) throws Exception {
1131       if (job == null) {
1132         throw new IllegalStateException("You should call run() first");
1133       }
1134 
1135       Counters counters = job.getCounters();
1136       if (counters == null) {
1137         LOG.info("Counters object was null, write verification cannot be performed."
1138               + " This is commonly a result of insufficient YARN configuration.");
1139         return false;
1140       }
1141 
1142       // Run through each check, even if we fail one early
1143       boolean success = verifyExpectedValues(expectedReferenced, counters);
1144 
1145       if (!verifyUnexpectedValues(counters)) {
1146         // We found counter objects which imply failure
1147         success = false;
1148       }
1149 
1150       if (!success) {
1151         handleFailure(counters);
1152       }
1153       return success;
1154     }
1155 
1156     /**
1157      * Verify the values in the Counters against the expected number of entries written.
1158      *
1159      * @param expectedReferenced
1160      *          Expected number of referenced entrires
1161      * @param counters
1162      *          The Job's Counters object
1163      * @return True if the values match what's expected, false otherwise
1164      */
1165     protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1166       final Counter referenced = counters.findCounter(Counts.REFERENCED);
1167       final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1168       boolean success = true;
1169 
1170       if (expectedReferenced != referenced.getValue()) {
1171         LOG.error("Expected referenced count does not match with actual referenced count. " +
1172             "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1173         success = false;
1174       }
1175 
1176       if (unreferenced.getValue() > 0) {
1177         final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1178         boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1179         LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1180             + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1181         success = false;
1182       }
1183 
1184       return success;
1185     }
1186 
1187     /**
1188      * Verify that the Counters don't contain values which indicate an outright failure from the Reducers.
1189      *
1190      * @param counters
1191      *          The Job's counters
1192      * @return True if the "bad" counter objects are 0, false otherwise
1193      */
1194     protected boolean verifyUnexpectedValues(Counters counters) {
1195       final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1196       final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1197       boolean success = true;
1198 
1199       if (undefined.getValue() > 0) {
1200         LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1201         success = false;
1202       }
1203 
1204       if (lostfamilies.getValue() > 0) {
1205         LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1206         success = false;
1207       }
1208 
1209       return success;
1210     }
1211 
1212     protected void handleFailure(Counters counters) throws IOException {
1213       Configuration conf = job.getConfiguration();
1214       TableName tableName = getTableName(conf);
1215       try (Connection conn = ConnectionFactory.createConnection(conf)) {
1216         try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1217           CounterGroup g = counters.getGroup("undef");
1218           Iterator<Counter> it = g.iterator();
1219           while (it.hasNext()) {
1220             String keyString = it.next().getName();
1221             byte[] key = Bytes.toBytes(keyString);
1222             HRegionLocation loc = rl.getRegionLocation(key, true);
1223             LOG.error("undefined row " + keyString + ", " + loc);
1224           }
1225           g = counters.getGroup("unref");
1226           it = g.iterator();
1227           while (it.hasNext()) {
1228             String keyString = it.next().getName();
1229             byte[] key = Bytes.toBytes(keyString);
1230             HRegionLocation loc = rl.getRegionLocation(key, true);
1231             LOG.error("unreferred row " + keyString + ", " + loc);
1232           }
1233         }
1234       }
1235     }
1236   }
1237 
1238   /**
1239    * Executes Generate and Verify in a loop. Data is not cleaned between runs, so each iteration
1240    * adds more data.
1241    */
1242   static class Loop extends Configured implements Tool {
1243 
1244     private static final Log LOG = LogFactory.getLog(Loop.class);
1245 
1246     IntegrationTestBigLinkedList it;
1247 
1248     protected void runGenerator(int numMappers, long numNodes,
1249         String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
1250       Path outputPath = new Path(outputDir);
1251       UUID uuid = UUID.randomUUID(); //create a random UUID.
1252       Path generatorOutput = new Path(outputPath, uuid.toString());
1253 
1254       Generator generator = new Generator();
1255       generator.setConf(getConf());
1256       int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
1257       if (retCode > 0) {
1258         throw new RuntimeException("Generator failed with return code: " + retCode);
1259       }
1260     }
1261 
1262     protected void runVerify(String outputDir,
1263         int numReducers, long expectedNumNodes) throws Exception {
1264       Path outputPath = new Path(outputDir);
1265       UUID uuid = UUID.randomUUID(); //create a random UUID.
1266       Path iterationOutput = new Path(outputPath, uuid.toString());
1267 
1268       Verify verify = new Verify();
1269       verify.setConf(getConf());
1270       int retCode = verify.run(iterationOutput, numReducers);
1271       if (retCode > 0) {
1272         throw new RuntimeException("Verify.run failed with return code: " + retCode);
1273       }
1274 
1275       if (!verify.verify(expectedNumNodes)) {
1276         throw new RuntimeException("Verify.verify failed");
1277       }
1278 
1279       LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
1280     }
1281 
1282     @Override
1283     public int run(String[] args) throws Exception {
1284       if (args.length < 5) {
1285         System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
1286         return 1;
1287       }
1288       LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1289 
1290       int numIterations = Integer.parseInt(args[0]);
1291       int numMappers = Integer.parseInt(args[1]);
1292       long numNodes = Long.parseLong(args[2]);
1293       String outputDir = args[3];
1294       int numReducers = Integer.parseInt(args[4]);
1295       Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1296       Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1297 
1298       long expectedNumNodes = 0;
1299 
1300       if (numIterations < 0) {
1301         numIterations = Integer.MAX_VALUE; //run indefinitely (kind of)
1302       }
1303 
1304       for (int i = 0; i < numIterations; i++) {
1305         LOG.info("Starting iteration = " + i);
1306         runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
1307         expectedNumNodes += numMappers * numNodes;
1308 
1309         runVerify(outputDir, numReducers, expectedNumNodes);
1310       }
1311 
1312       return 0;
1313     }
1314   }
1315 
1316   /**
1317    * A stand alone program that prints out portions of a list created by {@link Generator}
1318    */
1319   private static class Print extends Configured implements Tool {
1320     @Override
1321     public int run(String[] args) throws Exception {
1322       Options options = new Options();
1323       options.addOption("s", "start", true, "start key");
1324       options.addOption("e", "end", true, "end key");
1325       options.addOption("l", "limit", true, "number to print");
1326 
1327       GnuParser parser = new GnuParser();
1328       CommandLine cmd = null;
1329       try {
1330         cmd = parser.parse(options, args);
1331         if (cmd.getArgs().length != 0) {
1332           throw new ParseException("Command takes no arguments");
1333         }
1334       } catch (ParseException e) {
1335         System.err.println("Failed to parse command line " + e.getMessage());
1336         System.err.println();
1337         HelpFormatter formatter = new HelpFormatter();
1338         formatter.printHelp(getClass().getSimpleName(), options);
1339         System.exit(-1);
1340       }
1341 
1342       Table table = new HTable(getConf(), getTableName(getConf()));
1343 
1344       Scan scan = new Scan();
1345       scan.setBatch(10000);
1346 
1347       if (cmd.hasOption("s"))
1348         scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1349 
1350       if (cmd.hasOption("e"))
1351         scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1352 
1353       int limit = 0;
1354       if (cmd.hasOption("l"))
1355         limit = Integer.parseInt(cmd.getOptionValue("l"));
1356       else
1357         limit = 100;
1358 
1359       ResultScanner scanner = table.getScanner(scan);
1360 
1361       CINode node = new CINode();
1362       Result result = scanner.next();
1363       int count = 0;
1364       while (result != null && count++ < limit) {
1365         node = getCINode(result, node);
1366         System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1367             Bytes.toStringBinary(node.prev), node.count, node.client);
1368         result = scanner.next();
1369       }
1370       scanner.close();
1371       table.close();
1372 
1373       return 0;
1374     }
1375   }
1376 
1377   /**
1378    * A stand alone program that deletes a single node.
1379    */
1380   private static class Delete extends Configured implements Tool {
1381     @Override
1382     public int run(String[] args) throws Exception {
1383       if (args.length != 1) {
1384         System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1385         return 0;
1386       }
1387       byte[] val = Bytes.toBytesBinary(args[0]);
1388 
1389       org.apache.hadoop.hbase.client.Delete delete
1390         = new org.apache.hadoop.hbase.client.Delete(val);
1391 
1392       Table table = new HTable(getConf(), getTableName(getConf()));
1393       table.delete(delete);
1394       table.close();
1395 
1396       System.out.println("Delete successful");
1397       return 0;
1398     }
1399   }
1400 
1401   /**
1402    * A stand alone program that follows a linked list created by {@link Generator} and prints
1403    * timing info.
1404    */
1405   private static class Walker extends Configured implements Tool {
1406     @Override
1407     public int run(String[] args) throws IOException {
1408       Options options = new Options();
1409       options.addOption("n", "num", true, "number of queries");
1410       options.addOption("s", "start", true, "key to start at, binary string");
1411       options.addOption("l", "logevery", true, "log every N queries");
1412 
1413       GnuParser parser = new GnuParser();
1414       CommandLine cmd = null;
1415       try {
1416         cmd = parser.parse(options, args);
1417         if (cmd.getArgs().length != 0) {
1418           throw new ParseException("Command takes no arguments");
1419         }
1420       } catch (ParseException e) {
1421         System.err.println("Failed to parse command line " + e.getMessage());
1422         System.err.println();
1423         HelpFormatter formatter = new HelpFormatter();
1424         formatter.printHelp(getClass().getSimpleName(), options);
1425         System.exit(-1);
1426       }
1427 
1428       long maxQueries = Long.MAX_VALUE;
1429       if (cmd.hasOption('n')) {
1430         maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1431       }
1432       Random rand = new SecureRandom();
1433       boolean isSpecificStart = cmd.hasOption('s');
1434       byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1435       int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1436 
1437       Table table = new HTable(getConf(), getTableName(getConf()));
1438       long numQueries = 0;
1439       // If isSpecificStart is set, only walk one list from that particular node.
1440       // Note that in case of circular (or P-shaped) list it will walk forever, as is
1441       // the case in normal run without startKey.
1442       while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1443         if (!isSpecificStart) {
1444           startKey = new byte[ROWKEY_LENGTH];
1445           rand.nextBytes(startKey);
1446         }
1447         CINode node = findStartNode(table, startKey);
1448         if (node == null && isSpecificStart) {
1449           System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1450         }
1451         numQueries++;
1452         while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1453           byte[] prev = node.prev;
1454           long t1 = System.currentTimeMillis();
1455           node = getNode(prev, table, node);
1456           long t2 = System.currentTimeMillis();
1457           if (numQueries % logEvery == 0) {
1458             System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1459           }
1460           numQueries++;
1461           if (node == null) {
1462             System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1463           } else if (node.prev.length == NO_KEY.length) {
1464             System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1465           }
1466         }
1467       }
1468 
1469       table.close();
1470       return 0;
1471     }
1472 
1473     private static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1474       Scan scan = new Scan();
1475       scan.setStartRow(startKey);
1476       scan.setBatch(1);
1477       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1478 
1479       long t1 = System.currentTimeMillis();
1480       ResultScanner scanner = table.getScanner(scan);
1481       Result result = scanner.next();
1482       long t2 = System.currentTimeMillis();
1483       scanner.close();
1484 
1485       if ( result != null) {
1486         CINode node = getCINode(result, new CINode());
1487         System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1488         return node;
1489       }
1490 
1491       System.out.println("FSR " + (t2 - t1));
1492 
1493       return null;
1494     }
1495 
1496     private CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1497       Get get = new Get(row);
1498       get.addColumn(FAMILY_NAME, COLUMN_PREV);
1499       Result result = table.get(get);
1500       return getCINode(result, node);
1501     }
1502   }
1503 
1504   private static class Clean extends Configured implements Tool {
1505     @Override public int run(String[] args) throws Exception {
1506       if (args.length < 1) {
1507         System.err.println("Usage: Clean <output dir>");
1508         return -1;
1509       }
1510 
1511       Path p = new Path(args[0]);
1512       Configuration conf = getConf();
1513       TableName tableName = getTableName(conf);
1514       try (FileSystem fs = HFileSystem.get(conf);
1515           Connection conn = ConnectionFactory.createConnection(conf);
1516           Admin admin = conn.getAdmin()) {
1517         if (admin.tableExists(tableName)) {
1518           admin.disableTable(tableName);
1519           admin.deleteTable(tableName);
1520         }
1521 
1522         if (fs.exists(p)) {
1523           fs.delete(p, true);
1524         }
1525       }
1526 
1527       return 0;
1528     }
1529   }
1530 
1531   static TableName getTableName(Configuration conf) {
1532     return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1533   }
1534 
1535   private static CINode getCINode(Result result, CINode node) {
1536     node.key = Bytes.copy(result.getRow());
1537     if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1538       node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1539     } else {
1540       node.prev = NO_KEY;
1541     }
1542     if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1543       node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1544     } else {
1545       node.count = -1;
1546     }
1547     if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1548       node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1549     } else {
1550       node.client = "";
1551     }
1552     return node;
1553   }
1554 
1555   protected IntegrationTestingUtility util;
1556 
1557   @Override
1558   public void setUpCluster() throws Exception {
1559     util = getTestingUtil(getConf());
1560     boolean isDistributed = util.isDistributedCluster();
1561     util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1562     if (!isDistributed) {
1563       util.startMiniMapReduceCluster();
1564     }
1565     this.setConf(util.getConfiguration());
1566   }
1567 
1568   @Override
1569   public void cleanUpCluster() throws Exception {
1570     super.cleanUpCluster();
1571     if (util.isDistributedCluster()) {
1572       util.shutdownMiniMapReduceCluster();
1573     }
1574   }
1575 
1576   private static boolean isMultiUnevenColumnFamilies() {
1577     return Boolean.TRUE.toString().equalsIgnoreCase(
1578       System.getProperty(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY));
1579   }
1580 
1581   @Test
1582   public void testContinuousIngest() throws IOException, Exception {
1583     //Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers>
1584     Configuration conf = getTestingUtil(getConf()).getConfiguration();
1585     if (isMultiUnevenColumnFamilies()) {
1586       // make sure per CF flush is on
1587       conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
1588     }
1589     int ret =
1590         ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1591             util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1592     org.junit.Assert.assertEquals(0, ret);
1593   }
1594 
1595   private void usage() {
1596     System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1597     printCommands();
1598   }
1599 
1600   private void printCommands() {
1601     System.err.println("Commands:");
1602     System.err.println(" generator  Map only job that generates data.");
1603     System.err.println(" verify     A map reduce job that looks for holes. Check return code and");
1604     System.err.println("            look at the counts after running. See REFERENCED and");
1605     System.err.println("            UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1606     System.err.println("            with the Generator.");
1607     System.err.println(" walker     " +
1608       "Standalone program that starts following a linked list & emits timing info.");
1609     System.err.println(" print      Standalone program that prints nodes in the linked list.");
1610     System.err.println(" delete     Standalone program that deletes a·single node.");
1611     System.err.println(" loop       Program to Loop through Generator and Verify steps");
1612     System.err.println(" clean      Program to clean all left over detritus.");
1613     System.err.println(" search     Search for missing keys.");
1614     System.err.println("");
1615     System.err.println("General options:");
1616     System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1617     System.err.println("    Run using the <tableName> as the tablename.  Defaults to "
1618         + DEFAULT_TABLE_NAME);
1619     System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1620     System.err.println("    Create table with presplit regions per server.  Defaults to "
1621         + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1622 
1623     System.err.println(" -DuseMob=<true|false>");
1624     System.err.println("    Create table so that the mob read/write path is forced.  " +
1625         "Defaults to false");
1626 
1627     System.err.flush();
1628   }
1629 
1630   @Override
1631   protected void processOptions(CommandLine cmd) {
1632     super.processOptions(cmd);
1633     String[] args = cmd.getArgs();
1634     //get the class, run with the conf
1635     if (args.length < 1) {
1636       printUsage(this.getClass().getSimpleName() +
1637         " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1638       printCommands();
1639       // Have to throw an exception here to stop the processing. Looks ugly but gets message across.
1640       throw new RuntimeException("Incorrect Number of args.");
1641     }
1642     toRun = args[0];
1643     otherArgs = Arrays.copyOfRange(args, 1, args.length);
1644   }
1645 
1646   @Override
1647   public int runTestFromCommandLine() throws Exception {
1648     Tool tool = null;
1649     if (toRun.equalsIgnoreCase("Generator")) {
1650       tool = new Generator();
1651     } else if (toRun.equalsIgnoreCase("Verify")) {
1652       tool = new Verify();
1653     } else if (toRun.equalsIgnoreCase("Loop")) {
1654       Loop loop = new Loop();
1655       loop.it = this;
1656       tool = loop;
1657     } else if (toRun.equalsIgnoreCase("Walker")) {
1658       tool = new Walker();
1659     } else if (toRun.equalsIgnoreCase("Print")) {
1660       tool = new Print();
1661     } else if (toRun.equalsIgnoreCase("Delete")) {
1662       tool = new Delete();
1663     } else if (toRun.equalsIgnoreCase("Clean")) {
1664       tool = new Clean();
1665     } else if (toRun.equalsIgnoreCase("Search")) {
1666       tool = new Search();
1667     } else {
1668       usage();
1669       throw new RuntimeException("Unknown arg");
1670     }
1671 
1672     return ToolRunner.run(getConf(), tool, otherArgs);
1673   }
1674 
1675   @Override
1676   public TableName getTablename() {
1677     Configuration c = getConf();
1678     return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1679   }
1680 
1681   @Override
1682   protected Set<String> getColumnFamilies() {
1683     if (isMultiUnevenColumnFamilies()) {
1684       return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1685         Bytes.toString(TINY_FAMILY_NAME));
1686     } else {
1687       return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1688     }
1689   }
1690 
1691   private static void setJobConf(Job job, int numMappers, long numNodes,
1692       Integer width, Integer wrapMultiplier) {
1693     job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1694     job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1695     if (width != null) {
1696       job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1697     }
1698     if (wrapMultiplier != null) {
1699       job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1700     }
1701   }
1702 
1703   public static void setJobScannerConf(Job job) {
1704     // Make sure scanners log something useful to make debugging possible.
1705     job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1706     job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1707   }
1708 
1709   public static void main(String[] args) throws Exception {
1710     Configuration conf = HBaseConfiguration.create();
1711     IntegrationTestingUtility.setUseDistributedCluster(conf);
1712     int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1713     System.exit(ret);
1714   }
1715 }