View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import java.io.IOException;
20  import java.io.InterruptedIOException;
21  import java.lang.reflect.Constructor;
22  import java.net.InetAddress;
23  import java.security.SecureRandom;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.List;
27  import java.util.Properties;
28  import java.util.Random;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import javax.crypto.spec.SecretKeySpec;
32  
33  import org.apache.commons.cli.CommandLine;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.hbase.client.Admin;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Durability;
48  import org.apache.hadoop.hbase.client.HBaseAdmin;
49  import org.apache.hadoop.hbase.io.compress.Compression;
50  import org.apache.hadoop.hbase.io.crypto.Cipher;
51  import org.apache.hadoop.hbase.io.crypto.Encryption;
52  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53  import org.apache.hadoop.hbase.regionserver.BloomType;
54  import org.apache.hadoop.hbase.security.EncryptionUtil;
55  import org.apache.hadoop.hbase.security.User;
56  import org.apache.hadoop.hbase.security.access.AccessControlClient;
57  import org.apache.hadoop.hbase.security.access.Permission;
58  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
59  import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
60  import org.apache.hadoop.security.SecurityUtil;
61  import org.apache.hadoop.security.UserGroupInformation;
62  import org.apache.hadoop.util.ToolRunner;
63  
64  /**
65   * A command-line utility that reads, writes, and verifies data. Unlike
66   * {@link PerformanceEvaluation}, this tool validates the data written,
67   * and supports simultaneously writing and reading the same set of keys.
68   */
69  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
70  public class LoadTestTool extends AbstractHBaseTool {
71  
72    private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
73    private static final String COLON = ":";
74  
75    /** Table name for the test */
76    private TableName tableName;
77  
78    /** Column families for the test */
79    private byte[][] families;
80  
81    /** Table name to use of not overridden on the command line */
82    protected static final String DEFAULT_TABLE_NAME = "cluster_test";
83  
84    /** Column family used by the test */
85    public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
86  
87    /** Column families used by the test */
88    public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
89  
90    /** The default data size if not specified */
91    protected static final int DEFAULT_DATA_SIZE = 64;
92  
93    /** The number of reader/writer threads if not specified */
94    protected static final int DEFAULT_NUM_THREADS = 20;
95  
96    /** Usage string for the load option */
97    protected static final String OPT_USAGE_LOAD =
98        "<avg_cols_per_key>:<avg_data_size>" +
99        "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100 
101   /** Usage string for the read option */
102   protected static final String OPT_USAGE_READ =
103       "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104 
105   /** Usage string for the update option */
106   protected static final String OPT_USAGE_UPDATE =
107       "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108       + ">][:<#whether to ignore nonce collisions=0>]";
109 
110   protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111       Arrays.toString(BloomType.values());
112 
113   protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114       "one of " + Arrays.toString(Compression.Algorithm.values());
115 
116   public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
117     "Encoding algorithm (e.g. prefix "
118         + "compression) to use for data blocks in the test column family, "
119         + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
120 
121   public static final String OPT_BLOOM = "bloom";
122   public static final String OPT_COMPRESSION = "compression";
123   public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
124   public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
125 
126   public static final String OPT_DATA_BLOCK_ENCODING =
127       HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
128 
129   public static final String OPT_INMEMORY = "in_memory";
130   public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
131       "inmemory as far as possible.  Not guaranteed that reads are always served from inmemory";
132 
133   public static final String OPT_GENERATOR = "generator";
134   public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
135       + " Any args for this class can be passed as colon separated after class name";
136 
137   public static final String OPT_WRITER = "writer";
138   public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
139 
140   public static final String OPT_UPDATER = "updater";
141   public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
142 
143   public static final String OPT_READER = "reader";
144   public static final String OPT_READER_USAGE = "The class for executing the read requests";
145 
146   protected static final String OPT_KEY_WINDOW = "key_window";
147   protected static final String OPT_WRITE = "write";
148   protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
149   public static final String OPT_MULTIPUT = "multiput";
150   public static final String OPT_MULTIGET = "multiget_batchsize";
151   protected static final String OPT_NUM_KEYS = "num_keys";
152   protected static final String OPT_READ = "read";
153   protected static final String OPT_START_KEY = "start_key";
154   public static final String OPT_TABLE_NAME = "tn";
155   public static final String OPT_COLUMN_FAMILIES = "families";
156   protected static final String OPT_ZK_QUORUM = "zk";
157   protected static final String OPT_ZK_PARENT_NODE = "zk_root";
158   protected static final String OPT_SKIP_INIT = "skip_init";
159   protected static final String OPT_INIT_ONLY = "init_only";
160   protected static final String NUM_TABLES = "num_tables";
161   protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
162   protected static final String OPT_BATCHUPDATE = "batchupdate";
163   protected static final String OPT_UPDATE = "update";
164 
165   public static final String OPT_ENCRYPTION = "encryption";
166   protected static final String OPT_ENCRYPTION_USAGE =
167     "Enables transparent encryption on the test table, one of " +
168     Arrays.toString(Encryption.getSupportedCiphers());
169 
170   public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
171   protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
172     = "Desired number of regions per region server. Defaults to 5.";
173   public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
174 
175   public static final String OPT_REGION_REPLICATION = "region_replication";
176   protected static final String OPT_REGION_REPLICATION_USAGE =
177       "Desired number of replicas per region";
178 
179   public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
180   protected static final String OPT_REGION_REPLICA_ID_USAGE =
181       "Region replica id to do the reads from";
182 
183   public static final String OPT_MOB_THRESHOLD = "mob_threshold";
184   protected static final String OPT_MOB_THRESHOLD_USAGE =
185       "Desired cell size to exceed in bytes that will use the MOB write path";
186 
187   protected static final long DEFAULT_START_KEY = 0;
188 
189   /** This will be removed as we factor out the dependency on command line */
190   protected CommandLine cmd;
191 
192   protected MultiThreadedWriter writerThreads = null;
193   protected MultiThreadedReader readerThreads = null;
194   protected MultiThreadedUpdater updaterThreads = null;
195 
196   protected long startKey, endKey;
197 
198   protected boolean isWrite, isRead, isUpdate;
199   protected boolean deferredLogFlush;
200 
201   // Column family options
202   protected DataBlockEncoding dataBlockEncodingAlgo;
203   protected Compression.Algorithm compressAlgo;
204   protected BloomType bloomType;
205   private boolean inMemoryCF;
206 
207   private User userOwner;
208   // Writer options
209   protected int numWriterThreads = DEFAULT_NUM_THREADS;
210   protected int minColsPerKey, maxColsPerKey;
211   protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
212   protected boolean isMultiPut;
213 
214   // Updater options
215   protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
216   protected int updatePercent;
217   protected boolean ignoreConflicts = false;
218   protected boolean isBatchUpdate;
219 
220   // Reader options
221   private int numReaderThreads = DEFAULT_NUM_THREADS;
222   private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
223   private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
224   private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
225   private int verifyPercent;
226 
227   private int numTables = 1;
228 
229   private String superUser;
230 
231   private String userNames;
232   //This file is used to read authentication information in secure clusters.
233   private String authnFileName;
234 
235   private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
236   private int regionReplication = -1; // not set
237   private int regionReplicaId = -1; // not set
238 
239   private int mobThreshold = -1; // not set
240 
241   // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
242   //       console tool itself should only be used from console.
243   protected boolean isSkipInit = false;
244   protected boolean isInitOnly = false;
245 
246   protected Cipher cipher = null;
247 
248   protected String[] splitColonSeparated(String option,
249       int minNumCols, int maxNumCols) {
250     String optVal = cmd.getOptionValue(option);
251     String[] cols = optVal.split(COLON);
252     if (cols.length < minNumCols || cols.length > maxNumCols) {
253       throw new IllegalArgumentException("Expected at least "
254           + minNumCols + " columns but no more than " + maxNumCols +
255           " in the colon-separated value '" + optVal + "' of the " +
256           "-" + option + " option");
257     }
258     return cols;
259   }
260 
261   protected int getNumThreads(String numThreadsStr) {
262     return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
263   }
264 
265   public byte[][] getColumnFamilies() {
266     return families;
267   }
268 
269   /**
270    * Apply column family options such as Bloom filters, compression, and data
271    * block encoding.
272    */
273   protected void applyColumnFamilyOptions(TableName tableName,
274       byte[][] columnFamilies) throws IOException {
275     Admin admin = new HBaseAdmin(conf);
276     HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
277     LOG.info("Disabling table " + tableName);
278     admin.disableTable(tableName);
279     for (byte[] cf : columnFamilies) {
280       HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
281       boolean isNewCf = columnDesc == null;
282       if (isNewCf) {
283         columnDesc = new HColumnDescriptor(cf);
284       }
285       if (bloomType != null) {
286         columnDesc.setBloomFilterType(bloomType);
287       }
288       if (compressAlgo != null) {
289         columnDesc.setCompressionType(compressAlgo);
290       }
291       if (dataBlockEncodingAlgo != null) {
292         columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
293       }
294       if (inMemoryCF) {
295         columnDesc.setInMemory(inMemoryCF);
296       }
297       if (cipher != null) {
298         byte[] keyBytes = new byte[cipher.getKeyLength()];
299         new SecureRandom().nextBytes(keyBytes);
300         columnDesc.setEncryptionType(cipher.getName());
301         columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
302           User.getCurrent().getShortName(),
303           new SecretKeySpec(keyBytes, cipher.getName())));
304       }
305       if (mobThreshold >= 0) {
306         columnDesc.setMobEnabled(true);
307         columnDesc.setMobThreshold(mobThreshold);
308       }
309       if (isNewCf) {
310         admin.addColumn(tableName, columnDesc);
311       } else {
312         admin.modifyColumn(tableName, columnDesc);
313       }
314     }
315     LOG.info("Enabling table " + tableName);
316     admin.enableTable(tableName);
317     admin.close();
318   }
319 
320   @Override
321   protected void addOptions() {
322     addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
323         "without port numbers");
324     addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
325     addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
326     addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
327     addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
328     addOptWithArg(OPT_READ, OPT_USAGE_READ);
329     addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
330     addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
331     addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
332     addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
333     addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
334     addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
335         "to tolerate before terminating all reader threads. The default is " +
336         MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
337     addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
338         "separate gets for every column in a row");
339     addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
340         "reads and writes for concurrent write/read workload. The default " +
341         "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
342 
343     addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
344         "separate puts for every column in a row");
345     addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
346         "separate updates for every column in a row");
347     addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
348     addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
349     addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
350     addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
351     addOptWithArg(OPT_READER, OPT_READER_USAGE);
352 
353     addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
354     addOptWithArg(OPT_START_KEY, "The first key to read/write " +
355         "(a 0-based index). The default value is " +
356         DEFAULT_START_KEY + ".");
357     addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
358         + "already exists");
359 
360     addOptWithArg(NUM_TABLES,
361       "A positive integer number. When a number n is speicfied, load test "
362           + "tool  will load n table parallely. -tn parameter value becomes "
363           + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
364 
365     addOptWithArg(OPT_REGIONS_PER_SERVER,
366       "A positive integer number. When a number n is specified, load test "
367           + "tool will create the test table with n regions per server");
368 
369     addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
370     addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
371     addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
372     addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
373     addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
374     addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
375   }
376 
377   @Override
378   protected void processOptions(CommandLine cmd) {
379     this.cmd = cmd;
380 
381     tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
382         DEFAULT_TABLE_NAME));
383 
384     if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
385       String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
386       families = new byte[list.length][];
387       for (int i = 0; i < list.length; i++) {
388         families[i] = Bytes.toBytes(list[i]);
389       }
390     } else {
391       families = DEFAULT_COLUMN_FAMILIES;
392     }
393 
394     isWrite = cmd.hasOption(OPT_WRITE);
395     isRead = cmd.hasOption(OPT_READ);
396     isUpdate = cmd.hasOption(OPT_UPDATE);
397     isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
398     deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
399 
400     if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
401       throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
402         "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
403     }
404 
405     if (isInitOnly && (isRead || isWrite || isUpdate)) {
406       throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
407           + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
408     }
409 
410     if (!isInitOnly) {
411       if (!cmd.hasOption(OPT_NUM_KEYS)) {
412         throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
413             + "read or write mode");
414       }
415       startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
416           String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
417       long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
418           Long.MAX_VALUE - startKey);
419       endKey = startKey + numKeys;
420       isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
421       System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
422     }
423 
424     parseColumnFamilyOptions(cmd);
425 
426     if (isWrite) {
427       String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
428 
429       int colIndex = 0;
430       minColsPerKey = 1;
431       maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
432       int avgColDataSize =
433           parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
434       minColDataSize = avgColDataSize / 2;
435       maxColDataSize = avgColDataSize * 3 / 2;
436 
437       if (colIndex < writeOpts.length) {
438         numWriterThreads = getNumThreads(writeOpts[colIndex++]);
439       }
440 
441       isMultiPut = cmd.hasOption(OPT_MULTIPUT);
442 
443       mobThreshold = -1;
444       if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
445         mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
446       }
447 
448       System.out.println("Multi-puts: " + isMultiPut);
449       System.out.println("Columns per key: " + minColsPerKey + ".."
450           + maxColsPerKey);
451       System.out.println("Data size per column: " + minColDataSize + ".."
452           + maxColDataSize);
453     }
454 
455     if (isUpdate) {
456       String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
457       int colIndex = 0;
458       updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
459       if (colIndex < mutateOpts.length) {
460         numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
461       }
462       if (colIndex < mutateOpts.length) {
463         ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
464       }
465 
466       isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
467 
468       System.out.println("Batch updates: " + isBatchUpdate);
469       System.out.println("Percent of keys to update: " + updatePercent);
470       System.out.println("Updater threads: " + numUpdaterThreads);
471       System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
472     }
473 
474     if (isRead) {
475       String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
476       int colIndex = 0;
477       verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
478       if (colIndex < readOpts.length) {
479         numReaderThreads = getNumThreads(readOpts[colIndex++]);
480       }
481 
482       if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
483         maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
484             0, Integer.MAX_VALUE);
485       }
486 
487       if (cmd.hasOption(OPT_KEY_WINDOW)) {
488         keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
489             0, Integer.MAX_VALUE);
490       }
491 
492       if (cmd.hasOption(OPT_MULTIGET)) {
493         multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
494             0, Integer.MAX_VALUE);
495       }
496 
497       System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
498       System.out.println("Percent of keys to verify: " + verifyPercent);
499       System.out.println("Reader threads: " + numReaderThreads);
500     }
501 
502     numTables = 1;
503     if (cmd.hasOption(NUM_TABLES)) {
504       numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
505     }
506 
507     numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
508     if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
509       numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
510     }
511 
512     regionReplication = 1;
513     if (cmd.hasOption(OPT_REGION_REPLICATION)) {
514       regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
515     }
516 
517     regionReplicaId = -1;
518     if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
519       regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
520     }
521   }
522 
523   private void parseColumnFamilyOptions(CommandLine cmd) {
524     String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
525     dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
526         DataBlockEncoding.valueOf(dataBlockEncodingStr);
527 
528     String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
529     compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
530         Compression.Algorithm.valueOf(compressStr);
531 
532     String bloomStr = cmd.getOptionValue(OPT_BLOOM);
533     bloomType = bloomStr == null ? BloomType.ROW :
534         BloomType.valueOf(bloomStr);
535 
536     inMemoryCF = cmd.hasOption(OPT_INMEMORY);
537     if (cmd.hasOption(OPT_ENCRYPTION)) {
538       cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
539     }
540 
541   }
542 
543   public void initTestTable() throws IOException {
544     Durability durability = Durability.USE_DEFAULT;
545     if (deferredLogFlush) {
546       durability = Durability.ASYNC_WAL;
547     }
548 
549     HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
550       getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
551         regionReplication, durability);
552     applyColumnFamilyOptions(tableName, getColumnFamilies());
553   }
554 
555   @Override
556   protected int doWork() throws IOException {
557     if (numTables > 1) {
558       return parallelLoadTables();
559     } else {
560       return loadTable();
561     }
562   }
563 
564   protected int loadTable() throws IOException {
565     if (cmd.hasOption(OPT_ZK_QUORUM)) {
566       conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
567     }
568     if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
569       conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
570     }
571 
572     if (isInitOnly) {
573       LOG.info("Initializing only; no reads or writes");
574       initTestTable();
575       return 0;
576     }
577 
578     if (!isSkipInit) {
579       initTestTable();
580     }
581     LoadTestDataGenerator dataGen = null;
582     if (cmd.hasOption(OPT_GENERATOR)) {
583       String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
584       dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
585       String[] args;
586       if (dataGen instanceof LoadTestDataGeneratorWithACL) {
587         LOG.info("Using LoadTestDataGeneratorWithACL");
588         if (User.isHBaseSecurityEnabled(conf)) {
589           LOG.info("Security is enabled");
590           authnFileName = clazzAndArgs[1];
591           superUser = clazzAndArgs[2];
592           userNames = clazzAndArgs[3];
593           args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
594           Properties authConfig = new Properties();
595           authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
596           try {
597             addAuthInfoToConf(authConfig, conf, superUser, userNames);
598           } catch (IOException exp) {
599             LOG.error(exp);
600             return EXIT_FAILURE;
601           }
602           userOwner = User.create(loginAndReturnUGI(conf, superUser));
603         } else {
604           superUser = clazzAndArgs[1];
605           userNames = clazzAndArgs[2];
606           args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
607           userOwner = User.createUserForTesting(conf, superUser, new String[0]);
608         }
609       } else {
610         args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
611             clazzAndArgs.length);
612       }
613       dataGen.initialize(args);
614     } else {
615       // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
616       dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
617           minColsPerKey, maxColsPerKey, families);
618     }
619 
620     if (userOwner != null) {
621       LOG.info("Granting permissions for user " + userOwner.getShortName());
622       Permission.Action[] actions = {
623         Permission.Action.ADMIN, Permission.Action.CREATE,
624         Permission.Action.READ, Permission.Action.WRITE };
625       try {
626         AccessControlClient.grant(ConnectionFactory.createConnection(conf),
627             tableName, userOwner.getShortName(), null, null, actions);
628       } catch (Throwable e) {
629         LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
630         return EXIT_FAILURE;
631       }
632     }
633 
634     if (userNames != null) {
635       // This will be comma separated list of expressions.
636       String users[] = userNames.split(",");
637       User user = null;
638       for (String userStr : users) {
639         if (User.isHBaseSecurityEnabled(conf)) {
640           user = User.create(loginAndReturnUGI(conf, userStr));
641         } else {
642           user = User.createUserForTesting(conf, userStr, new String[0]);
643         }
644       }
645     }
646 
647     if (isWrite) {
648       if (userOwner != null) {
649         writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
650       } else {
651         String writerClass = null;
652         if (cmd.hasOption(OPT_WRITER)) {
653           writerClass = cmd.getOptionValue(OPT_WRITER);
654         } else {
655           writerClass = MultiThreadedWriter.class.getCanonicalName();
656         }
657 
658         writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
659       }
660       writerThreads.setMultiPut(isMultiPut);
661     }
662 
663     if (isUpdate) {
664       if (userOwner != null) {
665         updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
666             userOwner, userNames);
667       } else {
668         String updaterClass = null;
669         if (cmd.hasOption(OPT_UPDATER)) {
670           updaterClass = cmd.getOptionValue(OPT_UPDATER);
671         } else {
672           updaterClass = MultiThreadedUpdater.class.getCanonicalName();
673         }
674         updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
675       }
676       updaterThreads.setBatchUpdate(isBatchUpdate);
677       updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
678     }
679 
680     if (isRead) {
681       if (userOwner != null) {
682         readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
683             userNames);
684       } else {
685         String readerClass = null;
686         if (cmd.hasOption(OPT_READER)) {
687           readerClass = cmd.getOptionValue(OPT_READER);
688         } else {
689           readerClass = MultiThreadedReader.class.getCanonicalName();
690         }
691         readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
692       }
693       readerThreads.setMaxErrors(maxReadErrors);
694       readerThreads.setKeyWindow(keyWindow);
695       readerThreads.setMultiGetBatchSize(multiGetBatchSize);
696       readerThreads.setRegionReplicaId(regionReplicaId);
697     }
698 
699     if (isUpdate && isWrite) {
700       LOG.info("Concurrent write/update workload: making updaters aware of the " +
701         "write point");
702       updaterThreads.linkToWriter(writerThreads);
703     }
704 
705     if (isRead && (isUpdate || isWrite)) {
706       LOG.info("Concurrent write/read workload: making readers aware of the " +
707         "write point");
708       readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
709     }
710 
711     if (isWrite) {
712       System.out.println("Starting to write data...");
713       writerThreads.start(startKey, endKey, numWriterThreads);
714     }
715 
716     if (isUpdate) {
717       LOG.info("Starting to mutate data...");
718       System.out.println("Starting to mutate data...");
719       // TODO : currently append and increment operations not tested with tags
720       // Will update this aftet it is done
721       updaterThreads.start(startKey, endKey, numUpdaterThreads);
722     }
723 
724     if (isRead) {
725       System.out.println("Starting to read data...");
726       readerThreads.start(startKey, endKey, numReaderThreads);
727     }
728 
729     if (isWrite) {
730       writerThreads.waitForFinish();
731     }
732 
733     if (isUpdate) {
734       updaterThreads.waitForFinish();
735     }
736 
737     if (isRead) {
738       readerThreads.waitForFinish();
739     }
740 
741     boolean success = true;
742     if (isWrite) {
743       success = success && writerThreads.getNumWriteFailures() == 0;
744     }
745     if (isUpdate) {
746       success = success && updaterThreads.getNumWriteFailures() == 0;
747     }
748     if (isRead) {
749       success = success && readerThreads.getNumReadErrors() == 0
750           && readerThreads.getNumReadFailures() == 0;
751     }
752     return success ? EXIT_SUCCESS : EXIT_FAILURE;
753   }
754 
755   private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
756     try {
757       Class<?> clazz = Class.forName(clazzName);
758       Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
759           byte[][].class);
760       return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
761           minColsPerKey, maxColsPerKey, families);
762     } catch (Exception e) {
763       throw new IOException(e);
764     }
765   }
766 
767   private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
768       , LoadTestDataGenerator dataGen) throws IOException {
769     try {
770       Class<?> clazz = Class.forName(clazzName);
771       Constructor<?> constructor = clazz.getConstructor(
772         LoadTestDataGenerator.class, Configuration.class, TableName.class);
773       return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
774     } catch (Exception e) {
775       throw new IOException(e);
776     }
777   }
778 
779   private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
780       , LoadTestDataGenerator dataGen) throws IOException {
781     try {
782       Class<?> clazz = Class.forName(clazzName);
783       Constructor<?> constructor = clazz.getConstructor(
784         LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
785       return (MultiThreadedUpdater) constructor.newInstance(
786         dataGen, conf, tableName, updatePercent);
787     } catch (Exception e) {
788       throw new IOException(e);
789     }
790   }
791 
792   private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
793       , LoadTestDataGenerator dataGen) throws IOException {
794     try {
795       Class<?> clazz = Class.forName(clazzName);
796       Constructor<?> constructor = clazz.getConstructor(
797         LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
798       return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
799     } catch (Exception e) {
800       throw new IOException(e);
801     }
802   }
803 
804   public static byte[] generateData(final Random r, int length) {
805     byte [] b = new byte [length];
806     int i = 0;
807 
808     for(i = 0; i < (length-8); i += 8) {
809       b[i] = (byte) (65 + r.nextInt(26));
810       b[i+1] = b[i];
811       b[i+2] = b[i];
812       b[i+3] = b[i];
813       b[i+4] = b[i];
814       b[i+5] = b[i];
815       b[i+6] = b[i];
816       b[i+7] = b[i];
817     }
818 
819     byte a = (byte) (65 + r.nextInt(26));
820     for(; i < length; i++) {
821       b[i] = a;
822     }
823     return b;
824   }
825   public static void main(String[] args) {
826     new LoadTestTool().doStaticMain(args);
827   }
828 
829   /**
830    * When NUM_TABLES is specified, the function starts multiple worker threads
831    * which individually start a LoadTestTool instance to load a table. Each
832    * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
833    * , table names will be "test_1", "test_2"
834    *
835    * @throws IOException
836    */
837   private int parallelLoadTables()
838       throws IOException {
839     // create new command args
840     String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
841     String[] newArgs = null;
842     if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
843       newArgs = new String[cmdLineArgs.length + 2];
844       newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
845       newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
846       System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
847     } else {
848       newArgs = cmdLineArgs;
849     }
850 
851     int tableNameValueIndex = -1;
852     for (int j = 0; j < newArgs.length; j++) {
853       if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
854         tableNameValueIndex = j + 1;
855       } else if (newArgs[j].endsWith(NUM_TABLES)) {
856         // change NUM_TABLES to 1 so that each worker loads one table
857         newArgs[j + 1] = "1";
858       }
859     }
860 
861     // starting to load multiple tables
862     List<WorkerThread> workers = new ArrayList<WorkerThread>();
863     for (int i = 0; i < numTables; i++) {
864       String[] workerArgs = newArgs.clone();
865       workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
866       WorkerThread worker = new WorkerThread(i, workerArgs);
867       workers.add(worker);
868       LOG.info(worker + " starting");
869       worker.start();
870     }
871 
872     // wait for all workers finish
873     LOG.info("Waiting for worker threads to finish");
874     for (WorkerThread t : workers) {
875       try {
876         t.join();
877       } catch (InterruptedException ie) {
878         IOException iie = new InterruptedIOException();
879         iie.initCause(ie);
880         throw iie;
881       }
882       checkForErrors();
883     }
884 
885     return EXIT_SUCCESS;
886   }
887 
888   // If an exception is thrown by one of worker threads, it will be
889   // stored here.
890   protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
891 
892   private void workerThreadError(Throwable t) {
893     thrown.compareAndSet(null, t);
894   }
895 
896   /**
897    * Check for errors in the writer threads. If any is found, rethrow it.
898    */
899   private void checkForErrors() throws IOException {
900     Throwable thrown = this.thrown.get();
901     if (thrown == null) return;
902     if (thrown instanceof IOException) {
903       throw (IOException) thrown;
904     } else {
905       throw new RuntimeException(thrown);
906     }
907   }
908 
909   class WorkerThread extends Thread {
910     private String[] workerArgs;
911 
912     WorkerThread(int i, String[] args) {
913       super("WorkerThread-" + i);
914       workerArgs = args;
915     }
916 
917     @Override
918     public void run() {
919       try {
920         int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
921         if (ret != 0) {
922           throw new RuntimeException("LoadTestTool exit with non-zero return code.");
923         }
924       } catch (Exception ex) {
925         LOG.error("Error in worker thread", ex);
926         workerThreadError(ex);
927       }
928     }
929   }
930 
931   private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
932       String userList) throws IOException {
933     List<String> users = new ArrayList(Arrays.asList(userList.split(",")));
934     users.add(owner);
935     for (String user : users) {
936       String keyTabFileConfKey = "hbase." + user + ".keytab.file";
937       String principalConfKey = "hbase." + user + ".kerberos.principal";
938       if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
939         throw new IOException("Authentication configs missing for user : " + user);
940       }
941     }
942     for (String key : authConfig.stringPropertyNames()) {
943       conf.set(key, authConfig.getProperty(key));
944     }
945     LOG.debug("Added authentication properties to config successfully.");
946   }
947 
948   public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
949       throws IOException {
950     String hostname = InetAddress.getLocalHost().getHostName();
951     String keyTabFileConfKey = "hbase." + username + ".keytab.file";
952     String keyTabFileLocation = conf.get(keyTabFileConfKey);
953     String principalConfKey = "hbase." + username + ".kerberos.principal";
954     String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
955     if (keyTabFileLocation == null || principal == null) {
956       LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
957           + keyTabFileConfKey);
958     }
959     UserGroupInformation ugi =
960         UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
961     return ugi;
962   }
963 }