View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNotSame;
25  import static org.junit.Assert.assertTrue;
26  import static org.junit.Assert.fail;
27  
28  import java.io.IOException;
29  import java.util.Arrays;
30  import java.util.HashMap;
31  import java.util.Map;
32  import java.util.Map.Entry;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.concurrent.Callable;
36  
37  import junit.framework.Assert;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellUtil;
47  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
48  import org.apache.hadoop.hbase.HBaseConfiguration;
49  import org.apache.hadoop.hbase.HBaseTestingUtility;
50  import org.apache.hadoop.hbase.HColumnDescriptor;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.HadoopShims;
54  import org.apache.hadoop.hbase.KeyValue;
55  import org.apache.hadoop.hbase.PerformanceEvaluation;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.client.HBaseAdmin;
58  import org.apache.hadoop.hbase.client.HTable;
59  import org.apache.hadoop.hbase.client.Put;
60  import org.apache.hadoop.hbase.client.RegionLocator;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.ResultScanner;
63  import org.apache.hadoop.hbase.client.Scan;
64  import org.apache.hadoop.hbase.client.Table;
65  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
66  import org.apache.hadoop.hbase.io.compress.Compression;
67  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
68  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
69  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70  import org.apache.hadoop.hbase.io.hfile.HFile;
71  import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
72  import org.apache.hadoop.hbase.regionserver.BloomType;
73  import org.apache.hadoop.hbase.regionserver.HStore;
74  import org.apache.hadoop.hbase.regionserver.StoreFile;
75  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
76  import org.apache.hadoop.hbase.testclassification.LargeTests;
77  import org.apache.hadoop.hbase.util.Bytes;
78  import org.apache.hadoop.hbase.util.FSUtils;
79  import org.apache.hadoop.hbase.util.Threads;
80  import org.apache.hadoop.hbase.util.Writables;
81  import org.apache.hadoop.io.NullWritable;
82  import org.apache.hadoop.mapreduce.Job;
83  import org.apache.hadoop.mapreduce.Mapper;
84  import org.apache.hadoop.mapreduce.RecordWriter;
85  import org.apache.hadoop.mapreduce.TaskAttemptContext;
86  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
87  import org.junit.Ignore;
88  import org.junit.Test;
89  import org.junit.experimental.categories.Category;
90  import org.mockito.Mockito;
91  
92  /**
93   * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
94   * Sets up and runs a mapreduce job that writes hfile output.
95   * Creates a few inner classes to implement splits and an inputformat that
96   * emits keys and values like those of {@link PerformanceEvaluation}.
97   */
98  @Category(LargeTests.class)
99  public class TestHFileOutputFormat  {
100   private final static int ROWSPERSPLIT = 1024;
101 
102   private static final byte[][] FAMILIES
103     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
104       , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
105   private static final TableName TABLE_NAME =
106       TableName.valueOf("TestTable");
107 
108   private HBaseTestingUtility util = new HBaseTestingUtility();
109 
110   private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
111 
112   /**
113    * Simple mapper that makes KeyValue output.
114    */
115   static class RandomKVGeneratingMapper
116   extends Mapper<NullWritable, NullWritable,
117                  ImmutableBytesWritable, KeyValue> {
118 
119     private int keyLength;
120     private static final int KEYLEN_DEFAULT=10;
121     private static final String KEYLEN_CONF="randomkv.key.length";
122 
123     private int valLength;
124     private static final int VALLEN_DEFAULT=10;
125     private static final String VALLEN_CONF="randomkv.val.length";
126     private static final byte [] QUALIFIER = Bytes.toBytes("data");
127 
128     @Override
129     protected void setup(Context context) throws IOException,
130         InterruptedException {
131       super.setup(context);
132 
133       Configuration conf = context.getConfiguration();
134       keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
135       valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
136     }
137 
138     protected void map(
139         NullWritable n1, NullWritable n2,
140         Mapper<NullWritable, NullWritable,
141                ImmutableBytesWritable,KeyValue>.Context context)
142         throws java.io.IOException ,InterruptedException
143     {
144 
145       byte keyBytes[] = new byte[keyLength];
146       byte valBytes[] = new byte[valLength];
147 
148       int taskId = context.getTaskAttemptID().getTaskID().getId();
149       assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
150 
151       Random random = new Random();
152       for (int i = 0; i < ROWSPERSPLIT; i++) {
153 
154         random.nextBytes(keyBytes);
155         // Ensure that unique tasks generate unique keys
156         keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
157         random.nextBytes(valBytes);
158         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
159 
160         for (byte[] family : TestHFileOutputFormat.FAMILIES) {
161           KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
162           context.write(key, kv);
163         }
164       }
165     }
166   }
167 
168   private void setupRandomGeneratorMapper(Job job) {
169     job.setInputFormatClass(NMapInputFormat.class);
170     job.setMapperClass(RandomKVGeneratingMapper.class);
171     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
172     job.setMapOutputValueClass(KeyValue.class);
173   }
174 
175   /**
176    * Test that {@link HFileOutputFormat} RecordWriter amends timestamps if
177    * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
178    * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
179    */
180   @Test
181   public void test_LATEST_TIMESTAMP_isReplaced()
182   throws Exception {
183     Configuration conf = new Configuration(this.util.getConfiguration());
184     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
185     TaskAttemptContext context = null;
186     Path dir =
187       util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
188     try {
189       Job job = new Job(conf);
190       FileOutputFormat.setOutputPath(job, dir);
191       context = createTestTaskAttemptContext(job);
192       HFileOutputFormat hof = new HFileOutputFormat();
193       writer = hof.getRecordWriter(context);
194       final byte [] b = Bytes.toBytes("b");
195 
196       // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
197       // changed by call to write.  Check all in kv is same but ts.
198       KeyValue kv = new KeyValue(b, b, b);
199       KeyValue original = kv.clone();
200       writer.write(new ImmutableBytesWritable(), kv);
201       assertFalse(original.equals(kv));
202       assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
203       assertTrue(CellUtil.matchingColumn(original, kv.getFamily(), kv.getQualifier()));
204       assertNotSame(original.getTimestamp(), kv.getTimestamp());
205       assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
206 
207       // Test 2. Now test passing a kv that has explicit ts.  It should not be
208       // changed by call to record write.
209       kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
210       original = kv.clone();
211       writer.write(new ImmutableBytesWritable(), kv);
212       assertTrue(original.equals(kv));
213     } finally {
214       if (writer != null && context != null) writer.close(context);
215       dir.getFileSystem(conf).delete(dir, true);
216     }
217   }
218 
219   private TaskAttemptContext createTestTaskAttemptContext(final Job job)
220   throws IOException, Exception {
221     HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
222     TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
223     return context;
224   }
225 
226   /*
227    * Test that {@link HFileOutputFormat} creates an HFile with TIMERANGE
228    * metadata used by time-restricted scans.
229    */
230   @Test
231   public void test_TIMERANGE() throws Exception {
232     Configuration conf = new Configuration(this.util.getConfiguration());
233     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
234     TaskAttemptContext context = null;
235     Path dir =
236       util.getDataTestDir("test_TIMERANGE_present");
237     LOG.info("Timerange dir writing to dir: "+ dir);
238     try {
239       // build a record writer using HFileOutputFormat
240       Job job = new Job(conf);
241       FileOutputFormat.setOutputPath(job, dir);
242       context = createTestTaskAttemptContext(job);
243       HFileOutputFormat hof = new HFileOutputFormat();
244       writer = hof.getRecordWriter(context);
245 
246       // Pass two key values with explicit times stamps
247       final byte [] b = Bytes.toBytes("b");
248 
249       // value 1 with timestamp 2000
250       KeyValue kv = new KeyValue(b, b, b, 2000, b);
251       KeyValue original = kv.clone();
252       writer.write(new ImmutableBytesWritable(), kv);
253       assertEquals(original,kv);
254 
255       // value 2 with timestamp 1000
256       kv = new KeyValue(b, b, b, 1000, b);
257       original = kv.clone();
258       writer.write(new ImmutableBytesWritable(), kv);
259       assertEquals(original, kv);
260 
261       // verify that the file has the proper FileInfo.
262       writer.close(context);
263 
264       // the generated file lives 1 directory down from the attempt directory
265       // and is the only file, e.g.
266       // _attempt__0000_r_000000_0/b/1979617994050536795
267       FileSystem fs = FileSystem.get(conf);
268       Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
269       FileStatus[] sub1 = fs.listStatus(attemptDirectory);
270       FileStatus[] file = fs.listStatus(sub1[0].getPath());
271 
272       // open as HFile Reader and pull out TIMERANGE FileInfo.
273       HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
274           new CacheConfig(conf), conf);
275       Map<byte[],byte[]> finfo = rd.loadFileInfo();
276       byte[] range = finfo.get("TIMERANGE".getBytes());
277       assertNotNull(range);
278 
279       // unmarshall and check values.
280       TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
281       Writables.copyWritable(range, timeRangeTracker);
282       LOG.info(timeRangeTracker.getMinimumTimestamp() +
283           "...." + timeRangeTracker.getMaximumTimestamp());
284       assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
285       assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
286       rd.close();
287     } finally {
288       if (writer != null && context != null) writer.close(context);
289       dir.getFileSystem(conf).delete(dir, true);
290     }
291   }
292 
293   /**
294    * Run small MR job.
295    */
296   @Test
297   public void testWritingPEData() throws Exception {
298     Configuration conf = util.getConfiguration();
299     Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
300     FileSystem fs = testDir.getFileSystem(conf);
301 
302     // Set down this value or we OOME in eclipse.
303     conf.setInt("mapreduce.task.io.sort.mb", 20);
304     // Write a few files.
305     conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
306 
307     Job job = new Job(conf, "testWritingPEData");
308     setupRandomGeneratorMapper(job);
309     // This partitioner doesn't work well for number keys but using it anyways
310     // just to demonstrate how to configure it.
311     byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
312     byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
313 
314     Arrays.fill(startKey, (byte)0);
315     Arrays.fill(endKey, (byte)0xff);
316 
317     job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
318     // Set start and end rows for partitioner.
319     SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
320     SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
321     job.setReducerClass(KeyValueSortReducer.class);
322     job.setOutputFormatClass(HFileOutputFormat.class);
323     job.setNumReduceTasks(4);
324     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
325         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
326         KeyValueSerialization.class.getName());
327 
328     FileOutputFormat.setOutputPath(job, testDir);
329     assertTrue(job.waitForCompletion(false));
330     FileStatus [] files = fs.listStatus(testDir);
331     assertTrue(files.length > 0);
332   }
333 
334   @Test
335   public void testJobConfiguration() throws Exception {
336     Configuration conf = new Configuration(this.util.getConfiguration());
337     conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
338     Job job = new Job(conf);
339     job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
340     HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
341     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
342     setupMockStartKeys(regionLocator);
343     HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
344     assertEquals(job.getNumReduceTasks(), 4);
345   }
346 
347   private byte [][] generateRandomStartKeys(int numKeys) {
348     Random random = new Random();
349     byte[][] ret = new byte[numKeys][];
350     // first region start key is always empty
351     ret[0] = HConstants.EMPTY_BYTE_ARRAY;
352     for (int i = 1; i < numKeys; i++) {
353       ret[i] =
354         PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
355     }
356     return ret;
357   }
358 
359   private byte[][] generateRandomSplitKeys(int numKeys) {
360     Random random = new Random();
361     byte[][] ret = new byte[numKeys][];
362     for (int i = 0; i < numKeys; i++) {
363       ret[i] =
364           PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
365     }
366     return ret;
367   }
368 
369   @Test
370   public void testMRIncrementalLoad() throws Exception {
371     LOG.info("\nStarting test testMRIncrementalLoad\n");
372     doIncrementalLoadTest(false);
373   }
374 
375   @Test
376   public void testMRIncrementalLoadWithSplit() throws Exception {
377     LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
378     doIncrementalLoadTest(true);
379   }
380 
381   private void doIncrementalLoadTest(
382       boolean shouldChangeRegions) throws Exception {
383     util = new HBaseTestingUtility();
384     Configuration conf = util.getConfiguration();
385     byte[][] splitKeys = generateRandomSplitKeys(4);
386     HBaseAdmin admin = null;
387     try {
388       util.startMiniCluster();
389       Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
390       admin = util.getHBaseAdmin();
391       HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
392       assertEquals("Should start with empty table",
393           0, util.countRows(table));
394       int numRegions = -1;
395       try(RegionLocator r = table.getRegionLocator()) {
396         numRegions = r.getStartKeys().length;
397       }
398       assertEquals("Should make 5 regions", numRegions, 5);
399 
400       // Generate the bulk load files
401       util.startMiniMapReduceCluster();
402       runIncrementalPELoad(conf, table, testDir);
403       // This doesn't write into the table, just makes files
404       assertEquals("HFOF should not touch actual table",
405           0, util.countRows(table));
406 
407 
408       // Make sure that a directory was created for every CF
409       int dir = 0;
410       for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
411         for (byte[] family : FAMILIES) {
412           if (Bytes.toString(family).equals(f.getPath().getName())) {
413             ++dir;
414           }
415         }
416       }
417       assertEquals("Column family not found in FS.", FAMILIES.length, dir);
418 
419       // handle the split case
420       if (shouldChangeRegions) {
421         LOG.info("Changing regions in table");
422         admin.disableTable(table.getTableName());
423         while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
424             getRegionStates().isRegionsInTransition()) {
425           Threads.sleep(200);
426           LOG.info("Waiting on table to finish disabling");
427         }
428         util.deleteTable(table.getName());
429         byte[][] newSplitKeys = generateRandomSplitKeys(14);
430         table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
431         while (table.getRegionLocations().size() != 15 ||
432             !admin.isTableAvailable(table.getTableName())) {
433           Thread.sleep(200);
434           LOG.info("Waiting for new region assignment to happen");
435         }
436       }
437 
438       // Perform the actual load
439       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
440 
441       // Ensure data shows up
442       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
443       assertEquals("LoadIncrementalHFiles should put expected data in table",
444           expectedRows, util.countRows(table));
445       Scan scan = new Scan();
446       ResultScanner results = table.getScanner(scan);
447       for (Result res : results) {
448         assertEquals(FAMILIES.length, res.rawCells().length);
449         Cell first = res.rawCells()[0];
450         for (Cell kv : res.rawCells()) {
451           assertTrue(CellUtil.matchingRow(first, kv));
452           assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
453         }
454       }
455       results.close();
456       String tableDigestBefore = util.checksumRows(table);
457 
458       // Cause regions to reopen
459       admin.disableTable(TABLE_NAME);
460       while (!admin.isTableDisabled(TABLE_NAME)) {
461         Thread.sleep(200);
462         LOG.info("Waiting for table to disable");
463       }
464       admin.enableTable(TABLE_NAME);
465       util.waitTableAvailable(TABLE_NAME);
466       assertEquals("Data should remain after reopening of regions",
467           tableDigestBefore, util.checksumRows(table));
468     } finally {
469       if (admin != null) admin.close();
470       util.shutdownMiniMapReduceCluster();
471       util.shutdownMiniCluster();
472     }
473   }
474 
475   private void runIncrementalPELoad(
476       Configuration conf, HTable table, Path outDir)
477   throws Exception {
478     Job job = new Job(conf, "testLocalMRIncrementalLoad");
479     job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
480     job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
481         MutationSerialization.class.getName(), ResultSerialization.class.getName(),
482         KeyValueSerialization.class.getName());
483     setupRandomGeneratorMapper(job);
484     HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
485         table.getRegionLocator());
486     FileOutputFormat.setOutputPath(job, outDir);
487 
488     Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
489 
490     assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks());
491 
492     assertTrue(job.waitForCompletion(true));
493   }
494 
495   /**
496    * Test for {@link HFileOutputFormat#configureCompression(org.apache.hadoop.hbase.client.Table,
497    * Configuration)} and {@link HFileOutputFormat#createFamilyCompressionMap
498    * (Configuration)}.
499    * Tests that the compression map is correctly serialized into
500    * and deserialized from configuration
501    *
502    * @throws IOException
503    */
504   @Test
505   public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
506     for (int numCfs = 0; numCfs <= 3; numCfs++) {
507       Configuration conf = new Configuration(this.util.getConfiguration());
508       Map<String, Compression.Algorithm> familyToCompression =
509           getMockColumnFamiliesForCompression(numCfs);
510       Table table = Mockito.mock(HTable.class);
511       setupMockColumnFamiliesForCompression(table, familyToCompression);
512       HFileOutputFormat.configureCompression(table, conf);
513 
514       // read back family specific compression setting from the configuration
515       Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat
516           .createFamilyCompressionMap(conf);
517 
518       // test that we have a value for all column families that matches with the
519       // used mock values
520       for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
521         assertEquals("Compression configuration incorrect for column family:"
522             + entry.getKey(), entry.getValue(),
523             retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
524       }
525     }
526   }
527 
528   private void setupMockColumnFamiliesForCompression(Table table,
529       Map<String, Compression.Algorithm> familyToCompression) throws IOException {
530     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
531     for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
532       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
533           .setMaxVersions(1)
534           .setCompressionType(entry.getValue())
535           .setBlockCacheEnabled(false)
536           .setTimeToLive(0));
537     }
538     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
539   }
540 
541   /**
542    * @return a map from column family names to compression algorithms for
543    *         testing column family compression. Column family names have special characters
544    */
545   private Map<String, Compression.Algorithm>
546       getMockColumnFamiliesForCompression (int numCfs) {
547     Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
548     // use column family names having special characters
549     if (numCfs-- > 0) {
550       familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
551     }
552     if (numCfs-- > 0) {
553       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
554     }
555     if (numCfs-- > 0) {
556       familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
557     }
558     if (numCfs-- > 0) {
559       familyToCompression.put("Family3", Compression.Algorithm.NONE);
560     }
561     return familyToCompression;
562   }
563 
564 
565   /**
566    * Test for {@link HFileOutputFormat#configureBloomType(org.apache.hadoop.hbase.client.Table,
567    * Configuration)} and {@link HFileOutputFormat#createFamilyBloomTypeMap
568    * (Configuration)}.
569    * Tests that the compression map is correctly serialized into
570    * and deserialized from configuration
571    *
572    * @throws IOException
573    */
574   @Test
575   public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
576     for (int numCfs = 0; numCfs <= 2; numCfs++) {
577       Configuration conf = new Configuration(this.util.getConfiguration());
578       Map<String, BloomType> familyToBloomType =
579           getMockColumnFamiliesForBloomType(numCfs);
580       Table table = Mockito.mock(HTable.class);
581       setupMockColumnFamiliesForBloomType(table,
582           familyToBloomType);
583       HFileOutputFormat.configureBloomType(table, conf);
584 
585       // read back family specific data block encoding settings from the
586       // configuration
587       Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
588           HFileOutputFormat
589               .createFamilyBloomTypeMap(conf);
590 
591       // test that we have a value for all column families that matches with the
592       // used mock values
593       for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
594         assertEquals("BloomType configuration incorrect for column family:"
595             + entry.getKey(), entry.getValue(),
596             retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
597       }
598     }
599   }
600 
601   private void setupMockColumnFamiliesForBloomType(Table table,
602       Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
603     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
604     for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
605       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
606           .setMaxVersions(1)
607           .setBloomFilterType(entry.getValue())
608           .setBlockCacheEnabled(false)
609           .setTimeToLive(0));
610     }
611     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
612   }
613 
614   /**
615    * @return a map from column family names to compression algorithms for
616    *         testing column family compression. Column family names have special characters
617    */
618   private Map<String, BloomType>
619   getMockColumnFamiliesForBloomType (int numCfs) {
620     Map<String, BloomType> familyToBloomType =
621         new HashMap<String, BloomType>();
622     // use column family names having special characters
623     if (numCfs-- > 0) {
624       familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
625     }
626     if (numCfs-- > 0) {
627       familyToBloomType.put("Family2=asdads&!AASD",
628           BloomType.ROWCOL);
629     }
630     if (numCfs-- > 0) {
631       familyToBloomType.put("Family3", BloomType.NONE);
632     }
633     return familyToBloomType;
634   }
635 
636   /**
637    * Test for {@link HFileOutputFormat#configureBlockSize(org.apache.hadoop.hbase.client.Table,
638    * Configuration)} and {@link HFileOutputFormat#createFamilyBlockSizeMap
639    * (Configuration)}.
640    * Tests that the compression map is correctly serialized into
641    * and deserialized from configuration
642    *
643    * @throws IOException
644    */
645   @Test
646   public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
647     for (int numCfs = 0; numCfs <= 3; numCfs++) {
648       Configuration conf = new Configuration(this.util.getConfiguration());
649       Map<String, Integer> familyToBlockSize =
650           getMockColumnFamiliesForBlockSize(numCfs);
651       Table table = Mockito.mock(HTable.class);
652       setupMockColumnFamiliesForBlockSize(table,
653           familyToBlockSize);
654       HFileOutputFormat.configureBlockSize(table, conf);
655 
656       // read back family specific data block encoding settings from the
657       // configuration
658       Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
659           HFileOutputFormat
660               .createFamilyBlockSizeMap(conf);
661 
662       // test that we have a value for all column families that matches with the
663       // used mock values
664       for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
665           ) {
666         assertEquals("BlockSize configuration incorrect for column family:"
667             + entry.getKey(), entry.getValue(),
668             retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
669       }
670     }
671   }
672 
673   private void setupMockColumnFamiliesForBlockSize(Table table,
674       Map<String, Integer> familyToDataBlockEncoding) throws IOException {
675     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
676     for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
677       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
678           .setMaxVersions(1)
679           .setBlocksize(entry.getValue())
680           .setBlockCacheEnabled(false)
681           .setTimeToLive(0));
682     }
683     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
684   }
685 
686   /**
687    * @return a map from column family names to compression algorithms for
688    *         testing column family compression. Column family names have special characters
689    */
690   private Map<String, Integer>
691   getMockColumnFamiliesForBlockSize (int numCfs) {
692     Map<String, Integer> familyToBlockSize =
693         new HashMap<String, Integer>();
694     // use column family names having special characters
695     if (numCfs-- > 0) {
696       familyToBlockSize.put("Family1!@#!@#&", 1234);
697     }
698     if (numCfs-- > 0) {
699       familyToBlockSize.put("Family2=asdads&!AASD",
700           Integer.MAX_VALUE);
701     }
702     if (numCfs-- > 0) {
703       familyToBlockSize.put("Family2=asdads&!AASD",
704           Integer.MAX_VALUE);
705     }
706     if (numCfs-- > 0) {
707       familyToBlockSize.put("Family3", 0);
708     }
709     return familyToBlockSize;
710   }
711 
712     /**
713    * Test for {@link HFileOutputFormat#configureDataBlockEncoding(org.apache.hadoop.hbase.client.Table,
714    * Configuration)} and {@link HFileOutputFormat#createFamilyDataBlockEncodingMap
715    * (Configuration)}.
716    * Tests that the compression map is correctly serialized into
717    * and deserialized from configuration
718    *
719    * @throws IOException
720    */
721   @Test
722   public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
723     for (int numCfs = 0; numCfs <= 3; numCfs++) {
724       Configuration conf = new Configuration(this.util.getConfiguration());
725       Map<String, DataBlockEncoding> familyToDataBlockEncoding =
726           getMockColumnFamiliesForDataBlockEncoding(numCfs);
727       Table table = Mockito.mock(HTable.class);
728       setupMockColumnFamiliesForDataBlockEncoding(table,
729           familyToDataBlockEncoding);
730       HFileOutputFormat.configureDataBlockEncoding(table, conf);
731 
732       // read back family specific data block encoding settings from the
733       // configuration
734       Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
735           HFileOutputFormat
736           .createFamilyDataBlockEncodingMap(conf);
737 
738       // test that we have a value for all column families that matches with the
739       // used mock values
740       for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
741         assertEquals("DataBlockEncoding configuration incorrect for column family:"
742             + entry.getKey(), entry.getValue(),
743             retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
744       }
745     }
746   }
747 
748   private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
749       Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
750     HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
751     for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
752       mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
753           .setMaxVersions(1)
754           .setDataBlockEncoding(entry.getValue())
755           .setBlockCacheEnabled(false)
756           .setTimeToLive(0));
757     }
758     Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
759   }
760 
761   /**
762    * @return a map from column family names to compression algorithms for
763    *         testing column family compression. Column family names have special characters
764    */
765   private Map<String, DataBlockEncoding>
766       getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
767     Map<String, DataBlockEncoding> familyToDataBlockEncoding =
768         new HashMap<String, DataBlockEncoding>();
769     // use column family names having special characters
770     if (numCfs-- > 0) {
771       familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
772     }
773     if (numCfs-- > 0) {
774       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
775           DataBlockEncoding.FAST_DIFF);
776     }
777     if (numCfs-- > 0) {
778       familyToDataBlockEncoding.put("Family2=asdads&!AASD",
779           DataBlockEncoding.PREFIX);
780     }
781     if (numCfs-- > 0) {
782       familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
783     }
784     return familyToDataBlockEncoding;
785   }
786 
787   private void setupMockStartKeys(RegionLocator regionLocator) throws IOException {
788     byte[][] mockKeys = new byte[][] {
789         HConstants.EMPTY_BYTE_ARRAY,
790         Bytes.toBytes("aaa"),
791         Bytes.toBytes("ggg"),
792         Bytes.toBytes("zzz")
793     };
794     Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
795   }
796 
797   /**
798    * Test that {@link HFileOutputFormat} RecordWriter uses compression and
799    * bloom filter settings from the column family descriptor
800    */
801   @Test
802   public void testColumnFamilySettings() throws Exception {
803     Configuration conf = new Configuration(this.util.getConfiguration());
804     RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
805     TaskAttemptContext context = null;
806     Path dir = util.getDataTestDir("testColumnFamilySettings");
807 
808     // Setup table descriptor
809     HTable table = Mockito.mock(HTable.class);
810     RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
811     HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
812     Mockito.doReturn(htd).when(table).getTableDescriptor();
813     for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
814       htd.addFamily(hcd);
815     }
816 
817     // set up the table to return some mock keys
818     setupMockStartKeys(regionLocator);
819 
820     try {
821       // partial map red setup to get an operational writer for testing
822       // We turn off the sequence file compression, because DefaultCodec
823       // pollutes the GZip codec pool with an incompatible compressor.
824       conf.set("io.seqfile.compression.type", "NONE");
825       conf.set("hbase.fs.tmp.dir", dir.toString());
826       Job job = new Job(conf, "testLocalMRIncrementalLoad");
827       job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
828       setupRandomGeneratorMapper(job);
829       HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
830       FileOutputFormat.setOutputPath(job, dir);
831       context = createTestTaskAttemptContext(job);
832       HFileOutputFormat hof = new HFileOutputFormat();
833       writer = hof.getRecordWriter(context);
834 
835       // write out random rows
836       writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
837       writer.close(context);
838 
839       // Make sure that a directory was created for every CF
840       FileSystem fs = dir.getFileSystem(conf);
841 
842       // commit so that the filesystem has one directory per column family
843       hof.getOutputCommitter(context).commitTask(context);
844       hof.getOutputCommitter(context).commitJob(context);
845       FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
846       assertEquals(htd.getFamilies().size(), families.length);
847       for (FileStatus f : families) {
848         String familyStr = f.getPath().getName();
849         HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
850         // verify that the compression on this file matches the configured
851         // compression
852         Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
853         Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
854         Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
855 
856         byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
857         if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
858         assertEquals("Incorrect bloom filter used for column family " + familyStr +
859           "(reader: " + reader + ")",
860           hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
861         assertEquals("Incorrect compression used for column family " + familyStr +
862           "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
863       }
864     } finally {
865       dir.getFileSystem(conf).delete(dir, true);
866     }
867   }
868 
869   /**
870    * Write random values to the writer assuming a table created using
871    * {@link #FAMILIES} as column family descriptors
872    */
873   private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer,
874       TaskAttemptContext context, Set<byte[]> families, int numRows)
875       throws IOException, InterruptedException {
876     byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
877     int valLength = 10;
878     byte valBytes[] = new byte[valLength];
879 
880     int taskId = context.getTaskAttemptID().getTaskID().getId();
881     assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
882     final byte [] qualifier = Bytes.toBytes("data");
883     Random random = new Random();
884     for (int i = 0; i < numRows; i++) {
885 
886       Bytes.putInt(keyBytes, 0, i);
887       random.nextBytes(valBytes);
888       ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
889 
890       for (byte[] family : families) {
891         KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
892         writer.write(key, kv);
893       }
894     }
895   }
896 
897   /**
898    * This test is to test the scenario happened in HBASE-6901.
899    * All files are bulk loaded and excluded from minor compaction.
900    * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
901    * will be thrown.
902    */
903   @Ignore ("Flakey: See HBASE-9051") @Test
904   public void testExcludeAllFromMinorCompaction() throws Exception {
905     Configuration conf = util.getConfiguration();
906     conf.setInt("hbase.hstore.compaction.min", 2);
907     generateRandomStartKeys(5);
908 
909     try {
910       util.startMiniCluster();
911       final FileSystem fs = util.getDFSCluster().getFileSystem();
912       HBaseAdmin admin = new HBaseAdmin(conf);
913       HTable table = util.createTable(TABLE_NAME, FAMILIES);
914       assertEquals("Should start with empty table", 0, util.countRows(table));
915 
916       // deep inspection: get the StoreFile dir
917       final Path storePath = HStore.getStoreHomedir(
918           FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
919           admin.getTableRegions(TABLE_NAME).get(0),
920           FAMILIES[0]);
921       assertEquals(0, fs.listStatus(storePath).length);
922 
923       // Generate two bulk load files
924       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
925           true);
926       util.startMiniMapReduceCluster();
927 
928       for (int i = 0; i < 2; i++) {
929         Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
930         runIncrementalPELoad(conf, table, testDir);
931         // Perform the actual load
932         new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
933       }
934 
935       // Ensure data shows up
936       int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
937       assertEquals("LoadIncrementalHFiles should put expected data in table",
938           expectedRows, util.countRows(table));
939 
940       // should have a second StoreFile now
941       assertEquals(2, fs.listStatus(storePath).length);
942 
943       // minor compactions shouldn't get rid of the file
944       admin.compact(TABLE_NAME.getName());
945       try {
946         quickPoll(new Callable<Boolean>() {
947           public Boolean call() throws Exception {
948             return fs.listStatus(storePath).length == 1;
949           }
950         }, 5000);
951         throw new IOException("SF# = " + fs.listStatus(storePath).length);
952       } catch (AssertionError ae) {
953         // this is expected behavior
954       }
955 
956       // a major compaction should work though
957       admin.majorCompact(TABLE_NAME.getName());
958       quickPoll(new Callable<Boolean>() {
959         public Boolean call() throws Exception {
960           return fs.listStatus(storePath).length == 1;
961         }
962       }, 5000);
963 
964     } finally {
965       util.shutdownMiniMapReduceCluster();
966       util.shutdownMiniCluster();
967     }
968   }
969 
970   @Test
971   public void testExcludeMinorCompaction() throws Exception {
972     Configuration conf = util.getConfiguration();
973     conf.setInt("hbase.hstore.compaction.min", 2);
974     generateRandomStartKeys(5);
975 
976     try {
977       util.startMiniCluster();
978       Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
979       final FileSystem fs = util.getDFSCluster().getFileSystem();
980       HBaseAdmin admin = new HBaseAdmin(conf);
981       HTable table = util.createTable(TABLE_NAME, FAMILIES);
982       assertEquals("Should start with empty table", 0, util.countRows(table));
983 
984       // deep inspection: get the StoreFile dir
985       final Path storePath = HStore.getStoreHomedir(
986           FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
987           admin.getTableRegions(TABLE_NAME).get(0),
988           FAMILIES[0]);
989       assertEquals(0, fs.listStatus(storePath).length);
990 
991       // put some data in it and flush to create a storefile
992       Put p = new Put(Bytes.toBytes("test"));
993       p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
994       table.put(p);
995       admin.flush(TABLE_NAME.getName());
996       assertEquals(1, util.countRows(table));
997       quickPoll(new Callable<Boolean>() {
998         public Boolean call() throws Exception {
999           return fs.listStatus(storePath).length == 1;
1000         }
1001       }, 5000);
1002 
1003       // Generate a bulk load file with more rows
1004       conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1005           true);
1006       util.startMiniMapReduceCluster();
1007       runIncrementalPELoad(conf, table, testDir);
1008 
1009       // Perform the actual load
1010       new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
1011 
1012       // Ensure data shows up
1013       int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1014       assertEquals("LoadIncrementalHFiles should put expected data in table",
1015           expectedRows + 1, util.countRows(table));
1016 
1017       // should have a second StoreFile now
1018       assertEquals(2, fs.listStatus(storePath).length);
1019 
1020       // minor compactions shouldn't get rid of the file
1021       admin.compact(TABLE_NAME.getName());
1022       try {
1023         quickPoll(new Callable<Boolean>() {
1024           public Boolean call() throws Exception {
1025             return fs.listStatus(storePath).length == 1;
1026           }
1027         }, 5000);
1028         throw new IOException("SF# = " + fs.listStatus(storePath).length);
1029       } catch (AssertionError ae) {
1030         // this is expected behavior
1031       }
1032 
1033       // a major compaction should work though
1034       admin.majorCompact(TABLE_NAME.getName());
1035       quickPoll(new Callable<Boolean>() {
1036         public Boolean call() throws Exception {
1037           return fs.listStatus(storePath).length == 1;
1038         }
1039       }, 5000);
1040 
1041     } finally {
1042       util.shutdownMiniMapReduceCluster();
1043       util.shutdownMiniCluster();
1044     }
1045   }
1046 
1047   private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1048     int sleepMs = 10;
1049     int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1050     while (retries-- > 0) {
1051       if (c.call().booleanValue()) {
1052         return;
1053       }
1054       Thread.sleep(sleepMs);
1055     }
1056     fail();
1057   }
1058 
1059   public static void main(String args[]) throws Exception {
1060     new TestHFileOutputFormat().manualTest(args);
1061   }
1062 
1063   public void manualTest(String args[]) throws Exception {
1064     Configuration conf = HBaseConfiguration.create();
1065     util = new HBaseTestingUtility(conf);
1066     if ("newtable".equals(args[0])) {
1067       TableName tname = TableName.valueOf(args[1]);
1068       byte[][] splitKeys = generateRandomSplitKeys(4);
1069       HTable table = util.createTable(tname, FAMILIES, splitKeys);
1070     } else if ("incremental".equals(args[0])) {
1071       TableName tname = TableName.valueOf(args[1]);
1072       HTable table = new HTable(conf, tname);
1073       Path outDir = new Path("incremental-out");
1074       runIncrementalPELoad(conf, table, outDir);
1075     } else {
1076       throw new RuntimeException(
1077           "usage: TestHFileOutputFormat newtable | incremental");
1078     }
1079   }
1080 
1081 }
1082