View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.test;
19  
20  import java.io.IOException;
21  import java.security.PrivilegedExceptionAction;
22  import java.util.Arrays;
23  import java.util.Iterator;
24  import java.util.UUID;
25  
26  import org.apache.commons.cli.CommandLine;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.conf.Configured;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HRegionLocation;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.IntegrationTestingUtility;
38  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory;
41  import org.apache.hadoop.hbase.client.Admin;
42  import org.apache.hadoop.hbase.client.BufferedMutator;
43  import org.apache.hadoop.hbase.client.BufferedMutatorParams;
44  import org.apache.hadoop.hbase.client.ConnectionFactory;
45  import org.apache.hadoop.hbase.client.Delete;
46  import org.apache.hadoop.hbase.client.HBaseAdmin;
47  import org.apache.hadoop.hbase.client.HConnection;
48  import org.apache.hadoop.hbase.client.HConnectionManager;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
53  import org.apache.hadoop.hbase.io.hfile.HFile;
54  import org.apache.hadoop.hbase.mapreduce.Import;
55  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.security.access.AccessControlClient;
58  import org.apache.hadoop.hbase.security.access.Permission;
59  import org.apache.hadoop.hbase.security.visibility.Authorizations;
60  import org.apache.hadoop.hbase.security.visibility.CellVisibility;
61  import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
62  import org.apache.hadoop.hbase.security.visibility.VisibilityController;
63  import org.apache.hadoop.hbase.util.AbstractHBaseTool;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.io.BytesWritable;
66  import org.apache.hadoop.mapreduce.Counter;
67  import org.apache.hadoop.mapreduce.CounterGroup;
68  import org.apache.hadoop.mapreduce.Counters;
69  import org.apache.hadoop.mapreduce.Job;
70  import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
71  import org.apache.hadoop.util.Tool;
72  import org.apache.hadoop.util.ToolRunner;
73  import org.junit.Test;
74  import org.junit.experimental.categories.Category;
75  
76  /**
77   * IT test used to verify the deletes with visibility labels.
78   * The test creates three tables tablename_0, tablename_1 and tablename_2 and each table 
79   * is associated with a unique pair of labels.
80   * Another common table with the name 'commontable' is created and it has the data combined 
81   * from all these 3 tables such that there are 3 versions of every row but the visibility label 
82   * in every row corresponds to the table from which the row originated.  
83   * Then deletes are issued to the common table by selecting the visibility label 
84   * associated with each of the smaller tables. 
85   * After the delete is issued with one set of visibility labels we try to scan the common table 
86   * with each of the visibility pairs defined for the 3 tables.  
87   * So after the first delete is issued, a scan with the first set of visibility labels would 
88   * return zero result whereas the scan issued with the other two sets of visibility labels 
89   * should return all the rows corresponding to that set of visibility labels.  The above 
90   * process of delete and scan is repeated until after the last set of visibility labels are 
91   * used for the deletes the common table should not return any row.
92   * 
93   * To use this 
94   * ./hbase org.apache.hadoop.hbase.test.IntegrationTestBigLinkedListWithVisibility Loop 1 1 20000 /tmp 1 10000
95   * or 
96   * ./hbase org.apache.hadoop.hbase.IntegrationTestsDriver -r .*IntegrationTestBigLinkedListWithVisibility.*
97   */
98  @Category(IntegrationTests.class)
99  public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestBigLinkedList {
100 
101   private static final String CONFIDENTIAL = "confidential";
102   private static final String TOPSECRET = "topsecret";
103   private static final String SECRET = "secret";
104   private static final String PUBLIC = "public";
105   private static final String PRIVATE = "private";
106   private static final String EVERYONE = "everyone";
107   private static final String RESTRICTED = "restricted";
108   private static final String GROUP = "group";
109   private static final String PREVILIGED = "previliged";
110   private static final String OPEN = "open";
111   public static String labels = CONFIDENTIAL + "," + TOPSECRET + "," + SECRET + "," + RESTRICTED
112       + "," + PRIVATE + "," + PREVILIGED + "," + GROUP + "," + OPEN + "," + PUBLIC + "," + EVERYONE;
113   private static final String COMMA = ",";
114   private static final String UNDER_SCORE = "_";
115   public static int DEFAULT_TABLES_COUNT = 3;
116   public static String tableName = "tableName";
117   public static final String COMMON_TABLE_NAME = "commontable";
118   public static final String LABELS_KEY = "LABELS";
119   public static final String INDEX_KEY = "INDEX";
120   private static User USER;
121   private static final String OR = "|";
122   private static String USER_OPT = "user";
123   private static String userName = "user1";
124 
125   static class VisibilityGenerator extends Generator {
126     private static final Log LOG = LogFactory.getLog(VisibilityGenerator.class);
127 
128     @Override
129     protected void createSchema() throws IOException {
130       LOG.info("Creating tables");
131       // Create three tables
132       boolean acl = AccessControlClient.isAccessControllerRunning(ConnectionFactory
133           .createConnection(getConf()));
134       if(!acl) {
135         LOG.info("No ACL available.");
136       }
137       Admin admin = new HBaseAdmin(getConf());
138       for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
139         TableName tableName = IntegrationTestBigLinkedListWithVisibility.getTableName(i);
140         createTable(admin, tableName, false, acl);
141       }
142       TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
143       createTable(admin, tableName, true, acl);
144       admin.close();
145     }
146 
147     private void createTable(Admin admin, TableName tableName, boolean setVersion,
148         boolean acl) throws IOException {
149       if (!admin.tableExists(tableName)) {
150         HTableDescriptor htd = new HTableDescriptor(tableName);
151         HColumnDescriptor family = new HColumnDescriptor(FAMILY_NAME);
152         if (setVersion) {
153           family.setMaxVersions(DEFAULT_TABLES_COUNT);
154         }
155         htd.addFamily(family);
156         admin.createTable(htd);
157         if (acl) {
158           LOG.info("Granting permissions for user " + USER.getShortName());
159           Permission.Action[] actions = { Permission.Action.READ };
160           try {
161             AccessControlClient.grant(ConnectionFactory.createConnection(getConf()), tableName,
162                 USER.getShortName(), null, null, actions);
163           } catch (Throwable e) {
164             LOG.fatal("Error in granting permission for the user " + USER.getShortName(), e);
165             throw new IOException(e);
166           }
167         }
168       }
169     }
170 
171     @Override
172     protected void setMapperForGenerator(Job job) {
173       job.setMapperClass(VisibilityGeneratorMapper.class);
174     }
175 
176     static class VisibilityGeneratorMapper extends GeneratorMapper {
177       BufferedMutator[] tables = new BufferedMutator[DEFAULT_TABLES_COUNT];
178 
179       @Override
180       protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException,
181           InterruptedException {
182         super.setup(context);
183       }
184 
185       @Override
186       protected void instantiateHTable() throws IOException {
187         for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
188           BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
189           params.writeBufferSize(4 * 1024 * 1024);
190           BufferedMutator table = connection.getBufferedMutator(params);
191           this.tables[i] = table;
192         }
193       }
194 
195       @Override
196       protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
197           throws IOException, InterruptedException {
198         for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
199           if (tables[i] != null) {
200             tables[i].close();
201           }
202         }
203       }
204 
205       @Override
206       protected void persist(org.apache.hadoop.mapreduce.Mapper.Context output, long count,
207           byte[][] prev, byte[][] current, byte[] id) throws IOException {
208         String visibilityExps = "";
209         String[] split = labels.split(COMMA);
210         for (int i = 0; i < current.length; i++) {
211           for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
212             Put put = new Put(current[i]);
213             put.add(FAMILY_NAME, COLUMN_PREV, prev == null ? NO_KEY : prev[i]);
214             
215             if (count >= 0) {
216               put.add(FAMILY_NAME, COLUMN_COUNT, Bytes.toBytes(count + i));
217             }
218             if (id != null) {
219               put.add(FAMILY_NAME, COLUMN_CLIENT, id);
220             }
221             visibilityExps = split[j * 2] + OR + split[(j * 2) + 1];
222             put.setCellVisibility(new CellVisibility(visibilityExps));
223             tables[j].mutate(put);
224             try {
225               Thread.sleep(1);
226             } catch (InterruptedException e) {
227               throw new IOException();
228             }
229           }
230           if (i % 1000 == 0) {
231             // Tickle progress every so often else maprunner will think us hung
232             output.progress();
233           }
234         }
235       }
236     }
237   }
238 
239   static class Copier extends Configured implements Tool {
240     private static final Log LOG = LogFactory.getLog(Copier.class);
241     private TableName tableName;
242     private int labelIndex;
243     private boolean delete;
244 
245     public Copier(TableName tableName, int index, boolean delete) {
246       this.tableName = tableName;
247       this.labelIndex = index;
248       this.delete = delete;
249     }
250 
251     public int runCopier(String outputDir) throws Exception {
252       Job job = null;
253       Scan scan = null;
254       job = new Job(getConf());
255       job.setJobName("Data copier");
256       job.getConfiguration().setInt("INDEX", labelIndex);
257       job.getConfiguration().set("LABELS", labels);
258       job.setJarByClass(getClass());
259       scan = new Scan();
260       scan.setCacheBlocks(false);
261       scan.setRaw(true);
262 
263       String[] split = labels.split(COMMA);
264       scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
265           split[(this.labelIndex * 2) + 1]));
266       if (delete) {
267         LOG.info("Running deletes");
268       } else {
269         LOG.info("Running copiers");
270       }
271       if (delete) {
272         TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
273             VisibilityDeleteImport.class, null, null, job);
274       } else {
275         TableMapReduceUtil.initTableMapperJob(tableName.getNameAsString(), scan,
276             VisibilityImport.class, null, null, job);
277       }
278       job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
279       job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
280       TableMapReduceUtil.initTableReducerJob(COMMON_TABLE_NAME, null, job, null, null, null, null);
281       TableMapReduceUtil.addDependencyJars(job);
282       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
283       TableMapReduceUtil.initCredentials(job);
284       job.setNumReduceTasks(0);
285       boolean success = job.waitForCompletion(true);
286       return success ? 0 : 1;
287     }
288 
289     @Override
290     public int run(String[] arg0) throws Exception {
291       // TODO Auto-generated method stub
292       return 0;
293     }
294   }
295 
296   static class VisibilityImport extends Import.Importer {
297     private int index;
298     private String labels;
299     private String[] split;
300 
301     @Override
302     public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
303       index = context.getConfiguration().getInt(INDEX_KEY, -1);
304       labels = context.getConfiguration().get(LABELS_KEY);
305       split = labels.split(COMMA);
306       super.setup(context);
307     }
308 
309     @Override
310     protected void addPutToKv(Put put, Cell kv) throws IOException {
311       String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
312       put.setCellVisibility(new CellVisibility(visibilityExps));
313       super.addPutToKv(put, kv);
314     }
315   }
316 
317   static class VisibilityDeleteImport extends Import.Importer {
318     private int index;
319     private String labels;
320     private String[] split;
321 
322     @Override
323     public void setup(org.apache.hadoop.mapreduce.Mapper.Context context) {
324       index = context.getConfiguration().getInt(INDEX_KEY, -1);
325       labels = context.getConfiguration().get(LABELS_KEY);
326       split = labels.split(COMMA);
327       super.setup(context);
328     }
329 
330     // Creating delete here
331     @Override
332     protected void processKV(ImmutableBytesWritable key, Result result,
333         org.apache.hadoop.mapreduce.Mapper.Context context, Put put,
334         org.apache.hadoop.hbase.client.Delete delete) throws 
335         IOException, InterruptedException {
336       String visibilityExps = split[index * 2] + OR + split[(index * 2) + 1];
337       for (Cell kv : result.rawCells()) {
338         // skip if we filter it out
339         if (kv == null)
340           continue;
341         // Create deletes here
342         if (delete == null) {
343           delete = new Delete(key.get());
344         }
345         delete.setCellVisibility(new CellVisibility(visibilityExps));
346         delete.deleteFamily(kv.getFamily());
347       }
348       if (delete != null) {
349         context.write(key, delete);
350       }
351     }
352   }
353 
354   @Override
355   protected void addOptions() {
356     super.addOptions();
357     addOptWithArg("u", USER_OPT, "User name");
358   }
359   
360   @Override
361   protected void processOptions(CommandLine cmd) {
362     super.processOptions(cmd);
363     if (cmd.hasOption(USER_OPT)) {
364       userName = cmd.getOptionValue(USER_OPT);
365     }
366     
367   }
368   @Override
369   public void setUpCluster() throws Exception {
370     util = getTestingUtil(null);
371     Configuration conf = util.getConfiguration();
372     conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
373     conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
374     conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());
375     conf.set("hbase.superuser", User.getCurrent().getName());
376     conf.setBoolean("dfs.permissions", false);
377     USER = User.createUserForTesting(conf, userName, new String[] {});
378     super.setUpCluster();
379     addLabels();
380   }
381 
382   static TableName getTableName(int i) {
383     return TableName.valueOf(tableName + UNDER_SCORE + i);
384   }
385 
386   private void addLabels() throws Exception {
387     try {
388       VisibilityClient.addLabels(util.getConfiguration(), labels.split(COMMA));
389       VisibilityClient.setAuths(util.getConfiguration(), labels.split(COMMA), USER.getName());
390     } catch (Throwable t) {
391       throw new IOException(t);
392     }
393   }
394 
395   static class VisibilityVerify extends Verify {
396     private static final Log LOG = LogFactory.getLog(VisibilityVerify.class);
397     private TableName tableName;
398     private int labelIndex;
399 
400     public VisibilityVerify(String tableName, int index) {
401       this.tableName = TableName.valueOf(tableName);
402       this.labelIndex = index;
403     }
404 
405     @Override
406     public int run(final Path outputDir, final int numReducers) throws Exception {
407       LOG.info("Running Verify with outputDir=" + outputDir + ", numReducers=" + numReducers);
408       PrivilegedExceptionAction<Integer> scanAction = new PrivilegedExceptionAction<Integer>() {
409         @Override
410         public Integer run() throws Exception {
411           return doVerify(outputDir, numReducers);
412         }
413       };
414       return USER.runAs(scanAction);
415     }
416 
417     private int doVerify(Path outputDir, int numReducers) throws IOException, InterruptedException,
418         ClassNotFoundException {
419       job = new Job(getConf());
420 
421       job.setJobName("Link Verifier");
422       job.setNumReduceTasks(numReducers);
423       job.setJarByClass(getClass());
424 
425       setJobScannerConf(job);
426 
427       Scan scan = new Scan();
428       scan.addColumn(FAMILY_NAME, COLUMN_PREV);
429       scan.setCaching(10000);
430       scan.setCacheBlocks(false);
431       String[] split = labels.split(COMMA);
432 
433       scan.setAuthorizations(new Authorizations(split[this.labelIndex * 2],
434           split[(this.labelIndex * 2) + 1]));
435 
436       TableMapReduceUtil.initTableMapperJob(tableName.getName(), scan, VerifyMapper.class,
437           BytesWritable.class, BytesWritable.class, job);
438       TableMapReduceUtil.addDependencyJars(job.getConfiguration(), AbstractHBaseTool.class);
439 
440       job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
441 
442       job.setReducerClass(VerifyReducer.class);
443       job.setOutputFormatClass(TextOutputFormat.class);
444       TextOutputFormat.setOutputPath(job, outputDir);
445       boolean success = job.waitForCompletion(true);
446 
447       return success ? 0 : 1;
448     }
449 
450     @Override
451     protected void handleFailure(Counters counters) throws IOException {
452       Configuration conf = job.getConfiguration();
453       HConnection conn = HConnectionManager.getConnection(conf);
454       TableName tableName = TableName.valueOf(COMMON_TABLE_NAME);
455       CounterGroup g = counters.getGroup("undef");
456       Iterator<Counter> it = g.iterator();
457       while (it.hasNext()) {
458         String keyString = it.next().getName();
459         byte[] key = Bytes.toBytes(keyString);
460         HRegionLocation loc = conn.relocateRegion(tableName, key);
461         LOG.error("undefined row " + keyString + ", " + loc);
462       }
463       g = counters.getGroup("unref");
464       it = g.iterator();
465       while (it.hasNext()) {
466         String keyString = it.next().getName();
467         byte[] key = Bytes.toBytes(keyString);
468         HRegionLocation loc = conn.relocateRegion(tableName, key);
469         LOG.error("unreferred row " + keyString + ", " + loc);
470       }
471     }
472   }
473 
474   static class VisibilityLoop extends Loop {
475     private static final int SLEEP_IN_MS = 5000;
476     private static final Log LOG = LogFactory.getLog(VisibilityLoop.class);
477     IntegrationTestBigLinkedListWithVisibility it;
478 
479     @Override
480     protected void runGenerator(int numMappers, long numNodes, String outputDir, Integer width,
481         Integer wrapMuplitplier) throws Exception {
482       Path outputPath = new Path(outputDir);
483       UUID uuid = UUID.randomUUID(); // create a random UUID.
484       Path generatorOutput = new Path(outputPath, uuid.toString());
485 
486       Generator generator = new VisibilityGenerator();
487       generator.setConf(getConf());
488       int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMuplitplier);
489       if (retCode > 0) {
490         throw new RuntimeException("Generator failed with return code: " + retCode);
491       }
492     }
493 
494     protected void runDelete(int numMappers, long numNodes, String outputDir, Integer width,
495         Integer wrapMuplitplier, int tableIndex) throws Exception {
496       LOG.info("Running copier on table "+IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex));
497       Copier copier = new Copier(
498           IntegrationTestBigLinkedListWithVisibility.getTableName(tableIndex), tableIndex, true);
499       copier.setConf(getConf());
500       copier.runCopier(outputDir);
501       Thread.sleep(SLEEP_IN_MS);
502     }
503 
504     protected void runVerify(String outputDir, int numReducers, long expectedNumNodes,
505         boolean allTables) throws Exception {
506       Path outputPath = new Path(outputDir);
507 
508       if (allTables) {
509         for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
510           LOG.info("Verifying table " + i);
511           sleep(SLEEP_IN_MS);
512           UUID uuid = UUID.randomUUID(); // create a random UUID.
513           Path iterationOutput = new Path(outputPath, uuid.toString());
514           Verify verify = new VisibilityVerify(getTableName(i).getNameAsString(), i);
515           verify(numReducers, expectedNumNodes, iterationOutput, verify);
516         }
517       }
518       for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
519         runVerifyCommonTable(outputDir, numReducers, expectedNumNodes, i);
520       }
521     }
522 
523     private void runVerify(String outputDir, int numReducers, long expectedNodes, int tableIndex)
524         throws Exception {
525       long temp = expectedNodes;
526       for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
527         if (i <= tableIndex) {
528           expectedNodes = 0;
529         } else {
530           expectedNodes = temp;
531         }
532         LOG.info("Verifying data in the table with index "+i+ " and expected nodes is "+expectedNodes);
533         runVerifyCommonTable(outputDir, numReducers, expectedNodes, i);
534       }
535     }
536 
537     private void sleep(long ms) throws InterruptedException {
538       Thread.sleep(ms);
539     }
540 
541     protected void runVerifyCommonTable(String outputDir, int numReducers, long expectedNumNodes,
542         int index) throws Exception {
543       LOG.info("Verifying common table with index " + index);
544       sleep(SLEEP_IN_MS);
545       Path outputPath = new Path(outputDir);
546       UUID uuid = UUID.randomUUID(); // create a random UUID.
547       Path iterationOutput = new Path(outputPath, uuid.toString());
548       Verify verify = new VisibilityVerify(TableName.valueOf(COMMON_TABLE_NAME).getNameAsString(),
549           index);
550       verify(numReducers, expectedNumNodes, iterationOutput, verify);
551     }
552 
553     protected void runCopier(String outputDir) throws Exception {
554       for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
555         LOG.info("Running copier " + IntegrationTestBigLinkedListWithVisibility.getTableName(i));
556         sleep(SLEEP_IN_MS);
557         Copier copier = new Copier(IntegrationTestBigLinkedListWithVisibility.getTableName(i), i,
558             false);
559         copier.setConf(getConf());
560         copier.runCopier(outputDir);
561       }
562     }
563 
564     private void verify(int numReducers, long expectedNumNodes, 
565         Path iterationOutput, Verify verify) throws Exception {
566       verify.setConf(getConf());
567       int retCode = verify.run(iterationOutput, numReducers);
568       if (retCode > 0) {
569         throw new RuntimeException("Verify.run failed with return code: " + retCode);
570       }
571 
572       if (!verify.verify(expectedNumNodes)) {
573         throw new RuntimeException("Verify.verify failed");
574       }
575 
576       LOG.info("Verify finished with succees. Total nodes=" + expectedNumNodes);
577     }
578 
579     @Override
580     public int run(String[] args) throws Exception {
581       if (args.length < 5) {
582         System.err
583             .println("Usage: Loop <num iterations> " +
584                 "<num mappers> <num nodes per mapper> <output dir> " +
585                 "<num reducers> [<width> <wrap multiplier>]");
586         return 1;
587       }
588       LOG.info("Running Loop with args:" + Arrays.deepToString(args));
589 
590       int numIterations = Integer.parseInt(args[0]);
591       int numMappers = Integer.parseInt(args[1]);
592       long numNodes = Long.parseLong(args[2]);
593       String outputDir = args[3];
594       int numReducers = Integer.parseInt(args[4]);
595       Integer width = (args.length < 6) ? null : Integer.parseInt(args[5]);
596       Integer wrapMuplitplier = (args.length < 7) ? null : Integer.parseInt(args[6]);
597 
598       long expectedNumNodes = 0;
599 
600       if (numIterations < 0) {
601         numIterations = Integer.MAX_VALUE; // run indefinitely (kind of)
602       }
603 
604       for (int i = 0; i < numIterations; i++) {
605         LOG.info("Starting iteration = " + i);
606         LOG.info("Generating data");
607         runGenerator(numMappers, numNodes, outputDir, width, wrapMuplitplier);
608         expectedNumNodes += numMappers * numNodes;
609         // Copying wont work because expressions are not returned back to the
610         // client
611         LOG.info("Running copier");
612         sleep(SLEEP_IN_MS);
613         runCopier(outputDir);
614         LOG.info("Verifying copied data");
615         sleep(SLEEP_IN_MS);
616         runVerify(outputDir, numReducers, expectedNumNodes, true);
617         sleep(SLEEP_IN_MS);
618         for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) {
619           LOG.info("Deleting data on table with index: "+j);
620           runDelete(numMappers, numNodes, outputDir, width, wrapMuplitplier, j);
621           sleep(SLEEP_IN_MS);
622           LOG.info("Verifying common table after deleting");
623           runVerify(outputDir, numReducers, expectedNumNodes, j);
624           sleep(SLEEP_IN_MS);
625         }
626       }
627       return 0;
628     }
629   }
630 
631   @Override
632   @Test
633   public void testContinuousIngest() throws IOException, Exception {
634     // Loop <num iterations> <num mappers> <num nodes per mapper> <output dir>
635     // <num reducers>
636     int ret = ToolRunner.run(
637         getTestingUtil(getConf()).getConfiguration(),
638         new VisibilityLoop(),
639         new String[] { "1", "1", "20000",
640             util.getDataTestDirOnTestFS("IntegrationTestBigLinkedListWithVisibility").toString(),
641             "1", "10000" });
642     org.junit.Assert.assertEquals(0, ret);
643   }
644 
645   public static void main(String[] args) throws Exception {
646     Configuration conf = HBaseConfiguration.create();
647     IntegrationTestingUtility.setUseDistributedCluster(conf);
648     int ret = ToolRunner.run(conf, new IntegrationTestBigLinkedListWithVisibility(), args);
649     System.exit(ret);
650   }
651 
652   @Override
653   protected MonkeyFactory getDefaultMonkeyFactory() {
654     return MonkeyFactory.getFactory(MonkeyFactory.CALM);
655   }
656 
657   @Override
658   public int runTestFromCommandLine() throws Exception {
659     Tool tool = null;
660     Loop loop = new VisibilityLoop();
661     loop.it = this;
662     tool = loop;
663     return ToolRunner.run(getConf(), tool, otherArgs);
664   }
665 }