View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mob;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.List;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.CellUtil;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.Tag;
35  import org.apache.hadoop.hbase.TagType;
36  import org.apache.hadoop.hbase.client.Scan;
37  import org.apache.hadoop.hbase.regionserver.*;
38  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
39  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
40  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
41  import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
42  import org.apache.hadoop.hbase.security.User;
43  import org.apache.hadoop.hbase.util.Bytes;
44  
45  /**
46   * Compact passed set of files in the mob-enabled column family.
47   */
48  @InterfaceAudience.Private
49  public class DefaultMobStoreCompactor extends DefaultCompactor {
50  
51    private static final Log LOG = LogFactory.getLog(DefaultMobStoreCompactor.class);
52    private long mobSizeThreshold;
53    private HMobStore mobStore;
54  
55    private final InternalScannerFactory scannerFactory = new InternalScannerFactory() {
56  
57      @Override
58      public ScanType getScanType(CompactionRequest request) {
59        return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
60            : ScanType.COMPACT_RETAIN_DELETES;
61      }
62  
63      @Override
64      public InternalScanner createScanner(List<StoreFileScanner> scanners,
65          ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
66        Scan scan = new Scan();
67        scan.setMaxVersions(store.getFamily().getMaxVersions());
68        // retain the delete markers until they are expired.
69        return new StoreScanner(store, store.getScanInfo(), scan, scanners,
70            ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs);
71      }
72    };
73  
74    private final CellSinkFactory<Writer> writerFactory = new CellSinkFactory<Writer>() {
75  
76      @Override
77      public Writer createWriter(InternalScanner scanner,
78          org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
79          boolean shouldDropBehind) throws IOException {
80        // make this writer with tags always because of possible new cells with tags.
81        return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true);
82      }
83    };
84  
85    public DefaultMobStoreCompactor(Configuration conf, Store store) {
86      super(conf, store);
87      // The mob cells reside in the mob-enabled column family which is held by HMobStore.
88      // During the compaction, the compactor reads the cells from the mob files and
89      // probably creates new mob files. All of these operations are included in HMobStore,
90      // so we need to cast the Store to HMobStore.
91      if (!(store instanceof HMobStore)) {
92        throw new IllegalArgumentException("The store " + store + " is not a HMobStore");
93      }
94      mobStore = (HMobStore) store;
95      mobSizeThreshold = store.getFamily().getMobThreshold();
96    }
97  
98    @Override
99    public List<Path> compact(CompactionRequest request,
100     CompactionThroughputController throughputController, User user) throws IOException {
101     return compact(request, scannerFactory, writerFactory, throughputController, user);
102   }
103 
104   // TODO refactor to take advantage of the throughput controller.
105 
106   /**
107    * Performs compaction on a column family with the mob flag enabled.
108    * This is for when the mob threshold size has changed or if the mob
109    * column family mode has been toggled via an alter table statement.
110    * Compacts the files by the following rules.
111    * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file.
112    * <ol>
113    * <li>
114    * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
115    * directly copy the (with mob tag) cell into the new store file.
116    * </li>
117    * <li>
118    * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into
119    * the new store file.
120    * </li>
121    * </ol>
122    * 2. If the Put cell doesn't have a reference tag.
123    * <ol>
124    * <li>
125    * If the value size of a cell is larger than the threshold, this cell is regarded as a mob,
126    * write this cell to a mob file, and write the path of this mob file to the store file.
127    * </li>
128    * <li>
129    * Otherwise, directly write this cell into the store file.
130    * </li>
131    * </ol>
132    * 3. Decide how to write a Delete cell.
133    * <ol>
134    * <li>
135    * If a Delete cell does not have a mob reference tag which means this delete marker have not
136    * been written to the mob del file, write this cell to the mob del file, and write this cell
137    * with a ref tag to a store file.
138    * </li>
139    * <li>
140    * Otherwise, directly write it to a store file.
141    * </li>
142    * </ol>
143    * After the major compaction on the normal hfiles, we have a guarantee that we have purged all
144    * deleted or old version mob refs, and the delete markers are written to a del file with the
145    * suffix _del. Because of this, it is safe to use the del file in the mob compaction.
146    * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the
147    * mob files. When the small mob files are merged into bigger ones, the del file is added into
148    * the scanner to filter the deleted cells.
149    * @param fd File details
150    * @param scanner Where to read from.
151    * @param writer Where to write to.
152    * @param smallestReadPoint Smallest read point.
153    * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
154    * @param throughputController The compaction throughput controller.
155    * @param major Is a major compaction.
156    * @return Whether compaction ended; false if it was interrupted for any reason.
157    */
158   @Override
159   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
160       long smallestReadPoint, boolean cleanSeqId,
161       CompactionThroughputController throughputController,  boolean major) throws IOException {
162     int bytesWritten = 0;
163     // Since scanner.next() can return 'false' but still be delivering data,
164     // we have to use a do/while loop.
165     List<Cell> cells = new ArrayList<Cell>();
166     // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
167     int closeCheckInterval = HStore.getCloseCheckInterval();
168     boolean hasMore;
169     Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
170     byte[] fileName = null;
171     Writer mobFileWriter = null, delFileWriter = null;
172     long mobCells = 0;
173     long deleteMarkersCount = 0;
174     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
175             .getName());
176     long cellsCountCompactedToMob = 0;
177     long cellsCountCompactedFromMob = 0;
178     long cellsSizeCompactedToMob = 0;
179     long cellsSizeCompactedFromMob = 0;
180     try {
181       try {
182         // If the mob file writer could not be created, directly write the cell to the store file.
183         mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
184             store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
185         fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
186       } catch (IOException e) {
187         LOG.error(
188             "Failed to create mob writer, "
189                 + "we will continue the compaction by writing MOB cells directly in store files",
190             e);
191       }
192       delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
193           store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
194       ScannerContext scannerContext =
195               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
196       do {
197         hasMore = scanner.next(cells, scannerContext);
198         // output to writer:
199         for (Cell c : cells) {
200           if (major && CellUtil.isDelete(c)) {
201             if (MobUtils.isMobReferenceCell(c)) {
202               // Directly write it to a store file
203               writer.append(c);
204             } else {
205               // Add a ref tag to this cell and write it to a store file.
206               writer.append(MobUtils.createMobRefDeleteMarker(c));
207               // Write the cell to a del file
208               delFileWriter.append(c);
209               deleteMarkersCount++;
210             }
211           } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
212             // If the mob file writer is null or the kv type is not put, directly write the cell
213             // to the store file.
214             writer.append(c);
215           } else if (MobUtils.isMobReferenceCell(c)) {
216             if (MobUtils.hasValidMobRefCellValue(c)) {
217               int size = MobUtils.getMobValueLength(c);
218               if (size > mobSizeThreshold) {
219                 // If the value size is larger than the threshold, it's regarded as a mob. Since
220                 // its value is already in the mob file, directly write this cell to the store file
221                 writer.append(c);
222               } else {
223                 // If the value is not larger than the threshold, it's not regarded a mob. Retrieve
224                 // the mob cell from the mob file, and write it back to the store file.
225                 Cell mobCell = mobStore.resolve(c, false);
226                 if (mobCell.getValueLength() != 0) {
227                   // put the mob data back to the store file
228                   CellUtil.setSequenceId(mobCell, c.getSequenceId());
229                   writer.append(mobCell);
230                   cellsCountCompactedFromMob++;
231                   cellsSizeCompactedFromMob += mobCell.getValueLength();
232                 } else {
233                   // If the value of a file is empty, there might be issues when retrieving,
234                   // directly write the cell to the store file, and leave it to be handled by the
235                   // next compaction.
236                   writer.append(c);
237                 }
238               }
239             } else {
240               LOG.warn("The value format of the KeyValue " + c
241                   + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
242               writer.append(c);
243             }
244           } else if (c.getValueLength() <= mobSizeThreshold) {
245             // If the value size of a cell is not larger than the threshold, directly write it to
246             // the store file.
247             writer.append(c);
248           } else {
249             // If the value size of a cell is larger than the threshold, it's regarded as a mob,
250             // write this cell to a mob file, and write the path to the store file.
251             mobCells++;
252             // append the original keyValue in the mob file.
253             mobFileWriter.append(c);
254             Cell reference = MobUtils.createMobRefCell(c, fileName, tableNameTag);
255             // write the cell whose value is the path of a mob file to the store file.
256             writer.append(reference);
257             cellsCountCompactedToMob++;
258             cellsSizeCompactedToMob += c.getValueLength();
259           }
260           ++progress.currentCompactedKVs;
261 
262           // check periodically to see if a system stop is requested
263           if (closeCheckInterval > 0) {
264             bytesWritten += KeyValueUtil.length(c);
265             if (bytesWritten > closeCheckInterval) {
266               bytesWritten = 0;
267               if (!store.areWritesEnabled()) {
268                 progress.cancel();
269                 return false;
270               }
271             }
272           }
273         }
274         cells.clear();
275       } while (hasMore);
276     } finally {
277       if (mobFileWriter != null) {
278         mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
279         mobFileWriter.close();
280       }
281       if (delFileWriter != null) {
282         delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
283         delFileWriter.close();
284       }
285     }
286     if (mobFileWriter != null) {
287       if (mobCells > 0) {
288         // If the mob file is not empty, commit it.
289         mobStore.commitFile(mobFileWriter.getPath(), path);
290       } else {
291         try {
292           // If the mob file is empty, delete it instead of committing.
293           store.getFileSystem().delete(mobFileWriter.getPath(), true);
294         } catch (IOException e) {
295           LOG.error("Failed to delete the temp mob file", e);
296         }
297       }
298     }
299     if (delFileWriter != null) {
300       if (deleteMarkersCount > 0) {
301         // If the del file is not empty, commit it.
302         // If the commit fails, the compaction is re-performed again.
303         mobStore.commitFile(delFileWriter.getPath(), path);
304       } else {
305         try {
306           // If the del file is empty, delete it instead of committing.
307           store.getFileSystem().delete(delFileWriter.getPath(), true);
308         } catch (IOException e) {
309           LOG.error("Failed to delete the temp del file", e);
310         }
311       }
312     }
313     mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
314     mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
315     mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
316     mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
317     progress.complete();
318     return true;
319   }
320 }