View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.compactions;
19  
20  import com.google.common.collect.Lists;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.regionserver.InternalScanner;
35  import org.apache.hadoop.hbase.regionserver.ScanType;
36  import org.apache.hadoop.hbase.regionserver.Store;
37  import org.apache.hadoop.hbase.regionserver.StoreFile;
38  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
39  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
40  import org.apache.hadoop.hbase.security.User;
41  
42  /**
43   * Compact passed set of files. Create an instance and then call
44   * {@link #compact(CompactionRequest, CompactionThroughputController, User)}
45   */
46  @InterfaceAudience.Private
47  public class DefaultCompactor extends Compactor<Writer> {
48    private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
49  
50    public DefaultCompactor(final Configuration conf, final Store store) {
51      super(conf, store);
52    }
53  
54    private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
55  
56      @Override
57      public Writer createWriter(InternalScanner scanner,
58          org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
59          boolean shouldDropBehind) throws IOException {
60        return createTmpWriter(fd, shouldDropBehind);
61      }
62    };
63  
64    /**
65     * Do a minor/major compaction on an explicit set of storefiles from a Store.
66     */
67    public List<Path> compact(final CompactionRequest request,
68        CompactionThroughputController throughputController, User user) throws IOException {
69      return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
70    }
71  
72    /**
73     * Creates a writer for a new file in a temporary directory.
74     * @param fd The file details.
75     * @param smallestReadPoint The smallest mvcc readPoint across all the scanners in this region.
76     * @return Writer for a new StoreFile in the tmp dir.
77     * @throws IOException
78     */
79    protected StoreFile.Writer createTmpWriter(FileDetails fd, long smallestReadPoint) throws IOException {
80      // When all MVCC readpoints are 0, don't write them.
81      // See HBASE-8166, HBASE-12600, and HBASE-13389.
82  
83      // make this writer with tags always because of possible new cells with tags.
84      StoreFile.Writer writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
85              true, fd.maxMVCCReadpoint > 0, fd.maxTagsLength >0);
86      return writer;
87    }
88  
89  
90    /**
91     * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
92     * {@link #compact(CompactionRequest, CompactionThroughputController, User)};
93     * @param filesToCompact the files to compact. These are used as the compactionSelection for
94     *          the generated {@link CompactionRequest}.
95     * @param isMajor true to major compact (prune all deletes, max versions, etc)
96     * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
97     *         made it through the compaction.
98     * @throws IOException
99     */
100   public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
101       throws IOException {
102     CompactionRequest cr = new CompactionRequest(filesToCompact);
103     cr.setIsMajor(isMajor, isMajor);
104     return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
105   }
106 
107   @Override
108   protected List<Path> commitWriter(Writer writer, FileDetails fd,
109       CompactionRequest request) throws IOException {
110     List<Path> newFiles = Lists.newArrayList(writer.getPath());
111     writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
112     writer.close();
113     return newFiles;
114   }
115 
116   @Override
117   protected void abortWriter(Writer writer) throws IOException {
118     Path leftoverFile = writer.getPath();
119     try {
120       writer.close();
121     } catch (IOException e) {
122       LOG.warn("Failed to close the writer after an unfinished compaction.", e);
123     }
124     try {
125       store.getFileSystem().delete(leftoverFile, false);
126     } catch (IOException e) {
127       LOG.warn(
128         "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.",
129         e);
130     }
131   }
132 }