1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNotSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.util.Arrays;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.concurrent.Callable;
36
37 import junit.framework.Assert;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.Path;
45 import org.apache.hadoop.hbase.Cell;
46 import org.apache.hadoop.hbase.CellUtil;
47 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HBaseTestingUtility;
50 import org.apache.hadoop.hbase.HColumnDescriptor;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.HadoopShims;
54 import org.apache.hadoop.hbase.KeyValue;
55 import org.apache.hadoop.hbase.PerformanceEvaluation;
56 import org.apache.hadoop.hbase.TableName;
57 import org.apache.hadoop.hbase.client.HBaseAdmin;
58 import org.apache.hadoop.hbase.client.HTable;
59 import org.apache.hadoop.hbase.client.Put;
60 import org.apache.hadoop.hbase.client.RegionLocator;
61 import org.apache.hadoop.hbase.client.Result;
62 import org.apache.hadoop.hbase.client.ResultScanner;
63 import org.apache.hadoop.hbase.client.Scan;
64 import org.apache.hadoop.hbase.client.Table;
65 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
66 import org.apache.hadoop.hbase.io.compress.Compression;
67 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
68 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.io.hfile.HFile;
71 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
72 import org.apache.hadoop.hbase.regionserver.BloomType;
73 import org.apache.hadoop.hbase.regionserver.HStore;
74 import org.apache.hadoop.hbase.regionserver.StoreFile;
75 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
76 import org.apache.hadoop.hbase.testclassification.LargeTests;
77 import org.apache.hadoop.hbase.util.Bytes;
78 import org.apache.hadoop.hbase.util.FSUtils;
79 import org.apache.hadoop.hbase.util.Threads;
80 import org.apache.hadoop.hbase.util.Writables;
81 import org.apache.hadoop.io.NullWritable;
82 import org.apache.hadoop.mapreduce.Job;
83 import org.apache.hadoop.mapreduce.Mapper;
84 import org.apache.hadoop.mapreduce.RecordWriter;
85 import org.apache.hadoop.mapreduce.TaskAttemptContext;
86 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
87 import org.junit.Ignore;
88 import org.junit.Test;
89 import org.junit.experimental.categories.Category;
90 import org.mockito.Mockito;
91
92
93
94
95
96
97
98 @Category(LargeTests.class)
99 public class TestHFileOutputFormat {
100 private final static int ROWSPERSPLIT = 1024;
101
102 private static final byte[][] FAMILIES
103 = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
104 , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
105 private static final TableName TABLE_NAME =
106 TableName.valueOf("TestTable");
107
108 private HBaseTestingUtility util = new HBaseTestingUtility();
109
110 private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
111
112
113
114
115 static class RandomKVGeneratingMapper
116 extends Mapper<NullWritable, NullWritable,
117 ImmutableBytesWritable, KeyValue> {
118
119 private int keyLength;
120 private static final int KEYLEN_DEFAULT=10;
121 private static final String KEYLEN_CONF="randomkv.key.length";
122
123 private int valLength;
124 private static final int VALLEN_DEFAULT=10;
125 private static final String VALLEN_CONF="randomkv.val.length";
126 private static final byte [] QUALIFIER = Bytes.toBytes("data");
127
128 @Override
129 protected void setup(Context context) throws IOException,
130 InterruptedException {
131 super.setup(context);
132
133 Configuration conf = context.getConfiguration();
134 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
135 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
136 }
137
138 protected void map(
139 NullWritable n1, NullWritable n2,
140 Mapper<NullWritable, NullWritable,
141 ImmutableBytesWritable,KeyValue>.Context context)
142 throws java.io.IOException ,InterruptedException
143 {
144
145 byte keyBytes[] = new byte[keyLength];
146 byte valBytes[] = new byte[valLength];
147
148 int taskId = context.getTaskAttemptID().getTaskID().getId();
149 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
150
151 Random random = new Random();
152 for (int i = 0; i < ROWSPERSPLIT; i++) {
153
154 random.nextBytes(keyBytes);
155
156 keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
157 random.nextBytes(valBytes);
158 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
159
160 for (byte[] family : TestHFileOutputFormat.FAMILIES) {
161 KeyValue kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
162 context.write(key, kv);
163 }
164 }
165 }
166 }
167
168 private void setupRandomGeneratorMapper(Job job) {
169 job.setInputFormatClass(NMapInputFormat.class);
170 job.setMapperClass(RandomKVGeneratingMapper.class);
171 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
172 job.setMapOutputValueClass(KeyValue.class);
173 }
174
175
176
177
178
179
180 @Test
181 public void test_LATEST_TIMESTAMP_isReplaced()
182 throws Exception {
183 Configuration conf = new Configuration(this.util.getConfiguration());
184 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
185 TaskAttemptContext context = null;
186 Path dir =
187 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
188 try {
189 Job job = new Job(conf);
190 FileOutputFormat.setOutputPath(job, dir);
191 context = createTestTaskAttemptContext(job);
192 HFileOutputFormat hof = new HFileOutputFormat();
193 writer = hof.getRecordWriter(context);
194 final byte [] b = Bytes.toBytes("b");
195
196
197
198 KeyValue kv = new KeyValue(b, b, b);
199 KeyValue original = kv.clone();
200 writer.write(new ImmutableBytesWritable(), kv);
201 assertFalse(original.equals(kv));
202 assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
203 assertTrue(CellUtil.matchingColumn(original, kv.getFamily(), kv.getQualifier()));
204 assertNotSame(original.getTimestamp(), kv.getTimestamp());
205 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
206
207
208
209 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
210 original = kv.clone();
211 writer.write(new ImmutableBytesWritable(), kv);
212 assertTrue(original.equals(kv));
213 } finally {
214 if (writer != null && context != null) writer.close(context);
215 dir.getFileSystem(conf).delete(dir, true);
216 }
217 }
218
219 private TaskAttemptContext createTestTaskAttemptContext(final Job job)
220 throws IOException, Exception {
221 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
222 TaskAttemptContext context = hadoop.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0");
223 return context;
224 }
225
226
227
228
229
230 @Test
231 public void test_TIMERANGE() throws Exception {
232 Configuration conf = new Configuration(this.util.getConfiguration());
233 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
234 TaskAttemptContext context = null;
235 Path dir =
236 util.getDataTestDir("test_TIMERANGE_present");
237 LOG.info("Timerange dir writing to dir: "+ dir);
238 try {
239
240 Job job = new Job(conf);
241 FileOutputFormat.setOutputPath(job, dir);
242 context = createTestTaskAttemptContext(job);
243 HFileOutputFormat hof = new HFileOutputFormat();
244 writer = hof.getRecordWriter(context);
245
246
247 final byte [] b = Bytes.toBytes("b");
248
249
250 KeyValue kv = new KeyValue(b, b, b, 2000, b);
251 KeyValue original = kv.clone();
252 writer.write(new ImmutableBytesWritable(), kv);
253 assertEquals(original,kv);
254
255
256 kv = new KeyValue(b, b, b, 1000, b);
257 original = kv.clone();
258 writer.write(new ImmutableBytesWritable(), kv);
259 assertEquals(original, kv);
260
261
262 writer.close(context);
263
264
265
266
267 FileSystem fs = FileSystem.get(conf);
268 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
269 FileStatus[] sub1 = fs.listStatus(attemptDirectory);
270 FileStatus[] file = fs.listStatus(sub1[0].getPath());
271
272
273 HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
274 new CacheConfig(conf), conf);
275 Map<byte[],byte[]> finfo = rd.loadFileInfo();
276 byte[] range = finfo.get("TIMERANGE".getBytes());
277 assertNotNull(range);
278
279
280 TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
281 Writables.copyWritable(range, timeRangeTracker);
282 LOG.info(timeRangeTracker.getMinimumTimestamp() +
283 "...." + timeRangeTracker.getMaximumTimestamp());
284 assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
285 assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
286 rd.close();
287 } finally {
288 if (writer != null && context != null) writer.close(context);
289 dir.getFileSystem(conf).delete(dir, true);
290 }
291 }
292
293
294
295
296 @Test
297 public void testWritingPEData() throws Exception {
298 Configuration conf = util.getConfiguration();
299 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
300 FileSystem fs = testDir.getFileSystem(conf);
301
302
303 conf.setInt("mapreduce.task.io.sort.mb", 20);
304
305 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
306
307 Job job = new Job(conf, "testWritingPEData");
308 setupRandomGeneratorMapper(job);
309
310
311 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
312 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
313
314 Arrays.fill(startKey, (byte)0);
315 Arrays.fill(endKey, (byte)0xff);
316
317 job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
318
319 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
320 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
321 job.setReducerClass(KeyValueSortReducer.class);
322 job.setOutputFormatClass(HFileOutputFormat.class);
323 job.setNumReduceTasks(4);
324 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
325 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
326 KeyValueSerialization.class.getName());
327
328 FileOutputFormat.setOutputPath(job, testDir);
329 assertTrue(job.waitForCompletion(false));
330 FileStatus [] files = fs.listStatus(testDir);
331 assertTrue(files.length > 0);
332 }
333
334 @Test
335 public void testJobConfiguration() throws Exception {
336 Configuration conf = new Configuration(this.util.getConfiguration());
337 conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
338 Job job = new Job(conf);
339 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
340 HTableDescriptor tableDescriptor = Mockito.mock(HTableDescriptor.class);
341 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
342 setupMockStartKeys(regionLocator);
343 HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
344 assertEquals(job.getNumReduceTasks(), 4);
345 }
346
347 private byte [][] generateRandomStartKeys(int numKeys) {
348 Random random = new Random();
349 byte[][] ret = new byte[numKeys][];
350
351 ret[0] = HConstants.EMPTY_BYTE_ARRAY;
352 for (int i = 1; i < numKeys; i++) {
353 ret[i] =
354 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
355 }
356 return ret;
357 }
358
359 private byte[][] generateRandomSplitKeys(int numKeys) {
360 Random random = new Random();
361 byte[][] ret = new byte[numKeys][];
362 for (int i = 0; i < numKeys; i++) {
363 ret[i] =
364 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
365 }
366 return ret;
367 }
368
369 @Test
370 public void testMRIncrementalLoad() throws Exception {
371 LOG.info("\nStarting test testMRIncrementalLoad\n");
372 doIncrementalLoadTest(false);
373 }
374
375 @Test
376 public void testMRIncrementalLoadWithSplit() throws Exception {
377 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
378 doIncrementalLoadTest(true);
379 }
380
381 private void doIncrementalLoadTest(
382 boolean shouldChangeRegions) throws Exception {
383 util = new HBaseTestingUtility();
384 Configuration conf = util.getConfiguration();
385 byte[][] splitKeys = generateRandomSplitKeys(4);
386 HBaseAdmin admin = null;
387 try {
388 util.startMiniCluster();
389 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
390 admin = util.getHBaseAdmin();
391 HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
392 assertEquals("Should start with empty table",
393 0, util.countRows(table));
394 int numRegions = -1;
395 try(RegionLocator r = table.getRegionLocator()) {
396 numRegions = r.getStartKeys().length;
397 }
398 assertEquals("Should make 5 regions", numRegions, 5);
399
400
401 util.startMiniMapReduceCluster();
402 runIncrementalPELoad(conf, table, testDir);
403
404 assertEquals("HFOF should not touch actual table",
405 0, util.countRows(table));
406
407
408
409 int dir = 0;
410 for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
411 for (byte[] family : FAMILIES) {
412 if (Bytes.toString(family).equals(f.getPath().getName())) {
413 ++dir;
414 }
415 }
416 }
417 assertEquals("Column family not found in FS.", FAMILIES.length, dir);
418
419
420 if (shouldChangeRegions) {
421 LOG.info("Changing regions in table");
422 admin.disableTable(table.getTableName());
423 while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
424 getRegionStates().isRegionsInTransition()) {
425 Threads.sleep(200);
426 LOG.info("Waiting on table to finish disabling");
427 }
428 util.deleteTable(table.getName());
429 byte[][] newSplitKeys = generateRandomSplitKeys(14);
430 table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
431 while (table.getRegionLocations().size() != 15 ||
432 !admin.isTableAvailable(table.getTableName())) {
433 Thread.sleep(200);
434 LOG.info("Waiting for new region assignment to happen");
435 }
436 }
437
438
439 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
440
441
442 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
443 assertEquals("LoadIncrementalHFiles should put expected data in table",
444 expectedRows, util.countRows(table));
445 Scan scan = new Scan();
446 ResultScanner results = table.getScanner(scan);
447 for (Result res : results) {
448 assertEquals(FAMILIES.length, res.rawCells().length);
449 Cell first = res.rawCells()[0];
450 for (Cell kv : res.rawCells()) {
451 assertTrue(CellUtil.matchingRow(first, kv));
452 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
453 }
454 }
455 results.close();
456 String tableDigestBefore = util.checksumRows(table);
457
458
459 admin.disableTable(TABLE_NAME);
460 while (!admin.isTableDisabled(TABLE_NAME)) {
461 Thread.sleep(200);
462 LOG.info("Waiting for table to disable");
463 }
464 admin.enableTable(TABLE_NAME);
465 util.waitTableAvailable(TABLE_NAME);
466 assertEquals("Data should remain after reopening of regions",
467 tableDigestBefore, util.checksumRows(table));
468 } finally {
469 if (admin != null) admin.close();
470 util.shutdownMiniMapReduceCluster();
471 util.shutdownMiniCluster();
472 }
473 }
474
475 private void runIncrementalPELoad(
476 Configuration conf, HTable table, Path outDir)
477 throws Exception {
478 Job job = new Job(conf, "testLocalMRIncrementalLoad");
479 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
480 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
481 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
482 KeyValueSerialization.class.getName());
483 setupRandomGeneratorMapper(job);
484 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
485 table.getRegionLocator());
486 FileOutputFormat.setOutputPath(job, outDir);
487
488 Assert.assertFalse( util.getTestFileSystem().exists(outDir)) ;
489
490 assertEquals(table.getRegionLocator().getAllRegionLocations().size(), job.getNumReduceTasks());
491
492 assertTrue(job.waitForCompletion(true));
493 }
494
495
496
497
498
499
500
501
502
503
504 @Test
505 public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
506 for (int numCfs = 0; numCfs <= 3; numCfs++) {
507 Configuration conf = new Configuration(this.util.getConfiguration());
508 Map<String, Compression.Algorithm> familyToCompression =
509 getMockColumnFamiliesForCompression(numCfs);
510 Table table = Mockito.mock(HTable.class);
511 setupMockColumnFamiliesForCompression(table, familyToCompression);
512 HFileOutputFormat.configureCompression(table, conf);
513
514
515 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat
516 .createFamilyCompressionMap(conf);
517
518
519
520 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
521 assertEquals("Compression configuration incorrect for column family:"
522 + entry.getKey(), entry.getValue(),
523 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
524 }
525 }
526 }
527
528 private void setupMockColumnFamiliesForCompression(Table table,
529 Map<String, Compression.Algorithm> familyToCompression) throws IOException {
530 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
531 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
532 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
533 .setMaxVersions(1)
534 .setCompressionType(entry.getValue())
535 .setBlockCacheEnabled(false)
536 .setTimeToLive(0));
537 }
538 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
539 }
540
541
542
543
544
545 private Map<String, Compression.Algorithm>
546 getMockColumnFamiliesForCompression (int numCfs) {
547 Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
548
549 if (numCfs-- > 0) {
550 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
551 }
552 if (numCfs-- > 0) {
553 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
554 }
555 if (numCfs-- > 0) {
556 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
557 }
558 if (numCfs-- > 0) {
559 familyToCompression.put("Family3", Compression.Algorithm.NONE);
560 }
561 return familyToCompression;
562 }
563
564
565
566
567
568
569
570
571
572
573
574 @Test
575 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
576 for (int numCfs = 0; numCfs <= 2; numCfs++) {
577 Configuration conf = new Configuration(this.util.getConfiguration());
578 Map<String, BloomType> familyToBloomType =
579 getMockColumnFamiliesForBloomType(numCfs);
580 Table table = Mockito.mock(HTable.class);
581 setupMockColumnFamiliesForBloomType(table,
582 familyToBloomType);
583 HFileOutputFormat.configureBloomType(table, conf);
584
585
586
587 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
588 HFileOutputFormat
589 .createFamilyBloomTypeMap(conf);
590
591
592
593 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
594 assertEquals("BloomType configuration incorrect for column family:"
595 + entry.getKey(), entry.getValue(),
596 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
597 }
598 }
599 }
600
601 private void setupMockColumnFamiliesForBloomType(Table table,
602 Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
603 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
604 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
605 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
606 .setMaxVersions(1)
607 .setBloomFilterType(entry.getValue())
608 .setBlockCacheEnabled(false)
609 .setTimeToLive(0));
610 }
611 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
612 }
613
614
615
616
617
618 private Map<String, BloomType>
619 getMockColumnFamiliesForBloomType (int numCfs) {
620 Map<String, BloomType> familyToBloomType =
621 new HashMap<String, BloomType>();
622
623 if (numCfs-- > 0) {
624 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
625 }
626 if (numCfs-- > 0) {
627 familyToBloomType.put("Family2=asdads&!AASD",
628 BloomType.ROWCOL);
629 }
630 if (numCfs-- > 0) {
631 familyToBloomType.put("Family3", BloomType.NONE);
632 }
633 return familyToBloomType;
634 }
635
636
637
638
639
640
641
642
643
644
645 @Test
646 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
647 for (int numCfs = 0; numCfs <= 3; numCfs++) {
648 Configuration conf = new Configuration(this.util.getConfiguration());
649 Map<String, Integer> familyToBlockSize =
650 getMockColumnFamiliesForBlockSize(numCfs);
651 Table table = Mockito.mock(HTable.class);
652 setupMockColumnFamiliesForBlockSize(table,
653 familyToBlockSize);
654 HFileOutputFormat.configureBlockSize(table, conf);
655
656
657
658 Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
659 HFileOutputFormat
660 .createFamilyBlockSizeMap(conf);
661
662
663
664 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
665 ) {
666 assertEquals("BlockSize configuration incorrect for column family:"
667 + entry.getKey(), entry.getValue(),
668 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
669 }
670 }
671 }
672
673 private void setupMockColumnFamiliesForBlockSize(Table table,
674 Map<String, Integer> familyToDataBlockEncoding) throws IOException {
675 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
676 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
677 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
678 .setMaxVersions(1)
679 .setBlocksize(entry.getValue())
680 .setBlockCacheEnabled(false)
681 .setTimeToLive(0));
682 }
683 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
684 }
685
686
687
688
689
690 private Map<String, Integer>
691 getMockColumnFamiliesForBlockSize (int numCfs) {
692 Map<String, Integer> familyToBlockSize =
693 new HashMap<String, Integer>();
694
695 if (numCfs-- > 0) {
696 familyToBlockSize.put("Family1!@#!@#&", 1234);
697 }
698 if (numCfs-- > 0) {
699 familyToBlockSize.put("Family2=asdads&!AASD",
700 Integer.MAX_VALUE);
701 }
702 if (numCfs-- > 0) {
703 familyToBlockSize.put("Family2=asdads&!AASD",
704 Integer.MAX_VALUE);
705 }
706 if (numCfs-- > 0) {
707 familyToBlockSize.put("Family3", 0);
708 }
709 return familyToBlockSize;
710 }
711
712
713
714
715
716
717
718
719
720
721 @Test
722 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
723 for (int numCfs = 0; numCfs <= 3; numCfs++) {
724 Configuration conf = new Configuration(this.util.getConfiguration());
725 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
726 getMockColumnFamiliesForDataBlockEncoding(numCfs);
727 Table table = Mockito.mock(HTable.class);
728 setupMockColumnFamiliesForDataBlockEncoding(table,
729 familyToDataBlockEncoding);
730 HFileOutputFormat.configureDataBlockEncoding(table, conf);
731
732
733
734 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
735 HFileOutputFormat
736 .createFamilyDataBlockEncodingMap(conf);
737
738
739
740 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
741 assertEquals("DataBlockEncoding configuration incorrect for column family:"
742 + entry.getKey(), entry.getValue(),
743 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
744 }
745 }
746 }
747
748 private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
749 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
750 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
751 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
752 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
753 .setMaxVersions(1)
754 .setDataBlockEncoding(entry.getValue())
755 .setBlockCacheEnabled(false)
756 .setTimeToLive(0));
757 }
758 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
759 }
760
761
762
763
764
765 private Map<String, DataBlockEncoding>
766 getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
767 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
768 new HashMap<String, DataBlockEncoding>();
769
770 if (numCfs-- > 0) {
771 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
772 }
773 if (numCfs-- > 0) {
774 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
775 DataBlockEncoding.FAST_DIFF);
776 }
777 if (numCfs-- > 0) {
778 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
779 DataBlockEncoding.PREFIX);
780 }
781 if (numCfs-- > 0) {
782 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
783 }
784 return familyToDataBlockEncoding;
785 }
786
787 private void setupMockStartKeys(RegionLocator regionLocator) throws IOException {
788 byte[][] mockKeys = new byte[][] {
789 HConstants.EMPTY_BYTE_ARRAY,
790 Bytes.toBytes("aaa"),
791 Bytes.toBytes("ggg"),
792 Bytes.toBytes("zzz")
793 };
794 Mockito.doReturn(mockKeys).when(regionLocator).getStartKeys();
795 }
796
797
798
799
800
801 @Test
802 public void testColumnFamilySettings() throws Exception {
803 Configuration conf = new Configuration(this.util.getConfiguration());
804 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
805 TaskAttemptContext context = null;
806 Path dir = util.getDataTestDir("testColumnFamilySettings");
807
808
809 HTable table = Mockito.mock(HTable.class);
810 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
811 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
812 Mockito.doReturn(htd).when(table).getTableDescriptor();
813 for (HColumnDescriptor hcd: this.util.generateColumnDescriptors()) {
814 htd.addFamily(hcd);
815 }
816
817
818 setupMockStartKeys(regionLocator);
819
820 try {
821
822
823
824 conf.set("io.seqfile.compression.type", "NONE");
825 conf.set("hbase.fs.tmp.dir", dir.toString());
826 Job job = new Job(conf, "testLocalMRIncrementalLoad");
827 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
828 setupRandomGeneratorMapper(job);
829 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
830 FileOutputFormat.setOutputPath(job, dir);
831 context = createTestTaskAttemptContext(job);
832 HFileOutputFormat hof = new HFileOutputFormat();
833 writer = hof.getRecordWriter(context);
834
835
836 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
837 writer.close(context);
838
839
840 FileSystem fs = dir.getFileSystem(conf);
841
842
843 hof.getOutputCommitter(context).commitTask(context);
844 hof.getOutputCommitter(context).commitJob(context);
845 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
846 assertEquals(htd.getFamilies().size(), families.length);
847 for (FileStatus f : families) {
848 String familyStr = f.getPath().getName();
849 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
850
851
852 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
853 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
854 Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
855
856 byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
857 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
858 assertEquals("Incorrect bloom filter used for column family " + familyStr +
859 "(reader: " + reader + ")",
860 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
861 assertEquals("Incorrect compression used for column family " + familyStr +
862 "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
863 }
864 } finally {
865 dir.getFileSystem(conf).delete(dir, true);
866 }
867 }
868
869
870
871
872
873 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer,
874 TaskAttemptContext context, Set<byte[]> families, int numRows)
875 throws IOException, InterruptedException {
876 byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
877 int valLength = 10;
878 byte valBytes[] = new byte[valLength];
879
880 int taskId = context.getTaskAttemptID().getTaskID().getId();
881 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
882 final byte [] qualifier = Bytes.toBytes("data");
883 Random random = new Random();
884 for (int i = 0; i < numRows; i++) {
885
886 Bytes.putInt(keyBytes, 0, i);
887 random.nextBytes(valBytes);
888 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
889
890 for (byte[] family : families) {
891 KeyValue kv = new KeyValue(keyBytes, family, qualifier, valBytes);
892 writer.write(key, kv);
893 }
894 }
895 }
896
897
898
899
900
901
902
903 @Ignore ("Flakey: See HBASE-9051") @Test
904 public void testExcludeAllFromMinorCompaction() throws Exception {
905 Configuration conf = util.getConfiguration();
906 conf.setInt("hbase.hstore.compaction.min", 2);
907 generateRandomStartKeys(5);
908
909 try {
910 util.startMiniCluster();
911 final FileSystem fs = util.getDFSCluster().getFileSystem();
912 HBaseAdmin admin = new HBaseAdmin(conf);
913 HTable table = util.createTable(TABLE_NAME, FAMILIES);
914 assertEquals("Should start with empty table", 0, util.countRows(table));
915
916
917 final Path storePath = HStore.getStoreHomedir(
918 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
919 admin.getTableRegions(TABLE_NAME).get(0),
920 FAMILIES[0]);
921 assertEquals(0, fs.listStatus(storePath).length);
922
923
924 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
925 true);
926 util.startMiniMapReduceCluster();
927
928 for (int i = 0; i < 2; i++) {
929 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
930 runIncrementalPELoad(conf, table, testDir);
931
932 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
933 }
934
935
936 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
937 assertEquals("LoadIncrementalHFiles should put expected data in table",
938 expectedRows, util.countRows(table));
939
940
941 assertEquals(2, fs.listStatus(storePath).length);
942
943
944 admin.compact(TABLE_NAME.getName());
945 try {
946 quickPoll(new Callable<Boolean>() {
947 public Boolean call() throws Exception {
948 return fs.listStatus(storePath).length == 1;
949 }
950 }, 5000);
951 throw new IOException("SF# = " + fs.listStatus(storePath).length);
952 } catch (AssertionError ae) {
953
954 }
955
956
957 admin.majorCompact(TABLE_NAME.getName());
958 quickPoll(new Callable<Boolean>() {
959 public Boolean call() throws Exception {
960 return fs.listStatus(storePath).length == 1;
961 }
962 }, 5000);
963
964 } finally {
965 util.shutdownMiniMapReduceCluster();
966 util.shutdownMiniCluster();
967 }
968 }
969
970 @Test
971 public void testExcludeMinorCompaction() throws Exception {
972 Configuration conf = util.getConfiguration();
973 conf.setInt("hbase.hstore.compaction.min", 2);
974 generateRandomStartKeys(5);
975
976 try {
977 util.startMiniCluster();
978 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
979 final FileSystem fs = util.getDFSCluster().getFileSystem();
980 HBaseAdmin admin = new HBaseAdmin(conf);
981 HTable table = util.createTable(TABLE_NAME, FAMILIES);
982 assertEquals("Should start with empty table", 0, util.countRows(table));
983
984
985 final Path storePath = HStore.getStoreHomedir(
986 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
987 admin.getTableRegions(TABLE_NAME).get(0),
988 FAMILIES[0]);
989 assertEquals(0, fs.listStatus(storePath).length);
990
991
992 Put p = new Put(Bytes.toBytes("test"));
993 p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
994 table.put(p);
995 admin.flush(TABLE_NAME.getName());
996 assertEquals(1, util.countRows(table));
997 quickPoll(new Callable<Boolean>() {
998 public Boolean call() throws Exception {
999 return fs.listStatus(storePath).length == 1;
1000 }
1001 }, 5000);
1002
1003
1004 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1005 true);
1006 util.startMiniMapReduceCluster();
1007 runIncrementalPELoad(conf, table, testDir);
1008
1009
1010 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
1011
1012
1013 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1014 assertEquals("LoadIncrementalHFiles should put expected data in table",
1015 expectedRows + 1, util.countRows(table));
1016
1017
1018 assertEquals(2, fs.listStatus(storePath).length);
1019
1020
1021 admin.compact(TABLE_NAME.getName());
1022 try {
1023 quickPoll(new Callable<Boolean>() {
1024 public Boolean call() throws Exception {
1025 return fs.listStatus(storePath).length == 1;
1026 }
1027 }, 5000);
1028 throw new IOException("SF# = " + fs.listStatus(storePath).length);
1029 } catch (AssertionError ae) {
1030
1031 }
1032
1033
1034 admin.majorCompact(TABLE_NAME.getName());
1035 quickPoll(new Callable<Boolean>() {
1036 public Boolean call() throws Exception {
1037 return fs.listStatus(storePath).length == 1;
1038 }
1039 }, 5000);
1040
1041 } finally {
1042 util.shutdownMiniMapReduceCluster();
1043 util.shutdownMiniCluster();
1044 }
1045 }
1046
1047 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1048 int sleepMs = 10;
1049 int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1050 while (retries-- > 0) {
1051 if (c.call().booleanValue()) {
1052 return;
1053 }
1054 Thread.sleep(sleepMs);
1055 }
1056 fail();
1057 }
1058
1059 public static void main(String args[]) throws Exception {
1060 new TestHFileOutputFormat().manualTest(args);
1061 }
1062
1063 public void manualTest(String args[]) throws Exception {
1064 Configuration conf = HBaseConfiguration.create();
1065 util = new HBaseTestingUtility(conf);
1066 if ("newtable".equals(args[0])) {
1067 TableName tname = TableName.valueOf(args[1]);
1068 byte[][] splitKeys = generateRandomSplitKeys(4);
1069 HTable table = util.createTable(tname, FAMILIES, splitKeys);
1070 } else if ("incremental".equals(args[0])) {
1071 TableName tname = TableName.valueOf(args[1]);
1072 HTable table = new HTable(conf, tname);
1073 Path outDir = new Path("incremental-out");
1074 runIncrementalPELoad(conf, table, outDir);
1075 } else {
1076 throw new RuntimeException(
1077 "usage: TestHFileOutputFormat newtable | incremental");
1078 }
1079 }
1080
1081 }
1082