1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableSet;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellComparator;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.KeyValue.KVComparator;
42 import org.apache.hadoop.hbase.KeyValue.Type;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.Tag;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.filter.Filter;
47 import org.apache.hadoop.hbase.filter.FilterList;
48 import org.apache.hadoop.hbase.io.compress.Compression;
49 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
51 import org.apache.hadoop.hbase.io.hfile.HFileContext;
52 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
53 import org.apache.hadoop.hbase.master.TableLockManager;
54 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
55 import org.apache.hadoop.hbase.mob.MobCacheConfig;
56 import org.apache.hadoop.hbase.mob.MobConstants;
57 import org.apache.hadoop.hbase.mob.MobFile;
58 import org.apache.hadoop.hbase.mob.MobFileName;
59 import org.apache.hadoop.hbase.mob.MobStoreEngine;
60 import org.apache.hadoop.hbase.mob.MobUtils;
61 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
62 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
65 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
66 import org.apache.hadoop.hbase.util.IdLock;
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 @InterfaceAudience.Private
85 public class HMobStore extends HStore {
86 private static final Log LOG = LogFactory.getLog(HMobStore.class);
87 private MobCacheConfig mobCacheConfig;
88 private Path homePath;
89 private Path mobFamilyPath;
90 private volatile long cellsCountCompactedToMob = 0;
91 private volatile long cellsCountCompactedFromMob = 0;
92 private volatile long cellsSizeCompactedToMob = 0;
93 private volatile long cellsSizeCompactedFromMob = 0;
94 private volatile long mobFlushCount = 0;
95 private volatile long mobFlushedCellsCount = 0;
96 private volatile long mobFlushedCellsSize = 0;
97 private volatile long mobScanCellsCount = 0;
98 private volatile long mobScanCellsSize = 0;
99 private HColumnDescriptor family;
100 private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>();
101 private final IdLock keyLock = new IdLock();
102
103 public HMobStore(final HRegion region, final HColumnDescriptor family,
104 final Configuration confParam) throws IOException {
105 super(region, family, confParam);
106 this.family = family;
107 this.mobCacheConfig = (MobCacheConfig) cacheConf;
108 this.homePath = MobUtils.getMobHome(conf);
109 this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
110 family.getNameAsString());
111 List<Path> locations = new ArrayList<Path>(2);
112 locations.add(mobFamilyPath);
113 TableName tn = region.getTableDesc().getTableName();
114 locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
115 .getEncodedName(), family.getNameAsString()));
116 map.put(Bytes.toString(tn.getName()), locations);
117 }
118
119
120
121
122 @Override
123 protected void createCacheConf(HColumnDescriptor family) {
124 cacheConf = new MobCacheConfig(conf, family);
125 }
126
127
128
129
130 public Configuration getConfiguration() {
131 return this.conf;
132 }
133
134
135
136
137
138 @Override
139 protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
140 long readPt, KeyValueScanner scanner) throws IOException {
141 if (scanner == null) {
142 if (MobUtils.isRefOnlyScan(scan)) {
143 Filter refOnlyFilter = new MobReferenceOnlyFilter();
144 Filter filter = scan.getFilter();
145 if (filter != null) {
146 scan.setFilter(new FilterList(filter, refOnlyFilter));
147 } else {
148 scan.setFilter(refOnlyFilter);
149 }
150 }
151 scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
152 targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
153 }
154 return scanner;
155 }
156
157
158
159
160 @Override
161 protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
162 KVComparator cellComparator) throws IOException {
163 MobStoreEngine engine = new MobStoreEngine();
164 engine.createComponents(conf, store, cellComparator);
165 return engine;
166 }
167
168
169
170
171
172 private Path getTempDir() {
173 return new Path(homePath, MobConstants.TEMP_DIR_NAME);
174 }
175
176
177
178
179
180
181
182
183
184
185 public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
186 Compression.Algorithm compression, byte[] startKey) throws IOException {
187 if (startKey == null) {
188 startKey = HConstants.EMPTY_START_ROW;
189 }
190 Path path = getTempDir();
191 return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205 public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
206 Compression.Algorithm compression, byte[] startKey) throws IOException {
207 if (startKey == null) {
208 startKey = HConstants.EMPTY_START_ROW;
209 }
210 Path path = getTempDir();
211 String suffix = UUID
212 .randomUUID().toString().replaceAll("-", "") + "_del";
213 MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
214 return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
215 }
216
217
218
219
220
221
222
223
224
225
226
227 public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
228 Compression.Algorithm compression, byte[] startKey) throws IOException {
229 MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
230 .toString().replaceAll("-", ""));
231 return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
232 }
233
234
235
236
237
238
239
240
241
242
243 public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath, long maxKeyCount,
244 Compression.Algorithm compression) throws IOException {
245 final CacheConfig writerCacheConf = mobCacheConfig;
246 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
247 .withIncludesMvcc(true).withIncludesTags(true)
248 .withCompressTags(family.isCompressTags())
249 .withChecksumType(checksumType)
250 .withBytesPerCheckSum(bytesPerChecksum)
251 .withBlockSize(blocksize)
252 .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
253 .withEncryptionContext(cryptoContext)
254 .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
255
256 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
257 .withFilePath(new Path(basePath, mobFileName.getFileName()))
258 .withComparator(KeyValue.COMPARATOR).withBloomType(BloomType.NONE)
259 .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
260 return w;
261 }
262
263
264
265
266
267
268
269 public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
270 if (sourceFile == null) {
271 return;
272 }
273 Path dstPath = new Path(targetPath, sourceFile.getName());
274 validateMobFile(sourceFile);
275 String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
276 LOG.info(msg);
277 Path parent = dstPath.getParent();
278 if (!region.getFilesystem().exists(parent)) {
279 region.getFilesystem().mkdirs(parent);
280 }
281 if (!region.getFilesystem().rename(sourceFile, dstPath)) {
282 throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
283 }
284 }
285
286
287
288
289
290
291 private void validateMobFile(Path path) throws IOException {
292 StoreFile storeFile = null;
293 try {
294 storeFile =
295 new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
296 storeFile.createReader();
297 } catch (IOException e) {
298 LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
299 throw e;
300 } finally {
301 if (storeFile != null) {
302 storeFile.closeReader(false);
303 }
304 }
305 }
306
307
308
309
310
311
312
313
314
315 public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
316 return resolve(reference, cacheBlocks, -1, true);
317 }
318
319
320
321
322
323
324
325
326
327
328
329 public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
330 boolean readEmptyValueOnMobCellMiss) throws IOException {
331 Cell result = null;
332 if (MobUtils.hasValidMobRefCellValue(reference)) {
333 String fileName = MobUtils.getMobFileName(reference);
334 Tag tableNameTag = MobUtils.getTableNameTag(reference);
335 if (tableNameTag != null) {
336 byte[] tableName = tableNameTag.getValue();
337 String tableNameString = Bytes.toString(tableName);
338 List<Path> locations = map.get(tableNameString);
339 if (locations == null) {
340 IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
341 try {
342 locations = map.get(tableNameString);
343 if (locations == null) {
344 locations = new ArrayList<Path>(2);
345 TableName tn = TableName.valueOf(tableName);
346 locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
347 locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
348 .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
349 map.put(tableNameString, locations);
350 }
351 } finally {
352 keyLock.releaseLockEntry(lockEntry);
353 }
354 }
355 result = readCell(locations, fileName, reference, cacheBlocks, readPt,
356 readEmptyValueOnMobCellMiss);
357 }
358 }
359 if (result == null) {
360 LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
361 + "qualifier,timestamp,type and tags but with an empty value to return.");
362 result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
363 reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
364 reference.getFamilyLength(), reference.getQualifierArray(),
365 reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
366 Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
367 0, 0, reference.getTagsArray(), reference.getTagsOffset(),
368 reference.getTagsLength());
369 }
370 return result;
371 }
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389 private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
390 long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
391 FileSystem fs = getFileSystem();
392 Throwable throwable = null;
393 for (Path location : locations) {
394 MobFile file = null;
395 Path path = new Path(location, fileName);
396 try {
397 file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
398 return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
399 cacheMobBlocks);
400 } catch (IOException e) {
401 mobCacheConfig.getMobFileCache().evictFile(fileName);
402 throwable = e;
403 if ((e instanceof FileNotFoundException) ||
404 (e.getCause() instanceof FileNotFoundException)) {
405 LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
406 } else if (e instanceof CorruptHFileException) {
407 LOG.error("The mob file " + path + " is corrupt", e);
408 break;
409 } else {
410 throw e;
411 }
412 } catch (NullPointerException e) {
413 mobCacheConfig.getMobFileCache().evictFile(fileName);
414 LOG.warn("Fail to read the cell", e);
415 throwable = e;
416 } catch (AssertionError e) {
417 mobCacheConfig.getMobFileCache().evictFile(fileName);
418 LOG.warn("Fail to read the cell", e);
419 throwable = e;
420 } finally {
421 if (file != null) {
422 mobCacheConfig.getMobFileCache().closeFile(file);
423 }
424 }
425 }
426 LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
427 + " or it is corrupt");
428 if (readEmptyValueOnMobCellMiss) {
429 return null;
430 } else if (throwable instanceof IOException) {
431 throw (IOException) throwable;
432 } else {
433 throw new IOException(throwable);
434 }
435 }
436
437
438
439
440
441 public Path getPath() {
442 return mobFamilyPath;
443 }
444
445 public void updateCellsCountCompactedToMob(long count) {
446 cellsCountCompactedToMob += count;
447 }
448
449 public long getCellsCountCompactedToMob() {
450 return cellsCountCompactedToMob;
451 }
452
453 public void updateCellsCountCompactedFromMob(long count) {
454 cellsCountCompactedFromMob += count;
455 }
456
457 public long getCellsCountCompactedFromMob() {
458 return cellsCountCompactedFromMob;
459 }
460
461 public void updateCellsSizeCompactedToMob(long size) {
462 cellsSizeCompactedToMob += size;
463 }
464
465 public long getCellsSizeCompactedToMob() {
466 return cellsSizeCompactedToMob;
467 }
468
469 public void updateCellsSizeCompactedFromMob(long size) {
470 cellsSizeCompactedFromMob += size;
471 }
472
473 public long getCellsSizeCompactedFromMob() {
474 return cellsSizeCompactedFromMob;
475 }
476
477 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT")
478 public void updateMobFlushCount() {
479 mobFlushCount++;
480 }
481
482 public long getMobFlushCount() {
483 return mobFlushCount;
484 }
485
486 public void updateMobFlushedCellsCount(long count) {
487 mobFlushedCellsCount += count;
488 }
489
490 public long getMobFlushedCellsCount() {
491 return mobFlushedCellsCount;
492 }
493
494 public void updateMobFlushedCellsSize(long size) {
495 mobFlushedCellsSize += size;
496 }
497
498 public long getMobFlushedCellsSize() {
499 return mobFlushedCellsSize;
500 }
501
502 public void updateMobScanCellsCount(long count) {
503 mobScanCellsCount += count;
504 }
505
506 public long getMobScanCellsCount() {
507 return mobScanCellsCount;
508 }
509
510 public void updateMobScanCellsSize(long size) {
511 mobScanCellsSize += size;
512 }
513
514 public long getMobScanCellsSize() {
515 return mobScanCellsSize;
516 }
517 }