View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNotSame;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.io.UnsupportedEncodingException;
30  import java.util.Arrays;
31  import java.util.HashMap;
32  import java.util.Iterator;
33  import java.util.Map;
34  import java.util.Map.Entry;
35  import java.util.Random;
36  import java.util.Set;
37  import java.util.concurrent.Callable;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.LocatedFileStatus;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.fs.RemoteIterator;
47  import org.apache.hadoop.hbase.Cell;
48  import org.apache.hadoop.hbase.CellUtil;
49  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
50  import org.apache.hadoop.hbase.HBaseConfiguration;
51  import org.apache.hadoop.hbase.HBaseTestingUtility;
52  import org.apache.hadoop.hbase.HColumnDescriptor;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.HadoopShims;
56  import org.apache.hadoop.hbase.KeyValue;
57  import org.apache.hadoop.hbase.testclassification.LargeTests;
58  import org.apache.hadoop.hbase.PerformanceEvaluation;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.Tag;
61  import org.apache.hadoop.hbase.TagType;
62  import org.apache.hadoop.hbase.client.Admin;
63  import org.apache.hadoop.hbase.client.Connection;
64  import org.apache.hadoop.hbase.client.ConnectionFactory;
65  import org.apache.hadoop.hbase.client.HBaseAdmin;
66  import org.apache.hadoop.hbase.client.HTable;
67  import org.apache.hadoop.hbase.client.Put;
68  import org.apache.hadoop.hbase.client.RegionLocator;
69  import org.apache.hadoop.hbase.client.Result;
70  import org.apache.hadoop.hbase.client.ResultScanner;
71  import org.apache.hadoop.hbase.client.Scan;
72  import org.apache.hadoop.hbase.client.Table;
73  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
74  import org.apache.hadoop.hbase.io.compress.Compression;
75  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
76  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
77  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
78  import org.apache.hadoop.hbase.io.hfile.HFile;
79  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
80  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
81  import org.apache.hadoop.hbase.regionserver.BloomType;
82  import org.apache.hadoop.hbase.regionserver.StoreFile;
83  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
84  import org.apache.hadoop.hbase.util.Bytes;
85  import org.apache.hadoop.hbase.util.FSUtils;
86  import org.apache.hadoop.hbase.util.Threads;
87  import org.apache.hadoop.hbase.util.Writables;
88  import org.apache.hadoop.io.NullWritable;
89  import org.apache.hadoop.mapreduce.Job;
90  import org.apache.hadoop.mapreduce.Mapper;
91  import org.apache.hadoop.mapreduce.RecordWriter;
92  import org.apache.hadoop.mapreduce.TaskAttemptContext;
93  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
94  import org.junit.Ignore;
95  import org.junit.Test;
96  import org.junit.experimental.categories.Category;
97  import org.mockito.Mockito;
98  
99  /**
100  * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
101  * Sets up and runs a mapreduce job that writes hfile output.
102  * Creates a few inner classes to implement splits and an inputformat that
103  * emits keys and values like those of {@link PerformanceEvaluation}.
104  */
105 @Category(LargeTests.class)
106 public class TestHFileOutputFormat2  {
107   private final static int ROWSPERSPLIT = 1024;
108 
109   private static final byte[][] FAMILIES
110     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
111       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
112   private static final TableName TABLE_NAME =
113       TableName.valueOf("TestTable");
114 
115   private HBaseTestingUtility util = new HBaseTestingUtility();
116 
117   private static Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class);
118 
119   /**
120    * Simple mapper that makes KeyValue output.
121    */
122   static class RandomKVGeneratingMapper
123       extends Mapper<NullWritable, NullWritable,
124                  ImmutableBytesWritable, Cell> {
125 
126     private int keyLength;
127     private static final int KEYLEN_DEFAULT=10;
128     private static final String KEYLEN_CONF="randomkv.key.length";
129 
130     private int valLength;
131     private static final int VALLEN_DEFAULT=10;
132     private static final String VALLEN_CONF="randomkv.val.length";
133     private static final byte [] QUALIFIER = Bytes.toBytes("data");
134 
135     @Override
136     protected void setup(Context context) throws IOException,
137         InterruptedException {
138       super.setup(context);
139 
140       Configuration conf = context.getConfiguration();
141       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
142       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
143     }
144 
145     @Override
146     protected void map(
147         NullWritable n1, NullWritable n2,
148         Mapper<NullWritable, NullWritable,
149                ImmutableBytesWritable,Cell>.Context context)
150         throws java.io.IOException ,InterruptedException
151     {
152 
153       byte keyBytes[] = new byte[keyLength];
154       byte valBytes[] = new byte[valLength];
155 
156       int taskId = context.getTaskAttemptID().getTaskID().getId();
157       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
158 
159       Random random = new Random();
160       for (int i = 0; i < ROWSPERSPLIT; i++) {
161 
162         random.nextBytes(keyBytes);
163         // Ensure that unique tasks generate unique keys
164         keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
165         random.nextBytes(valBytes);
166         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
167 
168         for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
169           Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
170           context.write(key, kv);
171         }
172       }
173     }
174   }
175 
176   private void setupRandomGeneratorMapper(Job job) {
177     job.setInputFormatClass(NMapInputFormat.class);
178     job.setMapperClass(RandomKVGeneratingMapper.class);
179     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
180     job.setMapOutputValueClass(KeyValue.class);
181   }
182 
183   /**
184    * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
185    * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
186    * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
187    */
188   @Test
189   public void test_LATEST_TIMESTAMP_isReplaced()
190   throws Exception {
191     Configuration conf = new Configuration(this.util.getConfiguration());
192     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
193     TaskAttemptContext context = null;
194     Path dir =
195       util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
196     try {
197       Job job = new Job(conf);
198       FileOutputFormat.setOutputPath(job, dir);
199       context = createTestTaskAttemptContext(job);
200       HFileOutputFormat2 hof = new HFileOutputFormat2();
201       writer = hof.getRecordWriter(context);
202       final byte [] b = Bytes.toBytes("b");
203 
204       // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
205       // changed by call to write.  Check all in kv is same but ts.
206       KeyValue kv = new KeyValue(b, b, b);
207       KeyValue original = kv.clone();
208       writer.write(new ImmutableBytesWritable(), kv);
209       assertFalse(original.equals(kv));
210       assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
211       assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
212       assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
213       assertNotSame(original.getTimestamp(), kv.getTimestamp());
214       assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
215 
216       // Test 2. Now test passing a kv that has explicit ts.  It should not be
217       // changed by call to record write.
218       kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
219       original = kv.clone();
220       writer.write(new ImmutableBytesWritable(), kv);
221       assertTrue(original.equals(kv));
222     } finally {
223       if (writer != null && context != null) writer.close(context);
224       dir.getFileSystem(conf).delete(dir, true);
225     }
226   }
227 
228   private TaskAttemptContext createTestTaskAttemptContext(final Job job)
229   throws Exception {
230     HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
231     TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
232       job, "attempt_201402131733_0001_m_000000_0");
233     return context;
234   }
235 
236   /*
237    * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
238    * metadata used by time-restricted scans.
239    */
240   @Test
241   public void test_TIMERANGE() throws Exception {
242     Configuration conf = new Configuration(this.util.getConfiguration());
243     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
244     TaskAttemptContext context = null;
245     Path dir =
246       util.getDataTestDir("test_TIMERANGE_present");
247     LOG.info("Timerange dir writing to dir: " + dir);
248     try {
249       // build a record writer using HFileOutputFormat2
250       Job job = new Job(conf);
251       FileOutputFormat.setOutputPath(job, dir);
252       context = createTestTaskAttemptContext(job);
253       HFileOutputFormat2 hof = new HFileOutputFormat2();
254       writer = hof.getRecordWriter(context);
255 
256       // Pass two key values with explicit times stamps
257       final byte [] b = Bytes.toBytes("b");
258 
259       // value 1 with timestamp 2000
260       KeyValue kv = new KeyValue(b, b, b, 2000, b);
261       KeyValue original = kv.clone();
262       writer.write(new ImmutableBytesWritable(), kv);
263       assertEquals(original,kv);
264 
265       // value 2 with timestamp 1000
266       kv = new KeyValue(b, b, b, 1000, b);
267       original = kv.clone();
268       writer.write(new ImmutableBytesWritable(), kv);
269       assertEquals(original, kv);
270 
271       // verify that the file has the proper FileInfo.
272       writer.close(context);
273 
274       // the generated file lives 1 directory down from the attempt directory
275       // and is the only file, e.g.
276       // _attempt__0000_r_000000_0/b/1979617994050536795
277       FileSystem fs = FileSystem.get(conf);
278       Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
279       FileStatus[] sub1 = fs.listStatus(attemptDirectory);
280       FileStatus[] file = fs.listStatus(sub1[0].getPath());
281 
282       // open as HFile Reader and pull out TIMERANGE FileInfo.
283       HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
284           new CacheConfig(conf), conf);
285       Map<byte[],byte[]> finfo = rd.loadFileInfo();
286       byte[] range = finfo.get("TIMERANGE".getBytes());
287       assertNotNull(range);
288 
289       // unmarshall and check values.
290       TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
291       Writables.copyWritable(range, timeRangeTracker);
292       LOG.info(timeRangeTracker.getMinimumTimestamp() +
293           "...." + timeRangeTracker.getMaximumTimestamp());
294       assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
295       assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
296       rd.close();
297     } finally {
298       if (writer != null && context != null) writer.close(context);
299       dir.getFileSystem(conf).delete(dir, true);
300     }
301   }
302 
303   /**
304    * Run small MR job.
305    */
306   @Test
307   public void testWritingPEData() throws Exception {
308     Configuration conf = util.getConfiguration();
309     Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
310     FileSystem fs = testDir.getFileSystem(conf);
311 
312     // Set down this value or we OOME in eclipse.
313     conf.setInt("mapreduce.task.io.sort.mb", 20);
314     // Write a few files.
315     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
316 
317     Job job = new Job(conf, "testWritingPEData");
318     setupRandomGeneratorMapper(job);
319     // This partitioner doesn't work well for number keys but using it anyways
320     // just to demonstrate how to configure it.
321     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
322     byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
323 
324     Arrays.fill(startKey, (byte)0);
325     Arrays.fill(endKey, (byte)0xff);
326 
327     job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
328     // Set start and end rows for partitioner.
329     SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
330     SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
331     job.setReducerClass(KeyValueSortReducer.class);
332     job.setOutputFormatClass(HFileOutputFormat2.class);
333     job.setNumReduceTasks(4);
334     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
335         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
336         KeyValueSerialization.class.getName());
337 
338     FileOutputFormat.setOutputPath(job, testDir);
339     assertTrue(job.waitForCompletion(false));
340     FileStatus [] files = fs.listStatus(testDir);
341     assertTrue(files.length > 0);
342   }
343 
344   /**
345    * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
346    * hfile.
347    */
348   @Test
349   public void test_WritingTagData()
350       throws Exception {
351     Configuration conf = new Configuration(this.util.getConfiguration());
352     final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
353     conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
354     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
355     TaskAttemptContext context = null;
356     Path dir =
357         util.getDataTestDir("WritingTagData");
358     try {
359       Job job = new Job(conf);
360       FileOutputFormat.setOutputPath(job, dir);
361       context = createTestTaskAttemptContext(job);
362       HFileOutputFormat2 hof = new HFileOutputFormat2();
363       writer = hof.getRecordWriter(context);
364       final byte [] b = Bytes.toBytes("b");
365 
366       KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, new Tag[] {
367           new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)) });
368       writer.write(new ImmutableBytesWritable(), kv);
369       writer.close(context);
370       writer = null;
371       FileSystem fs = dir.getFileSystem(conf);
372       RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
373       while(iterator.hasNext()) {
374         LocatedFileStatus keyFileStatus = iterator.next();
375         HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf),
376             conf);
377         HFileScanner scanner = reader.getScanner(false, false, false);
378         scanner.seekTo();
379         Cell cell = scanner.getKeyValue();
380 
381         Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(),
382             cell.getTagsOffset(), cell.getTagsLength());
383         assertTrue(tagsIterator.hasNext());
384         assertTrue(tagsIterator.next().getType() == TagType.TTL_TAG_TYPE);
385       }
386     } finally {
387       if (writer != null && context != null) writer.close(context);
388       dir.getFileSystem(conf).delete(dir, true);
389     }
390   }
391 
392   @Test
393   public void testJobConfiguration() throws Exception {
394     Configuration conf = new Configuration(this.util.getConfiguration());
395     conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
396     Job job = new Job(conf);
397     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
398     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
399     setupMockStartKeys(regionLocator);
400     HFileOutputFormat2.configureIncrementalLoad(job, new HTableDescriptor(), regionLocator);
401     assertEquals(job.getNumReduceTasks(), 4);
402   }
403 
404   private byte [][] generateRandomStartKeys(int numKeys) {
405     Random random = new Random();
406     byte[][] ret = new byte[numKeys][];
407     // first region start key is always empty
408     ret[0] = HConstants.EMPTY_BYTE_ARRAY;
409     for (int i = 1; i < numKeys; i++) {
410       ret[i] =
411         PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
412     }
413     return ret;
414   }
415 
416   private byte[][] generateRandomSplitKeys(int numKeys) {
417     Random random = new Random();
418     byte[][] ret = new byte[numKeys][];
419     for (int i = 0; i < numKeys; i++) {
420       ret[i] =
421           PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
422     }
423     return ret;
424   }
425 
426   @Test
427   public void testMRIncrementalLoad() throws Exception {
428     LOG.info("\nStarting test testMRIncrementalLoad\n");
429     doIncrementalLoadTest(false);
430   }
431 
432   @Test
433   public void testMRIncrementalLoadWithSplit() throws Exception {
434     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
435     doIncrementalLoadTest(true);
436   }
437 
438   private void doIncrementalLoadTest(
439       boolean shouldChangeRegions) throws Exception {
440     util = new HBaseTestingUtility();
441     Configuration conf = util.getConfiguration();
442     byte[][] splitKeys = generateRandomSplitKeys(4);
443     util.startMiniCluster();
444     try {
445       HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
446       Admin admin = table.getConnection().getAdmin();
447       Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
448       assertEquals("Should start with empty table",
449           0, util.countRows(table));
450       int numRegions = -1;
451       try (RegionLocator r = table.getRegionLocator()) {
452         numRegions = r.getStartKeys().length;
453       }
454       assertEquals("Should make 5 regions", numRegions, 5);
455 
456       // Generate the bulk load files
457       util.startMiniMapReduceCluster();
458       runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
459       // This doesn't write into the table, just makes files
460       assertEquals("HFOF should not touch actual table",
461           0, util.countRows(table));
462 
463 
464       // Make sure that a directory was created for every CF
465       int dir = 0;
466       for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
467         for (byte[] family : FAMILIES) {
468           if (Bytes.toString(family).equals(f.getPath().getName())) {
469             ++dir;
470           }
471         }
472       }
473       assertEquals("Column family not found in FS.", FAMILIES.length, dir);
474 
475       // handle the split case
476       if (shouldChangeRegions) {
477         LOG.info("Changing regions in table");
478         admin.disableTable(table.getName());
479         while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
480             getRegionStates().isRegionsInTransition()) {
481           Threads.sleep(200);
482           LOG.info("Waiting on table to finish disabling");
483         }
484         util.deleteTable(table.getName());
485         byte[][] newSplitKeys = generateRandomSplitKeys(14);
486         table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
487 
488         while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
489             !admin.isTableAvailable(table.getName())) {
490           Thread.sleep(200);
491           LOG.info("Waiting for new region assignment to happen");
492         }
493       }
494 
495       // Perform the actual load
496       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
497 
498       // Ensure data shows up
499       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
500       assertEquals("LoadIncrementalHFiles should put expected data in table",
501           expectedRows, util.countRows(table));
502       Scan scan = new Scan();
503       ResultScanner results = table.getScanner(scan);
504       for (Result res : results) {
505         assertEquals(FAMILIES.length, res.rawCells().length);
506         Cell first = res.rawCells()[0];
507         for (Cell kv : res.rawCells()) {
508           assertTrue(CellUtil.matchingRow(first, kv));
509           assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
510         }
511       }
512       results.close();
513       String tableDigestBefore = util.checksumRows(table);
514 
515       // Cause regions to reopen
516       admin.disableTable(TABLE_NAME);
517       while (!admin.isTableDisabled(TABLE_NAME)) {
518         Thread.sleep(200);
519         LOG.info("Waiting for table to disable");
520       }
521       admin.enableTable(TABLE_NAME);
522       util.waitTableAvailable(TABLE_NAME);
523       assertEquals("Data should remain after reopening of regions",
524           tableDigestBefore, util.checksumRows(table));
525     } finally {
526       util.shutdownMiniMapReduceCluster();
527       util.shutdownMiniCluster();
528     }
529   }
530 
531   private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
532       RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
533       InterruptedException, ClassNotFoundException {
534     Job job = new Job(conf, "testLocalMRIncrementalLoad");
535     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
536     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
537         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
538         KeyValueSerialization.class.getName());
539     setupRandomGeneratorMapper(job);
540     HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
541     FileOutputFormat.setOutputPath(job, outDir);
542 
543     assertFalse(util.getTestFileSystem().exists(outDir)) ;
544 
545     assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
546 
547     assertTrue(job.waitForCompletion(true));
548   }
549 
550   /**
551    * Test for {@link HFileOutputFormat2#configureCompression(org.apache.hadoop.hbase.client.Table,
552    * Configuration)} and {@link HFileOutputFormat2#createFamilyCompressionMap
553    * (Configuration)}.
554    * Tests that the compression map is correctly serialized into
555    * and deserialized from configuration
556    *
557    * @throws IOException
558    */
559   @Test
560   public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
561     for (int numCfs = 0; numCfs <= 3; numCfs++) {
562       Configuration conf = new Configuration(this.util.getConfiguration());
563       Map<String, Compression.Algorithm> familyToCompression =
564           getMockColumnFamiliesForCompression(numCfs);
565       Table table = Mockito.mock(HTable.class);
566       setupMockColumnFamiliesForCompression(table, familyToCompression);
567       HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
568 
569       // read back family specific compression setting from the configuration
570       Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
571           .createFamilyCompressionMap(conf);
572 
573       // test that we have a value for all column families that matches with the
574       // used mock values
575       for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
576         assertEquals("Compression configuration incorrect for column family:"
577             + entry.getKey(), entry.getValue(),
578             retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
579       }
580     }
581   }
582 
583   private void setupMockColumnFamiliesForCompression(Table table,
584       Map<String, Compression.Algorithm> familyToCompression) throws IOException {
585     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
586     for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
587       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
588           .setMaxVersions(1)
589           .setCompressionType(entry.getValue())
590           .setBlockCacheEnabled(false)
591           .setTimeToLive(0));
592     }
593     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
594   }
595 
596   /**
597    * @return a map from column family names to compression algorithms for
598    *         testing column family compression. Column family names have special characters
599    */
600   private Map<String, Compression.Algorithm>
601       getMockColumnFamiliesForCompression (int numCfs) {
602     Map<String, Compression.Algorithm> familyToCompression
603       = new HashMap<String, Compression.Algorithm>();
604     // use column family names having special characters
605     if (numCfs-- > 0) {
606       familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
607     }
608     if (numCfs-- > 0) {
609       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
610     }
611     if (numCfs-- > 0) {
612       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
613     }
614     if (numCfs-- > 0) {
615       familyToCompression.put("Family3", Compression.Algorithm.NONE);
616     }
617     return familyToCompression;
618   }
619 
620 
621   /**
622    * Test for {@link HFileOutputFormat2#configureBloomType(org.apache.hadoop.hbase.client.Table,
623    * Configuration)} and {@link HFileOutputFormat2#createFamilyBloomTypeMap
624    * (Configuration)}.
625    * Tests that the compression map is correctly serialized into
626    * and deserialized from configuration
627    *
628    * @throws IOException
629    */
630   @Test
631   public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
632     for (int numCfs = 0; numCfs <= 2; numCfs++) {
633       Configuration conf = new Configuration(this.util.getConfiguration());
634       Map<String, BloomType> familyToBloomType =
635           getMockColumnFamiliesForBloomType(numCfs);
636       Table table = Mockito.mock(HTable.class);
637       setupMockColumnFamiliesForBloomType(table,
638           familyToBloomType);
639       HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
640 
641       // read back family specific data block encoding settings from the
642       // configuration
643       Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
644           HFileOutputFormat2
645               .createFamilyBloomTypeMap(conf);
646 
647       // test that we have a value for all column families that matches with the
648       // used mock values
649       for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
650         assertEquals("BloomType configuration incorrect for column family:"
651             + entry.getKey(), entry.getValue(),
652             retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
653       }
654     }
655   }
656 
657   private void setupMockColumnFamiliesForBloomType(Table table,
658       Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
659     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
660     for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
661       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
662           .setMaxVersions(1)
663           .setBloomFilterType(entry.getValue())
664           .setBlockCacheEnabled(false)
665           .setTimeToLive(0));
666     }
667     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
668   }
669 
670   /**
671    * @return a map from column family names to compression algorithms for
672    *         testing column family compression. Column family names have special characters
673    */
674   private Map<String, BloomType>
675   getMockColumnFamiliesForBloomType (int numCfs) {
676     Map<String, BloomType> familyToBloomType =
677         new HashMap<String, BloomType>();
678     // use column family names having special characters
679     if (numCfs-- > 0) {
680       familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
681     }
682     if (numCfs-- > 0) {
683       familyToBloomType.put("Family2=asdads&!AASD",
684           BloomType.ROWCOL);
685     }
686     if (numCfs-- > 0) {
687       familyToBloomType.put("Family3", BloomType.NONE);
688     }
689     return familyToBloomType;
690   }
691 
692   /**
693    * Test for {@link HFileOutputFormat2#configureBlockSize(org.apache.hadoop.hbase.client.Table,
694    * Configuration)} and {@link HFileOutputFormat2#createFamilyBlockSizeMap
695    * (Configuration)}.
696    * Tests that the compression map is correctly serialized into
697    * and deserialized from configuration
698    *
699    * @throws IOException
700    */
701   @Test
702   public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
703     for (int numCfs = 0; numCfs <= 3; numCfs++) {
704       Configuration conf = new Configuration(this.util.getConfiguration());
705       Map<String, Integer> familyToBlockSize =
706           getMockColumnFamiliesForBlockSize(numCfs);
707       Table table = Mockito.mock(HTable.class);
708       setupMockColumnFamiliesForBlockSize(table,
709           familyToBlockSize);
710       HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
711 
712       // read back family specific data block encoding settings from the
713       // configuration
714       Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
715           HFileOutputFormat2
716               .createFamilyBlockSizeMap(conf);
717 
718       // test that we have a value for all column families that matches with the
719       // used mock values
720       for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
721           ) {
722         assertEquals("BlockSize configuration incorrect for column family:"
723             + entry.getKey(), entry.getValue(),
724             retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
725       }
726     }
727   }
728 
729   private void setupMockColumnFamiliesForBlockSize(Table table,
730       Map<String, Integer> familyToDataBlockEncoding) throws IOException {
731     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
732     for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
733       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
734           .setMaxVersions(1)
735           .setBlocksize(entry.getValue())
736           .setBlockCacheEnabled(false)
737           .setTimeToLive(0));
738     }
739     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
740   }
741 
742   /**
743    * @return a map from column family names to compression algorithms for
744    *         testing column family compression. Column family names have special characters
745    */
746   private Map<String, Integer>
747   getMockColumnFamiliesForBlockSize (int numCfs) {
748     Map<String, Integer> familyToBlockSize =
749         new HashMap<String, Integer>();
750     // use column family names having special characters
751     if (numCfs-- > 0) {
752       familyToBlockSize.put("Family1!@#!@#&", 1234);
753     }
754     if (numCfs-- > 0) {
755       familyToBlockSize.put("Family2=asdads&!AASD",
756           Integer.MAX_VALUE);
757     }
758     if (numCfs-- > 0) {
759       familyToBlockSize.put("Family2=asdads&!AASD",
760           Integer.MAX_VALUE);
761     }
762     if (numCfs-- > 0) {
763       familyToBlockSize.put("Family3", 0);
764     }
765     return familyToBlockSize;
766   }
767 
768   /**
769    * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
770    * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
771    * Tests that the compression map is correctly serialized into
772    * and deserialized from configuration
773    *
774    * @throws IOException
775    */
776   @Test
777   public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
778     for (int numCfs = 0; numCfs <= 3; numCfs++) {
779       Configuration conf = new Configuration(this.util.getConfiguration());
780       Map<String, DataBlockEncoding> familyToDataBlockEncoding =
781           getMockColumnFamiliesForDataBlockEncoding(numCfs);
782       Table table = Mockito.mock(HTable.class);
783       setupMockColumnFamiliesForDataBlockEncoding(table,
784           familyToDataBlockEncoding);
785       HTableDescriptor tableDescriptor = table.getTableDescriptor();
786       HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
787 
788       // read back family specific data block encoding settings from the
789       // configuration
790       Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
791           HFileOutputFormat2
792           .createFamilyDataBlockEncodingMap(conf);
793 
794       // test that we have a value for all column families that matches with the
795       // used mock values
796       for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
797         assertEquals("DataBlockEncoding configuration incorrect for column family:"
798             + entry.getKey(), entry.getValue(),
799             retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
800       }
801     }
802   }
803 
804   private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
805       Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
806     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
807     for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
808       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
809           .setMaxVersions(1)
810           .setDataBlockEncoding(entry.getValue())
811           .setBlockCacheEnabled(false)
812           .setTimeToLive(0));
813     }
814     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
815   }
816 
817   /**
818    * @return a map from column family names to compression algorithms for
819    *         testing column family compression. Column family names have special characters
820    */
821   private Map<String, DataBlockEncoding>
822       getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
823     Map<String, DataBlockEncoding> familyToDataBlockEncoding =
824         new HashMap<String, DataBlockEncoding>();
825     // use column family names having special characters
826     if (numCfs-- > 0) {
827       familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
828     }
829     if (numCfs-- > 0) {
830       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
831           DataBlockEncoding.FAST_DIFF);
832     }
833     if (numCfs-- > 0) {
834       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
835           DataBlockEncoding.PREFIX);
836     }
837     if (numCfs-- > 0) {
838       familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
839     }
840     return familyToDataBlockEncoding;
841   }
842 
843   private void setupMockStartKeys(RegionLocator table) throws IOException {
844     byte[][] mockKeys = new byte[][] {
845         HConstants.EMPTY_BYTE_ARRAY,
846         Bytes.toBytes("aaa"),
847         Bytes.toBytes("ggg"),
848         Bytes.toBytes("zzz")
849     };
850     Mockito.doReturn(mockKeys).when(table).getStartKeys();
851   }
852 
853   /**
854    * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
855    * bloom filter settings from the column family descriptor
856    */
857   @Test
858   public void testColumnFamilySettings() throws Exception {
859     Configuration conf = new Configuration(this.util.getConfiguration());
860     RecordWriter<ImmutableBytesWritable, Cell> writer = null;
861     TaskAttemptContext context = null;
862     Path dir = util.getDataTestDir("testColumnFamilySettings");
863 
864     // Setup table descriptor
865     Table table = Mockito.mock(Table.class);
866     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
867     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
868     Mockito.doReturn(htd).when(table).getTableDescriptor();
869     for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
870       htd.addFamily(hcd);
871     }
872 
873     // set up the table to return some mock keys
874     setupMockStartKeys(regionLocator);
875 
876     try {
877       // partial map red setup to get an operational writer for testing
878       // We turn off the sequence file compression, because DefaultCodec
879       // pollutes the GZip codec pool with an incompatible compressor.
880       conf.set("io.seqfile.compression.type", "NONE");
881       conf.set("hbase.fs.tmp.dir", dir.toString());
882       Job job = new Job(conf, "testLocalMRIncrementalLoad");
883       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
884       setupRandomGeneratorMapper(job);
885       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
886       FileOutputFormat.setOutputPath(job, dir);
887       context = createTestTaskAttemptContext(job);
888       HFileOutputFormat2 hof = new HFileOutputFormat2();
889       writer = hof.getRecordWriter(context);
890 
891       // write out random rows
892       writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
893       writer.close(context);
894 
895       // Make sure that a directory was created for every CF
896       FileSystem fs = dir.getFileSystem(conf);
897 
898       // commit so that the filesystem has one directory per column family
899       hof.getOutputCommitter(context).commitTask(context);
900       hof.getOutputCommitter(context).commitJob(context);
901       FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
902       assertEquals(htd.getFamilies().size(), families.length);
903       for (FileStatus f : families) {
904         String familyStr = f.getPath().getName();
905         HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
906         // verify that the compression on this file matches the configured
907         // compression
908         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
909         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
910         Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
911 
912         byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
913         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
914         assertEquals("Incorrect bloom filter used for column family " + familyStr +
915           "(reader: " + reader + ")",
916           hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
917         assertEquals("Incorrect compression used for column family " + familyStr +
918           "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
919       }
920     } finally {
921       dir.getFileSystem(conf).delete(dir, true);
922     }
923   }
924 
925   /**
926    * Write random values to the writer assuming a table created using
927    * {@link #FAMILIES} as column family descriptors
928    */
929   private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
930       TaskAttemptContext context, Set<byte[]> families, int numRows)
931       throws IOException, InterruptedException {
932     byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
933     int valLength = 10;
934     byte valBytes[] = new byte[valLength];
935 
936     int taskId = context.getTaskAttemptID().getTaskID().getId();
937     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
938     final byte [] qualifier = Bytes.toBytes("data");
939     Random random = new Random();
940     for (int i = 0; i < numRows; i++) {
941 
942       Bytes.putInt(keyBytes, 0, i);
943       random.nextBytes(valBytes);
944       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
945 
946       for (byte[] family : families) {
947         Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
948         writer.write(key, kv);
949       }
950     }
951   }
952 
953   /**
954    * This test is to test the scenario happened in HBASE-6901.
955    * All files are bulk loaded and excluded from minor compaction.
956    * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
957    * will be thrown.
958    */
959   @Ignore ("Flakey: See HBASE-9051") @Test
960   public void testExcludeAllFromMinorCompaction() throws Exception {
961     Configuration conf = util.getConfiguration();
962     conf.setInt("hbase.hstore.compaction.min", 2);
963     generateRandomStartKeys(5);
964 
965     util.startMiniCluster();
966     try (Connection conn = ConnectionFactory.createConnection();
967         Admin admin = conn.getAdmin()) {
968       final FileSystem fs = util.getDFSCluster().getFileSystem();
969       HTable table = util.createTable(TABLE_NAME, FAMILIES);
970       assertEquals("Should start with empty table", 0, util.countRows(table));
971 
972       // deep inspection: get the StoreFile dir
973       final Path storePath = new Path(
974         FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
975           new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
976             Bytes.toString(FAMILIES[0])));
977       assertEquals(0, fs.listStatus(storePath).length);
978 
979       // Generate two bulk load files
980       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
981           true);
982       util.startMiniMapReduceCluster();
983 
984       for (int i = 0; i < 2; i++) {
985         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
986         runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
987             testDir);
988         // Perform the actual load
989         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
990       }
991 
992       // Ensure data shows up
993       int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
994       assertEquals("LoadIncrementalHFiles should put expected data in table",
995           expectedRows, util.countRows(table));
996 
997       // should have a second StoreFile now
998       assertEquals(2, fs.listStatus(storePath).length);
999 
1000       // minor compactions shouldn't get rid of the file
1001       admin.compact(TABLE_NAME);
1002       try {
1003         quickPoll(new Callable<Boolean>() {
1004           @Override
1005           public Boolean call() throws Exception {
1006             return fs.listStatus(storePath).length == 1;
1007           }
1008         }, 5000);
1009         throw new IOException("SF# = " + fs.listStatus(storePath).length);
1010       } catch (AssertionError ae) {
1011         // this is expected behavior
1012       }
1013 
1014       // a major compaction should work though
1015       admin.majorCompact(TABLE_NAME);
1016       quickPoll(new Callable<Boolean>() {
1017         @Override
1018         public Boolean call() throws Exception {
1019           return fs.listStatus(storePath).length == 1;
1020         }
1021       }, 5000);
1022 
1023     } finally {
1024       util.shutdownMiniMapReduceCluster();
1025       util.shutdownMiniCluster();
1026     }
1027   }
1028 
1029   @Test
1030   public void testExcludeMinorCompaction() throws Exception {
1031     Configuration conf = util.getConfiguration();
1032     conf.setInt("hbase.hstore.compaction.min", 2);
1033     generateRandomStartKeys(5);
1034 
1035     util.startMiniCluster();
1036     try (Connection conn = ConnectionFactory.createConnection(conf);
1037         Admin admin = conn.getAdmin()){
1038       Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1039       final FileSystem fs = util.getDFSCluster().getFileSystem();
1040       Table table = util.createTable(TABLE_NAME, FAMILIES);
1041       assertEquals("Should start with empty table", 0, util.countRows(table));
1042 
1043       // deep inspection: get the StoreFile dir
1044       final Path storePath = new Path(
1045         FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
1046           new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
1047             Bytes.toString(FAMILIES[0])));
1048       assertEquals(0, fs.listStatus(storePath).length);
1049 
1050       // put some data in it and flush to create a storefile
1051       Put p = new Put(Bytes.toBytes("test"));
1052       p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1053       table.put(p);
1054       admin.flush(TABLE_NAME);
1055       assertEquals(1, util.countRows(table));
1056       quickPoll(new Callable<Boolean>() {
1057         @Override
1058         public Boolean call() throws Exception {
1059           return fs.listStatus(storePath).length == 1;
1060         }
1061       }, 5000);
1062 
1063       // Generate a bulk load file with more rows
1064       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1065           true);
1066       util.startMiniMapReduceCluster();
1067 
1068       RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
1069       runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
1070 
1071       // Perform the actual load
1072       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1073 
1074       // Ensure data shows up
1075       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1076       assertEquals("LoadIncrementalHFiles should put expected data in table",
1077           expectedRows + 1, util.countRows(table));
1078 
1079       // should have a second StoreFile now
1080       assertEquals(2, fs.listStatus(storePath).length);
1081 
1082       // minor compactions shouldn't get rid of the file
1083       admin.compact(TABLE_NAME);
1084       try {
1085         quickPoll(new Callable<Boolean>() {
1086           @Override
1087           public Boolean call() throws Exception {
1088             return fs.listStatus(storePath).length == 1;
1089           }
1090         }, 5000);
1091         throw new IOException("SF# = " + fs.listStatus(storePath).length);
1092       } catch (AssertionError ae) {
1093         // this is expected behavior
1094       }
1095 
1096       // a major compaction should work though
1097       admin.majorCompact(TABLE_NAME);
1098       quickPoll(new Callable<Boolean>() {
1099         @Override
1100         public Boolean call() throws Exception {
1101           return fs.listStatus(storePath).length == 1;
1102         }
1103       }, 5000);
1104 
1105     } finally {
1106       util.shutdownMiniMapReduceCluster();
1107       util.shutdownMiniCluster();
1108     }
1109   }
1110 
1111   private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1112     int sleepMs = 10;
1113     int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1114     while (retries-- > 0) {
1115       if (c.call().booleanValue()) {
1116         return;
1117       }
1118       Thread.sleep(sleepMs);
1119     }
1120     fail();
1121   }
1122 
1123   public static void main(String args[]) throws Exception {
1124     new TestHFileOutputFormat2().manualTest(args);
1125   }
1126 
1127   public void manualTest(String args[]) throws Exception {
1128     Configuration conf = HBaseConfiguration.create();
1129     util = new HBaseTestingUtility(conf);
1130     if ("newtable".equals(args[0])) {
1131       TableName tname = TableName.valueOf(args[1]);
1132       byte[][] splitKeys = generateRandomSplitKeys(4);
1133       try (HTable table = util.createTable(tname, FAMILIES, splitKeys)) {
1134       }
1135     } else if ("incremental".equals(args[0])) {
1136       TableName tname = TableName.valueOf(args[1]);
1137       try(Connection c = ConnectionFactory.createConnection(conf);
1138           Admin admin = c.getAdmin();
1139           RegionLocator regionLocator = c.getRegionLocator(tname)) {
1140         Path outDir = new Path("incremental-out");
1141         runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
1142       }
1143     } else {
1144       throw new RuntimeException(
1145           "usage: TestHFileOutputFormat2 newtable | incremental");
1146     }
1147   }
1148 
1149 }
1150