View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.UUID;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.conf.Configured;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.MockRegionServerServices;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
51  import org.apache.hadoop.hbase.regionserver.HRegion;
52  import org.apache.hadoop.hbase.regionserver.LogRoller;
53  import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
54  import org.apache.hadoop.hbase.trace.SpanReceiverHost;
55  import org.apache.hadoop.hbase.wal.WALProvider.Writer;
56  import org.apache.hadoop.hbase.wal.WAL;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.hbase.util.FSUtils;
59  import org.apache.hadoop.hbase.util.Threads;
60  import org.apache.hadoop.util.Tool;
61  import org.apache.hadoop.util.ToolRunner;
62  import org.apache.htrace.HTraceConfiguration;
63  import org.apache.htrace.Sampler;
64  import org.apache.htrace.Trace;
65  import org.apache.htrace.TraceScope;
66  import org.apache.htrace.impl.ProbabilitySampler;
67  
68  import com.yammer.metrics.core.Histogram;
69  import com.yammer.metrics.core.Meter;
70  import com.yammer.metrics.core.MetricsRegistry;
71  import com.yammer.metrics.reporting.ConsoleReporter;
72  
73  // imports for things that haven't moved from regionserver.wal yet.
74  import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
75  import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
76  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
77  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
78  
79  /**
80   * This class runs performance benchmarks for {@link WAL}.
81   * See usage for this tool by running:
82   * <code>$ hbase org.apache.hadoop.hbase.wal.WALPerformanceEvaluation -h</code>
83   */
84  @InterfaceAudience.Private
85  public final class WALPerformanceEvaluation extends Configured implements Tool {
86    static final Log LOG = LogFactory.getLog(WALPerformanceEvaluation.class.getName());
87    private final MetricsRegistry metrics = new MetricsRegistry();
88    private final Meter syncMeter =
89      metrics.newMeter(WALPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
90    private final Histogram syncHistogram =
91      metrics.newHistogram(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs",
92        true);
93    private final Histogram syncCountHistogram =
94        metrics.newHistogram(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync",
95          true);
96    private final Meter appendMeter =
97      metrics.newMeter(WALPerformanceEvaluation.class, "appendMeter", "bytes",
98        TimeUnit.MILLISECONDS);
99    private final Histogram latencyHistogram =
100     metrics.newHistogram(WALPerformanceEvaluation.class, "latencyHistogram", "nanos", true);
101 
102   private HBaseTestingUtility TEST_UTIL;
103 
104   static final String TABLE_NAME = "WALPerformanceEvaluation";
105   static final String QUALIFIER_PREFIX = "q";
106   static final String FAMILY_PREFIX = "cf";
107 
108   private int numQualifiers = 1;
109   private int valueSize = 512;
110   private int keySize = 16;
111 
112   @Override
113   public void setConf(Configuration conf) {
114     super.setConf(conf);
115     TEST_UTIL = new HBaseTestingUtility(conf);
116   }
117 
118   /**
119    * Perform WAL.append() of Put object, for the number of iterations requested.
120    * Keys and Vaues are generated randomly, the number of column families,
121    * qualifiers and key/value size is tunable by the user.
122    */
123   class WALPutBenchmark implements Runnable {
124     private final long numIterations;
125     private final int numFamilies;
126     private final boolean noSync;
127     private final HRegion region;
128     private final int syncInterval;
129     private final HTableDescriptor htd;
130     private final Sampler loopSampler;
131 
132     WALPutBenchmark(final HRegion region, final HTableDescriptor htd,
133         final long numIterations, final boolean noSync, final int syncInterval,
134         final double traceFreq) {
135       this.numIterations = numIterations;
136       this.noSync = noSync;
137       this.syncInterval = syncInterval;
138       this.numFamilies = htd.getColumnFamilies().length;
139       this.region = region;
140       this.htd = htd;
141       String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
142       if (spanReceivers == null || spanReceivers.isEmpty()) {
143         loopSampler = Sampler.NEVER;
144       } else {
145         if (traceFreq <= 0.0) {
146           LOG.warn("Tracing enabled but traceFreq=0.");
147           loopSampler = Sampler.NEVER;
148         } else if (traceFreq >= 1.0) {
149           loopSampler = Sampler.ALWAYS;
150           if (numIterations > 1000) {
151             LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
152               + " SpanReciever can keep up.");
153           }
154         } else {
155           getConf().setDouble("hbase.sampler.fraction", traceFreq);
156           loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
157         }
158       }
159     }
160 
161     @Override
162     public void run() {
163       byte[] key = new byte[keySize];
164       byte[] value = new byte[valueSize];
165       Random rand = new Random(Thread.currentThread().getId());
166       WAL wal = region.getWAL();
167 
168       TraceScope threadScope =
169         Trace.startSpan("WALPerfEval." + Thread.currentThread().getName());
170       try {
171         long startTime = System.currentTimeMillis();
172         int lastSync = 0;
173         for (int i = 0; i < numIterations; ++i) {
174           assert Trace.currentSpan() == threadScope.getSpan() : "Span leak detected.";
175           TraceScope loopScope = Trace.startSpan("runLoopIter" + i, loopSampler);
176           try {
177             long now = System.nanoTime();
178             Put put = setupPut(rand, key, value, numFamilies);
179             WALEdit walEdit = new WALEdit();
180             addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
181             HRegionInfo hri = region.getRegionInfo();
182             final WALKey logkey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
183             wal.append(htd, hri, logkey, walEdit, region.getSequenceId(), true, null);
184             if (!this.noSync) {
185               if (++lastSync >= this.syncInterval) {
186                 wal.sync();
187                 lastSync = 0;
188               }
189             }
190             latencyHistogram.update(System.nanoTime() - now);
191           } finally {
192             loopScope.close();
193           }
194         }
195         long totalTime = (System.currentTimeMillis() - startTime);
196         logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
197       } catch (Exception e) {
198         LOG.error(getClass().getSimpleName() + " Thread failed", e);
199       } finally {
200         threadScope.close();
201       }
202     }
203   }
204 
205   @Override
206   public int run(String[] args) throws Exception {
207     Path rootRegionDir = null;
208     int numThreads = 1;
209     long numIterations = 1000000;
210     int numFamilies = 1;
211     int syncInterval = 0;
212     boolean noSync = false;
213     boolean verify = false;
214     boolean verbose = false;
215     boolean cleanup = true;
216     boolean noclosefs = false;
217     long roll = Long.MAX_VALUE;
218     boolean compress = false;
219     String cipher = null;
220     int numRegions = 1;
221     String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
222     boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
223     double traceFreq = 1.0;
224     // Process command line args
225     for (int i = 0; i < args.length; i++) {
226       String cmd = args[i];
227       try {
228         if (cmd.equals("-threads")) {
229           numThreads = Integer.parseInt(args[++i]);
230         } else if (cmd.equals("-iterations")) {
231           numIterations = Long.parseLong(args[++i]);
232         } else if (cmd.equals("-path")) {
233           rootRegionDir = new Path(args[++i]);
234         } else if (cmd.equals("-families")) {
235           numFamilies = Integer.parseInt(args[++i]);
236         } else if (cmd.equals("-qualifiers")) {
237           numQualifiers = Integer.parseInt(args[++i]);
238         } else if (cmd.equals("-keySize")) {
239           keySize = Integer.parseInt(args[++i]);
240         } else if (cmd.equals("-valueSize")) {
241           valueSize = Integer.parseInt(args[++i]);
242         } else if (cmd.equals("-syncInterval")) {
243           syncInterval = Integer.parseInt(args[++i]);
244         } else if (cmd.equals("-nosync")) {
245           noSync = true;
246         } else if (cmd.equals("-verify")) {
247           verify = true;
248         } else if (cmd.equals("-verbose")) {
249           verbose = true;
250         } else if (cmd.equals("-nocleanup")) {
251           cleanup = false;
252         } else if (cmd.equals("-noclosefs")) {
253           noclosefs = true;
254         } else if (cmd.equals("-roll")) {
255           roll = Long.parseLong(args[++i]);
256         } else if (cmd.equals("-compress")) {
257           compress = true;
258         } else if (cmd.equals("-encryption")) {
259           cipher = args[++i];
260         } else if (cmd.equals("-regions")) {
261           numRegions = Integer.parseInt(args[++i]);
262         } else if (cmd.equals("-traceFreq")) {
263           traceFreq = Double.parseDouble(args[++i]);
264         } else if (cmd.equals("-h")) {
265           printUsageAndExit();
266         } else if (cmd.equals("--help")) {
267           printUsageAndExit();
268         } else {
269           System.err.println("UNEXPECTED: " + cmd);
270           printUsageAndExit();
271         }
272       } catch (Exception e) {
273         printUsageAndExit();
274       }
275     }
276 
277     if (compress) {
278       Configuration conf = getConf();
279       conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
280     }
281 
282     if (cipher != null) {
283       // Set up WAL for encryption
284       Configuration conf = getConf();
285       conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
286       conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
287       conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
288         WAL.Reader.class);
289       conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
290         Writer.class);
291       conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
292       conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
293     }
294 
295     if (numThreads < numRegions) {
296       LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
297     }
298 
299     // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
300     // In regionserver, number of handlers == number of threads.
301     getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
302 
303     // Run WAL Performance Evaluation
304     // First set the fs from configs.  In case we are on hadoop1
305     FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
306     FileSystem fs = FileSystem.get(getConf());
307     LOG.info("FileSystem: " + fs);
308 
309     SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
310     final Sampler<?> sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
311     TraceScope scope = Trace.startSpan("WALPerfEval", sampler);
312 
313     try {
314       if (rootRegionDir == null) {
315         rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
316       }
317       rootRegionDir = rootRegionDir.makeQualified(fs);
318       cleanRegionRootDir(fs, rootRegionDir);
319       FSUtils.setRootDir(getConf(), rootRegionDir);
320       final WALFactory wals = new WALFactory(getConf(), null, "wals");
321       final HRegion[] regions = new HRegion[numRegions];
322       final Runnable[] benchmarks = new Runnable[numRegions];
323       final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
324       final LogRoller roller = new LogRoller(mockServices, mockServices);
325       Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller");
326 
327       try {
328         for(int i = 0; i < numRegions; i++) {
329           // Initialize Table Descriptor
330           // a table per desired region means we can avoid carving up the key space
331           final HTableDescriptor htd = createHTableDescriptor(i, numFamilies);
332           regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
333           benchmarks[i] = Trace.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
334               syncInterval, traceFreq));
335         }
336         ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
337         long putTime = runBenchmark(benchmarks, numThreads);
338         logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
339           ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
340         
341         for (int i = 0; i < numRegions; i++) {
342           if (regions[i] != null) {
343             closeRegion(regions[i]);
344             regions[i] = null;
345           }
346         }
347         if (verify) {
348           LOG.info("verifying written log entries.");
349           Path dir = new Path(FSUtils.getWALRootDir(getConf()),
350               DefaultWALProvider.getWALDirectoryName("wals"));
351           long editCount = 0;
352           FileStatus [] fsss = fs.listStatus(dir);
353           if (fsss.length == 0) throw new IllegalStateException("No WAL found");
354           for (FileStatus fss: fsss) {
355             Path p = fss.getPath();
356             if (!fs.exists(p)) throw new IllegalStateException(p.toString());
357             editCount += verify(wals, p, verbose);
358           }
359           long expected = numIterations * numThreads;
360           if (editCount != expected) {
361             throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
362           }
363         }
364       } finally {
365         mockServices.stop("test clean up.");
366         for (int i = 0; i < numRegions; i++) {
367           if (regions[i] != null) {
368             closeRegion(regions[i]);
369           }
370         }
371         if (null != roller) {
372           LOG.info("shutting down log roller.");
373           Threads.shutdown(roller.getThread());
374         }
375         wals.shutdown();
376         // Remove the root dir for this test region
377         if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
378       }
379     } finally {
380       // We may be called inside a test that wants to keep on using the fs.
381       if (!noclosefs) fs.close();
382       scope.close();
383       if (receiverHost != null) receiverHost.closeReceivers();
384     }
385 
386     return(0);
387   }
388 
389   private static HTableDescriptor createHTableDescriptor(final int regionNum,
390       final int numFamilies) {
391     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME + ":" + regionNum));
392     for (int i = 0; i < numFamilies; ++i) {
393       HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
394       htd.addFamily(colDef);
395     }
396     return htd;
397   }
398 
399   /**
400    * Verify the content of the WAL file.
401    * Verify that the file has expected number of edits.
402    * @param wals may not be null
403    * @param wal
404    * @return Count of edits.
405    * @throws IOException
406    */
407   private long verify(final WALFactory wals, final Path wal, final boolean verbose)
408       throws IOException {
409     WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
410     long count = 0;
411     Map<String, Long> sequenceIds = new HashMap<String, Long>();
412     try {
413       while (true) {
414         WAL.Entry e = reader.next();
415         if (e == null) {
416           LOG.debug("Read count=" + count + " from " + wal);
417           break;
418         }
419         count++;
420         long seqid = e.getKey().getLogSeqNum();
421         if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
422           // sequenceIds should be increasing for every regions
423           if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
424             throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
425                 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
426                 + ", current seqid = " + seqid);
427           }
428         }
429         // update the sequence Id.
430         sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
431         if (verbose) LOG.info("seqid=" + seqid);
432       }
433     } finally {
434       reader.close();
435     }
436     return count;
437   }
438 
439   private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
440     float tsec = totalTime / 1000.0f;
441     LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
442     
443   }
444 
445   private void printUsageAndExit() {
446     System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
447     System.err.println(" where [options] are:");
448     System.err.println("  -h|-help         Show this help and exit.");
449     System.err.println("  -threads <N>     Number of threads writing on the WAL.");
450     System.err.println("  -regions <N>     Number of regions to open in the WAL. Default: 1");
451     System.err.println("  -iterations <N>  Number of iterations per thread.");
452     System.err.println("  -path <PATH>     Path where region's root directory is created.");
453     System.err.println("  -families <N>    Number of column families to write.");
454     System.err.println("  -qualifiers <N>  Number of qualifiers to write.");
455     System.err.println("  -keySize <N>     Row key size in byte.");
456     System.err.println("  -valueSize <N>   Row/Col value size in byte.");
457     System.err.println("  -nocleanup       Do NOT remove test data when done.");
458     System.err.println("  -noclosefs       Do NOT close the filesystem when done.");
459     System.err.println("  -nosync          Append without syncing");
460     System.err.println("  -syncInterval <N> Append N edits and then sync. " +
461       "Default=0, i.e. sync every edit.");
462     System.err.println("  -verify          Verify edits written in sequence");
463     System.err.println("  -verbose         Output extra info; " +
464       "e.g. all edit seq ids when verifying");
465     System.err.println("  -roll <N>        Roll the way every N appends");
466     System.err.println("  -encryption <A>  Encrypt the WAL with algorithm A, e.g. AES");
467     System.err.println("  -traceFreq <N>   Rate of trace sampling. Default: 1.0, " +
468       "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
469     System.err.println("");
470     System.err.println("Examples:");
471     System.err.println("");
472     System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
473       "verification afterward do:");
474     System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.wal." +
475       "WALPerformanceEvaluation \\");
476     System.err.println("    -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
477       "-threads 100 -roll 10000 -verify");
478     System.exit(1);
479   }
480 
481   private final Set<WAL> walsListenedTo = new HashSet<WAL>();
482 
483   private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd,
484       final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
485     // Initialize HRegion
486     HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
487     // Initialize WAL
488     final WAL wal = wals.getWAL(regionInfo.getEncodedNameAsBytes());
489     // If we haven't already, attach a listener to this wal to handle rolls and metrics.
490     if (walsListenedTo.add(wal)) {
491       roller.addWAL(wal);
492       wal.registerWALActionsListener(new WALActionsListener.Base() {
493         private int appends = 0;
494 
495         @Override
496         public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
497             WALEdit logEdit) {
498           this.appends++;
499           if (this.appends % whenToRoll == 0) {
500             LOG.info("Rolling after " + appends + " edits");
501             // We used to do explicit call to rollWriter but changed it to a request
502             // to avoid dead lock (there are less threads going on in this class than
503             // in the regionserver -- regionserver does not have the issue).
504             DefaultWALProvider.requestLogRoll(wal);
505           }
506         }
507 
508         @Override
509         public void postSync(final long timeInNanos, final int handlerSyncs) {
510           syncMeter.mark();
511           syncHistogram.update(timeInNanos);
512           syncCountHistogram.update(handlerSyncs);
513         }
514 
515         @Override
516         public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
517             final WALEdit logEdit) {
518           appendMeter.mark(size);
519         }
520       });
521     }
522      
523     return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
524   }
525 
526   private void closeRegion(final HRegion region) throws IOException {
527     if (region != null) {
528       region.close();
529       WAL wal = region.getWAL();
530       if (wal != null) {
531         wal.shutdown();
532       }
533     }
534   }
535 
536   private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
537     if (fs.exists(dir)) {
538       fs.delete(dir, true);
539     }
540   }
541 
542   private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
543     rand.nextBytes(key);
544     Put put = new Put(key);
545     for (int cf = 0; cf < numFamilies; ++cf) {
546       for (int q = 0; q < numQualifiers; ++q) {
547         rand.nextBytes(value);
548         put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
549       }
550     }
551     return put;
552   }
553 
554   private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
555       WALEdit walEdit) {
556     for (List<Cell> edits : familyMap.values()) {
557       for (Cell cell : edits) {
558         walEdit.add(cell);
559       }
560     }
561   }
562 
563   private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
564     Thread[] threads = new Thread[numThreads];
565     long startTime = System.currentTimeMillis();
566     for (int i = 0; i < numThreads; ++i) {
567       threads[i] = new Thread(runnable[i%runnable.length], "t" + i + ",r" + (i%runnable.length));
568       threads[i].start();
569     }
570     for (Thread t : threads) t.join();
571     long endTime = System.currentTimeMillis();
572     return(endTime - startTime);
573   }
574 
575   /**
576    * The guts of the {@link #main} method.
577    * Call this method to avoid the {@link #main(String[])} System.exit.
578    * @param args
579    * @return errCode
580    * @throws Exception
581    */
582   static int innerMain(final Configuration c, final String [] args) throws Exception {
583     return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
584   }
585 
586   public static void main(String[] args) throws Exception {
587      System.exit(innerMain(HBaseConfiguration.create(), args));
588   }
589 }