View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.util.ArrayList;
29  import java.util.Deque;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.TreeMap;
33  
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.TableNotFoundException;
46  import org.apache.hadoop.hbase.client.Connection;
47  import org.apache.hadoop.hbase.client.ConnectionFactory;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Table;
50  import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
51  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
52  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
53  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
54  import org.apache.hadoop.hbase.io.hfile.HFile;
55  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
56  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
57  import org.apache.hadoop.hbase.regionserver.BloomType;
58  import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
59  import org.apache.hadoop.hbase.testclassification.LargeTests;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.hadoop.hbase.util.HFileTestUtil;
62  import org.junit.AfterClass;
63  import org.junit.BeforeClass;
64  import org.junit.Rule;
65  import org.junit.Test;
66  import org.junit.experimental.categories.Category;
67  import org.junit.rules.TestName;
68  
69  /**
70   * Test cases for the "load" half of the HFileOutputFormat bulk load
71   * functionality. These tests run faster than the full MR cluster
72   * tests in TestHFileOutputFormat
73   */
74  @Category(LargeTests.class)
75  public class TestLoadIncrementalHFiles {
76    @Rule
77    public TestName tn = new TestName();
78  
79    private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
80    private static final byte[] FAMILY = Bytes.toBytes("myfam");
81    private static final String NAMESPACE = "bulkNS";
82  
83    static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
84    static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
85  
86    private static final byte[][] SPLIT_KEYS = new byte[][] {
87      Bytes.toBytes("ddd"),
88      Bytes.toBytes("ppp")
89    };
90  
91    static HBaseTestingUtility util = new HBaseTestingUtility();
92  
93    @BeforeClass
94    public static void setUpBeforeClass() throws Exception {
95      util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
96      util.getConfiguration().setInt(
97        LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
98        MAX_FILES_PER_REGION_PER_FAMILY);
99      // change default behavior so that tag values are returned with normal rpcs
100     util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
101         KeyValueCodecWithTags.class.getCanonicalName());
102     util.startMiniCluster();
103 
104     setupNamespace();
105   }
106 
107   protected static void setupNamespace() throws Exception {
108     util.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
109   }
110 
111   @AfterClass
112   public static void tearDownAfterClass() throws Exception {
113     util.shutdownMiniCluster();
114   }
115 
116   @Test(timeout = 120000)
117   public void testSimpleLoadWithMap() throws Exception {
118     runTest("testSimpleLoadWithMap", BloomType.NONE,
119         new byte[][][] {
120           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
121           new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
122     },  true);
123   }
124 
125   /**
126    * Test case that creates some regions and loads
127    * HFiles that fit snugly inside those regions
128    */
129   @Test(timeout = 120000)
130   public void testSimpleLoad() throws Exception {
131     runTest("testSimpleLoad", BloomType.NONE,
132         new byte[][][] {
133           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
134           new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
135     });
136   }
137 
138   @Test(timeout = 120000)
139   public void testSimpleLoadWithFileCopy() throws Exception {
140     String testName = "mytable_testSimpleLoadWithFileCopy";
141     final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
142     runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
143         false, null, new byte[][][] {
144           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
145           new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
146     }, false, true);
147   }
148 
149   /**
150    * Test case that creates some regions and loads
151    * HFiles that cross the boundaries of those regions
152    */
153   @Test(timeout = 120000)
154   public void testRegionCrossingLoad() throws Exception {
155     runTest("testRegionCrossingLoad", BloomType.NONE,
156         new byte[][][] {
157           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
158           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
159     });
160   }
161 
162   /**
163    * Test loading into a column family that has a ROW bloom filter.
164    */
165   @Test(timeout = 60000)
166   public void testRegionCrossingRowBloom() throws Exception {
167     runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
168         new byte[][][] {
169           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
170           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
171     });
172   }
173 
174   /**
175    * Test loading into a column family that has a ROWCOL bloom filter.
176    */
177   @Test(timeout = 120000)
178   public void testRegionCrossingRowColBloom() throws Exception {
179     runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
180         new byte[][][] {
181           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
182           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
183     });
184   }
185 
186   /**
187    * Test case that creates some regions and loads HFiles that have
188    * different region boundaries than the table pre-split.
189    */
190   @Test(timeout = 120000)
191   public void testSimpleHFileSplit() throws Exception {
192     runTest("testHFileSplit", BloomType.NONE,
193         new byte[][] {
194           Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
195           Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
196         },
197         new byte[][][] {
198           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
199           new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
200         }
201     );
202   }
203 
204   /**
205    * Test case that creates some regions and loads HFiles that cross the boundaries
206    * and have different region boundaries than the table pre-split.
207    */
208   @Test(timeout = 60000)
209   public void testRegionCrossingHFileSplit() throws Exception {
210     testRegionCrossingHFileSplit(BloomType.NONE);
211   }
212 
213   /**
214    * Test case that creates some regions and loads HFiles that cross the boundaries
215    * have a ROW bloom filter and a different region boundaries than the table pre-split.
216    */
217   @Test(timeout = 120000)
218   public void testRegionCrossingHFileSplitRowBloom() throws Exception {
219     testRegionCrossingHFileSplit(BloomType.ROW);
220   }
221 
222   /**
223    * Test case that creates some regions and loads HFiles that cross the boundaries
224    * have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
225    */
226   @Test(timeout = 120000)
227   public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
228     testRegionCrossingHFileSplit(BloomType.ROWCOL);
229   }
230 
231   private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
232     runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
233         new byte[][] {
234           Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
235           Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
236         },
237         new byte[][][] {
238           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
239           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
240         }
241     );
242   }
243 
244   private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
245     HTableDescriptor htd = new HTableDescriptor(tableName);
246     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
247     familyDesc.setBloomFilterType(bloomType);
248     htd.addFamily(familyDesc);
249     return htd;
250   }
251 
252   private void runTest(String testName, BloomType bloomType,
253       byte[][][] hfileRanges) throws Exception {
254     runTest(testName, bloomType, null, hfileRanges);
255   }
256   private void runTest(String testName, BloomType bloomType,
257       byte[][][] hfileRanges, boolean useMap) throws Exception {
258     runTest(testName, bloomType, null, hfileRanges, useMap);
259   }
260 
261   private void runTest(String testName, BloomType bloomType,
262       byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
263     runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
264   }
265 
266   private void runTest(String testName, BloomType bloomType,
267       byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception {
268     final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
269     final boolean preCreateTable = tableSplitKeys != null;
270 
271     // Run the test bulkloading the table to the default namespace
272     final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
273     runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
274         useMap);
275 
276     // Run the test bulkloading the table to the specified namespace
277     final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
278     runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
279         useMap);
280   }
281 
282   private void runTest(String testName, TableName tableName, BloomType bloomType,
283       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
284           throws Exception {
285     HTableDescriptor htd = buildHTD(tableName, bloomType);
286     runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
287   }
288 
289   public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
290       byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
291       byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
292       boolean copyFiles, int initRowCount, int factor, boolean managed) throws Exception {
293     Path dir = util.getDataTestDirOnTestFS(testName);
294     FileSystem fs = util.getTestFileSystem();
295     dir = dir.makeQualified(fs);
296     Path familyDir = new Path(dir, Bytes.toString(fam));
297 
298     int hfileIdx = 0;
299     Map<byte[], List<Path>> map = null;
300     List<Path> list = null;
301     if (useMap || copyFiles) {
302       list = new ArrayList<>();
303     }
304     if (useMap) {
305       map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
306       map.put(fam, list);
307     }
308     Path last = null;
309     for (byte[][] range : hfileRanges) {
310       byte[] from = range[0];
311       byte[] to = range[1];
312       Path path = new Path(familyDir, "hfile_" + hfileIdx++);
313       HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
314       if (useMap) {
315         last = path;
316         list.add(path);
317         map.put(fam, list);
318       }
319     }
320     int expectedRows = hfileIdx * factor;
321 
322     final TableName tableName = htd.getTableName();
323     if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
324       util.getHBaseAdmin().createTable(htd, tableSplitKeys);
325     }
326 
327     if (!util.getHBaseAdmin().tableExists(tableName)) {
328       util.getHBaseAdmin().createTable(htd);
329     }
330     Configuration conf = util.getConfiguration();
331     if (copyFiles) {
332       conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
333     }
334     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
335     String [] args= { dir.toString(), tableName.toString() };
336 
337     if (managed) {
338       try (HTable table = new HTable(util.getConfiguration(), tableName)) {
339         if (useMap) {
340           if (deleteFile) fs.delete(last);
341           Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
342           if (deleteFile) {
343             expectedRows -= 1000;
344             for (LoadQueueItem item : loaded.keySet()) {
345               if (item.hfilePath.getName().equals(last.getName())) {
346                 fail(last + " should be missing");
347               }
348             }
349           }
350         } else {
351           loader.run(args);
352         }
353         assertEquals(initRowCount + expectedRows, util.countRows(table));
354       }
355     } else {
356       try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
357           HTable table = (HTable) conn.getTable(tableName)) {
358         if (useMap) {
359           loader.run(null, map, tableName);
360         } else {
361           loader.run(args);
362         }
363       }
364     }
365 
366     if (copyFiles) {
367       for (Path p : list) {
368         assertTrue(p + " should exist", fs.exists(p));
369       }
370     }
371     return expectedRows;
372   }
373   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
374       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
375       boolean copyFiles) throws Exception {
376     for (boolean managed : new boolean[] { true, false }) {
377       loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
378           hfileRanges, useMap, true, copyFiles, 0, 1000, managed);
379 
380       final TableName tableName = htd.getTableName();
381       // verify staging folder has been cleaned up
382       Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
383       FileSystem fs = util.getTestFileSystem();
384       if (fs.exists(stagingBasePath)) {
385         FileStatus[] files = fs.listStatus(stagingBasePath);
386         for (FileStatus file : files) {
387           assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
388               file.getPath().getName() != "DONOTERASE");
389         }
390       }
391 
392       util.deleteTable(tableName);
393     }
394   }
395 
396   /**
397    * Test that tags survive through a bulk load that needs to split hfiles.
398    *
399    * This test depends on the "hbase.client.rpc.codec" =  KeyValueCodecWithTags so that the client
400    * can get tags in the responses.
401    */
402   @Test(timeout = 60000)
403   public void htestTagsSurviveBulkLoadSplit() throws Exception {
404     Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
405     FileSystem fs = util.getTestFileSystem();
406     dir = dir.makeQualified(fs);
407     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
408     // table has these split points
409     byte [][] tableSplitKeys = new byte[][] {
410             Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
411             Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
412     };
413 
414     // creating an hfile that has values that span the split points.
415     byte[] from = Bytes.toBytes("ddd");
416     byte[] to = Bytes.toBytes("ooo");
417     HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
418         new Path(familyDir, tn.getMethodName()+"_hfile"),
419         FAMILY, QUALIFIER, from, to, 1000);
420     int expectedRows = 1000;
421 
422     TableName tableName = TableName.valueOf(tn.getMethodName());
423     HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
424     util.getHBaseAdmin().createTable(htd, tableSplitKeys);
425 
426     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
427     String [] args= {dir.toString(), tableName.toString()};
428     loader.run(args);
429 
430     Table table = util.getConnection().getTable(tableName);
431     try {
432       assertEquals(expectedRows, util.countRows(table));
433       HFileTestUtil.verifyTags(table);
434     } finally {
435       table.close();
436     }
437 
438     util.deleteTable(tableName);
439   }
440 
441   /**
442    * Test loading into a column family that does not exist.
443    */
444   @Test(timeout = 60000)
445   public void testNonexistentColumnFamilyLoad() throws Exception {
446     String testName = "testNonexistentColumnFamilyLoad";
447     byte[][][] hFileRanges = new byte[][][] {
448       new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
449       new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
450     };
451 
452     final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
453     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
454     // set real family name to upper case in purpose to simulate the case that
455     // family name in HFiles is invalid
456     HColumnDescriptor family =
457         new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase()));
458     htd.addFamily(family);
459 
460     try {
461       runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
462       assertTrue("Loading into table with non-existent family should have failed", false);
463     } catch (Exception e) {
464       assertTrue("IOException expected", e instanceof IOException);
465       // further check whether the exception message is correct
466       String errMsg = e.getMessage();
467       assertTrue("Incorrect exception message, expected message: ["
468           + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
469           errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
470     }
471   }
472 
473   @Test(timeout = 120000)
474   public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
475     testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
476   }
477 
478   @Test(timeout = 120000)
479   public void testNonHfileFolder() throws Exception {
480     testNonHfileFolder("testNonHfileFolder", false);
481   }
482 
483   /**
484    * Write a random data file and a non-file in a dir with a valid family name
485    * but not part of the table families. we should we able to bulkload without
486    * getting the unmatched family exception. HBASE-13037/HBASE-13227
487    */
488   private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
489     Path dir = util.getDataTestDirOnTestFS(tableName);
490     FileSystem fs = util.getTestFileSystem();
491     dir = dir.makeQualified(fs);
492 
493     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
494     HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
495         FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
496     createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
497 
498     final String NON_FAMILY_FOLDER = "_logs";
499     Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
500     fs.mkdirs(nonFamilyDir);
501     fs.mkdirs(new Path(nonFamilyDir, "non-file"));
502     createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
503 
504     Table table = null;
505     try {
506       if (preCreateTable) {
507         table = util.createTable(TableName.valueOf(tableName), FAMILY);
508       } else {
509         table = util.getConnection().getTable(TableName.valueOf(tableName));
510       }
511 
512       final String[] args = {dir.toString(), tableName};
513       new LoadIncrementalHFiles(util.getConfiguration()).run(args);
514       assertEquals(500, util.countRows(table));
515     } finally {
516       if (table != null) {
517         table.close();
518       }
519       fs.delete(dir, true);
520     }
521   }
522 
523   private static void createRandomDataFile(FileSystem fs, Path path, int size)
524       throws IOException {
525     FSDataOutputStream stream = fs.create(path);
526     try {
527       byte[] data = new byte[1024];
528       for (int i = 0; i < data.length; ++i) {
529         data[i] = (byte)(i & 0xff);
530       }
531       while (size >= data.length) {
532         stream.write(data, 0, data.length);
533         size -= data.length;
534       }
535       if (size > 0) {
536         stream.write(data, 0, size);
537       }
538     } finally {
539       stream.close();
540     }
541   }
542 
543   @Test(timeout = 120000)
544   public void testSplitStoreFile() throws IOException {
545     Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
546     FileSystem fs = util.getTestFileSystem();
547     Path testIn = new Path(dir, "testhfile");
548     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
549     HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
550         Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
551 
552     Path bottomOut = new Path(dir, "bottom.out");
553     Path topOut = new Path(dir, "top.out");
554 
555     LoadIncrementalHFiles.splitStoreFile(
556         util.getConfiguration(), testIn,
557         familyDesc, Bytes.toBytes("ggg"),
558         bottomOut,
559         topOut);
560 
561     int rowCount = verifyHFile(bottomOut);
562     rowCount += verifyHFile(topOut);
563     assertEquals(1000, rowCount);
564   }
565 
566   @Test
567   public void testSplitStoreFileWithNoneToNone() throws IOException {
568     testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
569   }
570 
571   @Test
572   public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
573     testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
574   }
575 
576   @Test
577   public void testSplitStoreFileWithEncodedToNone() throws IOException {
578     testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
579   }
580 
581   @Test
582   public void testSplitStoreFileWithNoneToEncoded() throws IOException {
583     testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
584   }
585 
586   private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
587       DataBlockEncoding cfEncoding) throws IOException {
588     Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
589     FileSystem fs = util.getTestFileSystem();
590     Path testIn = new Path(dir, "testhfile");
591     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
592     familyDesc.setDataBlockEncoding(cfEncoding);
593     HFileTestUtil.createHFileWithDataBlockEncoding(
594         util.getConfiguration(), fs, testIn, bulkloadEncoding,
595         FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
596 
597     Path bottomOut = new Path(dir, "bottom.out");
598     Path topOut = new Path(dir, "top.out");
599 
600     LoadIncrementalHFiles.splitStoreFile(
601         util.getConfiguration(), testIn,
602         familyDesc, Bytes.toBytes("ggg"),
603         bottomOut,
604         topOut);
605 
606     int rowCount = verifyHFile(bottomOut);
607     rowCount += verifyHFile(topOut);
608     assertEquals(1000, rowCount);
609   }
610 
611   private int verifyHFile(Path p) throws IOException {
612     Configuration conf = util.getConfiguration();
613     HFile.Reader reader = HFile.createReader(
614         p.getFileSystem(conf), p, new CacheConfig(conf), conf);
615     reader.loadFileInfo();
616     HFileScanner scanner = reader.getScanner(false, false);
617     scanner.seekTo();
618     int count = 0;
619     do {
620       count++;
621     } while (scanner.next());
622     assertTrue(count > 0);
623     reader.close();
624     return count;
625   }
626 
627   private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
628     Integer value = map.containsKey(first)?map.get(first):0;
629     map.put(first, value+1);
630 
631     value = map.containsKey(last)?map.get(last):0;
632     map.put(last, value-1);
633   }
634 
635   @Test(timeout = 120000)
636   public void testInferBoundaries() {
637     TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
638 
639     /* Toy example
640      *     c---------i            o------p          s---------t     v------x
641      * a------e    g-----k   m-------------q   r----s            u----w
642      *
643      * Should be inferred as:
644      * a-----------------k   m-------------q   r--------------t  u---------x
645      *
646      * The output should be (m,r,u)
647      */
648 
649     String first;
650     String last;
651 
652     first = "a"; last = "e";
653     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
654 
655     first = "r"; last = "s";
656     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
657 
658     first = "o"; last = "p";
659     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
660 
661     first = "g"; last = "k";
662     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
663 
664     first = "v"; last = "x";
665     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
666 
667     first = "c"; last = "i";
668     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
669 
670     first = "m"; last = "q";
671     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
672 
673     first = "s"; last = "t";
674     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
675 
676     first = "u"; last = "w";
677     addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
678 
679     byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
680     byte[][] compare = new byte[3][];
681     compare[0] = "m".getBytes();
682     compare[1] = "r".getBytes();
683     compare[2] = "u".getBytes();
684 
685     assertEquals(keysArray.length, 3);
686 
687     for (int row = 0; row<keysArray.length; row++){
688       assertArrayEquals(keysArray[row], compare[row]);
689     }
690   }
691 
692   @Test(timeout = 60000)
693   public void testLoadTooMayHFiles() throws Exception {
694     Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
695     FileSystem fs = util.getTestFileSystem();
696     dir = dir.makeQualified(fs);
697     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
698 
699     byte[] from = Bytes.toBytes("begin");
700     byte[] to = Bytes.toBytes("end");
701     for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
702       HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
703           + i), FAMILY, QUALIFIER, from, to, 1000);
704     }
705 
706     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
707     String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
708     try {
709       loader.run(args);
710       fail("Bulk loading too many files should fail");
711     } catch (IOException ie) {
712       assertTrue(ie.getMessage().contains("Trying to load more than "
713         + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
714     }
715   }
716 
717   @Test(expected = TableNotFoundException.class)
718   public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
719     Configuration conf = util.getConfiguration();
720     conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
721     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
722     String[] args = { "directory", "nonExistingTable" };
723     loader.run(args);
724   }
725 
726   @Test(timeout = 120000)
727   public void testTableWithCFNameStartWithUnderScore() throws Exception {
728     Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
729     FileSystem fs = util.getTestFileSystem();
730     dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
731     String family = "_cf";
732     Path familyDir = new Path(dir, family);
733 
734     byte[] from = Bytes.toBytes("begin");
735     byte[] to = Bytes.toBytes("end");
736     Configuration conf = util.getConfiguration();
737     String tableName = "mytable_cfNameStartWithUnderScore";
738     Table table = util.createTable(TableName.valueOf(tableName), family);
739     HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
740       QUALIFIER, from, to, 1000);
741 
742     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
743     String[] args = { dir.toString(), tableName };
744     try {
745       loader.run(args);
746       assertEquals(1000, util.countRows(table));
747     } finally {
748       if (null != table) {
749         table.close();
750       }
751     }
752   }
753 }
754