View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.UUID;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FSDataOutputStream;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.fs.PathFilter;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.InvalidFamilyOperationException;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.KeyValueUtil;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.client.Admin;
47  import org.apache.hadoop.hbase.client.BufferedMutator;
48  import org.apache.hadoop.hbase.client.BufferedMutatorParams;
49  import org.apache.hadoop.hbase.client.Connection;
50  import org.apache.hadoop.hbase.client.ConnectionFactory;
51  import org.apache.hadoop.hbase.io.HFileLink;
52  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
53  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
54  import org.apache.hadoop.hbase.mob.MobConstants;
55  import org.apache.hadoop.hbase.mob.MobFile;
56  import org.apache.hadoop.hbase.mob.MobFileName;
57  import org.apache.hadoop.hbase.mob.MobUtils;
58  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
59  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
60  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
61  import org.apache.hadoop.hbase.regionserver.BloomType;
62  import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
63  import org.apache.hadoop.hbase.regionserver.StoreFile;
64  import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68  import org.apache.hadoop.io.IOUtils;
69  import org.apache.hadoop.io.SequenceFile;
70  import org.apache.hadoop.io.SequenceFile.CompressionType;
71  import org.apache.hadoop.io.Text;
72  import org.apache.hadoop.io.Writable;
73  import org.apache.hadoop.mapreduce.Reducer;
74  import org.apache.zookeeper.KeeperException;
75  
76  /**
77   * The reducer of a sweep job.
78   * This reducer merges the small mob files into bigger ones, and write visited
79   * names of mob files to a sequence file which is used by the sweep job to delete
80   * the unused mob files.
81   * The key of the input is a file name, the value is a collection of KeyValues
82   * (the value format of KeyValue is valueLength + fileName) in HBase.
83   * In this reducer, we could know how many cells exist in HBase for a mob file.
84   * If the existCellSize/mobFileSize < compactionRatio, this mob
85   * file needs to be merged.
86   */
87  @InterfaceAudience.Private
88  public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
89  
90    private static final Log LOG = LogFactory.getLog(SweepReducer.class);
91  
92    private SequenceFile.Writer writer = null;
93    private MemStoreWrapper memstore;
94    private Configuration conf;
95    private FileSystem fs;
96  
97    private Path familyDir;
98    private CacheConfig cacheConfig;
99    private long compactionBegin;
100   private BufferedMutator table;
101   private HColumnDescriptor family;
102   private long mobCompactionDelay;
103   private Path mobTableDir;
104 
105   @Override
106   protected void setup(Context context) throws IOException, InterruptedException {
107     this.conf = context.getConfiguration();
108     Connection c = ConnectionFactory.createConnection(this.conf);
109     this.fs = FileSystem.get(conf);
110     // the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
111     mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
112     String tableName = conf.get(TableInputFormat.INPUT_TABLE);
113     String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
114     TableName tn = TableName.valueOf(tableName);
115     this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
116     Admin admin = c.getAdmin();
117     try {
118       family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
119       if (family == null) {
120         // this column family might be removed, directly return.
121         throw new InvalidFamilyOperationException("Column family '" + familyName
122             + "' does not exist. It might be removed.");
123       }
124     } finally {
125       try {
126         admin.close();
127       } catch (IOException e) {
128         LOG.warn("Failed to close the HBaseAdmin", e);
129       }
130     }
131     // disable the block cache.
132     Configuration copyOfConf = new Configuration(conf);
133     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
134     this.cacheConfig = new CacheConfig(copyOfConf);
135 
136     table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
137     memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
138 
139     // The start time of the sweep tool.
140     // Only the mob files whose creation time is older than startTime-oneDay will be handled by the
141     // reducer since it brings inconsistency to handle the latest mob files.
142     this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
143     mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
144   }
145 
146   private SweepPartition createPartition(CompactionPartitionId id, Context context)
147     throws IOException {
148     return new SweepPartition(id, context);
149   }
150 
151   @Override
152   public void run(Context context) throws IOException, InterruptedException {
153     String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
154     String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
155     String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
156     ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
157         new DummyMobAbortable());
158     FSDataOutputStream fout = null;
159     try {
160       SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
161       tracker.start();
162       setup(context);
163       // create a sequence contains all the visited file names in this reducer.
164       String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
165       Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
166           .replace("-", MobConstants.EMPTY_STRING));
167       fout = fs.create(nameFilePath, true);
168       writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
169           String.class, CompressionType.NONE, null);
170       CompactionPartitionId id;
171       SweepPartition partition = null;
172       // the mob files which have the same start key and date are in the same partition.
173       while (context.nextKey()) {
174         Text key = context.getCurrentKey();
175         String keyString = key.toString();
176         id = createPartitionId(keyString);
177         if (null == partition || !id.equals(partition.getId())) {
178           // It's the first mob file in the current partition.
179           if (null != partition) {
180             // this mob file is in different partitions with the previous mob file.
181             // directly close.
182             partition.close();
183           }
184           // create a new one
185           partition = createPartition(id, context);
186         }
187         if (partition != null) {
188           // run the partition
189           partition.execute(key, context.getValues());
190         }
191       }
192       if (null != partition) {
193         partition.close();
194       }
195       writer.hflush();
196     } catch (KeeperException e) {
197       throw new IOException(e);
198     } finally {
199       cleanup(context);
200       zkw.close();
201       if (writer != null) {
202         IOUtils.closeStream(writer);
203       }
204       if (fout != null) {
205         IOUtils.closeStream(fout);
206       }
207       if (table != null) {
208         try {
209           table.close();
210         } catch (IOException e) {
211           LOG.warn(e);
212         }
213       }
214     }
215 
216   }
217 
218   /**
219    * The mob files which have the same start key and date are in the same partition.
220    * The files in the same partition are merged together into bigger ones.
221    */
222   public class SweepPartition {
223 
224     private final CompactionPartitionId id;
225     private final Context context;
226     private boolean memstoreUpdated = false;
227     private boolean mergeSmall = false;
228     private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
229     private final List<Path> toBeDeleted = new ArrayList<Path>();
230 
231     public SweepPartition(CompactionPartitionId id, Context context) throws IOException {
232       this.id = id;
233       this.context = context;
234       memstore.setPartitionId(id);
235       init();
236     }
237 
238     public CompactionPartitionId getId() {
239       return this.id;
240     }
241 
242     /**
243      * Prepares the map of files.
244      *
245      * @throws IOException
246      */
247     private void init() throws IOException {
248       FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
249       if (null == fileStats) {
250         return;
251       }
252 
253       int smallFileCount = 0;
254       float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
255           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
256       long compactionMergeableSize = conf.getLong(
257           MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
258           MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
259       // list the files. Just merge the hfiles, don't merge the hfile links.
260       // prepare the map of mob files. The key is the file name, the value is the file status.
261       for (FileStatus fileStat : fileStats) {
262         MobFileStatus mobFileStatus = null;
263         if (!HFileLink.isHFileLink(fileStat.getPath())) {
264           mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
265           if (mobFileStatus.needMerge()) {
266             smallFileCount++;
267           }
268           // key is file name (not hfile name), value is hfile status.
269           fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
270         }
271       }
272       if (smallFileCount >= 2) {
273         // merge the files only when there're more than 1 files in the same partition.
274         this.mergeSmall = true;
275       }
276     }
277 
278     /**
279      * Flushes the data into mob files and store files, and archives the small
280      * files after they're merged.
281      * @throws IOException
282      */
283     public void close() throws IOException {
284       if (null == id) {
285         return;
286       }
287       // flush remain key values into mob files
288       if (memstoreUpdated) {
289         memstore.flushMemStore();
290       }
291       List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
292       // delete samll files after compaction
293       for (Path path : toBeDeleted) {
294         LOG.info("[In Partition close] Delete the file " + path + " in partition close");
295         storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
296       }
297       if (!storeFiles.isEmpty()) {
298         try {
299           MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
300               storeFiles);
301           context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
302         } catch (IOException e) {
303           LOG.error("Failed to archive the store files " + storeFiles, e);
304         }
305         storeFiles.clear();
306       }
307       fileStatusMap.clear();
308     }
309 
310     /**
311      * Merges the small mob files into bigger ones.
312      * @param fileName The current mob file name.
313      * @param values The collection of KeyValues in this mob file.
314      * @throws IOException
315      */
316     public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
317       if (null == values) {
318         return;
319       }
320       MobFileName mobFileName = MobFileName.create(fileName.toString());
321       LOG.info("[In reducer] The file name: " + fileName.toString());
322       MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
323       if (null == mobFileStat) {
324         LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
325         return;
326       }
327       // only handle the files that are older then one day.
328       if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
329           <= mobCompactionDelay) {
330         return;
331       }
332       // write the hfile name
333       writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
334       Set<KeyValue> kvs = new HashSet<KeyValue>();
335       for (KeyValue kv : values) {
336         if (kv.getValueLength() > Bytes.SIZEOF_INT) {
337           mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
338               Bytes.SIZEOF_INT));
339         }
340         kvs.add(kv.createKeyOnly(false));
341       }
342       // If the mob file is a invalid one or a small one, merge it into new/bigger ones.
343       if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
344         context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
345         MobFile file = MobFile.create(fs,
346             new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
347         StoreFileScanner scanner = null;
348         file.open();
349         try {
350           scanner = file.getScanner();
351           scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
352           Cell cell;
353           while (null != (cell = scanner.next())) {
354             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
355             KeyValue keyOnly = kv.createKeyOnly(false);
356             if (kvs.contains(keyOnly)) {
357               // write the KeyValue existing in HBase to the memstore.
358               memstore.addToMemstore(kv);
359               memstoreUpdated = true;
360             }
361           }
362         } finally {
363           if (scanner != null) {
364             scanner.close();
365           }
366           file.close();
367         }
368         toBeDeleted.add(mobFileStat.getFileStatus().getPath());
369       }
370     }
371 
372     /**
373      * Lists the files with the same prefix.
374      * @param p The file path.
375      * @param prefix The prefix.
376      * @return The files with the same prefix.
377      * @throws IOException
378      */
379     private FileStatus[] listStatus(Path p, String prefix) throws IOException {
380       return fs.listStatus(p, new PathPrefixFilter(prefix));
381     }
382   }
383 
384   static class PathPrefixFilter implements PathFilter {
385 
386     private final String prefix;
387 
388     public PathPrefixFilter(String prefix) {
389       this.prefix = prefix;
390     }
391 
392     public boolean accept(Path path) {
393       return path.getName().startsWith(prefix, 0);
394     }
395 
396   }
397 
398   /**
399    * Creates the partition id.
400    * @param fileNameAsString The current file name, in string.
401    * @return The partition id.
402    */
403   private CompactionPartitionId createPartitionId(String fileNameAsString) {
404     MobFileName fileName = MobFileName.create(fileNameAsString);
405     return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate());
406   }
407 
408   /**
409    * The mob file status used in the sweep reduecer.
410    */
411   private static class MobFileStatus {
412     private FileStatus fileStatus;
413     private int validSize;
414     private long size;
415 
416     private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
417     private long compactionMergeableSize =
418         MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
419 
420     /**
421      * @param fileStatus The current FileStatus.
422      * @param compactionRatio compactionRatio the invalid ratio.
423      * If there're too many cells deleted in a mob file, it's regarded as invalid,
424      * and needs to be written to a new one.
425      * If existingCellSize/fileSize < compactionRatio, it's regarded as a invalid one.
426      * @param compactionMergeableSize compactionMergeableSize If the size of a mob file is less
427      * than this value, it's regarded as a small file and needs to be merged
428      */
429     public MobFileStatus(FileStatus fileStatus, float compactionRatio,
430         long compactionMergeableSize) {
431       this.fileStatus = fileStatus;
432       this.size = fileStatus.getLen();
433       validSize = 0;
434       this.compactionRatio = compactionRatio;
435       this.compactionMergeableSize = compactionMergeableSize;
436     }
437 
438     /**
439      * Add size to this file.
440      * @param size The size to be added.
441      */
442     public void addValidSize(int size) {
443       this.validSize += size;
444     }
445 
446     /**
447      * Whether the mob files need to be cleaned.
448      * If there're too many cells deleted in this mob file, it needs to be cleaned.
449      * @return True if it needs to be cleaned.
450      */
451     public boolean needClean() {
452       return validSize < compactionRatio * size;
453     }
454 
455     /**
456      * Whether the mob files need to be merged.
457      * If this mob file is too small, it needs to be merged.
458      * @return True if it needs to be merged.
459      */
460     public boolean needMerge() {
461       return this.size < compactionMergeableSize;
462     }
463 
464     /**
465      * Gets the file status.
466      * @return The file status.
467      */
468     public FileStatus getFileStatus() {
469       return fileStatus;
470     }
471   }
472 }