View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.KeyValueUtil;
34  import org.apache.hadoop.hbase.Tag;
35  import org.apache.hadoop.hbase.TagType;
36  import org.apache.hadoop.hbase.client.BufferedMutator;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.io.crypto.Encryption;
39  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
40  import org.apache.hadoop.hbase.mob.MobConstants;
41  import org.apache.hadoop.hbase.mob.MobUtils;
42  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
43  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
44  import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
45  import org.apache.hadoop.hbase.regionserver.MemStore;
46  import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
47  import org.apache.hadoop.hbase.regionserver.StoreFile;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.mapreduce.Reducer.Context;
50  
51  /**
52   * The wrapper of a DefaultMemStore.
53   * This wrapper is used in the sweep reducer to buffer and sort the cells written from
54   * the invalid and small mob files.
55   * It's flushed when it's full, the mob data are written to the mob files, and their file names
56   * are written back to store files of HBase.
57   * This memStore is used to sort the cells in mob files.
58   * In a reducer of sweep tool, the mob files are grouped by the same prefix (start key and date),
59   * in each group, the reducer iterates the files and read the cells to a new and bigger mob file.
60   * The cells in the same mob file are ordered, but cells across mob files are not.
61   * So we need this MemStoreWrapper to sort those cells come from different mob files before
62   * flushing them to the disk, when the memStore is big enough it's flushed as a new mob file.
63   */
64  @InterfaceAudience.Private
65  public class MemStoreWrapper {
66  
67    private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
68  
69    private MemStore memstore;
70    private long flushSize;
71    private CompactionPartitionId partitionId;
72    private Context context;
73    private Configuration conf;
74    private BufferedMutator table;
75    private HColumnDescriptor hcd;
76    private Path mobFamilyDir;
77    private FileSystem fs;
78    private CacheConfig cacheConfig;
79    private Encryption.Context cryptoContext = Encryption.Context.NONE;
80  
81    public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
82      HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
83      this.memstore = memstore;
84      this.context = context;
85      this.fs = fs;
86      this.table = table;
87      this.hcd = hcd;
88      this.conf = context.getConfiguration();
89      this.cacheConfig = cacheConfig;
90      flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
91          MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
92      mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
93      cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
94    }
95  
96    public void setPartitionId(CompactionPartitionId partitionId) {
97      this.partitionId = partitionId;
98    }
99  
100   /**
101    * Flushes the memstore if the size is large enough.
102    * @throws IOException
103    */
104   private void flushMemStoreIfNecessary() throws IOException {
105     if (memstore.heapSize() >= flushSize) {
106       flushMemStore();
107     }
108   }
109 
110   /**
111    * Flushes the memstore anyway.
112    * @throws IOException
113    */
114   public void flushMemStore() throws IOException {
115     MemStoreSnapshot snapshot = memstore.snapshot();
116     internalFlushCache(snapshot);
117     memstore.clearSnapshot(snapshot.getId());
118   }
119 
120   /**
121    * Flushes the snapshot of the memstore.
122    * Flushes the mob data to the mob files, and flushes the name of these mob files to HBase.
123    * @param snapshot The snapshot of the memstore.
124    * @throws IOException
125    */
126   private void internalFlushCache(final MemStoreSnapshot snapshot)
127       throws IOException {
128     if (snapshot.getCellsCount() == 0) {
129       return;
130     }
131     // generate the files into a temp directory.
132     String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
133     StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
134       new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
135       partitionId.getStartKey(), cacheConfig, cryptoContext);
136 
137     String relativePath = mobFileWriter.getPath().getName();
138     LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
139 
140     byte[] referenceValue = Bytes.toBytes(relativePath);
141     KeyValueScanner scanner = snapshot.getScanner();
142     Cell cell = null;
143     while (null != (cell = scanner.next())) {
144       mobFileWriter.append(cell);
145     }
146     scanner.close();
147     // Write out the log sequence number that corresponds to this output
148     // hfile. The hfile is current up to and including logCacheFlushId.
149     mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
150     mobFileWriter.close();
151 
152     MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
153     context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
154     // write reference/fileName back to the store files of HBase.
155     scanner = snapshot.getScanner();
156     scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
157     cell = null;
158     Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName()
159       .toString()));
160     long updatedCount = 0;
161     while (null != (cell = scanner.next())) {
162       Cell reference = MobUtils.createMobRefCell(cell, referenceValue, tableNameTag);
163       Put put =
164           new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
165       put.add(reference);
166       table.mutate(put);
167       updatedCount++;
168     }
169     table.flush();
170     context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
171     scanner.close();
172   }
173 
174   /**
175    * Adds a KeyValue into the memstore.
176    * @param kv The KeyValue to be added.
177    * @throws IOException
178    */
179   public void addToMemstore(KeyValue kv) throws IOException {
180     memstore.add(kv);
181     // flush the memstore if it's full.
182     flushMemStoreIfNecessary();
183   }
184 
185 }