1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNotSame;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27
28 import java.io.IOException;
29 import java.io.UnsupportedEncodingException;
30 import java.util.Arrays;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.Random;
36 import java.util.Set;
37 import java.util.concurrent.Callable;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FileStatus;
43 import org.apache.hadoop.fs.FileSystem;
44 import org.apache.hadoop.fs.LocatedFileStatus;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.RemoteIterator;
47 import org.apache.hadoop.hbase.Cell;
48 import org.apache.hadoop.hbase.CellUtil;
49 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
50 import org.apache.hadoop.hbase.HBaseConfiguration;
51 import org.apache.hadoop.hbase.HBaseTestingUtility;
52 import org.apache.hadoop.hbase.HColumnDescriptor;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HTableDescriptor;
55 import org.apache.hadoop.hbase.HadoopShims;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.testclassification.LargeTests;
58 import org.apache.hadoop.hbase.PerformanceEvaluation;
59 import org.apache.hadoop.hbase.TableName;
60 import org.apache.hadoop.hbase.Tag;
61 import org.apache.hadoop.hbase.TagType;
62 import org.apache.hadoop.hbase.client.Admin;
63 import org.apache.hadoop.hbase.client.Connection;
64 import org.apache.hadoop.hbase.client.ConnectionFactory;
65 import org.apache.hadoop.hbase.client.HBaseAdmin;
66 import org.apache.hadoop.hbase.client.HTable;
67 import org.apache.hadoop.hbase.client.Put;
68 import org.apache.hadoop.hbase.client.RegionLocator;
69 import org.apache.hadoop.hbase.client.Result;
70 import org.apache.hadoop.hbase.client.ResultScanner;
71 import org.apache.hadoop.hbase.client.Scan;
72 import org.apache.hadoop.hbase.client.Table;
73 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
74 import org.apache.hadoop.hbase.io.compress.Compression;
75 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
76 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
77 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
78 import org.apache.hadoop.hbase.io.hfile.HFile;
79 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
80 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
81 import org.apache.hadoop.hbase.regionserver.BloomType;
82 import org.apache.hadoop.hbase.regionserver.StoreFile;
83 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
84 import org.apache.hadoop.hbase.util.Bytes;
85 import org.apache.hadoop.hbase.util.FSUtils;
86 import org.apache.hadoop.hbase.util.Threads;
87 import org.apache.hadoop.hbase.util.Writables;
88 import org.apache.hadoop.io.NullWritable;
89 import org.apache.hadoop.mapreduce.Job;
90 import org.apache.hadoop.mapreduce.Mapper;
91 import org.apache.hadoop.mapreduce.RecordWriter;
92 import org.apache.hadoop.mapreduce.TaskAttemptContext;
93 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
94 import org.junit.Ignore;
95 import org.junit.Test;
96 import org.junit.experimental.categories.Category;
97 import org.mockito.Mockito;
98
99
100
101
102
103
104
105 @Category(LargeTests.class)
106 public class TestHFileOutputFormat2 {
107 private final static int ROWSPERSPLIT = 1024;
108
109 private static final byte[][] FAMILIES
110 = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
111 , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
112 private static final TableName TABLE_NAME =
113 TableName.valueOf("TestTable");
114
115 private HBaseTestingUtility util = new HBaseTestingUtility();
116
117 private static Log LOG = LogFactory.getLog(TestHFileOutputFormat2.class);
118
119
120
121
122 static class RandomKVGeneratingMapper
123 extends Mapper<NullWritable, NullWritable,
124 ImmutableBytesWritable, Cell> {
125
126 private int keyLength;
127 private static final int KEYLEN_DEFAULT=10;
128 private static final String KEYLEN_CONF="randomkv.key.length";
129
130 private int valLength;
131 private static final int VALLEN_DEFAULT=10;
132 private static final String VALLEN_CONF="randomkv.val.length";
133 private static final byte [] QUALIFIER = Bytes.toBytes("data");
134
135 @Override
136 protected void setup(Context context) throws IOException,
137 InterruptedException {
138 super.setup(context);
139
140 Configuration conf = context.getConfiguration();
141 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
142 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
143 }
144
145 @Override
146 protected void map(
147 NullWritable n1, NullWritable n2,
148 Mapper<NullWritable, NullWritable,
149 ImmutableBytesWritable,Cell>.Context context)
150 throws java.io.IOException ,InterruptedException
151 {
152
153 byte keyBytes[] = new byte[keyLength];
154 byte valBytes[] = new byte[valLength];
155
156 int taskId = context.getTaskAttemptID().getTaskID().getId();
157 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
158
159 Random random = new Random();
160 for (int i = 0; i < ROWSPERSPLIT; i++) {
161
162 random.nextBytes(keyBytes);
163
164 keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
165 random.nextBytes(valBytes);
166 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
167
168 for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
169 Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
170 context.write(key, kv);
171 }
172 }
173 }
174 }
175
176 private void setupRandomGeneratorMapper(Job job) {
177 job.setInputFormatClass(NMapInputFormat.class);
178 job.setMapperClass(RandomKVGeneratingMapper.class);
179 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
180 job.setMapOutputValueClass(KeyValue.class);
181 }
182
183
184
185
186
187
188 @Test
189 public void test_LATEST_TIMESTAMP_isReplaced()
190 throws Exception {
191 Configuration conf = new Configuration(this.util.getConfiguration());
192 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
193 TaskAttemptContext context = null;
194 Path dir =
195 util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
196 try {
197 Job job = new Job(conf);
198 FileOutputFormat.setOutputPath(job, dir);
199 context = createTestTaskAttemptContext(job);
200 HFileOutputFormat2 hof = new HFileOutputFormat2();
201 writer = hof.getRecordWriter(context);
202 final byte [] b = Bytes.toBytes("b");
203
204
205
206 KeyValue kv = new KeyValue(b, b, b);
207 KeyValue original = kv.clone();
208 writer.write(new ImmutableBytesWritable(), kv);
209 assertFalse(original.equals(kv));
210 assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
211 assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
212 assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
213 assertNotSame(original.getTimestamp(), kv.getTimestamp());
214 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
215
216
217
218 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
219 original = kv.clone();
220 writer.write(new ImmutableBytesWritable(), kv);
221 assertTrue(original.equals(kv));
222 } finally {
223 if (writer != null && context != null) writer.close(context);
224 dir.getFileSystem(conf).delete(dir, true);
225 }
226 }
227
228 private TaskAttemptContext createTestTaskAttemptContext(final Job job)
229 throws Exception {
230 HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
231 TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
232 job, "attempt_201402131733_0001_m_000000_0");
233 return context;
234 }
235
236
237
238
239
240 @Test
241 public void test_TIMERANGE() throws Exception {
242 Configuration conf = new Configuration(this.util.getConfiguration());
243 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
244 TaskAttemptContext context = null;
245 Path dir =
246 util.getDataTestDir("test_TIMERANGE_present");
247 LOG.info("Timerange dir writing to dir: " + dir);
248 try {
249
250 Job job = new Job(conf);
251 FileOutputFormat.setOutputPath(job, dir);
252 context = createTestTaskAttemptContext(job);
253 HFileOutputFormat2 hof = new HFileOutputFormat2();
254 writer = hof.getRecordWriter(context);
255
256
257 final byte [] b = Bytes.toBytes("b");
258
259
260 KeyValue kv = new KeyValue(b, b, b, 2000, b);
261 KeyValue original = kv.clone();
262 writer.write(new ImmutableBytesWritable(), kv);
263 assertEquals(original,kv);
264
265
266 kv = new KeyValue(b, b, b, 1000, b);
267 original = kv.clone();
268 writer.write(new ImmutableBytesWritable(), kv);
269 assertEquals(original, kv);
270
271
272 writer.close(context);
273
274
275
276
277 FileSystem fs = FileSystem.get(conf);
278 Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
279 FileStatus[] sub1 = fs.listStatus(attemptDirectory);
280 FileStatus[] file = fs.listStatus(sub1[0].getPath());
281
282
283 HFile.Reader rd = HFile.createReader(fs, file[0].getPath(),
284 new CacheConfig(conf), conf);
285 Map<byte[],byte[]> finfo = rd.loadFileInfo();
286 byte[] range = finfo.get("TIMERANGE".getBytes());
287 assertNotNull(range);
288
289
290 TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
291 Writables.copyWritable(range, timeRangeTracker);
292 LOG.info(timeRangeTracker.getMinimumTimestamp() +
293 "...." + timeRangeTracker.getMaximumTimestamp());
294 assertEquals(1000, timeRangeTracker.getMinimumTimestamp());
295 assertEquals(2000, timeRangeTracker.getMaximumTimestamp());
296 rd.close();
297 } finally {
298 if (writer != null && context != null) writer.close(context);
299 dir.getFileSystem(conf).delete(dir, true);
300 }
301 }
302
303
304
305
306 @Test
307 public void testWritingPEData() throws Exception {
308 Configuration conf = util.getConfiguration();
309 Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
310 FileSystem fs = testDir.getFileSystem(conf);
311
312
313 conf.setInt("mapreduce.task.io.sort.mb", 20);
314
315 conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
316
317 Job job = new Job(conf, "testWritingPEData");
318 setupRandomGeneratorMapper(job);
319
320
321 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
322 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
323
324 Arrays.fill(startKey, (byte)0);
325 Arrays.fill(endKey, (byte)0xff);
326
327 job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
328
329 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
330 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
331 job.setReducerClass(KeyValueSortReducer.class);
332 job.setOutputFormatClass(HFileOutputFormat2.class);
333 job.setNumReduceTasks(4);
334 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
335 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
336 KeyValueSerialization.class.getName());
337
338 FileOutputFormat.setOutputPath(job, testDir);
339 assertTrue(job.waitForCompletion(false));
340 FileStatus [] files = fs.listStatus(testDir);
341 assertTrue(files.length > 0);
342 }
343
344
345
346
347
348 @Test
349 public void test_WritingTagData()
350 throws Exception {
351 Configuration conf = new Configuration(this.util.getConfiguration());
352 final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
353 conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
354 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
355 TaskAttemptContext context = null;
356 Path dir =
357 util.getDataTestDir("WritingTagData");
358 try {
359 Job job = new Job(conf);
360 FileOutputFormat.setOutputPath(job, dir);
361 context = createTestTaskAttemptContext(job);
362 HFileOutputFormat2 hof = new HFileOutputFormat2();
363 writer = hof.getRecordWriter(context);
364 final byte [] b = Bytes.toBytes("b");
365
366 KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, new Tag[] {
367 new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)) });
368 writer.write(new ImmutableBytesWritable(), kv);
369 writer.close(context);
370 writer = null;
371 FileSystem fs = dir.getFileSystem(conf);
372 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
373 while(iterator.hasNext()) {
374 LocatedFileStatus keyFileStatus = iterator.next();
375 HFile.Reader reader = HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf),
376 conf);
377 HFileScanner scanner = reader.getScanner(false, false, false);
378 scanner.seekTo();
379 Cell cell = scanner.getKeyValue();
380
381 Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(),
382 cell.getTagsOffset(), cell.getTagsLength());
383 assertTrue(tagsIterator.hasNext());
384 assertTrue(tagsIterator.next().getType() == TagType.TTL_TAG_TYPE);
385 }
386 } finally {
387 if (writer != null && context != null) writer.close(context);
388 dir.getFileSystem(conf).delete(dir, true);
389 }
390 }
391
392 @Test
393 public void testJobConfiguration() throws Exception {
394 Configuration conf = new Configuration(this.util.getConfiguration());
395 conf.set("hbase.fs.tmp.dir", util.getDataTestDir("testJobConfiguration").toString());
396 Job job = new Job(conf);
397 job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
398 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
399 setupMockStartKeys(regionLocator);
400 HFileOutputFormat2.configureIncrementalLoad(job, new HTableDescriptor(), regionLocator);
401 assertEquals(job.getNumReduceTasks(), 4);
402 }
403
404 private byte [][] generateRandomStartKeys(int numKeys) {
405 Random random = new Random();
406 byte[][] ret = new byte[numKeys][];
407
408 ret[0] = HConstants.EMPTY_BYTE_ARRAY;
409 for (int i = 1; i < numKeys; i++) {
410 ret[i] =
411 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
412 }
413 return ret;
414 }
415
416 private byte[][] generateRandomSplitKeys(int numKeys) {
417 Random random = new Random();
418 byte[][] ret = new byte[numKeys][];
419 for (int i = 0; i < numKeys; i++) {
420 ret[i] =
421 PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
422 }
423 return ret;
424 }
425
426 @Test
427 public void testMRIncrementalLoad() throws Exception {
428 LOG.info("\nStarting test testMRIncrementalLoad\n");
429 doIncrementalLoadTest(false);
430 }
431
432 @Test
433 public void testMRIncrementalLoadWithSplit() throws Exception {
434 LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
435 doIncrementalLoadTest(true);
436 }
437
438 private void doIncrementalLoadTest(
439 boolean shouldChangeRegions) throws Exception {
440 util = new HBaseTestingUtility();
441 Configuration conf = util.getConfiguration();
442 byte[][] splitKeys = generateRandomSplitKeys(4);
443 util.startMiniCluster();
444 try {
445 HTable table = util.createTable(TABLE_NAME, FAMILIES, splitKeys);
446 Admin admin = table.getConnection().getAdmin();
447 Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
448 assertEquals("Should start with empty table",
449 0, util.countRows(table));
450 int numRegions = -1;
451 try (RegionLocator r = table.getRegionLocator()) {
452 numRegions = r.getStartKeys().length;
453 }
454 assertEquals("Should make 5 regions", numRegions, 5);
455
456
457 util.startMiniMapReduceCluster();
458 runIncrementalPELoad(conf, table.getTableDescriptor(), table.getRegionLocator(), testDir);
459
460 assertEquals("HFOF should not touch actual table",
461 0, util.countRows(table));
462
463
464
465 int dir = 0;
466 for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
467 for (byte[] family : FAMILIES) {
468 if (Bytes.toString(family).equals(f.getPath().getName())) {
469 ++dir;
470 }
471 }
472 }
473 assertEquals("Column family not found in FS.", FAMILIES.length, dir);
474
475
476 if (shouldChangeRegions) {
477 LOG.info("Changing regions in table");
478 admin.disableTable(table.getName());
479 while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
480 getRegionStates().isRegionsInTransition()) {
481 Threads.sleep(200);
482 LOG.info("Waiting on table to finish disabling");
483 }
484 util.deleteTable(table.getName());
485 byte[][] newSplitKeys = generateRandomSplitKeys(14);
486 table = util.createTable(TABLE_NAME, FAMILIES, newSplitKeys);
487
488 while (table.getRegionLocator().getAllRegionLocations().size() != 15 ||
489 !admin.isTableAvailable(table.getName())) {
490 Thread.sleep(200);
491 LOG.info("Waiting for new region assignment to happen");
492 }
493 }
494
495
496 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
497
498
499 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
500 assertEquals("LoadIncrementalHFiles should put expected data in table",
501 expectedRows, util.countRows(table));
502 Scan scan = new Scan();
503 ResultScanner results = table.getScanner(scan);
504 for (Result res : results) {
505 assertEquals(FAMILIES.length, res.rawCells().length);
506 Cell first = res.rawCells()[0];
507 for (Cell kv : res.rawCells()) {
508 assertTrue(CellUtil.matchingRow(first, kv));
509 assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
510 }
511 }
512 results.close();
513 String tableDigestBefore = util.checksumRows(table);
514
515
516 admin.disableTable(TABLE_NAME);
517 while (!admin.isTableDisabled(TABLE_NAME)) {
518 Thread.sleep(200);
519 LOG.info("Waiting for table to disable");
520 }
521 admin.enableTable(TABLE_NAME);
522 util.waitTableAvailable(TABLE_NAME);
523 assertEquals("Data should remain after reopening of regions",
524 tableDigestBefore, util.checksumRows(table));
525 } finally {
526 util.shutdownMiniMapReduceCluster();
527 util.shutdownMiniCluster();
528 }
529 }
530
531 private void runIncrementalPELoad(Configuration conf, HTableDescriptor tableDescriptor,
532 RegionLocator regionLocator, Path outDir) throws IOException, UnsupportedEncodingException,
533 InterruptedException, ClassNotFoundException {
534 Job job = new Job(conf, "testLocalMRIncrementalLoad");
535 job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
536 job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
537 MutationSerialization.class.getName(), ResultSerialization.class.getName(),
538 KeyValueSerialization.class.getName());
539 setupRandomGeneratorMapper(job);
540 HFileOutputFormat2.configureIncrementalLoad(job, tableDescriptor, regionLocator);
541 FileOutputFormat.setOutputPath(job, outDir);
542
543 assertFalse(util.getTestFileSystem().exists(outDir)) ;
544
545 assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
546
547 assertTrue(job.waitForCompletion(true));
548 }
549
550
551
552
553
554
555
556
557
558
559 @Test
560 public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
561 for (int numCfs = 0; numCfs <= 3; numCfs++) {
562 Configuration conf = new Configuration(this.util.getConfiguration());
563 Map<String, Compression.Algorithm> familyToCompression =
564 getMockColumnFamiliesForCompression(numCfs);
565 Table table = Mockito.mock(HTable.class);
566 setupMockColumnFamiliesForCompression(table, familyToCompression);
567 HFileOutputFormat2.configureCompression(conf, table.getTableDescriptor());
568
569
570 Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
571 .createFamilyCompressionMap(conf);
572
573
574
575 for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
576 assertEquals("Compression configuration incorrect for column family:"
577 + entry.getKey(), entry.getValue(),
578 retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
579 }
580 }
581 }
582
583 private void setupMockColumnFamiliesForCompression(Table table,
584 Map<String, Compression.Algorithm> familyToCompression) throws IOException {
585 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
586 for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
587 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
588 .setMaxVersions(1)
589 .setCompressionType(entry.getValue())
590 .setBlockCacheEnabled(false)
591 .setTimeToLive(0));
592 }
593 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
594 }
595
596
597
598
599
600 private Map<String, Compression.Algorithm>
601 getMockColumnFamiliesForCompression (int numCfs) {
602 Map<String, Compression.Algorithm> familyToCompression
603 = new HashMap<String, Compression.Algorithm>();
604
605 if (numCfs-- > 0) {
606 familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
607 }
608 if (numCfs-- > 0) {
609 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
610 }
611 if (numCfs-- > 0) {
612 familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
613 }
614 if (numCfs-- > 0) {
615 familyToCompression.put("Family3", Compression.Algorithm.NONE);
616 }
617 return familyToCompression;
618 }
619
620
621
622
623
624
625
626
627
628
629
630 @Test
631 public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
632 for (int numCfs = 0; numCfs <= 2; numCfs++) {
633 Configuration conf = new Configuration(this.util.getConfiguration());
634 Map<String, BloomType> familyToBloomType =
635 getMockColumnFamiliesForBloomType(numCfs);
636 Table table = Mockito.mock(HTable.class);
637 setupMockColumnFamiliesForBloomType(table,
638 familyToBloomType);
639 HFileOutputFormat2.configureBloomType(table.getTableDescriptor(), conf);
640
641
642
643 Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
644 HFileOutputFormat2
645 .createFamilyBloomTypeMap(conf);
646
647
648
649 for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
650 assertEquals("BloomType configuration incorrect for column family:"
651 + entry.getKey(), entry.getValue(),
652 retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes()));
653 }
654 }
655 }
656
657 private void setupMockColumnFamiliesForBloomType(Table table,
658 Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
659 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
660 for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
661 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
662 .setMaxVersions(1)
663 .setBloomFilterType(entry.getValue())
664 .setBlockCacheEnabled(false)
665 .setTimeToLive(0));
666 }
667 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
668 }
669
670
671
672
673
674 private Map<String, BloomType>
675 getMockColumnFamiliesForBloomType (int numCfs) {
676 Map<String, BloomType> familyToBloomType =
677 new HashMap<String, BloomType>();
678
679 if (numCfs-- > 0) {
680 familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
681 }
682 if (numCfs-- > 0) {
683 familyToBloomType.put("Family2=asdads&!AASD",
684 BloomType.ROWCOL);
685 }
686 if (numCfs-- > 0) {
687 familyToBloomType.put("Family3", BloomType.NONE);
688 }
689 return familyToBloomType;
690 }
691
692
693
694
695
696
697
698
699
700
701 @Test
702 public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
703 for (int numCfs = 0; numCfs <= 3; numCfs++) {
704 Configuration conf = new Configuration(this.util.getConfiguration());
705 Map<String, Integer> familyToBlockSize =
706 getMockColumnFamiliesForBlockSize(numCfs);
707 Table table = Mockito.mock(HTable.class);
708 setupMockColumnFamiliesForBlockSize(table,
709 familyToBlockSize);
710 HFileOutputFormat2.configureBlockSize(table.getTableDescriptor(), conf);
711
712
713
714 Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
715 HFileOutputFormat2
716 .createFamilyBlockSizeMap(conf);
717
718
719
720 for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
721 ) {
722 assertEquals("BlockSize configuration incorrect for column family:"
723 + entry.getKey(), entry.getValue(),
724 retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes()));
725 }
726 }
727 }
728
729 private void setupMockColumnFamiliesForBlockSize(Table table,
730 Map<String, Integer> familyToDataBlockEncoding) throws IOException {
731 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
732 for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
733 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
734 .setMaxVersions(1)
735 .setBlocksize(entry.getValue())
736 .setBlockCacheEnabled(false)
737 .setTimeToLive(0));
738 }
739 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
740 }
741
742
743
744
745
746 private Map<String, Integer>
747 getMockColumnFamiliesForBlockSize (int numCfs) {
748 Map<String, Integer> familyToBlockSize =
749 new HashMap<String, Integer>();
750
751 if (numCfs-- > 0) {
752 familyToBlockSize.put("Family1!@#!@#&", 1234);
753 }
754 if (numCfs-- > 0) {
755 familyToBlockSize.put("Family2=asdads&!AASD",
756 Integer.MAX_VALUE);
757 }
758 if (numCfs-- > 0) {
759 familyToBlockSize.put("Family2=asdads&!AASD",
760 Integer.MAX_VALUE);
761 }
762 if (numCfs-- > 0) {
763 familyToBlockSize.put("Family3", 0);
764 }
765 return familyToBlockSize;
766 }
767
768
769
770
771
772
773
774
775
776 @Test
777 public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
778 for (int numCfs = 0; numCfs <= 3; numCfs++) {
779 Configuration conf = new Configuration(this.util.getConfiguration());
780 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
781 getMockColumnFamiliesForDataBlockEncoding(numCfs);
782 Table table = Mockito.mock(HTable.class);
783 setupMockColumnFamiliesForDataBlockEncoding(table,
784 familyToDataBlockEncoding);
785 HTableDescriptor tableDescriptor = table.getTableDescriptor();
786 HFileOutputFormat2.configureDataBlockEncoding(tableDescriptor, conf);
787
788
789
790 Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
791 HFileOutputFormat2
792 .createFamilyDataBlockEncodingMap(conf);
793
794
795
796 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
797 assertEquals("DataBlockEncoding configuration incorrect for column family:"
798 + entry.getKey(), entry.getValue(),
799 retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes()));
800 }
801 }
802 }
803
804 private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
805 Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
806 HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
807 for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
808 mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
809 .setMaxVersions(1)
810 .setDataBlockEncoding(entry.getValue())
811 .setBlockCacheEnabled(false)
812 .setTimeToLive(0));
813 }
814 Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
815 }
816
817
818
819
820
821 private Map<String, DataBlockEncoding>
822 getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
823 Map<String, DataBlockEncoding> familyToDataBlockEncoding =
824 new HashMap<String, DataBlockEncoding>();
825
826 if (numCfs-- > 0) {
827 familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
828 }
829 if (numCfs-- > 0) {
830 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
831 DataBlockEncoding.FAST_DIFF);
832 }
833 if (numCfs-- > 0) {
834 familyToDataBlockEncoding.put("Family2=asdads&!AASD",
835 DataBlockEncoding.PREFIX);
836 }
837 if (numCfs-- > 0) {
838 familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
839 }
840 return familyToDataBlockEncoding;
841 }
842
843 private void setupMockStartKeys(RegionLocator table) throws IOException {
844 byte[][] mockKeys = new byte[][] {
845 HConstants.EMPTY_BYTE_ARRAY,
846 Bytes.toBytes("aaa"),
847 Bytes.toBytes("ggg"),
848 Bytes.toBytes("zzz")
849 };
850 Mockito.doReturn(mockKeys).when(table).getStartKeys();
851 }
852
853
854
855
856
857 @Test
858 public void testColumnFamilySettings() throws Exception {
859 Configuration conf = new Configuration(this.util.getConfiguration());
860 RecordWriter<ImmutableBytesWritable, Cell> writer = null;
861 TaskAttemptContext context = null;
862 Path dir = util.getDataTestDir("testColumnFamilySettings");
863
864
865 Table table = Mockito.mock(Table.class);
866 RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
867 HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
868 Mockito.doReturn(htd).when(table).getTableDescriptor();
869 for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
870 htd.addFamily(hcd);
871 }
872
873
874 setupMockStartKeys(regionLocator);
875
876 try {
877
878
879
880 conf.set("io.seqfile.compression.type", "NONE");
881 conf.set("hbase.fs.tmp.dir", dir.toString());
882 Job job = new Job(conf, "testLocalMRIncrementalLoad");
883 job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
884 setupRandomGeneratorMapper(job);
885 HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
886 FileOutputFormat.setOutputPath(job, dir);
887 context = createTestTaskAttemptContext(job);
888 HFileOutputFormat2 hof = new HFileOutputFormat2();
889 writer = hof.getRecordWriter(context);
890
891
892 writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
893 writer.close(context);
894
895
896 FileSystem fs = dir.getFileSystem(conf);
897
898
899 hof.getOutputCommitter(context).commitTask(context);
900 hof.getOutputCommitter(context).commitJob(context);
901 FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
902 assertEquals(htd.getFamilies().size(), families.length);
903 for (FileStatus f : families) {
904 String familyStr = f.getPath().getName();
905 HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
906
907
908 Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
909 Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), conf);
910 Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
911
912 byte[] bloomFilter = fileInfo.get(StoreFile.BLOOM_FILTER_TYPE_KEY);
913 if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
914 assertEquals("Incorrect bloom filter used for column family " + familyStr +
915 "(reader: " + reader + ")",
916 hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
917 assertEquals("Incorrect compression used for column family " + familyStr +
918 "(reader: " + reader + ")", hcd.getCompression(), reader.getFileContext().getCompression());
919 }
920 } finally {
921 dir.getFileSystem(conf).delete(dir, true);
922 }
923 }
924
925
926
927
928
929 private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
930 TaskAttemptContext context, Set<byte[]> families, int numRows)
931 throws IOException, InterruptedException {
932 byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
933 int valLength = 10;
934 byte valBytes[] = new byte[valLength];
935
936 int taskId = context.getTaskAttemptID().getTaskID().getId();
937 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
938 final byte [] qualifier = Bytes.toBytes("data");
939 Random random = new Random();
940 for (int i = 0; i < numRows; i++) {
941
942 Bytes.putInt(keyBytes, 0, i);
943 random.nextBytes(valBytes);
944 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
945
946 for (byte[] family : families) {
947 Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
948 writer.write(key, kv);
949 }
950 }
951 }
952
953
954
955
956
957
958
959 @Ignore ("Flakey: See HBASE-9051") @Test
960 public void testExcludeAllFromMinorCompaction() throws Exception {
961 Configuration conf = util.getConfiguration();
962 conf.setInt("hbase.hstore.compaction.min", 2);
963 generateRandomStartKeys(5);
964
965 util.startMiniCluster();
966 try (Connection conn = ConnectionFactory.createConnection();
967 Admin admin = conn.getAdmin()) {
968 final FileSystem fs = util.getDFSCluster().getFileSystem();
969 HTable table = util.createTable(TABLE_NAME, FAMILIES);
970 assertEquals("Should start with empty table", 0, util.countRows(table));
971
972
973 final Path storePath = new Path(
974 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
975 new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
976 Bytes.toString(FAMILIES[0])));
977 assertEquals(0, fs.listStatus(storePath).length);
978
979
980 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
981 true);
982 util.startMiniMapReduceCluster();
983
984 for (int i = 0; i < 2; i++) {
985 Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
986 runIncrementalPELoad(conf, table.getTableDescriptor(), conn.getRegionLocator(TABLE_NAME),
987 testDir);
988
989 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
990 }
991
992
993 int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
994 assertEquals("LoadIncrementalHFiles should put expected data in table",
995 expectedRows, util.countRows(table));
996
997
998 assertEquals(2, fs.listStatus(storePath).length);
999
1000
1001 admin.compact(TABLE_NAME);
1002 try {
1003 quickPoll(new Callable<Boolean>() {
1004 @Override
1005 public Boolean call() throws Exception {
1006 return fs.listStatus(storePath).length == 1;
1007 }
1008 }, 5000);
1009 throw new IOException("SF# = " + fs.listStatus(storePath).length);
1010 } catch (AssertionError ae) {
1011
1012 }
1013
1014
1015 admin.majorCompact(TABLE_NAME);
1016 quickPoll(new Callable<Boolean>() {
1017 @Override
1018 public Boolean call() throws Exception {
1019 return fs.listStatus(storePath).length == 1;
1020 }
1021 }, 5000);
1022
1023 } finally {
1024 util.shutdownMiniMapReduceCluster();
1025 util.shutdownMiniCluster();
1026 }
1027 }
1028
1029 @Test
1030 public void testExcludeMinorCompaction() throws Exception {
1031 Configuration conf = util.getConfiguration();
1032 conf.setInt("hbase.hstore.compaction.min", 2);
1033 generateRandomStartKeys(5);
1034
1035 util.startMiniCluster();
1036 try (Connection conn = ConnectionFactory.createConnection(conf);
1037 Admin admin = conn.getAdmin()){
1038 Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1039 final FileSystem fs = util.getDFSCluster().getFileSystem();
1040 Table table = util.createTable(TABLE_NAME, FAMILIES);
1041 assertEquals("Should start with empty table", 0, util.countRows(table));
1042
1043
1044 final Path storePath = new Path(
1045 FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
1046 new Path(admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
1047 Bytes.toString(FAMILIES[0])));
1048 assertEquals(0, fs.listStatus(storePath).length);
1049
1050
1051 Put p = new Put(Bytes.toBytes("test"));
1052 p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1053 table.put(p);
1054 admin.flush(TABLE_NAME);
1055 assertEquals(1, util.countRows(table));
1056 quickPoll(new Callable<Boolean>() {
1057 @Override
1058 public Boolean call() throws Exception {
1059 return fs.listStatus(storePath).length == 1;
1060 }
1061 }, 5000);
1062
1063
1064 conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1065 true);
1066 util.startMiniMapReduceCluster();
1067
1068 RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAME);
1069 runIncrementalPELoad(conf, table.getTableDescriptor(), regionLocator, testDir);
1070
1071
1072 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1073
1074
1075 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1076 assertEquals("LoadIncrementalHFiles should put expected data in table",
1077 expectedRows + 1, util.countRows(table));
1078
1079
1080 assertEquals(2, fs.listStatus(storePath).length);
1081
1082
1083 admin.compact(TABLE_NAME);
1084 try {
1085 quickPoll(new Callable<Boolean>() {
1086 @Override
1087 public Boolean call() throws Exception {
1088 return fs.listStatus(storePath).length == 1;
1089 }
1090 }, 5000);
1091 throw new IOException("SF# = " + fs.listStatus(storePath).length);
1092 } catch (AssertionError ae) {
1093
1094 }
1095
1096
1097 admin.majorCompact(TABLE_NAME);
1098 quickPoll(new Callable<Boolean>() {
1099 @Override
1100 public Boolean call() throws Exception {
1101 return fs.listStatus(storePath).length == 1;
1102 }
1103 }, 5000);
1104
1105 } finally {
1106 util.shutdownMiniMapReduceCluster();
1107 util.shutdownMiniCluster();
1108 }
1109 }
1110
1111 private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1112 int sleepMs = 10;
1113 int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1114 while (retries-- > 0) {
1115 if (c.call().booleanValue()) {
1116 return;
1117 }
1118 Thread.sleep(sleepMs);
1119 }
1120 fail();
1121 }
1122
1123 public static void main(String args[]) throws Exception {
1124 new TestHFileOutputFormat2().manualTest(args);
1125 }
1126
1127 public void manualTest(String args[]) throws Exception {
1128 Configuration conf = HBaseConfiguration.create();
1129 util = new HBaseTestingUtility(conf);
1130 if ("newtable".equals(args[0])) {
1131 TableName tname = TableName.valueOf(args[1]);
1132 byte[][] splitKeys = generateRandomSplitKeys(4);
1133 try (HTable table = util.createTable(tname, FAMILIES, splitKeys)) {
1134 }
1135 } else if ("incremental".equals(args[0])) {
1136 TableName tname = TableName.valueOf(args[1]);
1137 try(Connection c = ConnectionFactory.createConnection(conf);
1138 Admin admin = c.getAdmin();
1139 RegionLocator regionLocator = c.getRegionLocator(tname)) {
1140 Path outDir = new Path("incremental-out");
1141 runIncrementalPELoad(conf, admin.getTableDescriptor(tname), regionLocator, outDir);
1142 }
1143 } else {
1144 throw new RuntimeException(
1145 "usage: TestHFileOutputFormat2 newtable | incremental");
1146 }
1147 }
1148
1149 }
1150