1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.test;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.security.SecureRandom;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.SortedSet;
34 import java.util.TreeSet;
35 import java.util.UUID;
36 import java.util.concurrent.ThreadLocalRandom;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.commons.cli.CommandLine;
40 import org.apache.commons.cli.GnuParser;
41 import org.apache.commons.cli.HelpFormatter;
42 import org.apache.commons.cli.Options;
43 import org.apache.commons.cli.ParseException;
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.conf.Configured;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.LocatedFileStatus;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.fs.RemoteIterator;
52 import org.apache.hadoop.hbase.Cell;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HBaseTestingUtility;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionLocation;
58 import org.apache.hadoop.hbase.HTableDescriptor;
59 import org.apache.hadoop.hbase.client.HTable;
60 import org.apache.hadoop.hbase.client.Table;
61 import org.apache.hadoop.hbase.IntegrationTestBase;
62 import org.apache.hadoop.hbase.IntegrationTestingUtility;
63 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
64 import org.apache.hadoop.hbase.fs.HFileSystem;
65 import org.apache.hadoop.hbase.MasterNotRunningException;
66 import org.apache.hadoop.hbase.TableName;
67 import org.apache.hadoop.hbase.client.Admin;
68 import org.apache.hadoop.hbase.client.BufferedMutator;
69 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
70 import org.apache.hadoop.hbase.client.Connection;
71 import org.apache.hadoop.hbase.client.ConnectionFactory;
72 import org.apache.hadoop.hbase.client.Get;
73 import org.apache.hadoop.hbase.client.Mutation;
74 import org.apache.hadoop.hbase.client.Put;
75 import org.apache.hadoop.hbase.client.RegionLocator;
76 import org.apache.hadoop.hbase.client.Result;
77 import org.apache.hadoop.hbase.client.ResultScanner;
78 import org.apache.hadoop.hbase.client.Scan;
79 import org.apache.hadoop.hbase.client.ScannerCallable;
80 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
81 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
82 import org.apache.hadoop.hbase.mapreduce.TableMapper;
83 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
84 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
85 import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
86 import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
87 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
88 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
89 import org.apache.hadoop.hbase.util.Bytes;
90 import org.apache.hadoop.hbase.util.RegionSplitter;
91 import org.apache.hadoop.hbase.wal.WALKey;
92 import org.apache.hadoop.io.BytesWritable;
93 import org.apache.hadoop.io.NullWritable;
94 import org.apache.hadoop.io.Writable;
95 import org.apache.hadoop.mapreduce.Counter;
96 import org.apache.hadoop.mapreduce.CounterGroup;
97 import org.apache.hadoop.mapreduce.Counters;
98 import org.apache.hadoop.mapreduce.InputFormat;
99 import org.apache.hadoop.mapreduce.InputSplit;
100 import org.apache.hadoop.mapreduce.Job;
101 import org.apache.hadoop.mapreduce.JobContext;
102 import org.apache.hadoop.mapreduce.Mapper;
103 import org.apache.hadoop.mapreduce.RecordReader;
104 import org.apache.hadoop.mapreduce.Reducer;
105 import org.apache.hadoop.mapreduce.TaskAttemptContext;
106 import org.apache.hadoop.mapreduce.TaskAttemptID;
107 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
108 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
109 import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
110 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
111 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
112 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
113 import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
114 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
115 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
116 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
117 import org.apache.hadoop.util.Tool;
118 import org.apache.hadoop.util.ToolRunner;
119 import org.junit.Test;
120 import org.junit.experimental.categories.Category;
121
122 import com.google.common.collect.Sets;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 @Category(IntegrationTests.class)
188 public class IntegrationTestBigLinkedList extends IntegrationTestBase {
189 protected static final byte[] NO_KEY = new byte[1];
190
191 protected static String TABLE_NAME_KEY = "IntegrationTestBigLinkedList.table";
192
193 protected static String DEFAULT_TABLE_NAME = "IntegrationTestBigLinkedList";
194
195 protected static byte[] FAMILY_NAME = Bytes.toBytes("meta");
196 private static byte[] BIG_FAMILY_NAME = Bytes.toBytes("big");
197 private static byte[] TINY_FAMILY_NAME = Bytes.toBytes("tiny");
198
199
200 protected static final byte[] COLUMN_PREV = Bytes.toBytes("prev");
201
202
203 protected static final byte[] COLUMN_CLIENT = Bytes.toBytes("client");
204
205
206 protected static final byte[] COLUMN_COUNT = Bytes.toBytes("count");
207
208
209 private static final String GENERATOR_NUM_ROWS_PER_MAP_KEY
210 = "IntegrationTestBigLinkedList.generator.num_rows";
211
212 private static final String GENERATOR_NUM_MAPPERS_KEY
213 = "IntegrationTestBigLinkedList.generator.map.tasks";
214
215 private static final String GENERATOR_WIDTH_KEY
216 = "IntegrationTestBigLinkedList.generator.width";
217
218 private static final String GENERATOR_WRAP_KEY
219 = "IntegrationTestBigLinkedList.generator.wrap";
220
221 protected int NUM_SLAVES_BASE = 3;
222
223 private static final int MISSING_ROWS_TO_LOG = 10;
224
225 private static final int WIDTH_DEFAULT = 1000000;
226 private static final int WRAP_DEFAULT = 25;
227 private static final int ROWKEY_LENGTH = 16;
228
229 protected String toRun;
230 protected String[] otherArgs;
231
232 static class CINode {
233 byte[] key;
234 byte[] prev;
235 String client;
236 long count;
237 }
238
239
240
241
242 static class Generator extends Configured implements Tool {
243
244 private static final Log LOG = LogFactory.getLog(Generator.class);
245
246
247
248
249
250
251
252
253
254
255
256
257 public static final String MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY =
258 "generator.multiple.columnfamilies";
259
260 static class GeneratorInputFormat extends InputFormat<BytesWritable,NullWritable> {
261 static class GeneratorInputSplit extends InputSplit implements Writable {
262 @Override
263 public long getLength() throws IOException, InterruptedException {
264 return 1;
265 }
266 @Override
267 public String[] getLocations() throws IOException, InterruptedException {
268 return new String[0];
269 }
270 @Override
271 public void readFields(DataInput arg0) throws IOException {
272 }
273 @Override
274 public void write(DataOutput arg0) throws IOException {
275 }
276 }
277
278 static class GeneratorRecordReader extends RecordReader<BytesWritable,NullWritable> {
279 private long count;
280 private long numNodes;
281 private Random rand;
282
283 @Override
284 public void close() throws IOException {
285 }
286
287 @Override
288 public BytesWritable getCurrentKey() throws IOException, InterruptedException {
289 byte[] bytes = new byte[ROWKEY_LENGTH];
290 rand.nextBytes(bytes);
291 return new BytesWritable(bytes);
292 }
293
294 @Override
295 public NullWritable getCurrentValue() throws IOException, InterruptedException {
296 return NullWritable.get();
297 }
298
299 @Override
300 public float getProgress() throws IOException, InterruptedException {
301 return (float)(count / (double)numNodes);
302 }
303
304 @Override
305 public void initialize(InputSplit arg0, TaskAttemptContext context)
306 throws IOException, InterruptedException {
307 numNodes = context.getConfiguration().getLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, 25000000);
308
309 rand = new SecureRandom();
310 }
311
312 @Override
313 public boolean nextKeyValue() throws IOException, InterruptedException {
314 return count++ < numNodes;
315 }
316
317 }
318
319 @Override
320 public RecordReader<BytesWritable,NullWritable> createRecordReader(
321 InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
322 GeneratorRecordReader rr = new GeneratorRecordReader();
323 rr.initialize(split, context);
324 return rr;
325 }
326
327 @Override
328 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
329 int numMappers = job.getConfiguration().getInt(GENERATOR_NUM_MAPPERS_KEY, 1);
330
331 ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numMappers);
332
333 for (int i = 0; i < numMappers; i++) {
334 splits.add(new GeneratorInputSplit());
335 }
336
337 return splits;
338 }
339 }
340
341
342 static class OneFilePerMapperSFIF<K, V> extends SequenceFileInputFormat<K, V> {
343 @Override
344 protected boolean isSplitable(JobContext context, Path filename) {
345 return false;
346 }
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375 static class GeneratorMapper
376 extends Mapper<BytesWritable, NullWritable, NullWritable, NullWritable> {
377
378 byte[][] first = null;
379 byte[][] prev = null;
380 byte[][] current = null;
381 byte[] id;
382 long count = 0;
383 int i;
384 BufferedMutator mutator;
385 Connection connection;
386 long numNodes;
387 long wrap;
388 int width;
389 boolean multipleUnevenColumnFamilies;
390 byte[] tinyValue = new byte[] { 't' };
391 byte[] bigValue = null;
392
393 @Override
394 protected void setup(Context context) throws IOException, InterruptedException {
395 id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID());
396 Configuration conf = context.getConfiguration();
397 connection = ConnectionFactory.createConnection(conf);
398 instantiateHTable();
399 this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT);
400 current = new byte[this.width][];
401 int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT);
402 this.wrap = (long)wrapMultiplier * width;
403 this.numNodes = context.getConfiguration().getLong(
404 GENERATOR_NUM_ROWS_PER_MAP_KEY, (long)WIDTH_DEFAULT * WRAP_DEFAULT);
405 if (this.numNodes < this.wrap) {
406 this.wrap = this.numNodes;
407 }
408 this.multipleUnevenColumnFamilies =
409 context.getConfiguration().getBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, false);
410 }
411
412 protected void instantiateHTable() throws IOException {
413 mutator = connection.getBufferedMutator(
414 new BufferedMutatorParams(getTableName(connection.getConfiguration()))
415 .writeBufferSize(4 * 1024 * 1024));
416 }
417
418 @Override
419 protected void cleanup(Context context) throws IOException ,InterruptedException {
420 mutator.close();
421 connection.close();
422 }
423
424 @Override
425 protected void map(BytesWritable key, NullWritable value, Context output) throws IOException {
426 current[i] = new byte[key.getLength()];
427 System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
428 if (++i == current.length) {
429 LOG.info("Persisting current.length=" + current.length + ", count=" + count + ", id=" +
430 Bytes.toStringBinary(id) + ", current=" + Bytes.toStringBinary(current[0]) +
431 ", i=" + i);
432 persist(output, count, prev, current, id);
433 i = 0;
434
435 if (first == null) {
436 first = current;
437 }
438 prev = current;
439 current = new byte[this.width][];
440
441 count += current.length;
442 output.setStatus("Count " + count);
443
444 if (count % wrap == 0) {
445
446
447 circularLeftShift(first);
448
449 persist(output, -1, prev, first, null);
450
451 first = null;
452 prev = null;
453 }
454 }
455 }
456
457 private static <T> void circularLeftShift(T[] first) {
458 T ez = first[0];
459 System.arraycopy(first, 1, first, 0, first.length - 1);
460 first[first.length - 1] = ez;
461 }
462
463 protected void persist(Context output, long count, byte[][] prev, byte[][] current, byte[] id)
464 throws IOException {
465 for (int i = 0; i < current.length; i++) {
466 Put put = new Put(current[i]);
467 put.addColumn(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
468
469 if (count >= 0) {
470 put.addColumn(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
471 }
472 if (id != null) {
473 put.addColumn(FAMILY_NAME, COLUMN_CLIENT, id);
474 }
475
476 if (this.multipleUnevenColumnFamilies) {
477
478 put.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME, this.tinyValue);
479
480 if (this.bigValue == null) {
481 this.bigValue = new byte[current[i].length * 10];
482 ThreadLocalRandom.current().nextBytes(this.bigValue);
483 }
484
485 put.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME, this.bigValue);
486 }
487 mutator.mutate(put);
488
489 if (i % 1000 == 0) {
490
491 output.progress();
492 }
493 }
494
495 mutator.flush();
496 }
497 }
498
499 @Override
500 public int run(String[] args) throws Exception {
501 if (args.length < 3) {
502 System.out.println("Usage : " + Generator.class.getSimpleName() +
503 " <num mappers> <num nodes per map> <tmp output dir> [<width> <wrap multiplier>]");
504 System.out.println(" where <num nodes per map> should be a multiple of " +
505 " width*wrap multiplier, 25M by default");
506 return 0;
507 }
508
509 int numMappers = Integer.parseInt(args[0]);
510 long numNodes = Long.parseLong(args[1]);
511 Path tmpOutput = new Path(args[2]);
512 Integer width = (args.length < 4) ? null : Integer.parseInt(args[3]);
513 Integer wrapMuplitplier = (args.length < 5) ? null : Integer.parseInt(args[4]);
514 return run(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
515 }
516
517 protected void createSchema() throws IOException {
518 Configuration conf = getConf();
519 TableName tableName = getTableName(conf);
520 try (Connection conn = ConnectionFactory.createConnection(conf);
521 Admin admin = conn.getAdmin()) {
522 if (!admin.tableExists(tableName)) {
523 HTableDescriptor htd = new HTableDescriptor(getTableName(getConf()));
524 htd.addFamily(new HColumnDescriptor(FAMILY_NAME));
525
526 htd.addFamily(new HColumnDescriptor(BIG_FAMILY_NAME));
527 htd.addFamily(new HColumnDescriptor(TINY_FAMILY_NAME));
528
529 if (conf.getBoolean("useMob", false)) {
530 for (HColumnDescriptor hcd : htd.getColumnFamilies() ) {
531 hcd.setMobEnabled(true);
532 hcd.setMobThreshold(4);
533 }
534 }
535 int numberOfServers = admin.getClusterStatus().getServers().size();
536 if (numberOfServers == 0) {
537 throw new IllegalStateException("No live regionservers");
538 }
539 int regionsPerServer = conf.getInt(HBaseTestingUtility.REGIONS_PER_SERVER_KEY,
540 HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
541 int totalNumberOfRegions = numberOfServers * regionsPerServer;
542 LOG.info("Number of live regionservers: " + numberOfServers + ", " +
543 "pre-splitting table into " + totalNumberOfRegions + " regions " +
544 "(default regions per server: " + regionsPerServer + ")");
545
546 byte[][] splits = new RegionSplitter.UniformSplit().split(totalNumberOfRegions);
547
548 admin.createTable(htd, splits);
549 }
550 } catch (MasterNotRunningException e) {
551 LOG.error("Master not running", e);
552 throw new IOException(e);
553 }
554 }
555
556 public int runRandomInputGenerator(int numMappers, long numNodes, Path tmpOutput,
557 Integer width, Integer wrapMuplitplier) throws Exception {
558 LOG.info("Running RandomInputGenerator with numMappers=" + numMappers
559 + ", numNodes=" + numNodes);
560 Job job = Job.getInstance(getConf());
561
562 job.setJobName("Random Input Generator");
563 job.setNumReduceTasks(0);
564 job.setJarByClass(getClass());
565
566 job.setInputFormatClass(GeneratorInputFormat.class);
567 job.setOutputKeyClass(BytesWritable.class);
568 job.setOutputValueClass(NullWritable.class);
569
570 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
571
572 job.setMapperClass(Mapper.class);
573
574 FileOutputFormat.setOutputPath(job, tmpOutput);
575 job.setOutputFormatClass(SequenceFileOutputFormat.class);
576
577 boolean success = jobCompletion(job);
578
579 return success ? 0 : 1;
580 }
581
582 public int runGenerator(int numMappers, long numNodes, Path tmpOutput,
583 Integer width, Integer wrapMuplitplier) throws Exception {
584 LOG.info("Running Generator with numMappers=" + numMappers +", numNodes=" + numNodes);
585 createSchema();
586 Job job = Job.getInstance(getConf());
587
588 job.setJobName("Link Generator");
589 job.setNumReduceTasks(0);
590 job.setJarByClass(getClass());
591
592 FileInputFormat.setInputPaths(job, tmpOutput);
593 job.setInputFormatClass(OneFilePerMapperSFIF.class);
594 job.setOutputKeyClass(NullWritable.class);
595 job.setOutputValueClass(NullWritable.class);
596
597 setJobConf(job, numMappers, numNodes, width, wrapMuplitplier);
598
599 setMapperForGenerator(job);
600
601 job.setOutputFormatClass(NullOutputFormat.class);
602
603 job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
604 String multipleUnevenColumnFamiliesStr = System.getProperty(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY);
605 if (multipleUnevenColumnFamiliesStr != null) {
606 job.getConfiguration().setBoolean(MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
607 Boolean.parseBoolean(multipleUnevenColumnFamiliesStr));
608 }
609 TableMapReduceUtil.addDependencyJars(job);
610 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
611 TableMapReduceUtil.initCredentials(job);
612
613 boolean success = jobCompletion(job);
614
615 return success ? 0 : 1;
616 }
617
618 protected boolean jobCompletion(Job job) throws IOException, InterruptedException,
619 ClassNotFoundException {
620 boolean success = job.waitForCompletion(true);
621 return success;
622 }
623
624 protected void setMapperForGenerator(Job job) {
625 job.setMapperClass(GeneratorMapper.class);
626 }
627
628 public int run(int numMappers, long numNodes, Path tmpOutput,
629 Integer width, Integer wrapMuplitplier) throws Exception {
630 int ret = runRandomInputGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
631 if (ret > 0) {
632 return ret;
633 }
634 return runGenerator(numMappers, numNodes, tmpOutput, width, wrapMuplitplier);
635 }
636 }
637
638
639
640
641
642
643
644 static class Search extends Configured implements Tool {
645 private static final Log LOG = LogFactory.getLog(Search.class);
646 protected Job job;
647
648 private static void printUsage(final String error) {
649 if (error != null && error.length() > 0) System.out.println("ERROR: " + error);
650 System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
651 }
652
653 @Override
654 public int run(String[] args) throws Exception {
655 if (args.length < 1 || args.length > 2) {
656 printUsage(null);
657 return 1;
658 }
659 Path inputDir = new Path(args[0]);
660 int numMappers = 1;
661 if (args.length > 1) {
662 numMappers = Integer.parseInt(args[1]);
663 }
664 return run(inputDir, numMappers);
665 }
666
667
668
669
670 public static class WALSearcher extends WALPlayer {
671 public WALSearcher(Configuration conf) {
672 super(conf);
673 }
674
675
676
677
678 public static class WALMapperSearcher extends WALMapper {
679 private SortedSet<byte []> keysToFind;
680 private AtomicInteger rows = new AtomicInteger(0);
681
682 @Override
683 public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context)
684 throws IOException {
685 super.setup(context);
686 try {
687 this.keysToFind = readKeysToSearch(context.getConfiguration());
688 LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
689 } catch (InterruptedException e) {
690 throw new InterruptedIOException(e.toString());
691 }
692 }
693
694 @Override
695 protected boolean filter(Context context, Cell cell) {
696
697 byte [] row = new byte [cell.getRowLength()];
698 System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0, cell.getRowLength());
699 boolean b = this.keysToFind.contains(row);
700 if (b) {
701 String keyStr = Bytes.toStringBinary(row);
702 try {
703 LOG.info("Found cell=" + cell + " , walKey=" + context.getCurrentKey());
704 } catch (IOException|InterruptedException e) {
705 LOG.warn(e);
706 }
707 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
708 context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
709 }
710 context.getCounter(FOUND_GROUP_KEY, "CELL_WITH_MISSING_ROW").increment(1);
711 }
712 return b;
713 }
714 }
715
716
717 @Override
718 public Job createSubmittableJob(String[] args) throws IOException {
719 Job job = super.createSubmittableJob(args);
720
721 job.setJarByClass(WALMapperSearcher.class);
722 job.setMapperClass(WALMapperSearcher.class);
723 job.setOutputFormatClass(NullOutputFormat.class);
724 return job;
725 }
726 }
727
728 static final String FOUND_GROUP_KEY = "Found";
729 static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
730
731 public int run(Path inputDir, int numMappers) throws Exception {
732 getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
733 SortedSet<byte []> keys = readKeysToSearch(getConf());
734 if (keys.isEmpty()) throw new RuntimeException("No keys to find");
735 LOG.info("Count of keys to find: " + keys.size());
736 for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
737 Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
738
739 Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
740 Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
741 LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers +
742 " against " + getConf().get(HConstants.HBASE_DIR));
743 int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""});
744 if (ret != 0) return ret;
745 return ToolRunner.run(new WALSearcher(getConf()), new String [] {oldWalsDir.toString(), ""});
746 }
747
748 static SortedSet<byte []> readKeysToSearch(final Configuration conf)
749 throws IOException, InterruptedException {
750 Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
751 FileSystem fs = FileSystem.get(conf);
752 SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
753 if (!fs.exists(keysInputDir)) {
754 throw new FileNotFoundException(keysInputDir.toString());
755 }
756 if (!fs.isDirectory(keysInputDir)) {
757 throw new UnsupportedOperationException("TODO");
758 } else {
759 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(keysInputDir, false);
760 while(iterator.hasNext()) {
761 LocatedFileStatus keyFileStatus = iterator.next();
762
763 if (keyFileStatus.getPath().getName().startsWith("_")) continue;
764 result.addAll(readFileToSearch(conf, fs, keyFileStatus));
765 }
766 }
767 return result;
768 }
769
770 private static SortedSet<byte[]> readFileToSearch(final Configuration conf,
771 final FileSystem fs, final LocatedFileStatus keyFileStatus) throws IOException,
772 InterruptedException {
773 SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
774
775
776 TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
777 try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr =
778 new SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
779 InputSplit is =
780 new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(), new String [] {});
781 rr.initialize(is, context);
782 while (rr.nextKeyValue()) {
783 rr.getCurrentKey();
784 BytesWritable bw = rr.getCurrentValue();
785 if (Verify.VerifyReducer.whichType(bw.getBytes()) == Verify.Counts.UNDEFINED) {
786 byte[] key = new byte[rr.getCurrentKey().getLength()];
787 System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0, rr.getCurrentKey()
788 .getLength());
789 result.add(key);
790 }
791 }
792 }
793 return result;
794 }
795 }
796
797
798
799
800
801 static class Verify extends Configured implements Tool {
802
803 private static final Log LOG = LogFactory.getLog(Verify.class);
804 protected static final BytesWritable DEF = new BytesWritable(new byte[] { 0 });
805 protected static final BytesWritable DEF_LOST_FAMILIES = new BytesWritable(new byte[] { 1 });
806
807 protected Job job;
808
809 public static class VerifyMapper extends TableMapper<BytesWritable, BytesWritable> {
810 private BytesWritable row = new BytesWritable();
811 private BytesWritable ref = new BytesWritable();
812
813 private boolean multipleUnevenColumnFamilies;
814
815 @Override
816 protected void setup(
817 Mapper<ImmutableBytesWritable, Result, BytesWritable, BytesWritable>.Context context)
818 throws IOException, InterruptedException {
819 this.multipleUnevenColumnFamilies =
820 context.getConfiguration().getBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY,
821 false);
822 }
823
824 @Override
825 protected void map(ImmutableBytesWritable key, Result value, Context context)
826 throws IOException ,InterruptedException {
827 byte[] rowKey = key.get();
828 row.set(rowKey, 0, rowKey.length);
829 if (multipleUnevenColumnFamilies
830 && (!value.containsColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME) || !value.containsColumn(
831 TINY_FAMILY_NAME, TINY_FAMILY_NAME))) {
832 context.write(row, DEF_LOST_FAMILIES);
833 } else {
834 context.write(row, DEF);
835 }
836 byte[] prev = value.getValue(FAMILY_NAME, COLUMN_PREV);
837 if (prev != null && prev.length > 0) {
838 ref.set(prev, 0, prev.length);
839 context.write(ref, row);
840 } else {
841 LOG.warn(String.format("Prev is not set for: %s", Bytes.toStringBinary(rowKey)));
842 }
843 }
844 }
845
846
847
848
849
850 public static enum Counts {
851 UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES, EXTRA_UNDEF_REFERENCES,
852 LOST_FAMILIES
853 }
854
855
856
857
858
859
860 public static class VerifyReducer extends
861 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
862 private ArrayList<byte[]> refs = new ArrayList<byte[]>();
863 private final BytesWritable UNREF = new BytesWritable(addPrefixFlag(
864 Counts.UNREFERENCED.ordinal(), new byte[] {}));
865 private final BytesWritable LOSTFAM = new BytesWritable(addPrefixFlag(
866 Counts.LOST_FAMILIES.ordinal(), new byte[] {}));
867
868 private AtomicInteger rows = new AtomicInteger(0);
869 private Connection connection;
870
871 @Override
872 protected void setup(Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
873 throws IOException, InterruptedException {
874 super.setup(context);
875 this.connection = ConnectionFactory.createConnection(context.getConfiguration());
876 }
877
878 @Override
879 protected void cleanup(
880 Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable>.Context context)
881 throws IOException, InterruptedException {
882 if (this.connection != null) {
883 this.connection.close();
884 }
885 super.cleanup(context);
886 }
887
888
889
890
891
892
893
894 public static byte[] addPrefixFlag(final int ordinal, final byte [] r) {
895 byte[] prefix = Bytes.toBytes((short)ordinal);
896 if (prefix.length != Bytes.SIZEOF_SHORT) {
897 throw new RuntimeException("Unexpected size: " + prefix.length);
898 }
899 byte[] result = new byte[prefix.length + r.length];
900 System.arraycopy(prefix, 0, result, 0, prefix.length);
901 System.arraycopy(r, 0, result, prefix.length, r.length);
902 return result;
903 }
904
905
906
907
908
909
910 public static Counts whichType(final byte [] bs) {
911 int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
912 return Counts.values()[ordinal];
913 }
914
915
916
917
918
919 public static byte[] getRowOnly(BytesWritable bw) {
920 byte[] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
921 System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0, bytes.length);
922 return bytes;
923 }
924
925 @Override
926 public void reduce(BytesWritable key, Iterable<BytesWritable> values, Context context)
927 throws IOException, InterruptedException {
928 int defCount = 0;
929 boolean lostFamilies = false;
930 refs.clear();
931 for (BytesWritable type : values) {
932 if (type.getLength() == DEF.getLength()) {
933 defCount++;
934 if (type.getBytes()[0] == 1) {
935 lostFamilies = true;
936 }
937 } else {
938 byte[] bytes = new byte[type.getLength()];
939 System.arraycopy(type.getBytes(), 0, bytes, 0, type.getLength());
940 refs.add(bytes);
941 }
942 }
943
944
945 StringBuilder refsSb = null;
946 String keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
947 if (defCount == 0 || refs.size() != 1) {
948 refsSb = dumpExtraInfoOnRefs(key, context, refs);
949 LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
950 (refsSb != null? refsSb.toString(): ""));
951 }
952 if (lostFamilies) {
953 LOG.error("LinkedListError: key=" + keyString + ", lost big or tiny families");
954 context.getCounter(Counts.LOST_FAMILIES).increment(1);
955 context.write(key, LOSTFAM);
956 }
957
958 if (defCount == 0 && refs.size() > 0) {
959
960
961
962 for (int i = 0; i < refs.size(); i++) {
963 byte[] bs = refs.get(i);
964 int ordinal;
965 if (i <= 0) {
966 ordinal = Counts.UNDEFINED.ordinal();
967 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
968 context.getCounter(Counts.UNDEFINED).increment(1);
969 } else {
970 ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
971 context.write(key, new BytesWritable(addPrefixFlag(ordinal, bs)));
972 }
973 }
974 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
975
976
977
978
979 context.getCounter("undef", keyString).increment(1);
980 }
981 } else if (defCount > 0 && refs.size() == 0) {
982
983 context.write(key, UNREF);
984 context.getCounter(Counts.UNREFERENCED).increment(1);
985 if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
986 context.getCounter("unref", keyString).increment(1);
987 }
988 } else {
989 if (refs.size() > 1) {
990
991 for (int i = 1; i < refs.size(); i++) {
992 context.write(key,
993 new BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
994 }
995 context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() - 1);
996 }
997
998 context.getCounter(Counts.REFERENCED).increment(1);
999 }
1000 }
1001
1002
1003
1004
1005
1006
1007 private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final Context context,
1008 final List<byte []> refs)
1009 throws IOException {
1010 StringBuilder refsSb = null;
1011 if (refs.isEmpty()) return refsSb;
1012 refsSb = new StringBuilder();
1013 String comma = "";
1014
1015
1016
1017 TableName tn = getTableName(context.getConfiguration());
1018 try (Table t = this.connection.getTable(tn)) {
1019 for (byte [] ref : refs) {
1020 Result r = t.get(new Get(ref));
1021 List<Cell> cells = r.listCells();
1022 String ts = (cells != null && !cells.isEmpty())?
1023 new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
1024 byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
1025 String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
1026 b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
1027 long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
1028 b = r.getValue(FAMILY_NAME, COLUMN_PREV);
1029 String refRegionLocation = "";
1030 String keyRegionLocation = "";
1031 if (b != null && b.length > 0) {
1032 try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
1033 HRegionLocation hrl = rl.getRegionLocation(b);
1034 if (hrl != null) refRegionLocation = hrl.toString();
1035
1036 hrl = rl.getRegionLocation(key.getBytes());
1037 if (hrl != null) keyRegionLocation = hrl.toString();
1038 }
1039 }
1040 LOG.error("Extras on ref without a def, ref=" + Bytes.toStringBinary(ref) +
1041 ", refPrevEqualsKey=" +
1042 (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0, b.length) == 0) +
1043 ", key=" + Bytes.toStringBinary(key.getBytes(), 0, key.getLength()) +
1044 ", ref row date=" + ts + ", jobStr=" + jobStr +
1045 ", ref row count=" + count +
1046 ", ref row regionLocation=" + refRegionLocation +
1047 ", key row regionLocation=" + keyRegionLocation);
1048 refsSb.append(comma);
1049 comma = ",";
1050 refsSb.append(Bytes.toStringBinary(ref));
1051 }
1052 }
1053 return refsSb;
1054 }
1055 }
1056
1057 @Override
1058 public int run(String[] args) throws Exception {
1059 if (args.length != 2) {
1060 System.out.println("Usage : " + Verify.class.getSimpleName()
1061 + " <output dir> <num reducers>");
1062 return 0;
1063 }
1064
1065 String outputDir = args[0];
1066 int numReducers = Integer.parseInt(args[1]);
1067
1068 return run(outputDir, numReducers);
1069 }
1070
1071 public int run(String outputDir, int numReducers) throws Exception {
1072 return run(new Path(outputDir), numReducers);
1073 }
1074
1075 public int run(Path outputDir, int numReducers) throws Exception {
1076 LOG.info("Running Verify with outputDir=" + outputDir +", numReducers=" + numReducers);
1077
1078 job = Job.getInstance(getConf());
1079
1080 job.setJobName("Link Verifier");
1081 job.setNumReduceTasks(numReducers);
1082 job.setJarByClass(getClass());
1083
1084 setJobScannerConf(job);
1085
1086 Scan scan = new Scan();
1087 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1088 scan.setCaching(10000);
1089 scan.setCacheBlocks(false);
1090 if (isMultiUnevenColumnFamilies()) {
1091 scan.addColumn(BIG_FAMILY_NAME, BIG_FAMILY_NAME);
1092 scan.addColumn(TINY_FAMILY_NAME, TINY_FAMILY_NAME);
1093 job.getConfiguration().setBoolean(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY, true);
1094 }
1095
1096 TableMapReduceUtil.initTableMapperJob(getTableName(getConf()).getName(), scan,
1097 VerifyMapper.class, BytesWritable.class, BytesWritable.class, job);
1098 TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
1099
1100 job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
1101
1102 job.setReducerClass(VerifyReducer.class);
1103 job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
1104 job.setOutputKeyClass(BytesWritable.class);
1105 job.setOutputValueClass(BytesWritable.class);
1106 TextOutputFormat.setOutputPath(job, outputDir);
1107
1108 boolean success = job.waitForCompletion(true);
1109
1110 if (success) {
1111 Counters counters = job.getCounters();
1112 if (null == counters) {
1113 LOG.warn("Counters were null, cannot verify Job completion."
1114 + " This is commonly a result of insufficient YARN configuration.");
1115
1116 return 0;
1117 }
1118
1119
1120 if (verifyUnexpectedValues(counters)) {
1121
1122 return 0;
1123 }
1124 }
1125
1126
1127 return 1;
1128 }
1129
1130 public boolean verify(long expectedReferenced) throws Exception {
1131 if (job == null) {
1132 throw new IllegalStateException("You should call run() first");
1133 }
1134
1135 Counters counters = job.getCounters();
1136 if (counters == null) {
1137 LOG.info("Counters object was null, write verification cannot be performed."
1138 + " This is commonly a result of insufficient YARN configuration.");
1139 return false;
1140 }
1141
1142
1143 boolean success = verifyExpectedValues(expectedReferenced, counters);
1144
1145 if (!verifyUnexpectedValues(counters)) {
1146
1147 success = false;
1148 }
1149
1150 if (!success) {
1151 handleFailure(counters);
1152 }
1153 return success;
1154 }
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165 protected boolean verifyExpectedValues(long expectedReferenced, Counters counters) {
1166 final Counter referenced = counters.findCounter(Counts.REFERENCED);
1167 final Counter unreferenced = counters.findCounter(Counts.UNREFERENCED);
1168 boolean success = true;
1169
1170 if (expectedReferenced != referenced.getValue()) {
1171 LOG.error("Expected referenced count does not match with actual referenced count. " +
1172 "expected referenced=" + expectedReferenced + " ,actual=" + referenced.getValue());
1173 success = false;
1174 }
1175
1176 if (unreferenced.getValue() > 0) {
1177 final Counter multiref = counters.findCounter(Counts.EXTRAREFERENCES);
1178 boolean couldBeMultiRef = (multiref.getValue() == unreferenced.getValue());
1179 LOG.error("Unreferenced nodes were not expected. Unreferenced count=" + unreferenced.getValue()
1180 + (couldBeMultiRef ? "; could be due to duplicate random numbers" : ""));
1181 success = false;
1182 }
1183
1184 return success;
1185 }
1186
1187
1188
1189
1190
1191
1192
1193
1194 protected boolean verifyUnexpectedValues(Counters counters) {
1195 final Counter undefined = counters.findCounter(Counts.UNDEFINED);
1196 final Counter lostfamilies = counters.findCounter(Counts.LOST_FAMILIES);
1197 boolean success = true;
1198
1199 if (undefined.getValue() > 0) {
1200 LOG.error("Found an undefined node. Undefined count=" + undefined.getValue());
1201 success = false;
1202 }
1203
1204 if (lostfamilies.getValue() > 0) {
1205 LOG.error("Found nodes which lost big or tiny families, count=" + lostfamilies.getValue());
1206 success = false;
1207 }
1208
1209 return success;
1210 }
1211
1212 protected void handleFailure(Counters counters) throws IOException {
1213 Configuration conf = job.getConfiguration();
1214 TableName tableName = getTableName(conf);
1215 try (Connection conn = ConnectionFactory.createConnection(conf)) {
1216 try (RegionLocator rl = conn.getRegionLocator(tableName)) {
1217 CounterGroup g = counters.getGroup("undef");
1218 Iterator<Counter> it = g.iterator();
1219 while (it.hasNext()) {
1220 String keyString = it.next().getName();
1221 byte[] key = Bytes.toBytes(keyString);
1222 HRegionLocation loc = rl.getRegionLocation(key, true);
1223 LOG.error("undefined row " + keyString + ", " + loc);
1224 }
1225 g = counters.getGroup("unref");
1226 it = g.iterator();
1227 while (it.hasNext()) {
1228 String keyString = it.next().getName();
1229 byte[] key = Bytes.toBytes(keyString);
1230 HRegionLocation loc = rl.getRegionLocation(key, true);
1231 LOG.error("unreferred row " + keyString + ", " + loc);
1232 }
1233 }
1234 }
1235 }
1236 }
1237
1238
1239
1240
1241
1242 static class Loop extends Configured implements Tool {
1243
1244 private static final Log LOG = LogFactory.getLog(Loop.class);
1245
1246 IntegrationTestBigLinkedList it;
1247
1248 protected void runGenerator(int numMappers, long numNodes,
1249 String outputDir, Integer width, Integer wrapMuplitplier) throws Exception {
1250 Path outputPath = new Path(outputDir);
1251 UUID uuid = UUID.randomUUID();
1252 Path generatorOutput = new Path(outputPath, uuid.toString());
1253
1254 Generator generator = new Generator();
1255 generator.setConf(getConf());
1256 int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
1257 if (retCode > 0) {
1258 throw new RuntimeException("Generator failed with return code: " + retCode);
1259 }
1260 }
1261
1262 protected void runVerify(String outputDir,
1263 int numReducers, long expectedNumNodes) throws Exception {
1264 Path outputPath = new Path(outputDir);
1265 UUID uuid = UUID.randomUUID();
1266 Path iterationOutput = new Path(outputPath, uuid.toString());
1267
1268 Verify verify = new Verify();
1269 verify.setConf(getConf());
1270 int retCode = verify.run(iterationOutput, numReducers);
1271 if (retCode > 0) {
1272 throw new RuntimeException("Verify.run failed with return code: " + retCode);
1273 }
1274
1275 if (!verify.verify(expectedNumNodes)) {
1276 throw new RuntimeException("Verify.verify failed");
1277 }
1278
1279 LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
1280 }
1281
1282 @Override
1283 public int run(String[] args) throws Exception {
1284 if (args.length < 5) {
1285 System.err.println("Usage: Loop <num iterations> <num mappers> <num nodes per mapper> <output dir> <num reducers> [<width> <wrap multiplier>]");
1286 return 1;
1287 }
1288 LOG.info("Running Loop with args:" + Arrays.deepToString(args));
1289
1290 int numIterations = Integer.parseInt(args[0]);
1291 int numMappers = Integer.parseInt(args[1]);
1292 long numNodes = Long.parseLong(args[2]);
1293 String outputDir = args[3];
1294 int numReducers = Integer.parseInt(args[4]);
1295 Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
1296 Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
1297
1298 long expectedNumNodes = 0;
1299
1300 if (numIterations < 0) {
1301 numIterations = Integer.MAX_VALUE;
1302 }
1303
1304 for (int i = 0; i < numIterations; i++) {
1305 LOG.info("Starting iteration = " + i);
1306 runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
1307 expectedNumNodes += numMappers * numNodes;
1308
1309 runVerify(outputDir, numReducers, expectedNumNodes);
1310 }
1311
1312 return 0;
1313 }
1314 }
1315
1316
1317
1318
1319 private static class Print extends Configured implements Tool {
1320 @Override
1321 public int run(String[] args) throws Exception {
1322 Options options = new Options();
1323 options.addOption("s", "start", true, "start key");
1324 options.addOption("e", "end", true, "end key");
1325 options.addOption("l", "limit", true, "number to print");
1326
1327 GnuParser parser = new GnuParser();
1328 CommandLine cmd = null;
1329 try {
1330 cmd = parser.parse(options, args);
1331 if (cmd.getArgs().length != 0) {
1332 throw new ParseException("Command takes no arguments");
1333 }
1334 } catch (ParseException e) {
1335 System.err.println("Failed to parse command line " + e.getMessage());
1336 System.err.println();
1337 HelpFormatter formatter = new HelpFormatter();
1338 formatter.printHelp(getClass().getSimpleName(), options);
1339 System.exit(-1);
1340 }
1341
1342 Table table = new HTable(getConf(), getTableName(getConf()));
1343
1344 Scan scan = new Scan();
1345 scan.setBatch(10000);
1346
1347 if (cmd.hasOption("s"))
1348 scan.setStartRow(Bytes.toBytesBinary(cmd.getOptionValue("s")));
1349
1350 if (cmd.hasOption("e"))
1351 scan.setStopRow(Bytes.toBytesBinary(cmd.getOptionValue("e")));
1352
1353 int limit = 0;
1354 if (cmd.hasOption("l"))
1355 limit = Integer.parseInt(cmd.getOptionValue("l"));
1356 else
1357 limit = 100;
1358
1359 ResultScanner scanner = table.getScanner(scan);
1360
1361 CINode node = new CINode();
1362 Result result = scanner.next();
1363 int count = 0;
1364 while (result != null && count++ < limit) {
1365 node = getCINode(result, node);
1366 System.out.printf("%s:%s:%012d:%s\n", Bytes.toStringBinary(node.key),
1367 Bytes.toStringBinary(node.prev), node.count, node.client);
1368 result = scanner.next();
1369 }
1370 scanner.close();
1371 table.close();
1372
1373 return 0;
1374 }
1375 }
1376
1377
1378
1379
1380 private static class Delete extends Configured implements Tool {
1381 @Override
1382 public int run(String[] args) throws Exception {
1383 if (args.length != 1) {
1384 System.out.println("Usage : " + Delete.class.getSimpleName() + " <node to delete>");
1385 return 0;
1386 }
1387 byte[] val = Bytes.toBytesBinary(args[0]);
1388
1389 org.apache.hadoop.hbase.client.Delete delete
1390 = new org.apache.hadoop.hbase.client.Delete(val);
1391
1392 Table table = new HTable(getConf(), getTableName(getConf()));
1393 table.delete(delete);
1394 table.close();
1395
1396 System.out.println("Delete successful");
1397 return 0;
1398 }
1399 }
1400
1401
1402
1403
1404
1405 private static class Walker extends Configured implements Tool {
1406 @Override
1407 public int run(String[] args) throws IOException {
1408 Options options = new Options();
1409 options.addOption("n", "num", true, "number of queries");
1410 options.addOption("s", "start", true, "key to start at, binary string");
1411 options.addOption("l", "logevery", true, "log every N queries");
1412
1413 GnuParser parser = new GnuParser();
1414 CommandLine cmd = null;
1415 try {
1416 cmd = parser.parse(options, args);
1417 if (cmd.getArgs().length != 0) {
1418 throw new ParseException("Command takes no arguments");
1419 }
1420 } catch (ParseException e) {
1421 System.err.println("Failed to parse command line " + e.getMessage());
1422 System.err.println();
1423 HelpFormatter formatter = new HelpFormatter();
1424 formatter.printHelp(getClass().getSimpleName(), options);
1425 System.exit(-1);
1426 }
1427
1428 long maxQueries = Long.MAX_VALUE;
1429 if (cmd.hasOption('n')) {
1430 maxQueries = Long.parseLong(cmd.getOptionValue("n"));
1431 }
1432 Random rand = new SecureRandom();
1433 boolean isSpecificStart = cmd.hasOption('s');
1434 byte[] startKey = isSpecificStart ? Bytes.toBytesBinary(cmd.getOptionValue('s')) : null;
1435 int logEvery = cmd.hasOption('l') ? Integer.parseInt(cmd.getOptionValue('l')) : 1;
1436
1437 Table table = new HTable(getConf(), getTableName(getConf()));
1438 long numQueries = 0;
1439
1440
1441
1442 while (numQueries < maxQueries && (numQueries == 0 || !isSpecificStart)) {
1443 if (!isSpecificStart) {
1444 startKey = new byte[ROWKEY_LENGTH];
1445 rand.nextBytes(startKey);
1446 }
1447 CINode node = findStartNode(table, startKey);
1448 if (node == null && isSpecificStart) {
1449 System.err.printf("Start node not found: %s \n", Bytes.toStringBinary(startKey));
1450 }
1451 numQueries++;
1452 while (node != null && node.prev.length != NO_KEY.length && numQueries < maxQueries) {
1453 byte[] prev = node.prev;
1454 long t1 = System.currentTimeMillis();
1455 node = getNode(prev, table, node);
1456 long t2 = System.currentTimeMillis();
1457 if (numQueries % logEvery == 0) {
1458 System.out.printf("CQ %d: %d %s \n", numQueries, t2 - t1, Bytes.toStringBinary(prev));
1459 }
1460 numQueries++;
1461 if (node == null) {
1462 System.err.printf("UNDEFINED NODE %s \n", Bytes.toStringBinary(prev));
1463 } else if (node.prev.length == NO_KEY.length) {
1464 System.err.printf("TERMINATING NODE %s \n", Bytes.toStringBinary(node.key));
1465 }
1466 }
1467 }
1468
1469 table.close();
1470 return 0;
1471 }
1472
1473 private static CINode findStartNode(Table table, byte[] startKey) throws IOException {
1474 Scan scan = new Scan();
1475 scan.setStartRow(startKey);
1476 scan.setBatch(1);
1477 scan.addColumn(FAMILY_NAME, COLUMN_PREV);
1478
1479 long t1 = System.currentTimeMillis();
1480 ResultScanner scanner = table.getScanner(scan);
1481 Result result = scanner.next();
1482 long t2 = System.currentTimeMillis();
1483 scanner.close();
1484
1485 if ( result != null) {
1486 CINode node = getCINode(result, new CINode());
1487 System.out.printf("FSR %d %s\n", t2 - t1, Bytes.toStringBinary(node.key));
1488 return node;
1489 }
1490
1491 System.out.println("FSR " + (t2 - t1));
1492
1493 return null;
1494 }
1495
1496 private CINode getNode(byte[] row, Table table, CINode node) throws IOException {
1497 Get get = new Get(row);
1498 get.addColumn(FAMILY_NAME, COLUMN_PREV);
1499 Result result = table.get(get);
1500 return getCINode(result, node);
1501 }
1502 }
1503
1504 private static class Clean extends Configured implements Tool {
1505 @Override public int run(String[] args) throws Exception {
1506 if (args.length < 1) {
1507 System.err.println("Usage: Clean <output dir>");
1508 return -1;
1509 }
1510
1511 Path p = new Path(args[0]);
1512 Configuration conf = getConf();
1513 TableName tableName = getTableName(conf);
1514 try (FileSystem fs = HFileSystem.get(conf);
1515 Connection conn = ConnectionFactory.createConnection(conf);
1516 Admin admin = conn.getAdmin()) {
1517 if (admin.tableExists(tableName)) {
1518 admin.disableTable(tableName);
1519 admin.deleteTable(tableName);
1520 }
1521
1522 if (fs.exists(p)) {
1523 fs.delete(p, true);
1524 }
1525 }
1526
1527 return 0;
1528 }
1529 }
1530
1531 static TableName getTableName(Configuration conf) {
1532 return TableName.valueOf(conf.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1533 }
1534
1535 private static CINode getCINode(Result result, CINode node) {
1536 node.key = Bytes.copy(result.getRow());
1537 if (result.containsColumn(FAMILY_NAME, COLUMN_PREV)) {
1538 node.prev = Bytes.copy(result.getValue(FAMILY_NAME, COLUMN_PREV));
1539 } else {
1540 node.prev = NO_KEY;
1541 }
1542 if (result.containsColumn(FAMILY_NAME, COLUMN_COUNT)) {
1543 node.count = Bytes.toLong(result.getValue(FAMILY_NAME, COLUMN_COUNT));
1544 } else {
1545 node.count = -1;
1546 }
1547 if (result.containsColumn(FAMILY_NAME, COLUMN_CLIENT)) {
1548 node.client = Bytes.toString(result.getValue(FAMILY_NAME, COLUMN_CLIENT));
1549 } else {
1550 node.client = "";
1551 }
1552 return node;
1553 }
1554
1555 protected IntegrationTestingUtility util;
1556
1557 @Override
1558 public void setUpCluster() throws Exception {
1559 util = getTestingUtil(getConf());
1560 boolean isDistributed = util.isDistributedCluster();
1561 util.initializeCluster(isDistributed ? 1 : this.NUM_SLAVES_BASE);
1562 if (!isDistributed) {
1563 util.startMiniMapReduceCluster();
1564 }
1565 this.setConf(util.getConfiguration());
1566 }
1567
1568 @Override
1569 public void cleanUpCluster() throws Exception {
1570 super.cleanUpCluster();
1571 if (util.isDistributedCluster()) {
1572 util.shutdownMiniMapReduceCluster();
1573 }
1574 }
1575
1576 private static boolean isMultiUnevenColumnFamilies() {
1577 return Boolean.TRUE.toString().equalsIgnoreCase(
1578 System.getProperty(Generator.MULTIPLE_UNEVEN_COLUMNFAMILIES_KEY));
1579 }
1580
1581 @Test
1582 public void testContinuousIngest() throws IOException, Exception {
1583
1584 Configuration conf = getTestingUtil(getConf()).getConfiguration();
1585 if (isMultiUnevenColumnFamilies()) {
1586
1587 conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
1588 }
1589 int ret =
1590 ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",
1591 util.getDataTestDirOnTestFS("IntegrationTestBigLinkedList").toString(), "1" });
1592 org.junit.Assert.assertEquals(0, ret);
1593 }
1594
1595 private void usage() {
1596 System.err.println("Usage: " + this.getClass().getSimpleName() + " COMMAND [COMMAND options]");
1597 printCommands();
1598 }
1599
1600 private void printCommands() {
1601 System.err.println("Commands:");
1602 System.err.println(" generator Map only job that generates data.");
1603 System.err.println(" verify A map reduce job that looks for holes. Check return code and");
1604 System.err.println(" look at the counts after running. See REFERENCED and");
1605 System.err.println(" UNREFERENCED are ok. Any UNDEFINED counts are bad. Do not run");
1606 System.err.println(" with the Generator.");
1607 System.err.println(" walker " +
1608 "Standalone program that starts following a linked list & emits timing info.");
1609 System.err.println(" print Standalone program that prints nodes in the linked list.");
1610 System.err.println(" delete Standalone program that deletes a·single node.");
1611 System.err.println(" loop Program to Loop through Generator and Verify steps");
1612 System.err.println(" clean Program to clean all left over detritus.");
1613 System.err.println(" search Search for missing keys.");
1614 System.err.println("");
1615 System.err.println("General options:");
1616 System.err.println(" -D"+ TABLE_NAME_KEY+ "=<tableName>");
1617 System.err.println(" Run using the <tableName> as the tablename. Defaults to "
1618 + DEFAULT_TABLE_NAME);
1619 System.err.println(" -D"+ HBaseTestingUtility.REGIONS_PER_SERVER_KEY+ "=<# regions>");
1620 System.err.println(" Create table with presplit regions per server. Defaults to "
1621 + HBaseTestingUtility.DEFAULT_REGIONS_PER_SERVER);
1622
1623 System.err.println(" -DuseMob=<true|false>");
1624 System.err.println(" Create table so that the mob read/write path is forced. " +
1625 "Defaults to false");
1626
1627 System.err.flush();
1628 }
1629
1630 @Override
1631 protected void processOptions(CommandLine cmd) {
1632 super.processOptions(cmd);
1633 String[] args = cmd.getArgs();
1634
1635 if (args.length < 1) {
1636 printUsage(this.getClass().getSimpleName() +
1637 " <general options> COMMAND [<COMMAND options>]", "General options:", "");
1638 printCommands();
1639
1640 throw new RuntimeException("Incorrect Number of args.");
1641 }
1642 toRun = args[0];
1643 otherArgs = Arrays.copyOfRange(args, 1, args.length);
1644 }
1645
1646 @Override
1647 public int runTestFromCommandLine() throws Exception {
1648 Tool tool = null;
1649 if (toRun.equalsIgnoreCase("Generator")) {
1650 tool = new Generator();
1651 } else if (toRun.equalsIgnoreCase("Verify")) {
1652 tool = new Verify();
1653 } else if (toRun.equalsIgnoreCase("Loop")) {
1654 Loop loop = new Loop();
1655 loop.it = this;
1656 tool = loop;
1657 } else if (toRun.equalsIgnoreCase("Walker")) {
1658 tool = new Walker();
1659 } else if (toRun.equalsIgnoreCase("Print")) {
1660 tool = new Print();
1661 } else if (toRun.equalsIgnoreCase("Delete")) {
1662 tool = new Delete();
1663 } else if (toRun.equalsIgnoreCase("Clean")) {
1664 tool = new Clean();
1665 } else if (toRun.equalsIgnoreCase("Search")) {
1666 tool = new Search();
1667 } else {
1668 usage();
1669 throw new RuntimeException("Unknown arg");
1670 }
1671
1672 return ToolRunner.run(getConf(), tool, otherArgs);
1673 }
1674
1675 @Override
1676 public TableName getTablename() {
1677 Configuration c = getConf();
1678 return TableName.valueOf(c.get(TABLE_NAME_KEY, DEFAULT_TABLE_NAME));
1679 }
1680
1681 @Override
1682 protected Set<String> getColumnFamilies() {
1683 if (isMultiUnevenColumnFamilies()) {
1684 return Sets.newHashSet(Bytes.toString(FAMILY_NAME), Bytes.toString(BIG_FAMILY_NAME),
1685 Bytes.toString(TINY_FAMILY_NAME));
1686 } else {
1687 return Sets.newHashSet(Bytes.toString(FAMILY_NAME));
1688 }
1689 }
1690
1691 private static void setJobConf(Job job, int numMappers, long numNodes,
1692 Integer width, Integer wrapMultiplier) {
1693 job.getConfiguration().setInt(GENERATOR_NUM_MAPPERS_KEY, numMappers);
1694 job.getConfiguration().setLong(GENERATOR_NUM_ROWS_PER_MAP_KEY, numNodes);
1695 if (width != null) {
1696 job.getConfiguration().setInt(GENERATOR_WIDTH_KEY, width);
1697 }
1698 if (wrapMultiplier != null) {
1699 job.getConfiguration().setInt(GENERATOR_WRAP_KEY, wrapMultiplier);
1700 }
1701 }
1702
1703 public static void setJobScannerConf(Job job) {
1704
1705 job.getConfiguration().setBoolean(ScannerCallable.LOG_SCANNER_ACTIVITY, true);
1706 job.getConfiguration().setInt(TableRecordReaderImpl.LOG_PER_ROW_COUNT, 100000);
1707 }
1708
1709 public static void main(String[] args) throws Exception {
1710 Configuration conf = HBaseConfiguration.create();
1711 IntegrationTestingUtility.setUseDistributedCluster(conf);
1712 int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedList(), args);
1713 System.exit(ret);
1714 }
1715 }