1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.lang.reflect.Constructor;
22 import java.net.InetAddress;
23 import java.security.SecureRandom;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.Random;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import javax.crypto.spec.SecretKeySpec;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.client.Admin;
46 import org.apache.hadoop.hbase.client.ConnectionFactory;
47 import org.apache.hadoop.hbase.client.Durability;
48 import org.apache.hadoop.hbase.client.HBaseAdmin;
49 import org.apache.hadoop.hbase.io.compress.Compression;
50 import org.apache.hadoop.hbase.io.crypto.Cipher;
51 import org.apache.hadoop.hbase.io.crypto.Encryption;
52 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53 import org.apache.hadoop.hbase.regionserver.BloomType;
54 import org.apache.hadoop.hbase.security.EncryptionUtil;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.security.access.AccessControlClient;
57 import org.apache.hadoop.hbase.security.access.Permission;
58 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
59 import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
60 import org.apache.hadoop.security.SecurityUtil;
61 import org.apache.hadoop.security.UserGroupInformation;
62 import org.apache.hadoop.util.ToolRunner;
63
64
65
66
67
68
69 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
70 public class LoadTestTool extends AbstractHBaseTool {
71
72 private static final Log LOG = LogFactory.getLog(LoadTestTool.class);
73 private static final String COLON = ":";
74
75
76 private TableName tableName;
77
78
79 private byte[][] families;
80
81
82 protected static final String DEFAULT_TABLE_NAME = "cluster_test";
83
84
85 public static byte[] DEFAULT_COLUMN_FAMILY = Bytes.toBytes("test_cf");
86
87
88 public static final byte[][] DEFAULT_COLUMN_FAMILIES = { DEFAULT_COLUMN_FAMILY };
89
90
91 protected static final int DEFAULT_DATA_SIZE = 64;
92
93
94 protected static final int DEFAULT_NUM_THREADS = 20;
95
96
97 protected static final String OPT_USAGE_LOAD =
98 "<avg_cols_per_key>:<avg_data_size>" +
99 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100
101
102 protected static final String OPT_USAGE_READ =
103 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104
105
106 protected static final String OPT_USAGE_UPDATE =
107 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108 + ">][:<#whether to ignore nonce collisions=0>]";
109
110 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111 Arrays.toString(BloomType.values());
112
113 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114 "one of " + Arrays.toString(Compression.Algorithm.values());
115
116 public static final String OPT_DATA_BLOCK_ENCODING_USAGE =
117 "Encoding algorithm (e.g. prefix "
118 + "compression) to use for data blocks in the test column family, "
119 + "one of " + Arrays.toString(DataBlockEncoding.values()) + ".";
120
121 public static final String OPT_BLOOM = "bloom";
122 public static final String OPT_COMPRESSION = "compression";
123 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
124 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
125
126 public static final String OPT_DATA_BLOCK_ENCODING =
127 HColumnDescriptor.DATA_BLOCK_ENCODING.toLowerCase();
128
129 public static final String OPT_INMEMORY = "in_memory";
130 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
131 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
132
133 public static final String OPT_GENERATOR = "generator";
134 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
135 + " Any args for this class can be passed as colon separated after class name";
136
137 public static final String OPT_WRITER = "writer";
138 public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
139
140 public static final String OPT_UPDATER = "updater";
141 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
142
143 public static final String OPT_READER = "reader";
144 public static final String OPT_READER_USAGE = "The class for executing the read requests";
145
146 protected static final String OPT_KEY_WINDOW = "key_window";
147 protected static final String OPT_WRITE = "write";
148 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
149 public static final String OPT_MULTIPUT = "multiput";
150 public static final String OPT_MULTIGET = "multiget_batchsize";
151 protected static final String OPT_NUM_KEYS = "num_keys";
152 protected static final String OPT_READ = "read";
153 protected static final String OPT_START_KEY = "start_key";
154 public static final String OPT_TABLE_NAME = "tn";
155 public static final String OPT_COLUMN_FAMILIES = "families";
156 protected static final String OPT_ZK_QUORUM = "zk";
157 protected static final String OPT_ZK_PARENT_NODE = "zk_root";
158 protected static final String OPT_SKIP_INIT = "skip_init";
159 protected static final String OPT_INIT_ONLY = "init_only";
160 protected static final String NUM_TABLES = "num_tables";
161 protected static final String OPT_REGIONS_PER_SERVER = "regions_per_server";
162 protected static final String OPT_BATCHUPDATE = "batchupdate";
163 protected static final String OPT_UPDATE = "update";
164
165 public static final String OPT_ENCRYPTION = "encryption";
166 protected static final String OPT_ENCRYPTION_USAGE =
167 "Enables transparent encryption on the test table, one of " +
168 Arrays.toString(Encryption.getSupportedCiphers());
169
170 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
171 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
172 = "Desired number of regions per region server. Defaults to 5.";
173 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
174
175 public static final String OPT_REGION_REPLICATION = "region_replication";
176 protected static final String OPT_REGION_REPLICATION_USAGE =
177 "Desired number of replicas per region";
178
179 public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
180 protected static final String OPT_REGION_REPLICA_ID_USAGE =
181 "Region replica id to do the reads from";
182
183 public static final String OPT_MOB_THRESHOLD = "mob_threshold";
184 protected static final String OPT_MOB_THRESHOLD_USAGE =
185 "Desired cell size to exceed in bytes that will use the MOB write path";
186
187 protected static final long DEFAULT_START_KEY = 0;
188
189
190 protected CommandLine cmd;
191
192 protected MultiThreadedWriter writerThreads = null;
193 protected MultiThreadedReader readerThreads = null;
194 protected MultiThreadedUpdater updaterThreads = null;
195
196 protected long startKey, endKey;
197
198 protected boolean isWrite, isRead, isUpdate;
199 protected boolean deferredLogFlush;
200
201
202 protected DataBlockEncoding dataBlockEncodingAlgo;
203 protected Compression.Algorithm compressAlgo;
204 protected BloomType bloomType;
205 private boolean inMemoryCF;
206
207 private User userOwner;
208
209 protected int numWriterThreads = DEFAULT_NUM_THREADS;
210 protected int minColsPerKey, maxColsPerKey;
211 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
212 protected boolean isMultiPut;
213
214
215 protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
216 protected int updatePercent;
217 protected boolean ignoreConflicts = false;
218 protected boolean isBatchUpdate;
219
220
221 private int numReaderThreads = DEFAULT_NUM_THREADS;
222 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
223 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
224 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
225 private int verifyPercent;
226
227 private int numTables = 1;
228
229 private String superUser;
230
231 private String userNames;
232
233 private String authnFileName;
234
235 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
236 private int regionReplication = -1;
237 private int regionReplicaId = -1;
238
239 private int mobThreshold = -1;
240
241
242
243 protected boolean isSkipInit = false;
244 protected boolean isInitOnly = false;
245
246 protected Cipher cipher = null;
247
248 protected String[] splitColonSeparated(String option,
249 int minNumCols, int maxNumCols) {
250 String optVal = cmd.getOptionValue(option);
251 String[] cols = optVal.split(COLON);
252 if (cols.length < minNumCols || cols.length > maxNumCols) {
253 throw new IllegalArgumentException("Expected at least "
254 + minNumCols + " columns but no more than " + maxNumCols +
255 " in the colon-separated value '" + optVal + "' of the " +
256 "-" + option + " option");
257 }
258 return cols;
259 }
260
261 protected int getNumThreads(String numThreadsStr) {
262 return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
263 }
264
265 public byte[][] getColumnFamilies() {
266 return families;
267 }
268
269
270
271
272
273 protected void applyColumnFamilyOptions(TableName tableName,
274 byte[][] columnFamilies) throws IOException {
275 Admin admin = new HBaseAdmin(conf);
276 HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
277 LOG.info("Disabling table " + tableName);
278 admin.disableTable(tableName);
279 for (byte[] cf : columnFamilies) {
280 HColumnDescriptor columnDesc = tableDesc.getFamily(cf);
281 boolean isNewCf = columnDesc == null;
282 if (isNewCf) {
283 columnDesc = new HColumnDescriptor(cf);
284 }
285 if (bloomType != null) {
286 columnDesc.setBloomFilterType(bloomType);
287 }
288 if (compressAlgo != null) {
289 columnDesc.setCompressionType(compressAlgo);
290 }
291 if (dataBlockEncodingAlgo != null) {
292 columnDesc.setDataBlockEncoding(dataBlockEncodingAlgo);
293 }
294 if (inMemoryCF) {
295 columnDesc.setInMemory(inMemoryCF);
296 }
297 if (cipher != null) {
298 byte[] keyBytes = new byte[cipher.getKeyLength()];
299 new SecureRandom().nextBytes(keyBytes);
300 columnDesc.setEncryptionType(cipher.getName());
301 columnDesc.setEncryptionKey(EncryptionUtil.wrapKey(conf,
302 User.getCurrent().getShortName(),
303 new SecretKeySpec(keyBytes, cipher.getName())));
304 }
305 if (mobThreshold >= 0) {
306 columnDesc.setMobEnabled(true);
307 columnDesc.setMobThreshold(mobThreshold);
308 }
309 if (isNewCf) {
310 admin.addColumn(tableName, columnDesc);
311 } else {
312 admin.modifyColumn(tableName, columnDesc);
313 }
314 }
315 LOG.info("Enabling table " + tableName);
316 admin.enableTable(tableName);
317 admin.close();
318 }
319
320 @Override
321 protected void addOptions() {
322 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
323 "without port numbers");
324 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
325 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
326 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
327 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
328 addOptWithArg(OPT_READ, OPT_USAGE_READ);
329 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
330 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
331 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
332 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
333 addOptWithArg(OPT_DATA_BLOCK_ENCODING, OPT_DATA_BLOCK_ENCODING_USAGE);
334 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
335 "to tolerate before terminating all reader threads. The default is " +
336 MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
337 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
338 "separate gets for every column in a row");
339 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
340 "reads and writes for concurrent write/read workload. The default " +
341 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
342
343 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
344 "separate puts for every column in a row");
345 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
346 "separate updates for every column in a row");
347 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
348 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
349 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
350 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
351 addOptWithArg(OPT_READER, OPT_READER_USAGE);
352
353 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
354 addOptWithArg(OPT_START_KEY, "The first key to read/write " +
355 "(a 0-based index). The default value is " +
356 DEFAULT_START_KEY + ".");
357 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
358 + "already exists");
359
360 addOptWithArg(NUM_TABLES,
361 "A positive integer number. When a number n is speicfied, load test "
362 + "tool will load n table parallely. -tn parameter value becomes "
363 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
364
365 addOptWithArg(OPT_REGIONS_PER_SERVER,
366 "A positive integer number. When a number n is specified, load test "
367 + "tool will create the test table with n regions per server");
368
369 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
370 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
371 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
372 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
373 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
374 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
375 }
376
377 @Override
378 protected void processOptions(CommandLine cmd) {
379 this.cmd = cmd;
380
381 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
382 DEFAULT_TABLE_NAME));
383
384 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
385 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
386 families = new byte[list.length][];
387 for (int i = 0; i < list.length; i++) {
388 families[i] = Bytes.toBytes(list[i]);
389 }
390 } else {
391 families = DEFAULT_COLUMN_FAMILIES;
392 }
393
394 isWrite = cmd.hasOption(OPT_WRITE);
395 isRead = cmd.hasOption(OPT_READ);
396 isUpdate = cmd.hasOption(OPT_UPDATE);
397 isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
398 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
399
400 if (!isWrite && !isRead && !isUpdate && !isInitOnly) {
401 throw new IllegalArgumentException("Either -" + OPT_WRITE + " or " +
402 "-" + OPT_UPDATE + " or -" + OPT_READ + " has to be specified");
403 }
404
405 if (isInitOnly && (isRead || isWrite || isUpdate)) {
406 throw new IllegalArgumentException(OPT_INIT_ONLY + " cannot be specified with"
407 + " either -" + OPT_WRITE + " or -" + OPT_UPDATE + " or -" + OPT_READ);
408 }
409
410 if (!isInitOnly) {
411 if (!cmd.hasOption(OPT_NUM_KEYS)) {
412 throw new IllegalArgumentException(OPT_NUM_KEYS + " must be specified in "
413 + "read or write mode");
414 }
415 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
416 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
417 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
418 Long.MAX_VALUE - startKey);
419 endKey = startKey + numKeys;
420 isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
421 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
422 }
423
424 parseColumnFamilyOptions(cmd);
425
426 if (isWrite) {
427 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
428
429 int colIndex = 0;
430 minColsPerKey = 1;
431 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
432 int avgColDataSize =
433 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
434 minColDataSize = avgColDataSize / 2;
435 maxColDataSize = avgColDataSize * 3 / 2;
436
437 if (colIndex < writeOpts.length) {
438 numWriterThreads = getNumThreads(writeOpts[colIndex++]);
439 }
440
441 isMultiPut = cmd.hasOption(OPT_MULTIPUT);
442
443 mobThreshold = -1;
444 if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
445 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
446 }
447
448 System.out.println("Multi-puts: " + isMultiPut);
449 System.out.println("Columns per key: " + minColsPerKey + ".."
450 + maxColsPerKey);
451 System.out.println("Data size per column: " + minColDataSize + ".."
452 + maxColDataSize);
453 }
454
455 if (isUpdate) {
456 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
457 int colIndex = 0;
458 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
459 if (colIndex < mutateOpts.length) {
460 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
461 }
462 if (colIndex < mutateOpts.length) {
463 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
464 }
465
466 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
467
468 System.out.println("Batch updates: " + isBatchUpdate);
469 System.out.println("Percent of keys to update: " + updatePercent);
470 System.out.println("Updater threads: " + numUpdaterThreads);
471 System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
472 }
473
474 if (isRead) {
475 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
476 int colIndex = 0;
477 verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
478 if (colIndex < readOpts.length) {
479 numReaderThreads = getNumThreads(readOpts[colIndex++]);
480 }
481
482 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
483 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
484 0, Integer.MAX_VALUE);
485 }
486
487 if (cmd.hasOption(OPT_KEY_WINDOW)) {
488 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
489 0, Integer.MAX_VALUE);
490 }
491
492 if (cmd.hasOption(OPT_MULTIGET)) {
493 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
494 0, Integer.MAX_VALUE);
495 }
496
497 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
498 System.out.println("Percent of keys to verify: " + verifyPercent);
499 System.out.println("Reader threads: " + numReaderThreads);
500 }
501
502 numTables = 1;
503 if (cmd.hasOption(NUM_TABLES)) {
504 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
505 }
506
507 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
508 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
509 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
510 }
511
512 regionReplication = 1;
513 if (cmd.hasOption(OPT_REGION_REPLICATION)) {
514 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
515 }
516
517 regionReplicaId = -1;
518 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
519 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
520 }
521 }
522
523 private void parseColumnFamilyOptions(CommandLine cmd) {
524 String dataBlockEncodingStr = cmd.getOptionValue(OPT_DATA_BLOCK_ENCODING);
525 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
526 DataBlockEncoding.valueOf(dataBlockEncodingStr);
527
528 String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
529 compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
530 Compression.Algorithm.valueOf(compressStr);
531
532 String bloomStr = cmd.getOptionValue(OPT_BLOOM);
533 bloomType = bloomStr == null ? BloomType.ROW :
534 BloomType.valueOf(bloomStr);
535
536 inMemoryCF = cmd.hasOption(OPT_INMEMORY);
537 if (cmd.hasOption(OPT_ENCRYPTION)) {
538 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
539 }
540
541 }
542
543 public void initTestTable() throws IOException {
544 Durability durability = Durability.USE_DEFAULT;
545 if (deferredLogFlush) {
546 durability = Durability.ASYNC_WAL;
547 }
548
549 HBaseTestingUtility.createPreSplitLoadTestTable(conf, tableName,
550 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
551 regionReplication, durability);
552 applyColumnFamilyOptions(tableName, getColumnFamilies());
553 }
554
555 @Override
556 protected int doWork() throws IOException {
557 if (numTables > 1) {
558 return parallelLoadTables();
559 } else {
560 return loadTable();
561 }
562 }
563
564 protected int loadTable() throws IOException {
565 if (cmd.hasOption(OPT_ZK_QUORUM)) {
566 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
567 }
568 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
569 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
570 }
571
572 if (isInitOnly) {
573 LOG.info("Initializing only; no reads or writes");
574 initTestTable();
575 return 0;
576 }
577
578 if (!isSkipInit) {
579 initTestTable();
580 }
581 LoadTestDataGenerator dataGen = null;
582 if (cmd.hasOption(OPT_GENERATOR)) {
583 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
584 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
585 String[] args;
586 if (dataGen instanceof LoadTestDataGeneratorWithACL) {
587 LOG.info("Using LoadTestDataGeneratorWithACL");
588 if (User.isHBaseSecurityEnabled(conf)) {
589 LOG.info("Security is enabled");
590 authnFileName = clazzAndArgs[1];
591 superUser = clazzAndArgs[2];
592 userNames = clazzAndArgs[3];
593 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
594 Properties authConfig = new Properties();
595 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
596 try {
597 addAuthInfoToConf(authConfig, conf, superUser, userNames);
598 } catch (IOException exp) {
599 LOG.error(exp);
600 return EXIT_FAILURE;
601 }
602 userOwner = User.create(loginAndReturnUGI(conf, superUser));
603 } else {
604 superUser = clazzAndArgs[1];
605 userNames = clazzAndArgs[2];
606 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
607 userOwner = User.createUserForTesting(conf, superUser, new String[0]);
608 }
609 } else {
610 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
611 clazzAndArgs.length);
612 }
613 dataGen.initialize(args);
614 } else {
615
616 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
617 minColsPerKey, maxColsPerKey, families);
618 }
619
620 if (userOwner != null) {
621 LOG.info("Granting permissions for user " + userOwner.getShortName());
622 Permission.Action[] actions = {
623 Permission.Action.ADMIN, Permission.Action.CREATE,
624 Permission.Action.READ, Permission.Action.WRITE };
625 try {
626 AccessControlClient.grant(ConnectionFactory.createConnection(conf),
627 tableName, userOwner.getShortName(), null, null, actions);
628 } catch (Throwable e) {
629 LOG.fatal("Error in granting permission for the user " + userOwner.getShortName(), e);
630 return EXIT_FAILURE;
631 }
632 }
633
634 if (userNames != null) {
635
636 String users[] = userNames.split(",");
637 User user = null;
638 for (String userStr : users) {
639 if (User.isHBaseSecurityEnabled(conf)) {
640 user = User.create(loginAndReturnUGI(conf, userStr));
641 } else {
642 user = User.createUserForTesting(conf, userStr, new String[0]);
643 }
644 }
645 }
646
647 if (isWrite) {
648 if (userOwner != null) {
649 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
650 } else {
651 String writerClass = null;
652 if (cmd.hasOption(OPT_WRITER)) {
653 writerClass = cmd.getOptionValue(OPT_WRITER);
654 } else {
655 writerClass = MultiThreadedWriter.class.getCanonicalName();
656 }
657
658 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
659 }
660 writerThreads.setMultiPut(isMultiPut);
661 }
662
663 if (isUpdate) {
664 if (userOwner != null) {
665 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
666 userOwner, userNames);
667 } else {
668 String updaterClass = null;
669 if (cmd.hasOption(OPT_UPDATER)) {
670 updaterClass = cmd.getOptionValue(OPT_UPDATER);
671 } else {
672 updaterClass = MultiThreadedUpdater.class.getCanonicalName();
673 }
674 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
675 }
676 updaterThreads.setBatchUpdate(isBatchUpdate);
677 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
678 }
679
680 if (isRead) {
681 if (userOwner != null) {
682 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
683 userNames);
684 } else {
685 String readerClass = null;
686 if (cmd.hasOption(OPT_READER)) {
687 readerClass = cmd.getOptionValue(OPT_READER);
688 } else {
689 readerClass = MultiThreadedReader.class.getCanonicalName();
690 }
691 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
692 }
693 readerThreads.setMaxErrors(maxReadErrors);
694 readerThreads.setKeyWindow(keyWindow);
695 readerThreads.setMultiGetBatchSize(multiGetBatchSize);
696 readerThreads.setRegionReplicaId(regionReplicaId);
697 }
698
699 if (isUpdate && isWrite) {
700 LOG.info("Concurrent write/update workload: making updaters aware of the " +
701 "write point");
702 updaterThreads.linkToWriter(writerThreads);
703 }
704
705 if (isRead && (isUpdate || isWrite)) {
706 LOG.info("Concurrent write/read workload: making readers aware of the " +
707 "write point");
708 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
709 }
710
711 if (isWrite) {
712 System.out.println("Starting to write data...");
713 writerThreads.start(startKey, endKey, numWriterThreads);
714 }
715
716 if (isUpdate) {
717 LOG.info("Starting to mutate data...");
718 System.out.println("Starting to mutate data...");
719
720
721 updaterThreads.start(startKey, endKey, numUpdaterThreads);
722 }
723
724 if (isRead) {
725 System.out.println("Starting to read data...");
726 readerThreads.start(startKey, endKey, numReaderThreads);
727 }
728
729 if (isWrite) {
730 writerThreads.waitForFinish();
731 }
732
733 if (isUpdate) {
734 updaterThreads.waitForFinish();
735 }
736
737 if (isRead) {
738 readerThreads.waitForFinish();
739 }
740
741 boolean success = true;
742 if (isWrite) {
743 success = success && writerThreads.getNumWriteFailures() == 0;
744 }
745 if (isUpdate) {
746 success = success && updaterThreads.getNumWriteFailures() == 0;
747 }
748 if (isRead) {
749 success = success && readerThreads.getNumReadErrors() == 0
750 && readerThreads.getNumReadFailures() == 0;
751 }
752 return success ? EXIT_SUCCESS : EXIT_FAILURE;
753 }
754
755 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
756 try {
757 Class<?> clazz = Class.forName(clazzName);
758 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
759 byte[][].class);
760 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
761 minColsPerKey, maxColsPerKey, families);
762 } catch (Exception e) {
763 throw new IOException(e);
764 }
765 }
766
767 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
768 , LoadTestDataGenerator dataGen) throws IOException {
769 try {
770 Class<?> clazz = Class.forName(clazzName);
771 Constructor<?> constructor = clazz.getConstructor(
772 LoadTestDataGenerator.class, Configuration.class, TableName.class);
773 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
774 } catch (Exception e) {
775 throw new IOException(e);
776 }
777 }
778
779 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
780 , LoadTestDataGenerator dataGen) throws IOException {
781 try {
782 Class<?> clazz = Class.forName(clazzName);
783 Constructor<?> constructor = clazz.getConstructor(
784 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
785 return (MultiThreadedUpdater) constructor.newInstance(
786 dataGen, conf, tableName, updatePercent);
787 } catch (Exception e) {
788 throw new IOException(e);
789 }
790 }
791
792 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
793 , LoadTestDataGenerator dataGen) throws IOException {
794 try {
795 Class<?> clazz = Class.forName(clazzName);
796 Constructor<?> constructor = clazz.getConstructor(
797 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
798 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
799 } catch (Exception e) {
800 throw new IOException(e);
801 }
802 }
803
804 public static byte[] generateData(final Random r, int length) {
805 byte [] b = new byte [length];
806 int i = 0;
807
808 for(i = 0; i < (length-8); i += 8) {
809 b[i] = (byte) (65 + r.nextInt(26));
810 b[i+1] = b[i];
811 b[i+2] = b[i];
812 b[i+3] = b[i];
813 b[i+4] = b[i];
814 b[i+5] = b[i];
815 b[i+6] = b[i];
816 b[i+7] = b[i];
817 }
818
819 byte a = (byte) (65 + r.nextInt(26));
820 for(; i < length; i++) {
821 b[i] = a;
822 }
823 return b;
824 }
825 public static void main(String[] args) {
826 new LoadTestTool().doStaticMain(args);
827 }
828
829
830
831
832
833
834
835
836
837 private int parallelLoadTables()
838 throws IOException {
839
840 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
841 String[] newArgs = null;
842 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
843 newArgs = new String[cmdLineArgs.length + 2];
844 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
845 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
846 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
847 } else {
848 newArgs = cmdLineArgs;
849 }
850
851 int tableNameValueIndex = -1;
852 for (int j = 0; j < newArgs.length; j++) {
853 if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
854 tableNameValueIndex = j + 1;
855 } else if (newArgs[j].endsWith(NUM_TABLES)) {
856
857 newArgs[j + 1] = "1";
858 }
859 }
860
861
862 List<WorkerThread> workers = new ArrayList<WorkerThread>();
863 for (int i = 0; i < numTables; i++) {
864 String[] workerArgs = newArgs.clone();
865 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
866 WorkerThread worker = new WorkerThread(i, workerArgs);
867 workers.add(worker);
868 LOG.info(worker + " starting");
869 worker.start();
870 }
871
872
873 LOG.info("Waiting for worker threads to finish");
874 for (WorkerThread t : workers) {
875 try {
876 t.join();
877 } catch (InterruptedException ie) {
878 IOException iie = new InterruptedIOException();
879 iie.initCause(ie);
880 throw iie;
881 }
882 checkForErrors();
883 }
884
885 return EXIT_SUCCESS;
886 }
887
888
889
890 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
891
892 private void workerThreadError(Throwable t) {
893 thrown.compareAndSet(null, t);
894 }
895
896
897
898
899 private void checkForErrors() throws IOException {
900 Throwable thrown = this.thrown.get();
901 if (thrown == null) return;
902 if (thrown instanceof IOException) {
903 throw (IOException) thrown;
904 } else {
905 throw new RuntimeException(thrown);
906 }
907 }
908
909 class WorkerThread extends Thread {
910 private String[] workerArgs;
911
912 WorkerThread(int i, String[] args) {
913 super("WorkerThread-" + i);
914 workerArgs = args;
915 }
916
917 @Override
918 public void run() {
919 try {
920 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
921 if (ret != 0) {
922 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
923 }
924 } catch (Exception ex) {
925 LOG.error("Error in worker thread", ex);
926 workerThreadError(ex);
927 }
928 }
929 }
930
931 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
932 String userList) throws IOException {
933 List<String> users = new ArrayList(Arrays.asList(userList.split(",")));
934 users.add(owner);
935 for (String user : users) {
936 String keyTabFileConfKey = "hbase." + user + ".keytab.file";
937 String principalConfKey = "hbase." + user + ".kerberos.principal";
938 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
939 throw new IOException("Authentication configs missing for user : " + user);
940 }
941 }
942 for (String key : authConfig.stringPropertyNames()) {
943 conf.set(key, authConfig.getProperty(key));
944 }
945 LOG.debug("Added authentication properties to config successfully.");
946 }
947
948 public static UserGroupInformation loginAndReturnUGI(Configuration conf, String username)
949 throws IOException {
950 String hostname = InetAddress.getLocalHost().getHostName();
951 String keyTabFileConfKey = "hbase." + username + ".keytab.file";
952 String keyTabFileLocation = conf.get(keyTabFileConfKey);
953 String principalConfKey = "hbase." + username + ".kerberos.principal";
954 String principal = SecurityUtil.getServerPrincipal(conf.get(principalConfKey), hostname);
955 if (keyTabFileLocation == null || principal == null) {
956 LOG.warn("Principal or key tab file null for : " + principalConfKey + ", "
957 + keyTabFileConfKey);
958 }
959 UserGroupInformation ugi =
960 UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFileLocation);
961 return ugi;
962 }
963 }