View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  
18  package org.apache.hadoop.hbase.io.hfile;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.util.Random;
23  import java.util.StringTokenizer;
24  
25  import junit.framework.TestCase;
26  
27  import org.apache.commons.cli.CommandLine;
28  import org.apache.commons.cli.CommandLineParser;
29  import org.apache.commons.cli.GnuParser;
30  import org.apache.commons.cli.HelpFormatter;
31  import org.apache.commons.cli.Option;
32  import org.apache.commons.cli.OptionBuilder;
33  import org.apache.commons.cli.Options;
34  import org.apache.commons.cli.ParseException;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FSDataOutputStream;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.fs.RawLocalFileSystem;
43  import org.apache.hadoop.hbase.HBaseTestingUtility;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.testclassification.MediumTests;
46  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
47  import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
48  import org.apache.hadoop.io.BytesWritable;
49  import org.junit.experimental.categories.Category;
50  
51  /**
52   * test the performance for seek.
53   * <p>
54   * Copied from
55   * <a href="https://issues.apache.org/jira/browse/HADOOP-3315">hadoop-3315 tfile</a>.
56   * Remove after tfile is committed and use the tfile version of this class
57   * instead.</p>
58   */
59  @Category(MediumTests.class)
60  public class TestHFileSeek extends TestCase {
61    private static final byte[] CF = "f1".getBytes();
62    private static final byte[] QUAL = "q1".getBytes();
63    private static final boolean USE_PREAD = true;
64    private MyOptions options;
65    private Configuration conf;
66    private Path path;
67    private FileSystem fs;
68    private NanoTimer timer;
69    private Random rng;
70    private RandomDistribution.DiscreteRNG keyLenGen;
71    private KVGenerator kvGen;
72  
73    private static final Log LOG = LogFactory.getLog(TestHFileSeek.class);
74  
75    @Override
76    public void setUp() throws IOException {
77      if (options == null) {
78        options = new MyOptions(new String[0]);
79      }
80  
81      conf = new Configuration();
82      
83      if (options.useRawFs) {
84        conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
85      }
86      
87      conf.setInt("tfile.fs.input.buffer.size", options.fsInputBufferSize);
88      conf.setInt("tfile.fs.output.buffer.size", options.fsOutputBufferSize);
89      path = new Path(new Path(options.rootDir), options.file);
90      fs = path.getFileSystem(conf);
91      timer = new NanoTimer(false);
92      rng = new Random(options.seed);
93      keyLenGen =
94          new RandomDistribution.Zipf(new Random(rng.nextLong()),
95              options.minKeyLen, options.maxKeyLen, 1.2);
96      RandomDistribution.DiscreteRNG valLenGen =
97          new RandomDistribution.Flat(new Random(rng.nextLong()),
98              options.minValLength, options.maxValLength);
99      RandomDistribution.DiscreteRNG wordLenGen =
100         new RandomDistribution.Flat(new Random(rng.nextLong()),
101             options.minWordLen, options.maxWordLen);
102     kvGen =
103         new KVGenerator(rng, true, keyLenGen, valLenGen, wordLenGen,
104             options.dictSize);
105   }
106 
107   @Override
108   public void tearDown() {
109     try {
110       fs.close();
111     }
112     catch (Exception e) {
113       // Nothing
114     }
115   }
116 
117   private static FSDataOutputStream createFSOutput(Path name, FileSystem fs)
118     throws IOException {
119     if (fs.exists(name)) {
120       fs.delete(name, true);
121     }
122     FSDataOutputStream fout = fs.create(name);
123     return fout;
124   }
125 
126   private void createTFile() throws IOException {
127     long totalBytes = 0;
128     FSDataOutputStream fout = createFSOutput(path, fs);
129     try {
130       HFileContext context = new HFileContextBuilder()
131                             .withBlockSize(options.minBlockSize)
132                             .withCompression(AbstractHFileWriter.compressionByName(options.compress))
133                             .build();
134       Writer writer = HFile.getWriterFactoryNoCache(conf)
135           .withOutputStream(fout)
136           .withFileContext(context)
137           .withComparator(new KeyValue.RawBytesComparator())
138           .create();
139       try {
140         BytesWritable key = new BytesWritable();
141         BytesWritable val = new BytesWritable();
142         timer.start();
143         for (long i = 0; true; ++i) {
144           if (i % 1000 == 0) { // test the size for every 1000 rows.
145             if (fs.getFileStatus(path).getLen() >= options.fileSize) {
146               break;
147             }
148           }
149           kvGen.next(key, val, false);
150           byte [] k = new byte [key.getLength()];
151           System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
152           byte [] v = new byte [val.getLength()];
153           System.arraycopy(val.getBytes(), 0, v, 0, key.getLength());
154           KeyValue kv = new KeyValue(k, CF, QUAL, v);
155           writer.append(kv);
156           totalBytes += kv.getKeyLength();
157           totalBytes += kv.getValueLength();
158         }
159         timer.stop();
160       }
161       finally {
162         writer.close();
163       }
164     }
165     finally {
166       fout.close();
167     }
168     double duration = (double)timer.read()/1000; // in us.
169     long fsize = fs.getFileStatus(path).getLen();
170 
171     System.out.printf(
172         "time: %s...uncompressed: %.2fMB...raw thrpt: %.2fMB/s\n",
173         timer.toString(), (double) totalBytes / 1024 / 1024, totalBytes
174             / duration);
175     System.out.printf("time: %s...file size: %.2fMB...disk thrpt: %.2fMB/s\n",
176         timer.toString(), (double) fsize / 1024 / 1024, fsize / duration);
177   }
178 
179   public void seekTFile() throws IOException {
180     int miss = 0;
181     long totalBytes = 0;
182     FSDataInputStream fsdis = fs.open(path);
183     Reader reader = HFile.createReaderFromStream(path, fsdis,
184         fs.getFileStatus(path).getLen(), new CacheConfig(conf), conf);
185     reader.loadFileInfo();
186     KeySampler kSampler =
187         new KeySampler(rng, reader.getFirstKey(), reader.getLastKey(),
188             keyLenGen);
189     HFileScanner scanner = reader.getScanner(false, USE_PREAD);
190     BytesWritable key = new BytesWritable();
191     timer.reset();
192     timer.start();
193     for (int i = 0; i < options.seekCount; ++i) {
194       kSampler.next(key);
195       byte [] k = new byte [key.getLength()];
196       System.arraycopy(key.getBytes(), 0, k, 0, key.getLength());
197       if (scanner.seekTo(KeyValue.createKeyValueFromKey(k)) >= 0) {
198         ByteBuffer bbkey = scanner.getKey();
199         ByteBuffer bbval = scanner.getValue();
200         totalBytes += bbkey.limit();
201         totalBytes += bbval.limit();
202       }
203       else {
204         ++miss;
205       }
206     }
207     timer.stop();
208     System.out.printf(
209         "time: %s...avg seek: %s...%d hit...%d miss...avg I/O size: %.2fKB\n",
210         timer.toString(), NanoTimer.nanoTimeToString(timer.read()
211             / options.seekCount), options.seekCount - miss, miss,
212         (double) totalBytes / 1024 / (options.seekCount - miss));
213 
214   }
215 
216   public void testSeeks() throws IOException {
217     if (options.doCreate()) {
218       createTFile();
219     }
220 
221     if (options.doRead()) {
222       seekTFile();
223     }
224 
225     if (options.doCreate()) {
226       fs.delete(path, true);
227     }
228   }
229 
230   private static class IntegerRange {
231     private final int from, to;
232 
233     public IntegerRange(int from, int to) {
234       this.from = from;
235       this.to = to;
236     }
237 
238     public static IntegerRange parse(String s) throws ParseException {
239       StringTokenizer st = new StringTokenizer(s, " \t,");
240       if (st.countTokens() != 2) {
241         throw new ParseException("Bad integer specification: " + s);
242       }
243       int from = Integer.parseInt(st.nextToken());
244       int to = Integer.parseInt(st.nextToken());
245       return new IntegerRange(from, to);
246     }
247 
248     public int from() {
249       return from;
250     }
251 
252     public int to() {
253       return to;
254     }
255   }
256 
257   private static class MyOptions {
258     // hard coded constants
259     int dictSize = 1000;
260     int minWordLen = 5;
261     int maxWordLen = 20;
262 
263     private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
264     String rootDir =
265       TEST_UTIL.getDataTestDir("TestTFileSeek").toString();
266     String file = "TestTFileSeek";
267     // String compress = "lzo"; DISABLED
268     String compress = "none";
269     int minKeyLen = 10;
270     int maxKeyLen = 50;
271     int minValLength = 1024;
272     int maxValLength = 2 * 1024;
273     int minBlockSize = 1 * 1024 * 1024;
274     int fsOutputBufferSize = 1;
275     int fsInputBufferSize = 0;
276     // Default writing 10MB.
277     long fileSize = 10 * 1024 * 1024;
278     long seekCount = 1000;
279     long trialCount = 1;
280     long seed;
281     boolean useRawFs = false;
282 
283     static final int OP_CREATE = 1;
284     static final int OP_READ = 2;
285     int op = OP_CREATE | OP_READ;
286 
287     boolean proceed = false;
288 
289     public MyOptions(String[] args) {
290       seed = System.nanoTime();
291 
292       try {
293         Options opts = buildOptions();
294         CommandLineParser parser = new GnuParser();
295         CommandLine line = parser.parse(opts, args, true);
296         processOptions(line, opts);
297         validateOptions();
298       }
299       catch (ParseException e) {
300         System.out.println(e.getMessage());
301         System.out.println("Try \"--help\" option for details.");
302         setStopProceed();
303       }
304     }
305 
306     public boolean proceed() {
307       return proceed;
308     }
309 
310     private Options buildOptions() {
311       Option compress =
312           OptionBuilder.withLongOpt("compress").withArgName("[none|lzo|gz|snappy]")
313               .hasArg().withDescription("compression scheme").create('c');
314 
315       Option fileSize =
316           OptionBuilder.withLongOpt("file-size").withArgName("size-in-MB")
317               .hasArg().withDescription("target size of the file (in MB).")
318               .create('s');
319 
320       Option fsInputBufferSz =
321           OptionBuilder.withLongOpt("fs-input-buffer").withArgName("size")
322               .hasArg().withDescription(
323                   "size of the file system input buffer (in bytes).").create(
324                   'i');
325 
326       Option fsOutputBufferSize =
327           OptionBuilder.withLongOpt("fs-output-buffer").withArgName("size")
328               .hasArg().withDescription(
329                   "size of the file system output buffer (in bytes).").create(
330                   'o');
331 
332       Option keyLen =
333           OptionBuilder
334               .withLongOpt("key-length")
335               .withArgName("min,max")
336               .hasArg()
337               .withDescription(
338                   "the length range of the key (in bytes)")
339               .create('k');
340 
341       Option valueLen =
342           OptionBuilder
343               .withLongOpt("value-length")
344               .withArgName("min,max")
345               .hasArg()
346               .withDescription(
347                   "the length range of the value (in bytes)")
348               .create('v');
349 
350       Option blockSz =
351           OptionBuilder.withLongOpt("block").withArgName("size-in-KB").hasArg()
352               .withDescription("minimum block size (in KB)").create('b');
353 
354       Option operation =
355           OptionBuilder.withLongOpt("operation").withArgName("r|w|rw").hasArg()
356               .withDescription(
357                   "action: seek-only, create-only, seek-after-create").create(
358                   'x');
359 
360       Option rootDir =
361           OptionBuilder.withLongOpt("root-dir").withArgName("path").hasArg()
362               .withDescription(
363                   "specify root directory where files will be created.")
364               .create('r');
365 
366       Option file =
367           OptionBuilder.withLongOpt("file").withArgName("name").hasArg()
368               .withDescription("specify the file name to be created or read.")
369               .create('f');
370 
371       Option seekCount =
372           OptionBuilder
373               .withLongOpt("seek")
374               .withArgName("count")
375               .hasArg()
376               .withDescription(
377                   "specify how many seek operations we perform (requires -x r or -x rw.")
378               .create('n');
379       
380       Option trialCount =
381           OptionBuilder 
382               .withLongOpt("trials")
383               .withArgName("n")
384               .hasArg()
385               .withDescription(
386                   "specify how many times to run the whole benchmark")
387               .create('t');
388 
389       Option useRawFs =
390           OptionBuilder
391             .withLongOpt("rawfs")
392             .withDescription("use raw instead of checksummed file system")
393             .create();
394       
395       Option help =
396           OptionBuilder.withLongOpt("help").hasArg(false).withDescription(
397               "show this screen").create("h");
398 
399       return new Options().addOption(compress).addOption(fileSize).addOption(
400           fsInputBufferSz).addOption(fsOutputBufferSize).addOption(keyLen)
401           .addOption(blockSz).addOption(rootDir).addOption(valueLen)
402           .addOption(operation).addOption(seekCount).addOption(file)
403           .addOption(trialCount).addOption(useRawFs).addOption(help);
404 
405     }
406 
407     private void processOptions(CommandLine line, Options opts)
408         throws ParseException {
409       // --help -h and --version -V must be processed first.
410       if (line.hasOption('h')) {
411         HelpFormatter formatter = new HelpFormatter();
412         System.out.println("TFile and SeqFile benchmark.");
413         System.out.println();
414         formatter.printHelp(100,
415             "java ... TestTFileSeqFileComparison [options]",
416             "\nSupported options:", opts, "");
417         return;
418       }
419 
420       if (line.hasOption('c')) {
421         compress = line.getOptionValue('c');
422       }
423 
424       if (line.hasOption('d')) {
425         dictSize = Integer.parseInt(line.getOptionValue('d'));
426       }
427 
428       if (line.hasOption('s')) {
429         fileSize = Long.parseLong(line.getOptionValue('s')) * 1024 * 1024;
430       }
431 
432       if (line.hasOption('i')) {
433         fsInputBufferSize = Integer.parseInt(line.getOptionValue('i'));
434       }
435 
436       if (line.hasOption('o')) {
437         fsOutputBufferSize = Integer.parseInt(line.getOptionValue('o'));
438       }
439 
440       if (line.hasOption('n')) {
441         seekCount = Integer.parseInt(line.getOptionValue('n'));
442       }
443       
444       if (line.hasOption('t')) {
445         trialCount = Integer.parseInt(line.getOptionValue('t'));
446       }
447 
448       if (line.hasOption('k')) {
449         IntegerRange ir = IntegerRange.parse(line.getOptionValue('k'));
450         minKeyLen = ir.from();
451         maxKeyLen = ir.to();
452       }
453 
454       if (line.hasOption('v')) {
455         IntegerRange ir = IntegerRange.parse(line.getOptionValue('v'));
456         minValLength = ir.from();
457         maxValLength = ir.to();
458       }
459 
460       if (line.hasOption('b')) {
461         minBlockSize = Integer.parseInt(line.getOptionValue('b')) * 1024;
462       }
463 
464       if (line.hasOption('r')) {
465         rootDir = line.getOptionValue('r');
466       }
467 
468       if (line.hasOption('f')) {
469         file = line.getOptionValue('f');
470       }
471 
472       if (line.hasOption('S')) {
473         seed = Long.parseLong(line.getOptionValue('S'));
474       }
475 
476       if (line.hasOption('x')) {
477         String strOp = line.getOptionValue('x');
478         if (strOp.equals("r")) {
479           op = OP_READ;
480         }
481         else if (strOp.equals("w")) {
482           op = OP_CREATE;
483         }
484         else if (strOp.equals("rw")) {
485           op = OP_CREATE | OP_READ;
486         }
487         else {
488           throw new ParseException("Unknown action specifier: " + strOp);
489         }
490       }
491       
492       useRawFs = line.hasOption("rawfs");
493 
494       proceed = true;
495     }
496 
497     private void validateOptions() throws ParseException {
498       if (!compress.equals("none") && !compress.equals("lzo")
499           && !compress.equals("gz") && !compress.equals("snappy")) {
500         throw new ParseException("Unknown compression scheme: " + compress);
501       }
502 
503       if (minKeyLen >= maxKeyLen) {
504         throw new ParseException(
505             "Max key length must be greater than min key length.");
506       }
507 
508       if (minValLength >= maxValLength) {
509         throw new ParseException(
510             "Max value length must be greater than min value length.");
511       }
512 
513       if (minWordLen >= maxWordLen) {
514         throw new ParseException(
515             "Max word length must be greater than min word length.");
516       }
517       return;
518     }
519 
520     private void setStopProceed() {
521       proceed = false;
522     }
523 
524     public boolean doCreate() {
525       return (op & OP_CREATE) != 0;
526     }
527 
528     public boolean doRead() {
529       return (op & OP_READ) != 0;
530     }
531   }
532 
533   public static void main(String[] argv) throws IOException {
534     TestHFileSeek testCase = new TestHFileSeek();
535     MyOptions options = new MyOptions(argv);
536 
537     if (options.proceed == false) {
538       return;
539     }
540 
541     testCase.options = options;
542     for (int i = 0; i < options.trialCount; i++) {
543       LOG.info("Beginning trial " + (i+1));
544       testCase.setUp();
545       testCase.testSeeks();
546       testCase.tearDown();
547     }
548   }
549 
550 }
551