View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.assertTrue;
25  
26  import java.io.IOException;
27  import java.util.ArrayList;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Random;
32  import java.util.Set;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileStatus;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.*;
41  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
42  import org.apache.hadoop.hbase.client.Delete;
43  import org.apache.hadoop.hbase.client.Durability;
44  import org.apache.hadoop.hbase.client.Put;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
47  import org.apache.hadoop.hbase.io.hfile.HFile;
48  import org.apache.hadoop.hbase.io.hfile.HFileContext;
49  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
50  import org.apache.hadoop.hbase.mob.MobConstants;
51  import org.apache.hadoop.hbase.mob.MobUtils;
52  import org.apache.hadoop.hbase.testclassification.MediumTests;
53  import org.apache.hadoop.hbase.util.Bytes;
54  import org.apache.hadoop.hbase.util.FSUtils;
55  import org.apache.hadoop.hbase.util.Pair;
56  import org.junit.After;
57  import org.junit.AfterClass;
58  import org.junit.BeforeClass;
59  import org.junit.Rule;
60  import org.junit.Test;
61  import org.junit.experimental.categories.Category;
62  import org.junit.rules.TestName;
63  
64  /**
65   * Test mob store compaction
66   */
67  @Category(MediumTests.class)
68  public class TestMobStoreCompaction {
69    @Rule
70    public TestName name = new TestName();
71    static final Log LOG = LogFactory.getLog(TestMobStoreCompaction.class.getName());
72    private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
73    private Configuration conf = null;
74  
75    private HRegion region = null;
76    private HTableDescriptor htd = null;
77    private HColumnDescriptor hcd = null;
78    private long mobCellThreshold = 1000;
79  
80    private FileSystem fs;
81  
82    private static final byte[] COLUMN_FAMILY = fam1;
83    private final byte[] STARTROW = Bytes.toBytes(START_KEY);
84    private int compactionThreshold;
85  
86    @BeforeClass
87    public static void setUpBeforeClass() throws Exception {
88      UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
89      UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
90      UTIL.startMiniCluster(1);
91    }
92  
93    @AfterClass
94    public static void tearDownAfterClass() throws Exception {
95      UTIL.shutdownMiniCluster();
96    }
97  
98    private void init(Configuration conf, long mobThreshold) throws Exception {
99      this.conf = conf;
100     this.mobCellThreshold = mobThreshold;
101     HBaseTestingUtility UTIL = new HBaseTestingUtility(conf);
102 
103     compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
104     htd = UTIL.createTableDescriptor(name.getMethodName());
105     hcd = new HColumnDescriptor(COLUMN_FAMILY);
106     hcd.setMobEnabled(true);
107     hcd.setMobThreshold(mobThreshold);
108     hcd.setMaxVersions(1);
109     htd.modifyFamily(hcd);
110 
111     region = UTIL.createLocalHRegion(htd, null, null);
112     fs = FileSystem.get(conf);
113   }
114 
115   @After
116   public void tearDown() throws Exception {
117     region.close();
118     fs.delete(UTIL.getDataTestDir(), true);
119   }
120 
121   /**
122    * During compaction, cells smaller than the threshold won't be affected.
123    */
124   @Test
125   public void testSmallerValue() throws Exception {
126     init(UTIL.getConfiguration(), 500);
127     byte[] dummyData = makeDummyData(300); // smaller than mob threshold
128     HRegionIncommon loader = new HRegionIncommon(region);
129     // one hfile per row
130     for (int i = 0; i < compactionThreshold; i++) {
131       Put p = createPut(i, dummyData);
132       loader.put(p);
133       loader.flushcache();
134     }
135     assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
136     assertEquals("Before compaction: mob file count", 0, countMobFiles());
137     assertEquals("Before compaction: rows", compactionThreshold, countRows());
138     assertEquals("Before compaction: mob rows", 0, countMobRows());
139 
140     region.compactStores();
141 
142     assertEquals("After compaction: store files", 1, countStoreFiles());
143     assertEquals("After compaction: mob file count", 0, countMobFiles());
144     assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
145     assertEquals("After compaction: rows", compactionThreshold, countRows());
146     assertEquals("After compaction: mob rows", 0, countMobRows());
147   }
148 
149   /**
150    * During compaction, the mob threshold size is changed.
151    */
152   @Test
153   public void testLargerValue() throws Exception {
154     init(UTIL.getConfiguration(), 200);
155     byte[] dummyData = makeDummyData(300); // larger than mob threshold
156     HRegionIncommon loader = new HRegionIncommon(region);
157     for (int i = 0; i < compactionThreshold; i++) {
158       Put p = createPut(i, dummyData);
159       loader.put(p);
160       loader.flushcache();
161     }
162     assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
163     assertEquals("Before compaction: mob file count", compactionThreshold, countMobFiles());
164     assertEquals("Before compaction: rows", compactionThreshold, countRows());
165     assertEquals("Before compaction: mob rows", compactionThreshold, countMobRows());
166     assertEquals("Before compaction: number of mob cells", compactionThreshold,
167         countMobCellsInMetadata());
168     // Change the threshold larger than the data size
169     region.getTableDesc().getFamily(COLUMN_FAMILY).setMobThreshold(500);
170     region.initialize();
171     region.compactStores();
172 
173     assertEquals("After compaction: store files", 1, countStoreFiles());
174     assertEquals("After compaction: mob file count", compactionThreshold, countMobFiles());
175     assertEquals("After compaction: referenced mob file count", 0, countReferencedMobFiles());
176     assertEquals("After compaction: rows", compactionThreshold, countRows());
177     assertEquals("After compaction: mob rows", 0, countMobRows());
178   }
179 
180   /**
181    * This test will first generate store files, then bulk load them and trigger the compaction.
182    * When compaction, the cell value will be larger than the threshold.
183    */
184   @Test
185   public void testMobCompactionWithBulkload() throws Exception {
186     // The following will produce store files of 600.
187     init(UTIL.getConfiguration(), 300);
188     byte[] dummyData = makeDummyData(600);
189 
190     Path hbaseRootDir = FSUtils.getRootDir(conf);
191     Path basedir = new Path(hbaseRootDir, htd.getNameAsString());
192     List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
193     for (int i = 0; i < compactionThreshold; i++) {
194       Path hpath = new Path(basedir, "hfile" + i);
195       hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
196       createHFile(hpath, i, dummyData);
197     }
198 
199     // The following will bulk load the above generated store files and compact, with 600(fileSize)
200     // > 300(threshold)
201     Map<byte[], List<Path>> map = region.bulkLoadHFiles(hfiles, true, null);
202     assertTrue("Bulkload result:", !map.isEmpty());
203     assertEquals("Before compaction: store files", compactionThreshold, countStoreFiles());
204     assertEquals("Before compaction: mob file count", 0, countMobFiles());
205     assertEquals("Before compaction: rows", compactionThreshold, countRows());
206     assertEquals("Before compaction: mob rows", 0, countMobRows());
207     assertEquals("Before compaction: referenced mob file count", 0, countReferencedMobFiles());
208 
209     region.compactStores();
210 
211     assertEquals("After compaction: store files", 1, countStoreFiles());
212     assertEquals("After compaction: mob file count:", 1, countMobFiles());
213     assertEquals("After compaction: rows", compactionThreshold, countRows());
214     assertEquals("After compaction: mob rows", compactionThreshold, countMobRows());
215     assertEquals("After compaction: referenced mob file count", 1, countReferencedMobFiles());
216     assertEquals("After compaction: number of mob cells", compactionThreshold,
217         countMobCellsInMetadata());
218   }
219 
220   @Test
221   public void testMajorCompactionAfterDelete() throws Exception {
222     init(UTIL.getConfiguration(), 100);
223     byte[] dummyData = makeDummyData(200); // larger than mob threshold
224     HRegionIncommon loader = new HRegionIncommon(region);
225     // create hfiles and mob hfiles but don't trigger compaction
226     int numHfiles = compactionThreshold - 1;
227     byte[] deleteRow = Bytes.add(STARTROW, Bytes.toBytes(0));
228     for (int i = 0; i < numHfiles; i++) {
229       Put p = createPut(i, dummyData);
230       loader.put(p);
231       loader.flushcache();
232     }
233     assertEquals("Before compaction: store files", numHfiles, countStoreFiles());
234     assertEquals("Before compaction: mob file count", numHfiles, countMobFiles());
235     assertEquals("Before compaction: rows", numHfiles, countRows());
236     assertEquals("Before compaction: mob rows", numHfiles, countMobRows());
237     assertEquals("Before compaction: number of mob cells", numHfiles, countMobCellsInMetadata());
238     // now let's delete some cells that contain mobs
239     Delete delete = new Delete(deleteRow);
240     delete.addFamily(COLUMN_FAMILY);
241     region.delete(delete);
242     loader.flushcache();
243 
244     assertEquals("Before compaction: store files", numHfiles + 1, countStoreFiles());
245     assertEquals("Before compaction: mob files", numHfiles, countMobFiles());
246     // region.compactStores();
247     region.compact(true);
248     assertEquals("After compaction: store files", 1, countStoreFiles());
249     // still have original mob hfiles and now added a mob del file
250     assertEquals("After compaction: mob files", numHfiles + 1, countMobFiles());
251 
252     Scan scan = new Scan();
253     scan.setRaw(true);
254     InternalScanner scanner = region.getScanner(scan);
255     List<Cell> results = new ArrayList<>();
256     scanner.next(results);
257     int deleteCount = 0;
258     while (!results.isEmpty()) {
259       for (Cell c : results) {
260         if (c.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
261           deleteCount++;
262           assertTrue(Bytes.equals(CellUtil.cloneRow(c), deleteRow));
263         }
264       }
265       results.clear();
266       scanner.next(results);
267     }
268     // assert the delete mark is retained after the major compaction
269     assertEquals(1, deleteCount);
270     scanner.close();
271     // assert the deleted cell is not counted
272     assertEquals("The cells in mob files", numHfiles - 1, countMobCellsInMobFiles(1));
273   }
274 
275   private int countStoreFiles() throws IOException {
276     Store store = region.getStore(COLUMN_FAMILY);
277     return store.getStorefilesCount();
278   }
279 
280   private int countMobFiles() throws IOException {
281     Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
282         hcd.getNameAsString());
283     if (fs.exists(mobDirPath)) {
284       FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
285       return files.length;
286     }
287     return 0;
288   }
289 
290   private long countMobCellsInMetadata() throws IOException {
291     long mobCellsCount = 0;
292     Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
293         hcd.getNameAsString());
294     Configuration copyOfConf = new Configuration(conf);
295     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
296     CacheConfig cacheConfig = new CacheConfig(copyOfConf);
297     if (fs.exists(mobDirPath)) {
298       FileStatus[] files = UTIL.getTestFileSystem().listStatus(mobDirPath);
299       for (FileStatus file : files) {
300         StoreFile sf = new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE);
301         Map<byte[], byte[]> fileInfo = sf.createReader().loadFileInfo();
302         byte[] count = fileInfo.get(StoreFile.MOB_CELLS_COUNT);
303         assertTrue(count != null);
304         mobCellsCount += Bytes.toLong(count);
305       }
306     }
307     return mobCellsCount;
308   }
309 
310   private Put createPut(int rowIdx, byte[] dummyData) throws IOException {
311     Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)));
312     p.setDurability(Durability.SKIP_WAL);
313     p.addColumn(COLUMN_FAMILY, Bytes.toBytes("colX"), dummyData);
314     return p;
315   }
316 
317   /**
318    * Create an HFile with the given number of bytes
319    */
320   private void createHFile(Path path, int rowIdx, byte[] dummyData) throws IOException {
321     HFileContext meta = new HFileContextBuilder().build();
322     HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
323         .withFileContext(meta).create();
324     long now = System.currentTimeMillis();
325     try {
326       KeyValue kv = new KeyValue(Bytes.add(STARTROW, Bytes.toBytes(rowIdx)), COLUMN_FAMILY,
327           Bytes.toBytes("colX"), now, dummyData);
328       writer.append(kv);
329     } finally {
330       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
331       writer.close();
332     }
333   }
334 
335   private int countMobRows() throws IOException {
336     Scan scan = new Scan();
337     // Do not retrieve the mob data when scanning
338     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
339     InternalScanner scanner = region.getScanner(scan);
340 
341     int scannedCount = 0;
342     List<Cell> results = new ArrayList<>();
343     boolean hasMore = true;
344     while (hasMore) {
345       hasMore = scanner.next(results);
346       for (Cell c : results) {
347         if (MobUtils.isMobReferenceCell(c)) {
348           scannedCount++;
349         }
350       }
351       results.clear();
352     }
353     scanner.close();
354 
355     return scannedCount;
356   }
357 
358   private int countRows() throws IOException {
359     Scan scan = new Scan();
360     // Do not retrieve the mob data when scanning
361     InternalScanner scanner = region.getScanner(scan);
362 
363     int scannedCount = 0;
364     List<Cell> results = new ArrayList<Cell>();
365     boolean hasMore = true;
366     while (hasMore) {
367       hasMore = scanner.next(results);
368       scannedCount += results.size();
369       results.clear();
370     }
371     scanner.close();
372 
373     return scannedCount;
374   }
375 
376   private byte[] makeDummyData(int size) {
377     byte[] dummyData = new byte[size];
378     new Random().nextBytes(dummyData);
379     return dummyData;
380   }
381 
382   private int countReferencedMobFiles() throws IOException {
383     Scan scan = new Scan();
384     // Do not retrieve the mob data when scanning
385     scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
386     InternalScanner scanner = region.getScanner(scan);
387 
388     List<Cell> kvs = new ArrayList<>();
389     boolean hasMore = true;
390     String fileName;
391     Set<String> files = new HashSet<>();
392     do {
393       kvs.clear();
394       hasMore = scanner.next(kvs);
395       for (Cell kv : kvs) {
396         if (!MobUtils.isMobReferenceCell(kv)) {
397           continue;
398         }
399         if (!MobUtils.hasValidMobRefCellValue(kv)) {
400           continue;
401         }
402         int size = MobUtils.getMobValueLength(kv);
403         if (size <= mobCellThreshold) {
404           continue;
405         }
406         fileName = MobUtils.getMobFileName(kv);
407         if (fileName.isEmpty()) {
408           continue;
409         }
410         files.add(fileName);
411         Path familyPath = MobUtils.getMobFamilyPath(conf, htd.getTableName(),
412             hcd.getNameAsString());
413         assertTrue(fs.exists(new Path(familyPath, fileName)));
414       }
415     } while (hasMore);
416 
417     scanner.close();
418 
419     return files.size();
420   }
421 
422   private int countMobCellsInMobFiles(int expectedNumDelfiles) throws IOException {
423     Configuration copyOfConf = new Configuration(conf);
424     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
425     CacheConfig cacheConfig = new CacheConfig(copyOfConf);
426     Path mobDirPath = new Path(MobUtils.getMobRegionPath(conf, htd.getTableName()),
427         hcd.getNameAsString());
428     List<StoreFile> sfs = new ArrayList<>();
429     int numDelfiles = 0;
430     int size = 0;
431     if (fs.exists(mobDirPath)) {
432       for (FileStatus f : fs.listStatus(mobDirPath)) {
433         StoreFile sf = new StoreFile(fs, f.getPath(), conf, cacheConfig, BloomType.NONE);
434         sfs.add(sf);
435         if (StoreFileInfo.isDelFile(sf.getPath())) {
436           numDelfiles++;
437         }
438       }
439       List scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, null,
440           HConstants.LATEST_TIMESTAMP);
441       Scan scan = new Scan();
442       scan.setMaxVersions(hcd.getMaxVersions());
443       long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
444       long ttl = HStore.determineTTLFromFamily(hcd);
445       ScanInfo scanInfo = new ScanInfo(hcd, ttl, timeToPurgeDeletes, KeyValue.COMPARATOR);
446       StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null,
447           scanners, 0L, HConstants.LATEST_TIMESTAMP);
448       List<Cell> results = new ArrayList<>();
449       boolean hasMore = true;
450       while (hasMore) {
451         hasMore = scanner.next(results);
452         size += results.size();
453         results.clear();
454       }
455     }
456     // assert the number of the existing del files
457     assertEquals(expectedNumDelfiles, numDelfiles);
458     return size;
459   }
460 }