View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.Tag;
35  import org.apache.hadoop.hbase.TagType;
36  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
37  import org.apache.hadoop.hbase.regionserver.*;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.util.StringUtils;
40  
41  /**
42   * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
43   * If the store is not a mob store, the flusher flushes the MemStore the same with
44   * DefaultStoreFlusher,
45   * If the store is a mob store, the flusher flushes the MemStore into two places.
46   * One is the store files of HBase, the other is the mob files.
47   * <ol>
48   * <li>Cells that are not PUT type or have the delete mark will be directly flushed to HBase.</li>
49   * <li>If the size of a cell value is larger than a threshold, it'll be flushed
50   * to a mob file, another cell with the path of this file will be flushed to HBase.</li>
51   * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
52   * HBase directly.</li>
53   * </ol>
54   *
55   */
56  @InterfaceAudience.Private
57  public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
58  
59    private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
60    private final Object flushLock = new Object();
61    private long mobCellValueSizeThreshold = 0;
62    private Path targetPath;
63    private HMobStore mobStore;
64  
65    public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
66      super(conf, store);
67      mobCellValueSizeThreshold = store.getFamily().getMobThreshold();
68      this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
69          store.getColumnFamilyName());
70      if (!this.store.getFileSystem().exists(targetPath)) {
71        this.store.getFileSystem().mkdirs(targetPath);
72      }
73      this.mobStore = (HMobStore) store;
74    }
75  
76    /**
77     * Flushes the snapshot of the MemStore.
78     * If this store is not a mob store, flush the cells in the snapshot to store files of HBase.
79     * If the store is a mob one, the flusher flushes the MemStore into two places.
80     * One is the store files of HBase, the other is the mob files.
81     * <ol>
82     * <li>Cells that are not PUT type or have the delete mark will be directly flushed to
83     * HBase.</li>
84     * <li>If the size of a cell value is larger than a threshold, it'll be
85     * flushed to a mob file, another cell with the path of this file will be flushed to HBase.</li>
86     * <li>If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to
87     * HBase directly.</li>
88     * </ol>
89     */
90    @Override
91    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
92        MonitoredTask status) throws IOException {
93      ArrayList<Path> result = new ArrayList<Path>();
94      int cellsCount = snapshot.getCellsCount();
95      if (cellsCount == 0) return result; // don't flush if there are no entries
96  
97      // Use a store scanner to find which rows to flush.
98      long smallestReadPoint = store.getSmallestReadPoint();
99      InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
100     if (scanner == null) {
101       return result; // NULL scanner returned from coprocessor hooks means skip normal processing
102     }
103     StoreFile.Writer writer;
104     try {
105       // TODO: We can fail in the below block before we complete adding this flush to
106       // list of store files. Add cleanup of anything put on filesystem if we fail.
107       synchronized (flushLock) {
108         status.setStatus("Flushing " + store + ": creating writer");
109         // Write the map out to the disk
110         writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
111             false, true, true);
112         writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
113         try {
114           // It's a mob store, flush the cells in a mob way. This is the difference of flushing
115           // between a normal and a mob store.
116           performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
117         } finally {
118           finalizeWriter(writer, cacheFlushId, status);
119         }
120       }
121     } finally {
122       scanner.close();
123     }
124     LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
125         + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
126         ", hasBloomFilter=" + writer.hasGeneralBloom() +
127         ", into tmp file " + writer.getPath());
128     result.add(writer.getPath());
129     return result;
130   }
131 
132   /**
133    * Flushes the cells in the mob store.
134    * <ol>In the mob store, the cells with PUT type might have or have no mob tags.
135    * <li>If a cell does not have a mob tag, flushing the cell to different files depends
136    * on the value length. If the length is larger than a threshold, it's flushed to a
137    * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly
138    * flush the cell to a store file in HBase.</li>
139    * <li>If a cell have a mob tag, its value is a mob file name, directly flush it
140    * to a store file in HBase.</li>
141    * </ol>
142    * @param snapshot Memstore snapshot.
143    * @param cacheFlushId Log cache flush sequence number.
144    * @param scanner The scanner of memstore snapshot.
145    * @param writer The store file writer.
146    * @param status Task that represents the flush operation and may be updated with status.
147    * @throws IOException
148    */
149   protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
150       InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
151     StoreFile.Writer mobFileWriter = null;
152     int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
153         HConstants.COMPACTION_KV_MAX_DEFAULT);
154     long mobCount = 0;
155     long mobSize = 0;
156     long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
157     mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
158         store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
159     // the target path is {tableName}/.mob/{cfName}/mobFiles
160     // the relative path is mobFiles
161     byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
162     try {
163       Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
164           .getName());
165       List<Cell> cells = new ArrayList<Cell>();
166       boolean hasMore;
167       ScannerContext scannerContext =
168               ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
169 
170       do {
171         hasMore = scanner.next(cells, scannerContext);
172         if (!cells.isEmpty()) {
173           for (Cell c : cells) {
174             // If we know that this KV is going to be included always, then let us
175             // set its memstoreTS to 0. This will help us save space when writing to
176             // disk.
177             if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
178                 || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
179               writer.append(c);
180             } else {
181               // append the original keyValue in the mob file.
182               mobFileWriter.append(c);
183               mobSize += c.getValueLength();
184               mobCount++;
185 
186               // append the tags to the KeyValue.
187               // The key is same, the value is the filename of the mob file
188               Cell reference = MobUtils.createMobRefCell(c, fileName, tableNameTag);
189               writer.append(reference);
190             }
191           }
192           cells.clear();
193         }
194       } while (hasMore);
195     } finally {
196       status.setStatus("Flushing mob file " + store + ": appending metadata");
197       mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
198       status.setStatus("Flushing mob file " + store + ": closing flushed file");
199       mobFileWriter.close();
200     }
201 
202     if (mobCount > 0) {
203       // commit the mob file from temp folder to target folder.
204       // If the mob file is committed successfully but the store file is not,
205       // the committed mob file will be handled by the sweep tool as an unused
206       // file.
207       mobStore.commitFile(mobFileWriter.getPath(), targetPath);
208       mobStore.updateMobFlushCount();
209       mobStore.updateMobFlushedCellsCount(mobCount);
210       mobStore.updateMobFlushedCellsSize(mobSize);
211     } else {
212       try {
213         // If the mob file is empty, delete it instead of committing.
214         store.getFileSystem().delete(mobFileWriter.getPath(), true);
215       } catch (IOException e) {
216         LOG.error("Failed to delete the temp mob file", e);
217       }
218     }
219   }
220 }