View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.List;
22  import java.util.concurrent.CountDownLatch;
23  
24  import junit.framework.Assert;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.testclassification.MediumTests;
34  import org.apache.hadoop.hbase.TableName;
35  import org.apache.hadoop.hbase.TableNotFoundException;
36  import org.apache.hadoop.hbase.client.Admin;
37  import org.apache.hadoop.hbase.client.Connection;
38  import org.apache.hadoop.hbase.client.HBaseAdmin;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.io.hfile.HFile;
45  import org.apache.hadoop.hbase.io.hfile.HFileContext;
46  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.junit.AfterClass;
49  import org.junit.BeforeClass;
50  import org.junit.Test;
51  import org.junit.experimental.categories.Category;
52  
53  @Category(MediumTests.class)
54  public class TestScannerWithBulkload {
55    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
56  
57    @BeforeClass
58    public static void setUpBeforeClass() throws Exception {
59      TEST_UTIL.startMiniCluster(1);
60    }
61  
62    private static void createTable(Admin admin, TableName tableName) throws IOException {
63      HTableDescriptor desc = new HTableDescriptor(tableName);
64      HColumnDescriptor hcd = new HColumnDescriptor("col");
65      hcd.setMaxVersions(3);
66      desc.addFamily(hcd);
67      admin.createTable(desc);
68    }
69  
70    @Test
71    public void testBulkLoad() throws Exception {
72      TableName tableName = TableName.valueOf("testBulkLoad");
73      long l = System.currentTimeMillis();
74      HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
75      createTable(admin, tableName);
76      Scan scan = createScan();
77      final HTable table = init(admin, l, scan, tableName);
78      // use bulkload
79      final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
80        false);
81      Configuration conf = TEST_UTIL.getConfiguration();
82      conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
83      final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
84      bulkload.doBulkLoad(hfilePath, table);
85      ResultScanner scanner = table.getScanner(scan);
86      Result result = scanner.next();
87      result = scanAfterBulkLoad(scanner, result, "version2");
88      Put put0 = new Put(Bytes.toBytes("row1"));
89      put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
90          .toBytes("version3")));
91      table.put(put0);
92      admin.flush(tableName);
93      scanner = table.getScanner(scan);
94      result = scanner.next();
95      while (result != null) {
96        List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
97        for (KeyValue _kv : kvs) {
98          if (Bytes.toString(_kv.getRow()).equals("row1")) {
99            System.out.println(Bytes.toString(_kv.getRow()));
100           System.out.println(Bytes.toString(_kv.getQualifier()));
101           System.out.println(Bytes.toString(_kv.getValue()));
102           Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
103         }
104       }
105       result = scanner.next();
106     }
107     scanner.close();
108     table.close();
109   }
110 
111   private Result scanAfterBulkLoad(ResultScanner scanner, Result result, String expctedVal)
112       throws IOException {
113     while (result != null) {
114       List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
115       for (KeyValue _kv : kvs) {
116         if (Bytes.toString(_kv.getRow()).equals("row1")) {
117           System.out.println(Bytes.toString(_kv.getRow()));
118           System.out.println(Bytes.toString(_kv.getQualifier()));
119           System.out.println(Bytes.toString(_kv.getValue()));
120           Assert.assertEquals(expctedVal, Bytes.toString(_kv.getValue()));
121         }
122       }
123       result = scanner.next();
124     }
125     return result;
126   }
127 
128   // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
129   // Else, we will set BULKLOAD_TIME_KEY.
130   private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
131       throws IOException {
132     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
133     final Path hfilePath = new Path(hFilePath);
134     fs.mkdirs(hfilePath);
135     Path path = new Path(pathStr);
136     HFile.WriterFactory wf = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
137     Assert.assertNotNull(wf);
138     HFileContext context = new HFileContext();
139     HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
140     KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
141         Bytes.toBytes("version2"));
142 
143     // Set cell seq id to test bulk load native hfiles.
144     if (nativeHFile) {
145       // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
146       // Scan should only look at the seq id appended at the bulk load time, and not skip
147       // this kv.
148       kv.setSequenceId(9999999);
149     }
150 
151     writer.append(kv);
152 
153     if (nativeHFile) {
154       // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
155       // Scan should only look at the seq id appended at the bulk load time, and not skip its
156       // kv.
157       writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
158     }
159     else {
160     writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
161     }
162     writer.close();
163     return hfilePath;
164   }
165 
166   private HTable init(HBaseAdmin admin, long l, Scan scan, TableName tableName) throws Exception {
167     Connection connection = TEST_UTIL.getConnection();
168     HTable table = (HTable) connection.getTable(tableName);
169     Put put0 = new Put(Bytes.toBytes("row1"));
170     put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
171         .toBytes("version0")));
172     table.put(put0);
173     admin.flush(tableName);
174     Put put1 = new Put(Bytes.toBytes("row2"));
175     put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
176         .toBytes("version0")));
177     table.put(put1);
178     admin.flush(tableName);
179     put0 = new Put(Bytes.toBytes("row1"));
180     put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
181         .toBytes("version1")));
182     table.put(put0);
183     admin.flush(tableName);
184     admin.compact(tableName);
185 
186     ResultScanner scanner = table.getScanner(scan);
187     Result result = scanner.next();
188     List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
189     Assert.assertEquals(1, kvs.size());
190     Assert.assertEquals("version1", Bytes.toString(kvs.get(0).getValue()));
191     scanner.close();
192     return table;
193   }
194 
195   @Test
196   public void testBulkLoadWithParallelScan() throws Exception {
197     TableName tableName = TableName.valueOf("testBulkLoadWithParallelScan");
198       final long l = System.currentTimeMillis();
199     HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
200     createTable(admin, tableName);
201     Scan scan = createScan();
202     final HTable table = init(admin, l, scan, tableName);
203     // use bulkload
204     final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
205         "/temp/testBulkLoadWithParallelScan/col/file", false);
206     Configuration conf = TEST_UTIL.getConfiguration();
207     conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
208     final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
209     ResultScanner scanner = table.getScanner(scan);
210     // Create a scanner and then do bulk load
211     final CountDownLatch latch = new CountDownLatch(1);
212     new Thread() {
213       public void run() {
214         try {
215           Put put1 = new Put(Bytes.toBytes("row5"));
216           put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
217               Bytes.toBytes("version0")));
218           table.put(put1);
219           bulkload.doBulkLoad(hfilePath, (HTable) table);
220           latch.countDown();
221         } catch (TableNotFoundException e) {
222         } catch (IOException e) {
223         }
224       }
225     }.start();
226     latch.await();
227     // By the time we do next() the bulk loaded files are also added to the kv
228     // scanner
229     Result result = scanner.next();
230     scanAfterBulkLoad(scanner, result, "version1");
231     scanner.close();
232     table.close();
233 
234   }
235 
236   @Test
237   public void testBulkLoadNativeHFile() throws Exception {
238     TableName tableName = TableName.valueOf("testBulkLoadNativeHFile");
239     long l = System.currentTimeMillis();
240     HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
241     createTable(admin, tableName);
242     Scan scan = createScan();
243     final HTable table = init(admin, l, scan, tableName);
244     // use bulkload
245     final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
246       "/temp/testBulkLoadNativeHFile/col/file", true);
247     Configuration conf = TEST_UTIL.getConfiguration();
248     conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
249     final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
250     bulkload.doBulkLoad(hfilePath, table);
251     ResultScanner scanner = table.getScanner(scan);
252     Result result = scanner.next();
253     // We had 'version0', 'version1' for 'row1,col:q' in the table.
254     // Bulk load added 'version2'  scanner should be able to see 'version2'
255     result = scanAfterBulkLoad(scanner, result, "version2");
256     Put put0 = new Put(Bytes.toBytes("row1"));
257     put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
258         .toBytes("version3")));
259     table.put(put0);
260     admin.flush(tableName);
261     scanner = table.getScanner(scan);
262     result = scanner.next();
263     while (result != null) {
264       List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
265       for (KeyValue _kv : kvs) {
266         if (Bytes.toString(_kv.getRow()).equals("row1")) {
267           System.out.println(Bytes.toString(_kv.getRow()));
268           System.out.println(Bytes.toString(_kv.getQualifier()));
269           System.out.println(Bytes.toString(_kv.getValue()));
270           Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
271         }
272       }
273       result = scanner.next();
274     }
275     scanner.close();
276     table.close();
277   }
278 
279   private Scan createScan() {
280     Scan scan = new Scan();
281     scan.setMaxVersions(3);
282     return scan;
283   }
284 
285   @AfterClass
286   public static void tearDownAfterClass() throws Exception {
287     TEST_UTIL.shutdownMiniCluster();
288   }
289 }