View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import com.google.common.io.Closeables;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.security.PrivilegedExceptionAction;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValueUtil;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.io.compress.Compression;
43  import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
44  import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
45  import org.apache.hadoop.hbase.regionserver.HStore;
46  import org.apache.hadoop.hbase.regionserver.InternalScanner;
47  import org.apache.hadoop.hbase.regionserver.ScanType;
48  import org.apache.hadoop.hbase.regionserver.ScannerContext;
49  import org.apache.hadoop.hbase.regionserver.Store;
50  import org.apache.hadoop.hbase.regionserver.StoreFile;
51  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
52  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
53  import org.apache.hadoop.hbase.regionserver.StoreScanner;
54  import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
55  import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.hbase.util.Writables;
59  import org.apache.hadoop.util.StringUtils;
60  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61  import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
62  
63  /**
64   * A compactor is a compaction algorithm associated a given policy. Base class also contains
65   * reusable parts for implementing compactors (what is common and what isn't is evolving).
66   */
67  @InterfaceAudience.Private
68  public abstract class Compactor<T extends CellSink> {
69    private static final Log LOG = LogFactory.getLog(Compactor.class);
70  
71    protected volatile CompactionProgress progress;
72  
73    protected final Configuration conf;
74    protected final Store store;
75  
76    protected final int compactionKVMax;
77    protected final Compression.Algorithm compactionCompression;
78  
79    /** specify how many days to keep MVCC values during major compaction **/ 
80    protected int keepSeqIdPeriod;
81  
82    //TODO: depending on Store is not good but, realistically, all compactors currently do.
83    Compactor(final Configuration conf, final Store store) {
84      this.conf = conf;
85      this.store = store;
86      this.compactionKVMax =
87        this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
88      this.compactionCompression = (this.store.getFamily() == null) ?
89          Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
90      this.keepSeqIdPeriod = Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, 
91        HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
92    }
93  
94    public interface CellSink {
95      void append(Cell cell) throws IOException;
96    }
97  
98    protected interface CellSinkFactory<S> {
99      S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind)
100         throws IOException;
101   }
102 
103   public CompactionProgress getProgress() {
104     return this.progress;
105   }
106 
107   /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
108   protected static class FileDetails {
109     /** Maximum key count after compaction (for blooms) */
110     public long maxKeyCount = 0;
111     /** Earliest put timestamp if major compaction */
112     public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
113     /** Latest put timestamp */
114     public long latestPutTs = HConstants.LATEST_TIMESTAMP;
115     /** The last key in the files we're compacting. */
116     public long maxSeqId = 0;
117     /** Latest memstore read point found in any of the involved files */
118     public long maxMVCCReadpoint = 0;
119     /** Max tags length**/
120     public int maxTagsLength = 0;
121     /** Min SeqId to keep during a major compaction **/
122     public long minSeqIdToKeep = 0;
123   }
124 
125   /**
126    * Extracts some details about the files to compact that are commonly needed by compactors.
127    * @param filesToCompact Files.
128    * @param allFiles Whether all files are included for compaction
129    * @return The result.
130    */
131   protected FileDetails getFileDetails(
132       Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {
133     FileDetails fd = new FileDetails();
134     long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - 
135       (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  
136 
137     for (StoreFile file : filesToCompact) {
138       if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
139         // when isAllFiles is true, all files are compacted so we can calculate the smallest 
140         // MVCC value to keep
141         if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
142           fd.minSeqIdToKeep = file.getMaxMemstoreTS();
143         }
144       }
145       long seqNum = file.getMaxSequenceId();
146       fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
147       StoreFile.Reader r = file.getReader();
148       if (r == null) {
149         LOG.warn("Null reader for " + file.getPath());
150         continue;
151       }
152       // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
153       // blooms can cause progress to be miscalculated or if the user switches bloom
154       // type (e.g. from ROW to ROWCOL)
155       long keyCount = r.getEntries();
156       fd.maxKeyCount += keyCount;
157       // calculate the latest MVCC readpoint in any of the involved store files
158       Map<byte[], byte[]> fileInfo = r.loadFileInfo();
159       byte[] tmp = null;
160       // Get and set the real MVCCReadpoint for bulk loaded files, which is the
161       // SeqId number.
162       if (r.isBulkLoaded()) {
163         fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
164       }
165       else {
166         tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
167         if (tmp != null) {
168           fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
169         }
170       }
171       tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
172       if (tmp != null) {
173         fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
174       }
175       // If required, calculate the earliest put timestamp of all involved storefiles.
176       // This is used to remove family delete marker during compaction.
177       long earliestPutTs = 0;
178       if (allFiles) {
179         tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
180         if (tmp == null) {
181           // There's a file with no information, must be an old one
182           // assume we have very old puts
183           fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
184         } else {
185           earliestPutTs = Bytes.toLong(tmp);
186           fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
187         }
188       }
189       tmp = fileInfo.get(StoreFile.TIMERANGE_KEY);
190       TimeRangeTracker trt = new TimeRangeTracker();
191       if (tmp == null) {
192         fd.latestPutTs = HConstants.LATEST_TIMESTAMP;
193       } else {
194         Writables.copyWritable(tmp, trt);
195         fd.latestPutTs = trt.getMaximumTimestamp();
196       }
197       if (LOG.isDebugEnabled()) {
198         LOG.debug("Compacting " + file +
199           ", keycount=" + keyCount +
200           ", bloomtype=" + r.getBloomFilterType().toString() +
201           ", size=" + TraditionalBinaryPrefix.long2String(r.length(), "", 1) +
202           ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
203           ", seqNum=" + seqNum +
204           (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
205       }
206     }
207     return fd;
208   }
209 
210   /**
211    * Creates file scanners for compaction.
212    * @param filesToCompact Files.
213    * @return Scanners.
214    */
215   protected List<StoreFileScanner> createFileScanners(
216       final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
217     return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
218       smallestReadPoint);
219   }
220 
221   protected long getSmallestReadPoint() {
222     return store.getSmallestReadPoint();
223   }
224 
225   protected interface InternalScannerFactory {
226 
227     ScanType getScanType(CompactionRequest request);
228 
229     InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
230         FileDetails fd, long smallestReadPoint) throws IOException;
231   }
232 
233   protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
234 
235     @Override
236     public ScanType getScanType(CompactionRequest request) {
237       return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
238           : ScanType.COMPACT_RETAIN_DELETES;
239     }
240 
241     @Override
242     public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
243         FileDetails fd, long smallestReadPoint) throws IOException {
244       return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
245         fd.earliestPutTs);
246     }
247   };
248 
249   /**
250    * Creates a writer for a new file in a temporary directory.
251    * @param fd The file details.
252    * @return Writer for a new StoreFile in the tmp dir.
253    * @throws IOException if creation failed
254    */
255   protected Writer createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException {
256     // When all MVCC readpoints are 0, don't write them.
257     // See HBASE-8166, HBASE-12600, and HBASE-13389.
258     return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
259     /* isCompaction = */true,
260     /* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
261     /* includesTags = */fd.maxTagsLength > 0);
262   }
263 
264   protected List<Path> compact(final CompactionRequest request,
265       InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
266       CompactionThroughputController throughputController, User user) throws IOException {
267     FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
268     this.progress = new CompactionProgress(fd.maxKeyCount);
269 
270     // Find the smallest read point across all the Scanners.
271     long smallestReadPoint = getSmallestReadPoint();
272 
273     List<StoreFileScanner> scanners;
274     Collection<StoreFile> readersToClose;
275     T writer = null;
276     if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
277       // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
278       // HFiles, and their readers
279       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
280       for (StoreFile f : request.getFiles()) {
281         StoreFile clonedStoreFile = f.cloneForReader();
282         // create the reader after the store file is cloned in case
283         // the sequence id is used for sorting in scanners
284         clonedStoreFile.createReader();
285         readersToClose.add(clonedStoreFile);
286       }
287       scanners = createFileScanners(readersToClose, smallestReadPoint);
288         // store.throttleCompaction(request.getSize())
289     } else {
290       readersToClose = Collections.emptyList();
291       scanners = createFileScanners(request.getFiles(), smallestReadPoint);
292         // store.throttleCompaction(request.getSize())
293     }
294     InternalScanner scanner = null;
295     boolean finished = false;
296     try {
297       /* Include deletes, unless we are doing a major compaction */
298       ScanType scanType = scannerFactory.getScanType(request);
299       scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user);
300       if (scanner == null) {
301         scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
302       }
303       scanner = postCreateCoprocScanner(request, scanType, scanner, user);
304       if (scanner == null) {
305         // NULL scanner returned from coprocessor hooks means skip normal processing.
306         return new ArrayList<Path>();
307       }
308       boolean cleanSeqId = false;
309       if (fd.minSeqIdToKeep > 0) {
310         smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
311         cleanSeqId = true;
312       }
313       writer = sinkFactory.createWriter(scanner, fd, store.throttleCompaction(request.getSize()));
314       finished =
315           performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
316             throughputController, request.isAllFiles());
317       if (!finished) {
318         throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
319             + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
320       }
321     } finally {
322       Closeables.close(scanner, true);
323       for (StoreFile f : readersToClose) {
324         try {
325           f.closeReader(true);
326         } catch (IOException e) {
327           LOG.warn("Exception closing " + f, e);
328         }
329       }
330       if (!finished && writer != null) {
331         abortWriter(writer);
332       }
333     }
334     assert finished : "We should have exited the method on all error paths";
335     assert writer != null : "Writer should be non-null if no error";
336     return commitWriter(writer, fd, request);
337   }
338 
339   protected abstract List<Path> commitWriter(T writer, FileDetails fd, CompactionRequest request)
340       throws IOException;
341 
342   protected abstract void abortWriter(T writer) throws IOException;
343 
344   /**
345    * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
346    * @param request Compaction request.
347    * @param scanType Scan type.
348    * @param earliestPutTs Earliest put ts.
349    * @param scanners File scanners for compaction files.
350    * @return Scanner override by coprocessor; null if not overriding.
351    */
352   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
353       ScanType scanType, long earliestPutTs,  List<StoreFileScanner> scanners) throws IOException {
354     return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null);
355   }
356 
357   protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
358       final ScanType scanType, final long earliestPutTs, final List<StoreFileScanner> scanners,
359       User user) throws IOException {
360     if (store.getCoprocessorHost() == null) return null;
361     if (user == null) {
362       return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
363         earliestPutTs, request);
364     } else {
365       try {
366         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
367           @Override
368           public InternalScanner run() throws Exception {
369             return store.getCoprocessorHost().preCompactScannerOpen(store, scanners,
370               scanType, earliestPutTs, request);
371           }
372         });
373       } catch (InterruptedException ie) {
374         InterruptedIOException iioe = new InterruptedIOException();
375         iioe.initCause(ie);
376         throw iioe;
377       }
378     }
379   }
380 
381   /**
382    * Calls coprocessor, if any, to create scanners - after normal scanner creation.
383    * @param request Compaction request.
384    * @param scanType Scan type.
385    * @param scanner The default scanner created for compaction.
386    * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
387    */
388    protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
389        final ScanType scanType, final InternalScanner scanner, User user) throws IOException {
390     if (store.getCoprocessorHost() == null) {
391       return scanner;
392     }
393     if (user == null) {
394       return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
395     } else {
396       try {
397         return user.getUGI().doAs(new PrivilegedExceptionAction<InternalScanner>() {
398           @Override
399           public InternalScanner run() throws Exception {
400             return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
401           }
402         });
403       } catch (InterruptedException ie) {
404         InterruptedIOException iioe = new InterruptedIOException();
405         iioe.initCause(ie);
406         throw iioe;
407       }
408     }
409   }
410 
411   /**
412    * Used to prevent compaction name conflict when multiple compactions running parallel on the
413    * same store.
414    */
415   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
416 
417   private String generateCompactionName() {
418     int counter;
419     for (;;) {
420       counter = NAME_COUNTER.get();
421       int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
422       if (NAME_COUNTER.compareAndSet(counter, next)) {
423         break;
424       }
425     }
426     return store.getRegionInfo().getRegionNameAsString() + "#"
427         + store.getFamily().getNameAsString() + "#" + counter;
428   }
429   /**
430    * Performs the compaction.
431    * @param fd FileDetails of cell sink writer
432    * @param scanner Where to read from.
433    * @param writer Where to write to.
434    * @param smallestReadPoint Smallest read point.
435    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
436    * @param major Is a major compaction.
437    * @return Whether compaction ended; false if it was interrupted for some reason.
438    */
439   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
440       long smallestReadPoint, boolean cleanSeqId,
441       CompactionThroughputController throughputController, boolean major) throws IOException {
442     long bytesWritten = 0;
443     long bytesWrittenProgress = 0;
444     // Since scanner.next() can return 'false' but still be delivering data,
445     // we have to use a do/while loop.
446     List<Cell> cells = new ArrayList<Cell>();
447     long closeCheckInterval = HStore.getCloseCheckInterval();
448     long lastMillis = 0;
449     if (LOG.isDebugEnabled()) {
450       lastMillis = EnvironmentEdgeManager.currentTime();
451     }
452     String compactionName = generateCompactionName();
453     long now = 0;
454     boolean hasMore;
455     ScannerContext scannerContext =
456         ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
457 
458     throughputController.start(compactionName);
459     try {
460       do {
461         hasMore = scanner.next(cells, scannerContext);
462         if (LOG.isDebugEnabled()) {
463           now = EnvironmentEdgeManager.currentTime();
464         }
465         // output to writer:
466         Cell lastCleanCell = null;
467         long lastCleanCellSeqId = 0;
468         for (Cell c : cells) {
469           if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
470             lastCleanCell = c;
471             lastCleanCellSeqId = c.getSequenceId();
472             CellUtil.setSequenceId(c, 0);
473           } else {
474             lastCleanCell = null;
475             lastCleanCellSeqId = 0;
476           }
477           writer.append(c);
478           int len = KeyValueUtil.length(c);
479           ++progress.currentCompactedKVs;
480           progress.totalCompactedSize += len;
481           if (LOG.isDebugEnabled()) {
482             bytesWrittenProgress += len;
483           }
484           throughputController.control(compactionName, len);
485           // check periodically to see if a system stop is requested
486           if (closeCheckInterval > 0) {
487             bytesWritten += len;
488             if (bytesWritten > closeCheckInterval) {
489               bytesWritten = 0;
490               if (!store.areWritesEnabled()) {
491                 progress.cancel();
492                 return false;
493               }
494             }
495           }
496         }
497         if (lastCleanCell != null) {
498           // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
499           CellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
500         }
501         // Log the progress of long running compactions every minute if
502         // logging at DEBUG level
503         if (LOG.isDebugEnabled()) {
504           if ((now - lastMillis) >= 60 * 1000) {
505             LOG.debug("Compaction progress: "
506                 + compactionName
507                 + " "
508                 + progress
509                 + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0)
510                     / ((now - lastMillis) / 1000.0)) + ", throughputController is "
511                 + throughputController);
512             lastMillis = now;
513             bytesWrittenProgress = 0;
514           }
515         }
516         cells.clear();
517       } while (hasMore);
518     } catch (InterruptedException e) {
519       progress.cancel();
520       throw new InterruptedIOException("Interrupted while control throughput of compacting "
521           + compactionName);
522     } finally {
523       throughputController.finish(compactionName);
524     }
525     progress.complete();
526     return true;
527   }
528 
529   /**
530    * @param store store
531    * @param scanners Store file scanners.
532    * @param scanType Scan type.
533    * @param smallestReadPoint Smallest MVCC read point.
534    * @param earliestPutTs Earliest put across all files.
535    * @return A compaction scanner.
536    */
537   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
538       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
539     Scan scan = new Scan();
540     scan.setMaxVersions(store.getFamily().getMaxVersions());
541     return new StoreScanner(store, store.getScanInfo(), scan, scanners,
542         scanType, smallestReadPoint, earliestPutTs);
543   }
544 
545   /**
546    * @param store The store.
547    * @param scanners Store file scanners.
548    * @param smallestReadPoint Smallest MVCC read point.
549    * @param earliestPutTs Earliest put across all files.
550    * @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
551    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
552    * @return A compaction scanner.
553    */
554   protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
555      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
556      byte[] dropDeletesToRow) throws IOException {
557     Scan scan = new Scan();
558     scan.setMaxVersions(store.getFamily().getMaxVersions());
559     return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
560         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
561   }
562 
563   /**
564    * Appends the metadata and closes the writer.
565    * @param writer The current store writer.
566    * @param fd The file details.
567    * @param isMajor Is a major compaction.
568    * @throws IOException
569    */
570   protected void appendMetadataAndCloseWriter(StoreFile.Writer writer, FileDetails fd,
571       boolean isMajor) throws IOException {
572     writer.appendMetadata(fd.maxSeqId, isMajor);
573     writer.close();
574   }
575 }