1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.InetSocketAddress;
24 import java.security.Key;
25 import java.security.KeyException;
26 import java.security.PrivilegedExceptionAction;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Iterator;
33 import java.util.List;
34 import java.util.NavigableSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletionService;
38 import java.util.concurrent.ConcurrentHashMap;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.ExecutorCompletionService;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadPoolExecutor;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.locks.ReentrantReadWriteLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.CellComparator;
53 import org.apache.hadoop.hbase.CellUtil;
54 import org.apache.hadoop.hbase.CompoundConfiguration;
55 import org.apache.hadoop.hbase.HColumnDescriptor;
56 import org.apache.hadoop.hbase.HConstants;
57 import org.apache.hadoop.hbase.HRegionInfo;
58 import org.apache.hadoop.hbase.KeyValue;
59 import org.apache.hadoop.hbase.RemoteExceptionHandler;
60 import org.apache.hadoop.hbase.TableName;
61 import org.apache.hadoop.hbase.KeyValue.KVComparator;
62 import org.apache.hadoop.hbase.Tag;
63 import org.apache.hadoop.hbase.TagType;
64 import org.apache.hadoop.hbase.classification.InterfaceAudience;
65 import org.apache.hadoop.hbase.client.Scan;
66 import org.apache.hadoop.hbase.conf.ConfigurationManager;
67 import org.apache.hadoop.hbase.io.compress.Compression;
68 import org.apache.hadoop.hbase.io.crypto.Cipher;
69 import org.apache.hadoop.hbase.io.crypto.Encryption;
70 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
71 import org.apache.hadoop.hbase.io.hfile.HFile;
72 import org.apache.hadoop.hbase.io.hfile.HFileContext;
73 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
74 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
75 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
76 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
77 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
78 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
79 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
80 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
81 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
82 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
83 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
84 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
85 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
86 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
87 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
88 import org.apache.hadoop.hbase.security.EncryptionUtil;
89 import org.apache.hadoop.hbase.security.User;
90 import org.apache.hadoop.hbase.util.Bytes;
91 import org.apache.hadoop.hbase.util.ChecksumType;
92 import org.apache.hadoop.hbase.util.ClassSize;
93 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
94 import org.apache.hadoop.hbase.util.Pair;
95 import org.apache.hadoop.hbase.util.ReflectionUtils;
96 import org.apache.hadoop.util.StringUtils;
97 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
98
99 import com.google.common.annotations.VisibleForTesting;
100 import com.google.common.base.Preconditions;
101 import com.google.common.collect.ImmutableCollection;
102 import com.google.common.collect.ImmutableList;
103 import com.google.common.collect.Lists;
104 import com.google.common.collect.Sets;
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 @InterfaceAudience.Private
130 public class HStore implements Store {
131 private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
132 public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
133 "hbase.server.compactchecker.interval.multiplier";
134 public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
135 public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
136 public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
137
138 static final Log LOG = LogFactory.getLog(HStore.class);
139
140 protected final MemStore memstore;
141
142 protected final HRegion region;
143 private final HColumnDescriptor family;
144 private final HRegionFileSystem fs;
145 protected Configuration conf;
146 protected CacheConfig cacheConf;
147 private long lastCompactSize = 0;
148 volatile boolean forceMajor = false;
149
150 static int closeCheckInterval = 0;
151 private volatile long storeSize = 0L;
152 private volatile long totalUncompressedBytes = 0L;
153
154
155
156
157
158
159
160
161
162
163 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
164 private final boolean verifyBulkLoads;
165
166 private ScanInfo scanInfo;
167
168
169 final List<StoreFile> filesCompacting = Lists.newArrayList();
170
171
172 private final Set<ChangedReadersObserver> changedReaderObservers =
173 Collections.newSetFromMap(new ConcurrentHashMap<ChangedReadersObserver, Boolean>());
174
175 protected final int blocksize;
176 private HFileDataBlockEncoder dataBlockEncoder;
177
178
179 protected ChecksumType checksumType;
180 protected int bytesPerChecksum;
181
182
183 private final KeyValue.KVComparator comparator;
184
185 final StoreEngine<?, ?, ?, ?> storeEngine;
186
187 private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
188 private volatile OffPeakHours offPeakHours;
189
190 private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
191 private int flushRetriesNumber;
192 private int pauseTime;
193
194 private long blockingFileCount;
195 private int compactionCheckMultiplier;
196
197 protected Encryption.Context cryptoContext = Encryption.Context.NONE;
198
199 private volatile long flushedCellsCount = 0;
200 private volatile long compactedCellsCount = 0;
201 private volatile long majorCompactedCellsCount = 0;
202 private volatile long flushedCellsSize = 0;
203 private volatile long flushedOutputFileSize = 0;
204 private volatile long compactedCellsSize = 0;
205 private volatile long majorCompactedCellsSize = 0;
206
207
208
209
210
211
212
213
214
215 protected HStore(final HRegion region, final HColumnDescriptor family,
216 final Configuration confParam) throws IOException {
217
218 HRegionInfo info = region.getRegionInfo();
219 this.fs = region.getRegionFileSystem();
220
221
222 fs.createStoreDir(family.getNameAsString());
223 this.region = region;
224 this.family = family;
225
226
227
228 this.conf = new CompoundConfiguration()
229 .add(confParam)
230 .addStringMap(region.getTableDesc().getConfiguration())
231 .addStringMap(family.getConfiguration())
232 .addWritableMap(family.getValues());
233 this.blocksize = family.getBlocksize();
234
235 this.dataBlockEncoder =
236 new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
237
238 this.comparator = info.getComparator();
239
240 long timeToPurgeDeletes =
241 Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
242 LOG.trace("Time to purge deletes set to " + timeToPurgeDeletes +
243 "ms in store " + this);
244
245 long ttl = determineTTLFromFamily(family);
246
247
248 scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
249 String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
250 this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
251 Configuration.class, KeyValue.KVComparator.class }, new Object[] { conf, this.comparator });
252 this.offPeakHours = OffPeakHours.getInstance(conf);
253
254
255 createCacheConf(family);
256
257 this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
258
259 this.blockingFileCount =
260 conf.getInt(BLOCKING_STOREFILES_KEY, DEFAULT_BLOCKING_STOREFILE_COUNT);
261 this.compactionCheckMultiplier = conf.getInt(
262 COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
263 if (this.compactionCheckMultiplier <= 0) {
264 LOG.error("Compaction check period multiplier must be positive, setting default: "
265 + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER);
266 this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
267 }
268
269 if (HStore.closeCheckInterval == 0) {
270 HStore.closeCheckInterval = conf.getInt(
271 "hbase.hstore.close.check.interval", 10*1000*1000
272 }
273
274 this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
275 this.storeEngine.getStoreFileManager().loadFiles(loadStoreFiles());
276
277
278 this.checksumType = getChecksumType(conf);
279
280 this.bytesPerChecksum = getBytesPerChecksum(conf);
281 flushRetriesNumber = conf.getInt(
282 "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
283 pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
284 if (flushRetriesNumber <= 0) {
285 throw new IllegalArgumentException(
286 "hbase.hstore.flush.retries.number must be > 0, not "
287 + flushRetriesNumber);
288 }
289
290
291 String cipherName = family.getEncryptionType();
292 if (cipherName != null) {
293 Cipher cipher;
294 Key key;
295 byte[] keyBytes = family.getEncryptionKey();
296 if (keyBytes != null) {
297
298 String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY,
299 User.getCurrent().getShortName());
300 try {
301
302 key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
303 } catch (KeyException e) {
304
305
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
308 }
309 String alternateKeyName =
310 conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
311 if (alternateKeyName != null) {
312 try {
313 key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
314 } catch (KeyException ex) {
315 throw new IOException(ex);
316 }
317 } else {
318 throw new IOException(e);
319 }
320 }
321
322 cipher = Encryption.getCipher(conf, key.getAlgorithm());
323 if (cipher == null) {
324 throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
325 }
326
327
328
329 if (!cipher.getName().equalsIgnoreCase(cipherName)) {
330 throw new RuntimeException("Encryption for family '" + family.getNameAsString() +
331 "' configured with type '" + cipherName +
332 "' but key specifies algorithm '" + cipher.getName() + "'");
333 }
334 } else {
335
336 cipher = Encryption.getCipher(conf, cipherName);
337 if (cipher == null) {
338 throw new RuntimeException("Cipher '" + cipherName + "' is not available");
339 }
340 key = cipher.getRandomKey();
341 }
342 cryptoContext = Encryption.newContext(conf);
343 cryptoContext.setCipher(cipher);
344 cryptoContext.setKey(key);
345 }
346 }
347
348
349
350
351
352 protected void createCacheConf(final HColumnDescriptor family) {
353 this.cacheConf = new CacheConfig(conf, family);
354 }
355
356
357
358
359
360
361
362
363
364 protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
365 KVComparator kvComparator) throws IOException {
366 return StoreEngine.create(store, conf, comparator);
367 }
368
369
370
371
372
373 public static long determineTTLFromFamily(final HColumnDescriptor family) {
374
375 long ttl = family.getTimeToLive();
376 if (ttl == HConstants.FOREVER) {
377
378 ttl = Long.MAX_VALUE;
379 } else if (ttl == -1) {
380 ttl = Long.MAX_VALUE;
381 } else {
382
383 ttl *= 1000;
384 }
385 return ttl;
386 }
387
388 @Override
389 public String getColumnFamilyName() {
390 return this.family.getNameAsString();
391 }
392
393 @Override
394 public TableName getTableName() {
395 return this.getRegionInfo().getTable();
396 }
397
398 @Override
399 public FileSystem getFileSystem() {
400 return this.fs.getFileSystem();
401 }
402
403 public HRegionFileSystem getRegionFileSystem() {
404 return this.fs;
405 }
406
407
408 @Override
409 public long getStoreFileTtl() {
410
411 return (this.scanInfo.getMinVersions() == 0) ? this.scanInfo.getTtl() : Long.MAX_VALUE;
412 }
413
414 @Override
415 public long getMemstoreFlushSize() {
416
417 return this.region.memstoreFlushSize;
418 }
419
420 @Override
421 public long getFlushableSize() {
422 return this.memstore.getFlushableSize();
423 }
424
425 @Override
426 public long getSnapshotSize() {
427 return this.memstore.getSnapshotSize();
428 }
429
430 @Override
431 public long getCompactionCheckMultiplier() {
432 return this.compactionCheckMultiplier;
433 }
434
435 @Override
436 public long getBlockingFileCount() {
437 return blockingFileCount;
438 }
439
440
441
442
443
444
445
446 public static int getBytesPerChecksum(Configuration conf) {
447 return conf.getInt(HConstants.BYTES_PER_CHECKSUM,
448 HFile.DEFAULT_BYTES_PER_CHECKSUM);
449 }
450
451
452
453
454
455
456 public static ChecksumType getChecksumType(Configuration conf) {
457 String checksumName = conf.get(HConstants.CHECKSUM_TYPE_NAME);
458 if (checksumName == null) {
459 return HFile.DEFAULT_CHECKSUM_TYPE;
460 } else {
461 return ChecksumType.nameToType(checksumName);
462 }
463 }
464
465
466
467
468 public static int getCloseCheckInterval() {
469 return closeCheckInterval;
470 }
471
472 @Override
473 public HColumnDescriptor getFamily() {
474 return this.family;
475 }
476
477
478
479
480 @Override
481 public long getMaxSequenceId() {
482 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
483 }
484
485 @Override
486 public long getMaxMemstoreTS() {
487 return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
488 }
489
490
491
492
493
494
495
496 @Deprecated
497 public static Path getStoreHomedir(final Path tabledir,
498 final HRegionInfo hri, final byte[] family) {
499 return getStoreHomedir(tabledir, hri.getEncodedName(), family);
500 }
501
502
503
504
505
506
507
508 @Deprecated
509 public static Path getStoreHomedir(final Path tabledir,
510 final String encodedName, final byte[] family) {
511 return new Path(tabledir, new Path(encodedName, Bytes.toString(family)));
512 }
513
514 @Override
515 public HFileDataBlockEncoder getDataBlockEncoder() {
516 return dataBlockEncoder;
517 }
518
519
520
521
522
523 void setDataBlockEncoderInTest(HFileDataBlockEncoder blockEncoder) {
524 this.dataBlockEncoder = blockEncoder;
525 }
526
527
528
529
530
531
532 private List<StoreFile> loadStoreFiles() throws IOException {
533 Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName());
534 return openStoreFiles(files);
535 }
536
537 private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException {
538 if (files == null || files.size() == 0) {
539 return new ArrayList<StoreFile>();
540 }
541
542 ThreadPoolExecutor storeFileOpenerThreadPool =
543 this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" +
544 this.getColumnFamilyName());
545 CompletionService<StoreFile> completionService =
546 new ExecutorCompletionService<StoreFile>(storeFileOpenerThreadPool);
547
548 int totalValidStoreFile = 0;
549 for (final StoreFileInfo storeFileInfo: files) {
550
551 completionService.submit(new Callable<StoreFile>() {
552 @Override
553 public StoreFile call() throws IOException {
554 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
555 return storeFile;
556 }
557 });
558 totalValidStoreFile++;
559 }
560
561 ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
562 IOException ioe = null;
563 try {
564 for (int i = 0; i < totalValidStoreFile; i++) {
565 try {
566 Future<StoreFile> future = completionService.take();
567 StoreFile storeFile = future.get();
568 long length = storeFile.getReader().length();
569 this.storeSize += length;
570 this.totalUncompressedBytes +=
571 storeFile.getReader().getTotalUncompressedBytes();
572 if (LOG.isDebugEnabled()) {
573 LOG.debug("loaded " + storeFile.toStringDetailed());
574 }
575 results.add(storeFile);
576 } catch (InterruptedException e) {
577 if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
578 } catch (ExecutionException e) {
579 if (ioe == null) ioe = new IOException(e.getCause());
580 }
581 }
582 } finally {
583 storeFileOpenerThreadPool.shutdownNow();
584 }
585 if (ioe != null) {
586
587 boolean evictOnClose =
588 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
589 for (StoreFile file : results) {
590 try {
591 if (file != null) file.closeReader(evictOnClose);
592 } catch (IOException e) {
593 LOG.warn(e.getMessage());
594 }
595 }
596 throw ioe;
597 }
598
599 return results;
600 }
601
602
603
604
605
606
607
608
609 @Override
610 public void refreshStoreFiles() throws IOException {
611 Collection<StoreFileInfo> newFiles = fs.getStoreFiles(getColumnFamilyName());
612 refreshStoreFilesInternal(newFiles);
613 }
614
615 @Override
616 public void refreshStoreFiles(Collection<String> newFiles) throws IOException {
617 List<StoreFileInfo> storeFiles = new ArrayList<StoreFileInfo>(newFiles.size());
618 for (String file : newFiles) {
619 storeFiles.add(fs.getStoreFileInfo(getColumnFamilyName(), file));
620 }
621 refreshStoreFilesInternal(storeFiles);
622 }
623
624
625
626
627
628
629
630
631 private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException {
632 StoreFileManager sfm = storeEngine.getStoreFileManager();
633 Collection<StoreFile> currentFiles = sfm.getStorefiles();
634 if (currentFiles == null) currentFiles = new ArrayList<StoreFile>(0);
635
636 if (newFiles == null) newFiles = new ArrayList<StoreFileInfo>(0);
637
638 HashMap<StoreFileInfo, StoreFile> currentFilesSet =
639 new HashMap<StoreFileInfo, StoreFile>(currentFiles.size());
640 for (StoreFile sf : currentFiles) {
641 currentFilesSet.put(sf.getFileInfo(), sf);
642 }
643 HashSet<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
644
645 Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, currentFilesSet.keySet());
646 Set<StoreFileInfo> toBeRemovedFiles = Sets.difference(currentFilesSet.keySet(), newFilesSet);
647
648 if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
649 return;
650 }
651
652 LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString()
653 + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
654
655 Set<StoreFile> toBeRemovedStoreFiles = new HashSet<StoreFile>(toBeRemovedFiles.size());
656 for (StoreFileInfo sfi : toBeRemovedFiles) {
657 toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
658 }
659
660
661 List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles);
662
663
664 replaceStoreFiles(toBeRemovedStoreFiles, openedFiles);
665
666
667
668
669 if (!toBeAddedFiles.isEmpty()) {
670 region.getMVCC().advanceMemstoreReadPointIfNeeded(this.getMaxSequenceId());
671 }
672
673
674 completeCompaction(toBeRemovedStoreFiles, false);
675 }
676
677 private StoreFile createStoreFileAndReader(final Path p) throws IOException {
678 StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p);
679 return createStoreFileAndReader(info);
680 }
681
682 private StoreFile createStoreFileAndReader(final StoreFileInfo info)
683 throws IOException {
684 info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
685 StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf,
686 this.family.getBloomFilterType());
687 StoreFile.Reader r = storeFile.createReader();
688 r.setReplicaStoreFile(isPrimaryReplicaStore());
689 return storeFile;
690 }
691
692 @Override
693 public Pair<Long, Cell> add(final Cell cell) {
694 lock.readLock().lock();
695 try {
696 return this.memstore.add(cell);
697 } finally {
698 lock.readLock().unlock();
699 }
700 }
701
702 @Override
703 public long timeOfOldestEdit() {
704 return memstore.timeOfOldestEdit();
705 }
706
707
708
709
710
711
712
713 protected long delete(final KeyValue kv) {
714 lock.readLock().lock();
715 try {
716 return this.memstore.delete(kv);
717 } finally {
718 lock.readLock().unlock();
719 }
720 }
721
722 @Override
723 public void rollback(final Cell cell) {
724 lock.readLock().lock();
725 try {
726 this.memstore.rollback(cell);
727 } finally {
728 lock.readLock().unlock();
729 }
730 }
731
732
733
734
735 @Override
736 public Collection<StoreFile> getStorefiles() {
737 return this.storeEngine.getStoreFileManager().getStorefiles();
738 }
739
740 @Override
741 public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
742 HFile.Reader reader = null;
743 try {
744 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
745 + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
746 reader = HFile.createReader(srcPath.getFileSystem(conf),
747 srcPath, cacheConf, conf);
748 reader.loadFileInfo();
749
750 byte[] firstKey = reader.getFirstRowKey();
751 Preconditions.checkState(firstKey != null, "First key can not be null");
752 byte[] lk = reader.getLastKey();
753 Preconditions.checkState(lk != null, "Last key can not be null");
754 byte[] lastKey = KeyValue.createKeyValueFromKey(lk).getRow();
755
756 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
757 " last=" + Bytes.toStringBinary(lastKey));
758 LOG.debug("Region bounds: first=" +
759 Bytes.toStringBinary(getRegionInfo().getStartKey()) +
760 " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey()));
761
762 if (!this.getRegionInfo().containsRange(firstKey, lastKey)) {
763 throw new WrongRegionException(
764 "Bulk load file " + srcPath.toString() + " does not fit inside region "
765 + this.getRegionInfo().getRegionNameAsString());
766 }
767
768 if(reader.length() > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
769 HConstants.DEFAULT_MAX_FILE_SIZE)) {
770 LOG.warn("Trying to bulk load hfile " + srcPath.toString() + " with size: " +
771 reader.length() + " bytes can be problematic as it may lead to oversplitting.");
772 }
773
774 if (verifyBulkLoads) {
775 long verificationStartTime = EnvironmentEdgeManager.currentTime();
776 LOG.info("Full verification started for bulk load hfile: " + srcPath.toString());
777 Cell prevCell = null;
778 HFileScanner scanner = reader.getScanner(false, false, false);
779 scanner.seekTo();
780 do {
781 Cell cell = scanner.getKeyValue();
782 if (prevCell != null) {
783 if (CellComparator.compareRows(prevCell, cell) > 0) {
784 throw new InvalidHFileException("Previous row is greater than"
785 + " current row: path=" + srcPath + " previous="
786 + CellUtil.getCellKeyAsString(prevCell) + " current="
787 + CellUtil.getCellKeyAsString(cell));
788 }
789 if (CellComparator.compareFamilies(prevCell, cell) != 0) {
790 throw new InvalidHFileException("Previous key had different"
791 + " family compared to current key: path=" + srcPath
792 + " previous="
793 + Bytes.toStringBinary(prevCell.getFamilyArray(), prevCell.getFamilyOffset(),
794 prevCell.getFamilyLength())
795 + " current="
796 + Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
797 cell.getFamilyLength()));
798 }
799 }
800 prevCell = cell;
801 } while (scanner.next());
802 LOG.info("Full verification complete for bulk load hfile: " + srcPath.toString()
803 + " took " + (EnvironmentEdgeManager.currentTime() - verificationStartTime)
804 + " ms");
805 }
806 } finally {
807 if (reader != null) reader.close();
808 }
809 }
810
811 public Pair<Path, Path> preBulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
812 Path srcPath = new Path(srcPathStr);
813 return fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
814 }
815
816 @Override
817 public Path bulkLoadHFile(byte[] family, String srcPathStr, Path dstPath) throws IOException {
818 Path srcPath = new Path(srcPathStr);
819 try {
820 fs.commitStoreFile(srcPath, dstPath);
821 } finally {
822 if (this.getCoprocessorHost() != null) {
823 this.getCoprocessorHost().postCommitStoreFile(family, srcPath, dstPath);
824 }
825 }
826 LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
827 + dstPath + " - updating store file list.");
828
829 StoreFile sf = createStoreFileAndReader(dstPath);
830 bulkLoadHFile(sf);
831
832 LOG.info("Successfully loaded store file " + srcPath + " into store " + this
833 + " (new location: " + dstPath + ")");
834
835 return dstPath;
836 }
837
838 @Override
839 public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
840 StoreFile sf = createStoreFileAndReader(fileInfo);
841 bulkLoadHFile(sf);
842 }
843
844 private void bulkLoadHFile(StoreFile sf) throws IOException {
845 StoreFile.Reader r = sf.getReader();
846 this.storeSize += r.length();
847 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
848
849
850 this.lock.writeLock().lock();
851 try {
852 this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
853 } finally {
854
855
856
857
858
859 this.lock.writeLock().unlock();
860 }
861 notifyChangedReadersObservers();
862 LOG.info("Loaded HFile " + sf.getFileInfo() + " into store " + getColumnFamilyName());
863 if (LOG.isTraceEnabled()) {
864 String traceMessage = "BULK LOAD time,size,store size,store files ["
865 + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
866 + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
867 LOG.trace(traceMessage);
868 }
869 }
870
871 @Override
872 public ImmutableCollection<StoreFile> close() throws IOException {
873 this.lock.writeLock().lock();
874 try {
875
876 ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
877
878 if (!result.isEmpty()) {
879
880 ThreadPoolExecutor storeFileCloserThreadPool = this.region
881 .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
882 + this.getColumnFamilyName());
883
884
885 CompletionService<Void> completionService =
886 new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
887 for (final StoreFile f : result) {
888 completionService.submit(new Callable<Void>() {
889 @Override
890 public Void call() throws IOException {
891 boolean evictOnClose =
892 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
893 f.closeReader(evictOnClose);
894 return null;
895 }
896 });
897 }
898
899 IOException ioe = null;
900 try {
901 for (int i = 0; i < result.size(); i++) {
902 try {
903 Future<Void> future = completionService.take();
904 future.get();
905 } catch (InterruptedException e) {
906 if (ioe == null) {
907 ioe = new InterruptedIOException();
908 ioe.initCause(e);
909 }
910 } catch (ExecutionException e) {
911 if (ioe == null) ioe = new IOException(e.getCause());
912 }
913 }
914 } finally {
915 storeFileCloserThreadPool.shutdownNow();
916 }
917 if (ioe != null) throw ioe;
918 }
919 LOG.info("Closed " + this);
920 return result;
921 } finally {
922 this.lock.writeLock().unlock();
923 }
924 }
925
926
927
928
929
930
931 void snapshot() {
932 this.lock.writeLock().lock();
933 try {
934 this.memstore.snapshot();
935 } finally {
936 this.lock.writeLock().unlock();
937 }
938 }
939
940
941
942
943
944
945
946
947
948
949 protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
950 MonitoredTask status) throws IOException {
951
952
953
954
955
956 StoreFlusher flusher = storeEngine.getStoreFlusher();
957 IOException lastException = null;
958 for (int i = 0; i < flushRetriesNumber; i++) {
959 try {
960 List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
961 Path lastPathName = null;
962 try {
963 for (Path pathName : pathNames) {
964 lastPathName = pathName;
965 validateStoreFile(pathName);
966 }
967 return pathNames;
968 } catch (Exception e) {
969 LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
970 if (e instanceof IOException) {
971 lastException = (IOException) e;
972 } else {
973 lastException = new IOException(e);
974 }
975 }
976 } catch (IOException e) {
977 LOG.warn("Failed flushing store file, retrying num=" + i, e);
978 lastException = e;
979 }
980 if (lastException != null && i < (flushRetriesNumber - 1)) {
981 try {
982 Thread.sleep(pauseTime);
983 } catch (InterruptedException e) {
984 IOException iie = new InterruptedIOException();
985 iie.initCause(e);
986 throw iie;
987 }
988 }
989 }
990 throw lastException;
991 }
992
993
994
995
996
997
998
999
1000 private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status)
1001 throws IOException {
1002
1003 Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
1004
1005 status.setStatus("Flushing " + this + ": reopening flushed file");
1006 StoreFile sf = createStoreFileAndReader(dstPath);
1007
1008 StoreFile.Reader r = sf.getReader();
1009 this.storeSize += r.length();
1010 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1011
1012 if (LOG.isInfoEnabled()) {
1013 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
1014 ", sequenceid=" + logCacheFlushId +
1015 ", filesize=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1));
1016 }
1017 return sf;
1018 }
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028 @Override
1029 public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
1030 boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
1031 throws IOException {
1032 final CacheConfig writerCacheConf;
1033 if (isCompaction) {
1034
1035 writerCacheConf = new CacheConfig(cacheConf);
1036 writerCacheConf.setCacheDataOnWrite(false);
1037 } else {
1038 writerCacheConf = cacheConf;
1039 }
1040 InetSocketAddress[] favoredNodes = null;
1041 if (region.getRegionServerServices() != null) {
1042 favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
1043 region.getRegionInfo().getEncodedName());
1044 }
1045 HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
1046 cryptoContext);
1047 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
1048 this.getFileSystem())
1049 .withFilePath(fs.createTempName())
1050 .withComparator(comparator)
1051 .withBloomType(family.getBloomFilterType())
1052 .withMaxKeyCount(maxKeyCount)
1053 .withFavoredNodes(favoredNodes)
1054 .withFileContext(hFileContext)
1055 .build();
1056 return w;
1057 }
1058
1059 private HFileContext createFileContext(Compression.Algorithm compression,
1060 boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context cryptoContext) {
1061 if (compression == null) {
1062 compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
1063 }
1064 HFileContext hFileContext = new HFileContextBuilder()
1065 .withIncludesMvcc(includeMVCCReadpoint)
1066 .withIncludesTags(includesTag)
1067 .withCompression(compression)
1068 .withCompressTags(family.isCompressTags())
1069 .withChecksumType(checksumType)
1070 .withBytesPerCheckSum(bytesPerChecksum)
1071 .withBlockSize(blocksize)
1072 .withHBaseCheckSum(true)
1073 .withDataBlockEncoding(family.getDataBlockEncoding())
1074 .withEncryptionContext(cryptoContext)
1075 .withCreateTime(EnvironmentEdgeManager.currentTime())
1076 .build();
1077 return hFileContext;
1078 }
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088 private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
1089 throws IOException {
1090 this.lock.writeLock().lock();
1091 try {
1092 this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
1093 if (snapshotId > 0) {
1094 this.memstore.clearSnapshot(snapshotId);
1095 }
1096 } finally {
1097
1098
1099
1100
1101
1102 this.lock.writeLock().unlock();
1103 }
1104
1105
1106 notifyChangedReadersObservers();
1107
1108 if (LOG.isTraceEnabled()) {
1109 long totalSize = 0;
1110 for (StoreFile sf : sfs) {
1111 totalSize += sf.getReader().length();
1112 }
1113 String traceMessage = "FLUSH time,count,size,store size,store files ["
1114 + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
1115 + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
1116 LOG.trace(traceMessage);
1117 }
1118 return needsCompaction();
1119 }
1120
1121
1122
1123
1124
1125 private void notifyChangedReadersObservers() throws IOException {
1126 for (ChangedReadersObserver o: this.changedReaderObservers) {
1127 o.updateReaders();
1128 }
1129 }
1130
1131
1132
1133
1134
1135
1136 @Override
1137 public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet,
1138 boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
1139 byte[] stopRow, long readPt) throws IOException {
1140 Collection<StoreFile> storeFilesToScan;
1141 List<KeyValueScanner> memStoreScanners;
1142 this.lock.readLock().lock();
1143 try {
1144 storeFilesToScan =
1145 this.storeEngine.getStoreFileManager().getFilesForScanOrGet(isGet, startRow, stopRow);
1146 memStoreScanners = this.memstore.getScanners(readPt);
1147 } finally {
1148 this.lock.readLock().unlock();
1149 }
1150
1151
1152
1153
1154
1155
1156 List<StoreFileScanner> sfScanners = StoreFileScanner
1157 .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
1158 readPt, isPrimaryReplicaStore());
1159 List<KeyValueScanner> scanners =
1160 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
1161 scanners.addAll(sfScanners);
1162
1163 scanners.addAll(memStoreScanners);
1164 return scanners;
1165 }
1166
1167 @Override
1168 public void addChangedReaderObserver(ChangedReadersObserver o) {
1169 this.changedReaderObservers.add(o);
1170 }
1171
1172 @Override
1173 public void deleteChangedReaderObserver(ChangedReadersObserver o) {
1174
1175 this.changedReaderObservers.remove(o);
1176 }
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225 @Override
1226 public List<StoreFile> compact(CompactionContext compaction,
1227 CompactionThroughputController throughputController) throws IOException {
1228 return compact(compaction, throughputController, null);
1229 }
1230
1231 @Override
1232 public List<StoreFile> compact(CompactionContext compaction,
1233 CompactionThroughputController throughputController, User user) throws IOException {
1234 assert compaction != null;
1235 List<StoreFile> sfs = null;
1236 CompactionRequest cr = compaction.getRequest();;
1237 try {
1238
1239
1240
1241 long compactionStartTime = EnvironmentEdgeManager.currentTime();
1242 assert compaction.hasSelection();
1243 Collection<StoreFile> filesToCompact = cr.getFiles();
1244 assert !filesToCompact.isEmpty();
1245 synchronized (filesCompacting) {
1246
1247
1248 Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
1249 }
1250
1251
1252 LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
1253 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1254 + " into tmpdir=" + fs.getTempDir() + ", totalSize="
1255 + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
1256
1257
1258 List<Path> newFiles = compaction.compact(throughputController, user);
1259
1260 long outputBytes = 0L;
1261
1262 if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
1263 LOG.warn("hbase.hstore.compaction.complete is set to false");
1264 sfs = new ArrayList<StoreFile>(newFiles.size());
1265 final boolean evictOnClose =
1266 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1267 for (Path newFile : newFiles) {
1268
1269 StoreFile sf = createStoreFileAndReader(newFile);
1270 sf.closeReader(evictOnClose);
1271 sfs.add(sf);
1272 }
1273 return sfs;
1274 }
1275
1276 sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
1277 writeCompactionWalRecord(filesToCompact, sfs);
1278 replaceStoreFiles(filesToCompact, sfs);
1279 if (cr.isMajor()) {
1280 majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
1281 majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
1282 } else {
1283 compactedCellsCount += getCompactionProgress().totalCompactingKVs;
1284 compactedCellsSize += getCompactionProgress().totalCompactedSize;
1285 }
1286
1287 for (StoreFile sf : sfs) {
1288 outputBytes += sf.getReader().length();
1289 }
1290
1291
1292 completeCompaction(filesToCompact, true);
1293
1294 long now = EnvironmentEdgeManager.currentTime();
1295 if (region.getRegionServerServices() != null
1296 && region.getRegionServerServices().getMetrics() != null) {
1297 region.getRegionServerServices().getMetrics().updateCompaction(
1298 region.getTableDesc().getTableName().getNameAsString(),
1299 cr.isMajor(), now - compactionStartTime, cr.getFiles().size(),
1300 newFiles.size(), cr.getSize(), outputBytes);
1301 }
1302
1303 logCompactionEndMessage(cr, sfs, now, compactionStartTime);
1304 return sfs;
1305 } finally {
1306 finishCompactionRequest(cr);
1307 }
1308 }
1309
1310 private List<StoreFile> moveCompatedFilesIntoPlace(
1311 final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
1312 List<StoreFile> sfs = new ArrayList<StoreFile>(newFiles.size());
1313 for (Path newFile : newFiles) {
1314 assert newFile != null;
1315 final StoreFile sf = moveFileIntoPlace(newFile);
1316 if (this.getCoprocessorHost() != null) {
1317 final Store thisStore = this;
1318 if (user == null) {
1319 getCoprocessorHost().postCompact(thisStore, sf, cr);
1320 } else {
1321 try {
1322 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1323 @Override
1324 public Void run() throws Exception {
1325 getCoprocessorHost().postCompact(thisStore, sf, cr);
1326 return null;
1327 }
1328 });
1329 } catch (InterruptedException ie) {
1330 InterruptedIOException iioe = new InterruptedIOException();
1331 iioe.initCause(ie);
1332 throw iioe;
1333 }
1334 }
1335 }
1336 assert sf != null;
1337 sfs.add(sf);
1338 }
1339 return sfs;
1340 }
1341
1342
1343 StoreFile moveFileIntoPlace(final Path newFile) throws IOException {
1344 validateStoreFile(newFile);
1345
1346 Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile);
1347 return createStoreFileAndReader(destPath);
1348 }
1349
1350
1351
1352
1353
1354
1355 private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted,
1356 Collection<StoreFile> newFiles) throws IOException {
1357 if (region.getWAL() == null) return;
1358 List<Path> inputPaths = new ArrayList<Path>(filesCompacted.size());
1359 for (StoreFile f : filesCompacted) {
1360 inputPaths.add(f.getPath());
1361 }
1362 List<Path> outputPaths = new ArrayList<Path>(newFiles.size());
1363 for (StoreFile f : newFiles) {
1364 outputPaths.add(f.getPath());
1365 }
1366 HRegionInfo info = this.region.getRegionInfo();
1367 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
1368 family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
1369 WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
1370 this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
1371 }
1372
1373 @VisibleForTesting
1374 void replaceStoreFiles(final Collection<StoreFile> compactedFiles,
1375 final Collection<StoreFile> result) throws IOException {
1376 this.lock.writeLock().lock();
1377 try {
1378 this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result);
1379 filesCompacting.removeAll(compactedFiles);
1380 } finally {
1381 this.lock.writeLock().unlock();
1382 }
1383 }
1384
1385
1386
1387
1388
1389
1390
1391 private void logCompactionEndMessage(
1392 CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) {
1393 StringBuilder message = new StringBuilder(
1394 "Completed" + (cr.isMajor() ? " major" : "") + " compaction of "
1395 + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in "
1396 + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into ");
1397 if (sfs.isEmpty()) {
1398 message.append("none, ");
1399 } else {
1400 for (StoreFile sf: sfs) {
1401 message.append(sf.getPath().getName());
1402 message.append("(size=");
1403 message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1));
1404 message.append("), ");
1405 }
1406 }
1407 message.append("total size for store is ")
1408 .append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
1409 .append(". This selection was in queue for ")
1410 .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
1411 .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
1412 .append(" to execute.");
1413 LOG.info(message.toString());
1414 if (LOG.isTraceEnabled()) {
1415 int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
1416 long resultSize = 0;
1417 for (StoreFile sf : sfs) {
1418 resultSize += sf.getReader().length();
1419 }
1420 String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
1421 + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
1422 + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
1423 LOG.trace(traceMessage);
1424 }
1425 }
1426
1427
1428
1429
1430
1431
1432
1433 @Override
1434 public void replayCompactionMarker(CompactionDescriptor compaction,
1435 boolean pickCompactionFiles, boolean removeFiles)
1436 throws IOException {
1437 LOG.debug("Completing compaction from the WAL marker");
1438 List<String> compactionInputs = compaction.getCompactionInputList();
1439 List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList());
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455 String familyName = this.getColumnFamilyName();
1456 List<String> inputFiles = new ArrayList<String>(compactionInputs.size());
1457 for (String compactionInput : compactionInputs) {
1458 Path inputPath = fs.getStoreFilePath(familyName, compactionInput);
1459 inputFiles.add(inputPath.getName());
1460 }
1461
1462
1463 List<StoreFile> inputStoreFiles = new ArrayList<StoreFile>(compactionInputs.size());
1464 for (StoreFile sf : this.getStorefiles()) {
1465 if (inputFiles.contains(sf.getPath().getName())) {
1466 inputStoreFiles.add(sf);
1467 }
1468 }
1469
1470
1471 List<StoreFile> outputStoreFiles = new ArrayList<StoreFile>(compactionOutputs.size());
1472
1473 if (pickCompactionFiles) {
1474 for (StoreFile sf : this.getStorefiles()) {
1475 compactionOutputs.remove(sf.getPath().getName());
1476 }
1477 for (String compactionOutput : compactionOutputs) {
1478 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput);
1479 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
1480 outputStoreFiles.add(storeFile);
1481 }
1482 }
1483
1484 if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
1485 LOG.info("Replaying compaction marker, replacing input files: " +
1486 inputStoreFiles + " with output files : " + outputStoreFiles);
1487 this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
1488 this.completeCompaction(inputStoreFiles, removeFiles);
1489 }
1490 }
1491
1492
1493
1494
1495
1496
1497
1498
1499 public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
1500 List<StoreFile> filesToCompact;
1501 boolean isMajor;
1502
1503 this.lock.readLock().lock();
1504 try {
1505 synchronized (filesCompacting) {
1506 filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
1507 if (!filesCompacting.isEmpty()) {
1508
1509
1510 StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
1511 int idx = filesToCompact.indexOf(last);
1512 Preconditions.checkArgument(idx != -1);
1513 filesToCompact.subList(0, idx + 1).clear();
1514 }
1515 int count = filesToCompact.size();
1516 if (N > count) {
1517 throw new RuntimeException("Not enough files");
1518 }
1519
1520 filesToCompact = filesToCompact.subList(count - N, count);
1521 isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
1522 filesCompacting.addAll(filesToCompact);
1523 Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
1524 .getStoreFileComparator());
1525 }
1526 } finally {
1527 this.lock.readLock().unlock();
1528 }
1529
1530 try {
1531
1532 List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
1533 .compactForTesting(filesToCompact, isMajor);
1534 for (Path newFile: newFiles) {
1535
1536 StoreFile sf = moveFileIntoPlace(newFile);
1537 if (this.getCoprocessorHost() != null) {
1538 this.getCoprocessorHost().postCompact(this, sf, null);
1539 }
1540 replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
1541 completeCompaction(filesToCompact, true);
1542 }
1543 } finally {
1544 synchronized (filesCompacting) {
1545 filesCompacting.removeAll(filesToCompact);
1546 }
1547 }
1548 }
1549
1550 @Override
1551 public boolean hasReferences() {
1552 return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
1553 }
1554
1555 @Override
1556 public CompactionProgress getCompactionProgress() {
1557 return this.storeEngine.getCompactor().getProgress();
1558 }
1559
1560 @Override
1561 public boolean isMajorCompaction() throws IOException {
1562 for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1563
1564 if (sf.getReader() == null) {
1565 LOG.debug("StoreFile " + sf + " has null Reader");
1566 return false;
1567 }
1568 }
1569 return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(
1570 this.storeEngine.getStoreFileManager().getStorefiles());
1571 }
1572
1573 @Override
1574 public CompactionContext requestCompaction() throws IOException {
1575 return requestCompaction(Store.NO_PRIORITY, null);
1576 }
1577
1578 @Override
1579 public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
1580 throws IOException {
1581 return requestCompaction(priority, baseRequest, null);
1582 }
1583 @Override
1584 public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
1585 User user) throws IOException {
1586
1587 if (!this.areWritesEnabled()) {
1588 return null;
1589 }
1590
1591
1592 removeUnneededFiles();
1593
1594 final CompactionContext compaction = storeEngine.createCompaction();
1595 CompactionRequest request = null;
1596 this.lock.readLock().lock();
1597 try {
1598 synchronized (filesCompacting) {
1599 final Store thisStore = this;
1600
1601 if (this.getCoprocessorHost() != null) {
1602 final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
1603 boolean override = false;
1604 if (user == null) {
1605 override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
1606 baseRequest);
1607 } else {
1608 try {
1609 override = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
1610 @Override
1611 public Boolean run() throws Exception {
1612 return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc,
1613 baseRequest);
1614 }
1615 });
1616 } catch (InterruptedException ie) {
1617 InterruptedIOException iioe = new InterruptedIOException();
1618 iioe.initCause(ie);
1619 throw iioe;
1620 }
1621 }
1622 if (override) {
1623
1624 compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
1625 }
1626 }
1627
1628
1629 if (!compaction.hasSelection()) {
1630 boolean isUserCompaction = priority == Store.PRIORITY_USER;
1631 boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
1632 offPeakCompactionTracker.compareAndSet(false, true);
1633 try {
1634 compaction.select(this.filesCompacting, isUserCompaction,
1635 mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
1636 } catch (IOException e) {
1637 if (mayUseOffPeak) {
1638 offPeakCompactionTracker.set(false);
1639 }
1640 throw e;
1641 }
1642 assert compaction.hasSelection();
1643 if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
1644
1645 offPeakCompactionTracker.set(false);
1646 }
1647 }
1648 if (this.getCoprocessorHost() != null) {
1649 if (user == null) {
1650 this.getCoprocessorHost().postCompactSelection(
1651 this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
1652 } else {
1653 try {
1654 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1655 @Override
1656 public Void run() throws Exception {
1657 getCoprocessorHost().postCompactSelection(
1658 thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest);
1659 return null;
1660 }
1661 });
1662 } catch (InterruptedException ie) {
1663 InterruptedIOException iioe = new InterruptedIOException();
1664 iioe.initCause(ie);
1665 throw iioe;
1666 }
1667 }
1668 }
1669
1670
1671 if (baseRequest != null) {
1672
1673
1674 compaction.forceSelect(
1675 baseRequest.combineWith(compaction.getRequest()));
1676 }
1677
1678 request = compaction.getRequest();
1679 final Collection<StoreFile> selectedFiles = request.getFiles();
1680 if (selectedFiles.isEmpty()) {
1681 return null;
1682 }
1683
1684 addToCompactingFiles(selectedFiles);
1685
1686
1687 this.forceMajor = this.forceMajor && !request.isMajor();
1688
1689
1690
1691 request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
1692 request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
1693 }
1694 } finally {
1695 this.lock.readLock().unlock();
1696 }
1697
1698 LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
1699 + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
1700 + (request.isAllFiles() ? " (all files)" : ""));
1701 this.region.reportCompactionRequestStart(request.isMajor());
1702 return compaction;
1703 }
1704
1705
1706 private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) {
1707 if (filesToAdd == null) return;
1708
1709 if (!Collections.disjoint(filesCompacting, filesToAdd)) {
1710 Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
1711 }
1712 filesCompacting.addAll(filesToAdd);
1713 Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
1714 }
1715
1716 private void removeUnneededFiles() throws IOException {
1717 if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) return;
1718 if (getFamily().getMinVersions() > 0) {
1719 LOG.debug("Skipping expired store file removal due to min version being " +
1720 getFamily().getMinVersions());
1721 return;
1722 }
1723 this.lock.readLock().lock();
1724 Collection<StoreFile> delSfs = null;
1725 try {
1726 synchronized (filesCompacting) {
1727 long cfTtl = getStoreFileTtl();
1728 if (cfTtl != Long.MAX_VALUE) {
1729 delSfs = storeEngine.getStoreFileManager().getUnneededFiles(
1730 EnvironmentEdgeManager.currentTime() - cfTtl, filesCompacting);
1731 addToCompactingFiles(delSfs);
1732 }
1733 }
1734 } finally {
1735 this.lock.readLock().unlock();
1736 }
1737 if (delSfs == null || delSfs.isEmpty()) return;
1738
1739 Collection<StoreFile> newFiles = new ArrayList<StoreFile>();
1740 writeCompactionWalRecord(delSfs, newFiles);
1741 replaceStoreFiles(delSfs, newFiles);
1742 completeCompaction(delSfs);
1743 LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
1744 + this + " of " + this.getRegionInfo().getRegionNameAsString()
1745 + "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
1746 }
1747
1748 @Override
1749 public void cancelRequestedCompaction(CompactionContext compaction) {
1750 finishCompactionRequest(compaction.getRequest());
1751 }
1752
1753 private void finishCompactionRequest(CompactionRequest cr) {
1754 this.region.reportCompactionRequestEnd(cr.isMajor(), cr.getFiles().size(), cr.getSize());
1755 if (cr.isOffPeak()) {
1756 offPeakCompactionTracker.set(false);
1757 cr.setOffPeak(false);
1758 }
1759 synchronized (filesCompacting) {
1760 filesCompacting.removeAll(cr.getFiles());
1761 }
1762 }
1763
1764
1765
1766
1767
1768
1769
1770 private void validateStoreFile(Path path)
1771 throws IOException {
1772 StoreFile storeFile = null;
1773 try {
1774 storeFile = createStoreFileAndReader(path);
1775 } catch (IOException e) {
1776 LOG.error("Failed to open store file : " + path
1777 + ", keeping it in tmp location", e);
1778 throw e;
1779 } finally {
1780 if (storeFile != null) {
1781 storeFile.closeReader(false);
1782 }
1783 }
1784 }
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800 @VisibleForTesting
1801 protected void completeCompaction(final Collection<StoreFile> compactedFiles)
1802 throws IOException {
1803 completeCompaction(compactedFiles, true);
1804 }
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821 @VisibleForTesting
1822 protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
1823 throws IOException {
1824 try {
1825
1826
1827
1828
1829 notifyChangedReadersObservers();
1830
1831
1832
1833 LOG.debug("Removing store files after compaction...");
1834 boolean evictOnClose =
1835 cacheConf != null? cacheConf.shouldEvictOnClose(): true;
1836 for (StoreFile compactedFile : compactedFiles) {
1837 compactedFile.closeReader(evictOnClose);
1838 }
1839 if (removeFiles) {
1840 this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
1841 }
1842 } catch (IOException e) {
1843 e = RemoteExceptionHandler.checkIOException(e);
1844 LOG.error("Failed removing compacted files in " + this +
1845 ". Files we were trying to remove are " + compactedFiles.toString() +
1846 "; some of them may have been already removed", e);
1847 }
1848
1849
1850 this.storeSize = 0L;
1851 this.totalUncompressedBytes = 0L;
1852 for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
1853 StoreFile.Reader r = hsf.getReader();
1854 if (r == null) {
1855 LOG.warn("StoreFile " + hsf + " has a null Reader");
1856 continue;
1857 }
1858 this.storeSize += r.length();
1859 this.totalUncompressedBytes += r.getTotalUncompressedBytes();
1860 }
1861 }
1862
1863
1864
1865
1866
1867 int versionsToReturn(final int wantedVersions) {
1868 if (wantedVersions <= 0) {
1869 throw new IllegalArgumentException("Number of versions must be > 0");
1870 }
1871
1872 int maxVersions = this.family.getMaxVersions();
1873 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1874 }
1875
1876
1877
1878
1879
1880
1881 static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
1882
1883
1884 if (cell.getTagsLength() > 0) {
1885
1886
1887
1888 Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
1889 cell.getTagsLength());
1890 while (i.hasNext()) {
1891 Tag t = i.next();
1892 if (TagType.TTL_TAG_TYPE == t.getType()) {
1893
1894
1895 long ts = cell.getTimestamp();
1896 assert t.getTagLength() == Bytes.SIZEOF_LONG;
1897 long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
1898 if (ts + ttl < now) {
1899 return true;
1900 }
1901
1902
1903 break;
1904 }
1905 }
1906 }
1907 return false;
1908 }
1909
1910 @Override
1911 public Cell getRowKeyAtOrBefore(final byte[] row) throws IOException {
1912
1913
1914
1915
1916
1917
1918 long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.scanInfo.getTtl();
1919
1920 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1921
1922 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1923 this.comparator, kv, ttlToUse, this.getRegionInfo().isMetaRegion());
1924 this.lock.readLock().lock();
1925 try {
1926
1927 this.memstore.getRowKeyAtOrBefore(state);
1928
1929
1930 Iterator<StoreFile> sfIterator = this.storeEngine.getStoreFileManager()
1931 .getCandidateFilesForRowKeyBefore(state.getTargetKey());
1932 while (sfIterator.hasNext()) {
1933 StoreFile sf = sfIterator.next();
1934 sfIterator.remove();
1935 boolean haveNewCandidate = rowAtOrBeforeFromStoreFile(sf, state);
1936 Cell candidate = state.getCandidate();
1937
1938 if (candidate != null && CellUtil.matchingRow(candidate, row)) {
1939 return candidate;
1940 }
1941 if (haveNewCandidate) {
1942 sfIterator = this.storeEngine.getStoreFileManager().updateCandidateFilesForRowKeyBefore(
1943 sfIterator, state.getTargetKey(), candidate);
1944 }
1945 }
1946 return state.getCandidate();
1947 } finally {
1948 this.lock.readLock().unlock();
1949 }
1950 }
1951
1952
1953
1954
1955
1956
1957
1958
1959 private boolean rowAtOrBeforeFromStoreFile(final StoreFile f,
1960 final GetClosestRowBeforeTracker state)
1961 throws IOException {
1962 StoreFile.Reader r = f.getReader();
1963 if (r == null) {
1964 LOG.warn("StoreFile " + f + " has a null Reader");
1965 return false;
1966 }
1967 if (r.getEntries() == 0) {
1968 LOG.warn("StoreFile " + f + " is a empty store file");
1969 return false;
1970 }
1971
1972 byte [] fk = r.getFirstKey();
1973 if (fk == null) return false;
1974 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1975 byte [] lk = r.getLastKey();
1976 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1977 KeyValue firstOnRow = state.getTargetKey();
1978 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1979
1980
1981 if (!state.isTargetTable(lastKV)) return false;
1982
1983
1984 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1985 }
1986
1987 HFileScanner scanner = r.getScanner(true, true, false);
1988
1989 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
1990
1991
1992 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
1993
1994 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1995 firstOnRow.getKeyLength())) {
1996 Cell kv = scanner.getKeyValue();
1997 if (!state.isTargetTable(kv)) break;
1998 if (!state.isBetterCandidate(kv)) break;
1999
2000 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
2001
2002 if (!seekToScanner(scanner, firstOnRow, firstKV)) return false;
2003
2004 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return true;
2005 }
2006 return false;
2007 }
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017 private boolean seekToScanner(final HFileScanner scanner,
2018 final KeyValue firstOnRow,
2019 final KeyValue firstKV)
2020 throws IOException {
2021 KeyValue kv = firstOnRow;
2022
2023 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
2024 int result = scanner.seekTo(kv);
2025 return result != -1;
2026 }
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038 private boolean walkForwardInSingleRow(final HFileScanner scanner,
2039 final KeyValue firstOnRow,
2040 final GetClosestRowBeforeTracker state)
2041 throws IOException {
2042 boolean foundCandidate = false;
2043 do {
2044 Cell kv = scanner.getKeyValue();
2045
2046 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
2047
2048 if (state.isTooFar(kv, firstOnRow)) break;
2049 if (state.isExpired(kv)) {
2050 continue;
2051 }
2052
2053 if (state.handle(kv)) {
2054 foundCandidate = true;
2055 break;
2056 }
2057 } while(scanner.next());
2058 return foundCandidate;
2059 }
2060
2061 @Override
2062 public boolean canSplit() {
2063 this.lock.readLock().lock();
2064 try {
2065
2066 boolean result = !hasReferences();
2067 if (!result && LOG.isDebugEnabled()) {
2068 LOG.debug("Cannot split region due to reference files being there");
2069 }
2070 return result;
2071 } finally {
2072 this.lock.readLock().unlock();
2073 }
2074 }
2075
2076 @Override
2077 public byte[] getSplitPoint() {
2078 this.lock.readLock().lock();
2079 try {
2080
2081 assert !this.getRegionInfo().isMetaRegion();
2082
2083 if (hasReferences()) {
2084 return null;
2085 }
2086 return this.storeEngine.getStoreFileManager().getSplitPoint();
2087 } catch(IOException e) {
2088 LOG.warn("Failed getting store size for " + this, e);
2089 } finally {
2090 this.lock.readLock().unlock();
2091 }
2092 return null;
2093 }
2094
2095 @Override
2096 public long getLastCompactSize() {
2097 return this.lastCompactSize;
2098 }
2099
2100 @Override
2101 public long getSize() {
2102 return storeSize;
2103 }
2104
2105 @Override
2106 public void triggerMajorCompaction() {
2107 this.forceMajor = true;
2108 }
2109
2110
2111
2112
2113
2114
2115 @Override
2116 public KeyValueScanner getScanner(Scan scan,
2117 final NavigableSet<byte []> targetCols, long readPt) throws IOException {
2118 lock.readLock().lock();
2119 try {
2120 KeyValueScanner scanner = null;
2121 if (this.getCoprocessorHost() != null) {
2122 scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
2123 }
2124 scanner = createScanner(scan, targetCols, readPt, scanner);
2125 return scanner;
2126 } finally {
2127 lock.readLock().unlock();
2128 }
2129 }
2130
2131 protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
2132 long readPt, KeyValueScanner scanner) throws IOException {
2133 if (scanner == null) {
2134 scanner = scan.isReversed() ? new ReversedStoreScanner(this,
2135 getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
2136 getScanInfo(), scan, targetCols, readPt);
2137 }
2138 return scanner;
2139 }
2140
2141 @Override
2142 public String toString() {
2143 return this.getColumnFamilyName();
2144 }
2145
2146 @Override
2147 public int getStorefilesCount() {
2148 return this.storeEngine.getStoreFileManager().getStorefileCount();
2149 }
2150
2151 @Override
2152 public long getMaxStoreFileAge() {
2153 long earliestTS = Long.MAX_VALUE;
2154 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2155 StoreFile.Reader r = s.getReader();
2156 if (r == null) {
2157 LOG.warn("StoreFile " + s + " has a null Reader");
2158 continue;
2159 }
2160 if (!s.isHFile()) {
2161 continue;
2162 }
2163 long createdTS = s.getFileInfo().getCreatedTimestamp();
2164 earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS;
2165 }
2166 long now = EnvironmentEdgeManager.currentTime();
2167 return now - earliestTS;
2168 }
2169
2170 @Override
2171 public long getMinStoreFileAge() {
2172 long latestTS = 0;
2173 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2174 StoreFile.Reader r = s.getReader();
2175 if (r == null) {
2176 LOG.warn("StoreFile " + s + " has a null Reader");
2177 continue;
2178 }
2179 if (!s.isHFile()) {
2180 continue;
2181 }
2182 long createdTS = s.getFileInfo().getCreatedTimestamp();
2183 latestTS = (createdTS > latestTS) ? createdTS : latestTS;
2184 }
2185 long now = EnvironmentEdgeManager.currentTime();
2186 return latestTS == 0 ? 0 : now - latestTS;
2187 }
2188
2189 @Override
2190 public long getAvgStoreFileAge() {
2191 long sum = 0, count = 0;
2192 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2193 StoreFile.Reader r = s.getReader();
2194 if (r == null) {
2195 LOG.warn("StoreFile " + s + " has a null Reader");
2196 continue;
2197 }
2198 if (!s.isHFile()) {
2199 continue;
2200 }
2201 sum += s.getFileInfo().getCreatedTimestamp();
2202 count++;
2203 }
2204 if (count == 0) {
2205 return 0;
2206 }
2207 long avgTS = sum / count;
2208 long now = EnvironmentEdgeManager.currentTime();
2209 return now - avgTS;
2210 }
2211
2212 @Override
2213 public long getNumReferenceFiles() {
2214 long numRefFiles = 0;
2215 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2216 if (s.isReference()) {
2217 numRefFiles++;
2218 }
2219 }
2220 return numRefFiles;
2221 }
2222
2223 @Override
2224 public long getNumHFiles() {
2225 long numHFiles = 0;
2226 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2227 if (s.isHFile()) {
2228 numHFiles++;
2229 }
2230 }
2231 return numHFiles;
2232 }
2233
2234 @Override
2235 public long getStoreSizeUncompressed() {
2236 return this.totalUncompressedBytes;
2237 }
2238
2239 @Override
2240 public long getStorefilesSize() {
2241 long size = 0;
2242 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2243 StoreFile.Reader r = s.getReader();
2244 if (r == null) {
2245 LOG.warn("StoreFile " + s + " has a null Reader");
2246 continue;
2247 }
2248 size += r.length();
2249 }
2250 return size;
2251 }
2252
2253 @Override
2254 public long getStorefilesIndexSize() {
2255 long size = 0;
2256 for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {
2257 StoreFile.Reader r = s.getReader();
2258 if (r == null) {
2259 LOG.warn("StoreFile " + s + " has a null Reader");
2260 continue;
2261 }
2262 size += r.indexSize();
2263 }
2264 return size;
2265 }
2266
2267 @Override
2268 public long getTotalStaticIndexSize() {
2269 long size = 0;
2270 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2271 StoreFile.Reader r = s.getReader();
2272 if (r == null) {
2273 continue;
2274 }
2275 size += r.getUncompressedDataIndexSize();
2276 }
2277 return size;
2278 }
2279
2280 @Override
2281 public long getTotalStaticBloomSize() {
2282 long size = 0;
2283 for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) {
2284 StoreFile.Reader r = s.getReader();
2285 if (r == null) {
2286 continue;
2287 }
2288 size += r.getTotalBloomSize();
2289 }
2290 return size;
2291 }
2292
2293 @Override
2294 public long getMemStoreSize() {
2295 return this.memstore.size();
2296 }
2297
2298 @Override
2299 public int getCompactPriority() {
2300 int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority();
2301 if (priority == PRIORITY_USER) {
2302 LOG.warn("Compaction priority is USER despite there being no user compaction");
2303 }
2304 return priority;
2305 }
2306
2307 @Override
2308 public boolean throttleCompaction(long compactionSize) {
2309 return storeEngine.getCompactionPolicy().throttleCompaction(compactionSize);
2310 }
2311
2312 public HRegion getHRegion() {
2313 return this.region;
2314 }
2315
2316 @Override
2317 public RegionCoprocessorHost getCoprocessorHost() {
2318 return this.region.getCoprocessorHost();
2319 }
2320
2321 @Override
2322 public HRegionInfo getRegionInfo() {
2323 return this.fs.getRegionInfo();
2324 }
2325
2326 @Override
2327 public boolean areWritesEnabled() {
2328 return this.region.areWritesEnabled();
2329 }
2330
2331 @Override
2332 public long getSmallestReadPoint() {
2333 return this.region.getSmallestReadPoint();
2334 }
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349 public long updateColumnValue(byte [] row, byte [] f,
2350 byte [] qualifier, long newValue)
2351 throws IOException {
2352
2353 this.lock.readLock().lock();
2354 try {
2355 long now = EnvironmentEdgeManager.currentTime();
2356
2357 return this.memstore.updateColumnValue(row,
2358 f,
2359 qualifier,
2360 newValue,
2361 now);
2362
2363 } finally {
2364 this.lock.readLock().unlock();
2365 }
2366 }
2367
2368 @Override
2369 public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
2370 this.lock.readLock().lock();
2371 try {
2372 return this.memstore.upsert(cells, readpoint);
2373 } finally {
2374 this.lock.readLock().unlock();
2375 }
2376 }
2377
2378 @Override
2379 public StoreFlushContext createFlushContext(long cacheFlushId) {
2380 return new StoreFlusherImpl(cacheFlushId);
2381 }
2382
2383 private class StoreFlusherImpl implements StoreFlushContext {
2384
2385 private long cacheFlushSeqNum;
2386 private MemStoreSnapshot snapshot;
2387 private List<Path> tempFiles;
2388 private List<Path> committedFiles;
2389 private long cacheFlushCount;
2390 private long cacheFlushSize;
2391 private long outputFileSize;
2392
2393 private StoreFlusherImpl(long cacheFlushSeqNum) {
2394 this.cacheFlushSeqNum = cacheFlushSeqNum;
2395 }
2396
2397
2398
2399
2400
2401 @Override
2402 public void prepare() {
2403 this.snapshot = memstore.snapshot();
2404 this.cacheFlushCount = snapshot.getCellsCount();
2405 this.cacheFlushSize = snapshot.getSize();
2406 committedFiles = new ArrayList<Path>(1);
2407 }
2408
2409 @Override
2410 public void flushCache(MonitoredTask status) throws IOException {
2411 tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
2412 }
2413
2414 @Override
2415 public boolean commit(MonitoredTask status) throws IOException {
2416 if (this.tempFiles == null || this.tempFiles.isEmpty()) {
2417 return false;
2418 }
2419 List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
2420 for (Path storeFilePath : tempFiles) {
2421 try {
2422 StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status);
2423 outputFileSize += sf.getReader().length();
2424 storeFiles.add(sf);
2425 } catch (IOException ex) {
2426 LOG.error("Failed to commit store file " + storeFilePath, ex);
2427
2428 for (StoreFile sf : storeFiles) {
2429 Path pathToDelete = sf.getPath();
2430 try {
2431 sf.deleteReader();
2432 } catch (IOException deleteEx) {
2433 LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
2434 Runtime.getRuntime().halt(1);
2435 }
2436 }
2437 throw new IOException("Failed to commit the flush", ex);
2438 }
2439 }
2440
2441 for (StoreFile sf : storeFiles) {
2442 if (HStore.this.getCoprocessorHost() != null) {
2443 HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
2444 }
2445 committedFiles.add(sf.getPath());
2446 }
2447
2448 HStore.this.flushedCellsCount += cacheFlushCount;
2449 HStore.this.flushedCellsSize += cacheFlushSize;
2450 HStore.this.flushedOutputFileSize += outputFileSize;
2451
2452
2453 return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
2454 }
2455
2456 @Override
2457 public long getOutputFileSize() {
2458 return outputFileSize;
2459 }
2460
2461 @Override
2462 public List<Path> getCommittedFiles() {
2463 return committedFiles;
2464 }
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474 @Override
2475 public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot)
2476 throws IOException {
2477 List<StoreFile> storeFiles = new ArrayList<StoreFile>(fileNames.size());
2478 for (String file : fileNames) {
2479
2480 StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
2481 StoreFile storeFile = createStoreFileAndReader(storeFileInfo);
2482 storeFiles.add(storeFile);
2483 HStore.this.storeSize += storeFile.getReader().length();
2484 HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
2485 if (LOG.isInfoEnabled()) {
2486 LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
2487 " added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
2488 ", sequenceid=" + + storeFile.getReader().getSequenceID() +
2489 ", filesize=" + StringUtils.humanReadableInt(storeFile.getReader().length()));
2490 }
2491 }
2492
2493 long snapshotId = -1;
2494 if (dropMemstoreSnapshot && snapshot != null) {
2495 snapshotId = snapshot.getId();
2496 }
2497 HStore.this.updateStorefiles(storeFiles, snapshotId);
2498 }
2499
2500
2501
2502
2503
2504 @Override
2505 public void abort() throws IOException {
2506 if (snapshot == null) {
2507 return;
2508 }
2509 HStore.this.updateStorefiles(new ArrayList<StoreFile>(0), snapshot.getId());
2510 }
2511 }
2512
2513 @Override
2514 public boolean needsCompaction() {
2515 return this.storeEngine.needsCompaction(this.filesCompacting);
2516 }
2517
2518 @Override
2519 public CacheConfig getCacheConfig() {
2520 return this.cacheConf;
2521 }
2522
2523 public static final long FIXED_OVERHEAD =
2524 ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
2525 + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
2526
2527 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
2528 + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
2529 + ClassSize.CONCURRENT_SKIPLISTMAP
2530 + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
2531 + ScanInfo.FIXED_OVERHEAD);
2532
2533 @Override
2534 public long heapSize() {
2535 return DEEP_OVERHEAD + this.memstore.heapSize();
2536 }
2537
2538 @Override
2539 public KeyValue.KVComparator getComparator() {
2540 return comparator;
2541 }
2542
2543 @Override
2544 public ScanInfo getScanInfo() {
2545 return scanInfo;
2546 }
2547
2548
2549
2550
2551
2552 void setScanInfo(ScanInfo scanInfo) {
2553 this.scanInfo = scanInfo;
2554 }
2555
2556 @Override
2557 public boolean hasTooManyStoreFiles() {
2558 return getStorefilesCount() > this.blockingFileCount;
2559 }
2560
2561 @Override
2562 public long getFlushedCellsCount() {
2563 return flushedCellsCount;
2564 }
2565
2566 @Override
2567 public long getFlushedCellsSize() {
2568 return flushedCellsSize;
2569 }
2570
2571 @Override
2572 public long getFlushedOutputFileSize() {
2573 return flushedOutputFileSize;
2574 }
2575
2576 @Override
2577 public long getCompactedCellsCount() {
2578 return compactedCellsCount;
2579 }
2580
2581 @Override
2582 public long getCompactedCellsSize() {
2583 return compactedCellsSize;
2584 }
2585
2586 @Override
2587 public long getMajorCompactedCellsCount() {
2588 return majorCompactedCellsCount;
2589 }
2590
2591 @Override
2592 public long getMajorCompactedCellsSize() {
2593 return majorCompactedCellsSize;
2594 }
2595
2596
2597
2598
2599
2600 @VisibleForTesting
2601 public StoreEngine<?, ?, ?, ?> getStoreEngine() {
2602 return this.storeEngine;
2603 }
2604
2605 protected OffPeakHours getOffPeakHours() {
2606 return this.offPeakHours;
2607 }
2608
2609
2610
2611
2612 @Override
2613 public void onConfigurationChange(Configuration conf) {
2614 this.conf = new CompoundConfiguration()
2615 .add(conf)
2616 .addWritableMap(family.getValues());
2617 this.storeEngine.compactionPolicy.setConf(conf);
2618 this.offPeakHours = OffPeakHours.getInstance(conf);
2619 }
2620
2621
2622
2623
2624 @Override
2625 public void registerChildren(ConfigurationManager manager) {
2626
2627 }
2628
2629
2630
2631
2632 @Override
2633 public void deregisterChildren(ConfigurationManager manager) {
2634
2635 }
2636
2637 @Override
2638 public double getCompactionPressure() {
2639 return storeEngine.getStoreFileManager().getCompactionPressure();
2640 }
2641
2642 @Override
2643 public boolean isPrimaryReplicaStore() {
2644 return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
2645 }
2646 }