1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Random;
28 import java.util.Set;
29 import java.util.UUID;
30 import java.util.concurrent.TimeUnit;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.conf.Configured;
37 import org.apache.hadoop.fs.FileStatus;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.MockRegionServerServices;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.Put;
50 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
51 import org.apache.hadoop.hbase.regionserver.HRegion;
52 import org.apache.hadoop.hbase.regionserver.LogRoller;
53 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
54 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
55 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.FSUtils;
59 import org.apache.hadoop.hbase.util.Threads;
60 import org.apache.hadoop.util.Tool;
61 import org.apache.hadoop.util.ToolRunner;
62 import org.apache.htrace.HTraceConfiguration;
63 import org.apache.htrace.Sampler;
64 import org.apache.htrace.Trace;
65 import org.apache.htrace.TraceScope;
66 import org.apache.htrace.impl.ProbabilitySampler;
67
68 import com.yammer.metrics.core.Histogram;
69 import com.yammer.metrics.core.Meter;
70 import com.yammer.metrics.core.MetricsRegistry;
71 import com.yammer.metrics.reporting.ConsoleReporter;
72
73
74 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
75 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
76 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
77 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
78
79
80
81
82
83
84 @InterfaceAudience.Private
85 public final class WALPerformanceEvaluation extends Configured implements Tool {
86 static final Log LOG = LogFactory.getLog(WALPerformanceEvaluation.class.getName());
87 private final MetricsRegistry metrics = new MetricsRegistry();
88 private final Meter syncMeter =
89 metrics.newMeter(WALPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
90 private final Histogram syncHistogram =
91 metrics.newHistogram(WALPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs",
92 true);
93 private final Histogram syncCountHistogram =
94 metrics.newHistogram(WALPerformanceEvaluation.class, "syncCountHistogram", "countPerSync",
95 true);
96 private final Meter appendMeter =
97 metrics.newMeter(WALPerformanceEvaluation.class, "appendMeter", "bytes",
98 TimeUnit.MILLISECONDS);
99 private final Histogram latencyHistogram =
100 metrics.newHistogram(WALPerformanceEvaluation.class, "latencyHistogram", "nanos", true);
101
102 private HBaseTestingUtility TEST_UTIL;
103
104 static final String TABLE_NAME = "WALPerformanceEvaluation";
105 static final String QUALIFIER_PREFIX = "q";
106 static final String FAMILY_PREFIX = "cf";
107
108 private int numQualifiers = 1;
109 private int valueSize = 512;
110 private int keySize = 16;
111
112 @Override
113 public void setConf(Configuration conf) {
114 super.setConf(conf);
115 TEST_UTIL = new HBaseTestingUtility(conf);
116 }
117
118
119
120
121
122
123 class WALPutBenchmark implements Runnable {
124 private final long numIterations;
125 private final int numFamilies;
126 private final boolean noSync;
127 private final HRegion region;
128 private final int syncInterval;
129 private final HTableDescriptor htd;
130 private final Sampler loopSampler;
131
132 WALPutBenchmark(final HRegion region, final HTableDescriptor htd,
133 final long numIterations, final boolean noSync, final int syncInterval,
134 final double traceFreq) {
135 this.numIterations = numIterations;
136 this.noSync = noSync;
137 this.syncInterval = syncInterval;
138 this.numFamilies = htd.getColumnFamilies().length;
139 this.region = region;
140 this.htd = htd;
141 String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
142 if (spanReceivers == null || spanReceivers.isEmpty()) {
143 loopSampler = Sampler.NEVER;
144 } else {
145 if (traceFreq <= 0.0) {
146 LOG.warn("Tracing enabled but traceFreq=0.");
147 loopSampler = Sampler.NEVER;
148 } else if (traceFreq >= 1.0) {
149 loopSampler = Sampler.ALWAYS;
150 if (numIterations > 1000) {
151 LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
152 + " SpanReciever can keep up.");
153 }
154 } else {
155 getConf().setDouble("hbase.sampler.fraction", traceFreq);
156 loopSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(getConf()));
157 }
158 }
159 }
160
161 @Override
162 public void run() {
163 byte[] key = new byte[keySize];
164 byte[] value = new byte[valueSize];
165 Random rand = new Random(Thread.currentThread().getId());
166 WAL wal = region.getWAL();
167
168 TraceScope threadScope =
169 Trace.startSpan("WALPerfEval." + Thread.currentThread().getName());
170 try {
171 long startTime = System.currentTimeMillis();
172 int lastSync = 0;
173 for (int i = 0; i < numIterations; ++i) {
174 assert Trace.currentSpan() == threadScope.getSpan() : "Span leak detected.";
175 TraceScope loopScope = Trace.startSpan("runLoopIter" + i, loopSampler);
176 try {
177 long now = System.nanoTime();
178 Put put = setupPut(rand, key, value, numFamilies);
179 WALEdit walEdit = new WALEdit();
180 addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
181 HRegionInfo hri = region.getRegionInfo();
182 final WALKey logkey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
183 wal.append(htd, hri, logkey, walEdit, region.getSequenceId(), true, null);
184 if (!this.noSync) {
185 if (++lastSync >= this.syncInterval) {
186 wal.sync();
187 lastSync = 0;
188 }
189 }
190 latencyHistogram.update(System.nanoTime() - now);
191 } finally {
192 loopScope.close();
193 }
194 }
195 long totalTime = (System.currentTimeMillis() - startTime);
196 logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
197 } catch (Exception e) {
198 LOG.error(getClass().getSimpleName() + " Thread failed", e);
199 } finally {
200 threadScope.close();
201 }
202 }
203 }
204
205 @Override
206 public int run(String[] args) throws Exception {
207 Path rootRegionDir = null;
208 int numThreads = 1;
209 long numIterations = 1000000;
210 int numFamilies = 1;
211 int syncInterval = 0;
212 boolean noSync = false;
213 boolean verify = false;
214 boolean verbose = false;
215 boolean cleanup = true;
216 boolean noclosefs = false;
217 long roll = Long.MAX_VALUE;
218 boolean compress = false;
219 String cipher = null;
220 int numRegions = 1;
221 String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
222 boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
223 double traceFreq = 1.0;
224
225 for (int i = 0; i < args.length; i++) {
226 String cmd = args[i];
227 try {
228 if (cmd.equals("-threads")) {
229 numThreads = Integer.parseInt(args[++i]);
230 } else if (cmd.equals("-iterations")) {
231 numIterations = Long.parseLong(args[++i]);
232 } else if (cmd.equals("-path")) {
233 rootRegionDir = new Path(args[++i]);
234 } else if (cmd.equals("-families")) {
235 numFamilies = Integer.parseInt(args[++i]);
236 } else if (cmd.equals("-qualifiers")) {
237 numQualifiers = Integer.parseInt(args[++i]);
238 } else if (cmd.equals("-keySize")) {
239 keySize = Integer.parseInt(args[++i]);
240 } else if (cmd.equals("-valueSize")) {
241 valueSize = Integer.parseInt(args[++i]);
242 } else if (cmd.equals("-syncInterval")) {
243 syncInterval = Integer.parseInt(args[++i]);
244 } else if (cmd.equals("-nosync")) {
245 noSync = true;
246 } else if (cmd.equals("-verify")) {
247 verify = true;
248 } else if (cmd.equals("-verbose")) {
249 verbose = true;
250 } else if (cmd.equals("-nocleanup")) {
251 cleanup = false;
252 } else if (cmd.equals("-noclosefs")) {
253 noclosefs = true;
254 } else if (cmd.equals("-roll")) {
255 roll = Long.parseLong(args[++i]);
256 } else if (cmd.equals("-compress")) {
257 compress = true;
258 } else if (cmd.equals("-encryption")) {
259 cipher = args[++i];
260 } else if (cmd.equals("-regions")) {
261 numRegions = Integer.parseInt(args[++i]);
262 } else if (cmd.equals("-traceFreq")) {
263 traceFreq = Double.parseDouble(args[++i]);
264 } else if (cmd.equals("-h")) {
265 printUsageAndExit();
266 } else if (cmd.equals("--help")) {
267 printUsageAndExit();
268 } else {
269 System.err.println("UNEXPECTED: " + cmd);
270 printUsageAndExit();
271 }
272 } catch (Exception e) {
273 printUsageAndExit();
274 }
275 }
276
277 if (compress) {
278 Configuration conf = getConf();
279 conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
280 }
281
282 if (cipher != null) {
283
284 Configuration conf = getConf();
285 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
286 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
287 conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
288 WAL.Reader.class);
289 conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
290 Writer.class);
291 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
292 conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
293 }
294
295 if (numThreads < numRegions) {
296 LOG.warn("Number of threads is less than the number of regions; some regions will sit idle.");
297 }
298
299
300
301 getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
302
303
304
305 FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
306 FileSystem fs = FileSystem.get(getConf());
307 LOG.info("FileSystem: " + fs);
308
309 SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
310 final Sampler<?> sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
311 TraceScope scope = Trace.startSpan("WALPerfEval", sampler);
312
313 try {
314 if (rootRegionDir == null) {
315 rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("WALPerformanceEvaluation");
316 }
317 rootRegionDir = rootRegionDir.makeQualified(fs);
318 cleanRegionRootDir(fs, rootRegionDir);
319 FSUtils.setRootDir(getConf(), rootRegionDir);
320 final WALFactory wals = new WALFactory(getConf(), null, "wals");
321 final HRegion[] regions = new HRegion[numRegions];
322 final Runnable[] benchmarks = new Runnable[numRegions];
323 final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
324 final LogRoller roller = new LogRoller(mockServices, mockServices);
325 Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller");
326
327 try {
328 for(int i = 0; i < numRegions; i++) {
329
330
331 final HTableDescriptor htd = createHTableDescriptor(i, numFamilies);
332 regions[i] = openRegion(fs, rootRegionDir, htd, wals, roll, roller);
333 benchmarks[i] = Trace.wrap(new WALPutBenchmark(regions[i], htd, numIterations, noSync,
334 syncInterval, traceFreq));
335 }
336 ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
337 long putTime = runBenchmark(benchmarks, numThreads);
338 logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
339 ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
340
341 for (int i = 0; i < numRegions; i++) {
342 if (regions[i] != null) {
343 closeRegion(regions[i]);
344 regions[i] = null;
345 }
346 }
347 if (verify) {
348 LOG.info("verifying written log entries.");
349 Path dir = new Path(FSUtils.getWALRootDir(getConf()),
350 DefaultWALProvider.getWALDirectoryName("wals"));
351 long editCount = 0;
352 FileStatus [] fsss = fs.listStatus(dir);
353 if (fsss.length == 0) throw new IllegalStateException("No WAL found");
354 for (FileStatus fss: fsss) {
355 Path p = fss.getPath();
356 if (!fs.exists(p)) throw new IllegalStateException(p.toString());
357 editCount += verify(wals, p, verbose);
358 }
359 long expected = numIterations * numThreads;
360 if (editCount != expected) {
361 throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
362 }
363 }
364 } finally {
365 mockServices.stop("test clean up.");
366 for (int i = 0; i < numRegions; i++) {
367 if (regions[i] != null) {
368 closeRegion(regions[i]);
369 }
370 }
371 if (null != roller) {
372 LOG.info("shutting down log roller.");
373 Threads.shutdown(roller.getThread());
374 }
375 wals.shutdown();
376
377 if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
378 }
379 } finally {
380
381 if (!noclosefs) fs.close();
382 scope.close();
383 if (receiverHost != null) receiverHost.closeReceivers();
384 }
385
386 return(0);
387 }
388
389 private static HTableDescriptor createHTableDescriptor(final int regionNum,
390 final int numFamilies) {
391 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME + ":" + regionNum));
392 for (int i = 0; i < numFamilies; ++i) {
393 HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
394 htd.addFamily(colDef);
395 }
396 return htd;
397 }
398
399
400
401
402
403
404
405
406
407 private long verify(final WALFactory wals, final Path wal, final boolean verbose)
408 throws IOException {
409 WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
410 long count = 0;
411 Map<String, Long> sequenceIds = new HashMap<String, Long>();
412 try {
413 while (true) {
414 WAL.Entry e = reader.next();
415 if (e == null) {
416 LOG.debug("Read count=" + count + " from " + wal);
417 break;
418 }
419 count++;
420 long seqid = e.getKey().getLogSeqNum();
421 if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
422
423 if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
424 throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
425 + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
426 + ", current seqid = " + seqid);
427 }
428 }
429
430 sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
431 if (verbose) LOG.info("seqid=" + seqid);
432 }
433 } finally {
434 reader.close();
435 }
436 return count;
437 }
438
439 private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
440 float tsec = totalTime / 1000.0f;
441 LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
442
443 }
444
445 private void printUsageAndExit() {
446 System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
447 System.err.println(" where [options] are:");
448 System.err.println(" -h|-help Show this help and exit.");
449 System.err.println(" -threads <N> Number of threads writing on the WAL.");
450 System.err.println(" -regions <N> Number of regions to open in the WAL. Default: 1");
451 System.err.println(" -iterations <N> Number of iterations per thread.");
452 System.err.println(" -path <PATH> Path where region's root directory is created.");
453 System.err.println(" -families <N> Number of column families to write.");
454 System.err.println(" -qualifiers <N> Number of qualifiers to write.");
455 System.err.println(" -keySize <N> Row key size in byte.");
456 System.err.println(" -valueSize <N> Row/Col value size in byte.");
457 System.err.println(" -nocleanup Do NOT remove test data when done.");
458 System.err.println(" -noclosefs Do NOT close the filesystem when done.");
459 System.err.println(" -nosync Append without syncing");
460 System.err.println(" -syncInterval <N> Append N edits and then sync. " +
461 "Default=0, i.e. sync every edit.");
462 System.err.println(" -verify Verify edits written in sequence");
463 System.err.println(" -verbose Output extra info; " +
464 "e.g. all edit seq ids when verifying");
465 System.err.println(" -roll <N> Roll the way every N appends");
466 System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES");
467 System.err.println(" -traceFreq <N> Rate of trace sampling. Default: 1.0, " +
468 "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
469 System.err.println("");
470 System.err.println("Examples:");
471 System.err.println("");
472 System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
473 "verification afterward do:");
474 System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.wal." +
475 "WALPerformanceEvaluation \\");
476 System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
477 "-threads 100 -roll 10000 -verify");
478 System.exit(1);
479 }
480
481 private final Set<WAL> walsListenedTo = new HashSet<WAL>();
482
483 private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd,
484 final WALFactory wals, final long whenToRoll, final LogRoller roller) throws IOException {
485
486 HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
487
488 final WAL wal = wals.getWAL(regionInfo.getEncodedNameAsBytes());
489
490 if (walsListenedTo.add(wal)) {
491 roller.addWAL(wal);
492 wal.registerWALActionsListener(new WALActionsListener.Base() {
493 private int appends = 0;
494
495 @Override
496 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
497 WALEdit logEdit) {
498 this.appends++;
499 if (this.appends % whenToRoll == 0) {
500 LOG.info("Rolling after " + appends + " edits");
501
502
503
504 DefaultWALProvider.requestLogRoll(wal);
505 }
506 }
507
508 @Override
509 public void postSync(final long timeInNanos, final int handlerSyncs) {
510 syncMeter.mark();
511 syncHistogram.update(timeInNanos);
512 syncCountHistogram.update(handlerSyncs);
513 }
514
515 @Override
516 public void postAppend(final long size, final long elapsedTime, final WALKey logkey,
517 final WALEdit logEdit) {
518 appendMeter.mark(size);
519 }
520 });
521 }
522
523 return HRegion.createHRegion(regionInfo, dir, getConf(), htd, wal);
524 }
525
526 private void closeRegion(final HRegion region) throws IOException {
527 if (region != null) {
528 region.close();
529 WAL wal = region.getWAL();
530 if (wal != null) {
531 wal.shutdown();
532 }
533 }
534 }
535
536 private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
537 if (fs.exists(dir)) {
538 fs.delete(dir, true);
539 }
540 }
541
542 private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
543 rand.nextBytes(key);
544 Put put = new Put(key);
545 for (int cf = 0; cf < numFamilies; ++cf) {
546 for (int q = 0; q < numQualifiers; ++q) {
547 rand.nextBytes(value);
548 put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
549 }
550 }
551 return put;
552 }
553
554 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
555 WALEdit walEdit) {
556 for (List<Cell> edits : familyMap.values()) {
557 for (Cell cell : edits) {
558 walEdit.add(cell);
559 }
560 }
561 }
562
563 private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
564 Thread[] threads = new Thread[numThreads];
565 long startTime = System.currentTimeMillis();
566 for (int i = 0; i < numThreads; ++i) {
567 threads[i] = new Thread(runnable[i%runnable.length], "t" + i + ",r" + (i%runnable.length));
568 threads[i].start();
569 }
570 for (Thread t : threads) t.join();
571 long endTime = System.currentTimeMillis();
572 return(endTime - startTime);
573 }
574
575
576
577
578
579
580
581
582 static int innerMain(final Configuration c, final String [] args) throws Exception {
583 return ToolRunner.run(c, new WALPerformanceEvaluation(), args);
584 }
585
586 public static void main(String[] args) throws Exception {
587 System.exit(innerMain(HBaseConfiguration.create(), args));
588 }
589 }