1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.mapreduce;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.UUID;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FSDataOutputStream;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.fs.PathFilter;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.InvalidFamilyOperationException;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.KeyValueUtil;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.client.Admin;
47 import org.apache.hadoop.hbase.client.BufferedMutator;
48 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.io.HFileLink;
52 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
53 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
54 import org.apache.hadoop.hbase.mob.MobConstants;
55 import org.apache.hadoop.hbase.mob.MobFile;
56 import org.apache.hadoop.hbase.mob.MobFileName;
57 import org.apache.hadoop.hbase.mob.MobUtils;
58 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
59 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
60 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
61 import org.apache.hadoop.hbase.regionserver.BloomType;
62 import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
63 import org.apache.hadoop.hbase.regionserver.StoreFile;
64 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.FSUtils;
67 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68 import org.apache.hadoop.io.IOUtils;
69 import org.apache.hadoop.io.SequenceFile;
70 import org.apache.hadoop.io.SequenceFile.CompressionType;
71 import org.apache.hadoop.io.Text;
72 import org.apache.hadoop.io.Writable;
73 import org.apache.hadoop.mapreduce.Reducer;
74 import org.apache.zookeeper.KeeperException;
75
76
77
78
79
80
81
82
83
84
85
86
87 @InterfaceAudience.Private
88 public class SweepReducer extends Reducer<Text, KeyValue, Writable, Writable> {
89
90 private static final Log LOG = LogFactory.getLog(SweepReducer.class);
91
92 private SequenceFile.Writer writer = null;
93 private MemStoreWrapper memstore;
94 private Configuration conf;
95 private FileSystem fs;
96
97 private Path familyDir;
98 private CacheConfig cacheConfig;
99 private long compactionBegin;
100 private BufferedMutator table;
101 private HColumnDescriptor family;
102 private long mobCompactionDelay;
103 private Path mobTableDir;
104
105 @Override
106 protected void setup(Context context) throws IOException, InterruptedException {
107 this.conf = context.getConfiguration();
108 Connection c = ConnectionFactory.createConnection(this.conf);
109 this.fs = FileSystem.get(conf);
110
111 mobCompactionDelay = conf.getLong(SweepJob.MOB_SWEEP_JOB_DELAY, SweepJob.ONE_DAY);
112 String tableName = conf.get(TableInputFormat.INPUT_TABLE);
113 String familyName = conf.get(TableInputFormat.SCAN_COLUMN_FAMILY);
114 TableName tn = TableName.valueOf(tableName);
115 this.familyDir = MobUtils.getMobFamilyPath(conf, tn, familyName);
116 Admin admin = c.getAdmin();
117 try {
118 family = admin.getTableDescriptor(tn).getFamily(Bytes.toBytes(familyName));
119 if (family == null) {
120
121 throw new InvalidFamilyOperationException("Column family '" + familyName
122 + "' does not exist. It might be removed.");
123 }
124 } finally {
125 try {
126 admin.close();
127 } catch (IOException e) {
128 LOG.warn("Failed to close the HBaseAdmin", e);
129 }
130 }
131
132 Configuration copyOfConf = new Configuration(conf);
133 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
134 this.cacheConfig = new CacheConfig(copyOfConf);
135
136 table = c.getBufferedMutator(new BufferedMutatorParams(tn).writeBufferSize(1*1024*1024));
137 memstore = new MemStoreWrapper(context, fs, table, family, new DefaultMemStore(), cacheConfig);
138
139
140
141
142 this.compactionBegin = conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE, 0);
143 mobTableDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), tn);
144 }
145
146 private SweepPartition createPartition(CompactionPartitionId id, Context context)
147 throws IOException {
148 return new SweepPartition(id, context);
149 }
150
151 @Override
152 public void run(Context context) throws IOException, InterruptedException {
153 String jobId = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
154 String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
155 String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
156 ZooKeeperWatcher zkw = new ZooKeeperWatcher(context.getConfiguration(), jobId,
157 new DummyMobAbortable());
158 FSDataOutputStream fout = null;
159 try {
160 SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
161 tracker.start();
162 setup(context);
163
164 String dir = this.conf.get(SweepJob.WORKING_VISITED_DIR_KEY);
165 Path nameFilePath = new Path(dir, UUID.randomUUID().toString()
166 .replace("-", MobConstants.EMPTY_STRING));
167 fout = fs.create(nameFilePath, true);
168 writer = SequenceFile.createWriter(context.getConfiguration(), fout, String.class,
169 String.class, CompressionType.NONE, null);
170 CompactionPartitionId id;
171 SweepPartition partition = null;
172
173 while (context.nextKey()) {
174 Text key = context.getCurrentKey();
175 String keyString = key.toString();
176 id = createPartitionId(keyString);
177 if (null == partition || !id.equals(partition.getId())) {
178
179 if (null != partition) {
180
181
182 partition.close();
183 }
184
185 partition = createPartition(id, context);
186 }
187 if (partition != null) {
188
189 partition.execute(key, context.getValues());
190 }
191 }
192 if (null != partition) {
193 partition.close();
194 }
195 writer.hflush();
196 } catch (KeeperException e) {
197 throw new IOException(e);
198 } finally {
199 cleanup(context);
200 zkw.close();
201 if (writer != null) {
202 IOUtils.closeStream(writer);
203 }
204 if (fout != null) {
205 IOUtils.closeStream(fout);
206 }
207 if (table != null) {
208 try {
209 table.close();
210 } catch (IOException e) {
211 LOG.warn(e);
212 }
213 }
214 }
215
216 }
217
218
219
220
221
222 public class SweepPartition {
223
224 private final CompactionPartitionId id;
225 private final Context context;
226 private boolean memstoreUpdated = false;
227 private boolean mergeSmall = false;
228 private final Map<String, MobFileStatus> fileStatusMap = new HashMap<String, MobFileStatus>();
229 private final List<Path> toBeDeleted = new ArrayList<Path>();
230
231 public SweepPartition(CompactionPartitionId id, Context context) throws IOException {
232 this.id = id;
233 this.context = context;
234 memstore.setPartitionId(id);
235 init();
236 }
237
238 public CompactionPartitionId getId() {
239 return this.id;
240 }
241
242
243
244
245
246
247 private void init() throws IOException {
248 FileStatus[] fileStats = listStatus(familyDir, id.getStartKey());
249 if (null == fileStats) {
250 return;
251 }
252
253 int smallFileCount = 0;
254 float compactionRatio = conf.getFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO,
255 MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO);
256 long compactionMergeableSize = conf.getLong(
257 MobConstants.MOB_SWEEP_TOOL_COMPACTION_MERGEABLE_SIZE,
258 MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE);
259
260
261 for (FileStatus fileStat : fileStats) {
262 MobFileStatus mobFileStatus = null;
263 if (!HFileLink.isHFileLink(fileStat.getPath())) {
264 mobFileStatus = new MobFileStatus(fileStat, compactionRatio, compactionMergeableSize);
265 if (mobFileStatus.needMerge()) {
266 smallFileCount++;
267 }
268
269 fileStatusMap.put(fileStat.getPath().getName(), mobFileStatus);
270 }
271 }
272 if (smallFileCount >= 2) {
273
274 this.mergeSmall = true;
275 }
276 }
277
278
279
280
281
282
283 public void close() throws IOException {
284 if (null == id) {
285 return;
286 }
287
288 if (memstoreUpdated) {
289 memstore.flushMemStore();
290 }
291 List<StoreFile> storeFiles = new ArrayList<StoreFile>(toBeDeleted.size());
292
293 for (Path path : toBeDeleted) {
294 LOG.info("[In Partition close] Delete the file " + path + " in partition close");
295 storeFiles.add(new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE));
296 }
297 if (!storeFiles.isEmpty()) {
298 try {
299 MobUtils.removeMobFiles(conf, fs, table.getName(), mobTableDir, family.getName(),
300 storeFiles);
301 context.getCounter(SweepCounter.FILE_TO_BE_MERGE_OR_CLEAN).increment(storeFiles.size());
302 } catch (IOException e) {
303 LOG.error("Failed to archive the store files " + storeFiles, e);
304 }
305 storeFiles.clear();
306 }
307 fileStatusMap.clear();
308 }
309
310
311
312
313
314
315
316 public void execute(Text fileName, Iterable<KeyValue> values) throws IOException {
317 if (null == values) {
318 return;
319 }
320 MobFileName mobFileName = MobFileName.create(fileName.toString());
321 LOG.info("[In reducer] The file name: " + fileName.toString());
322 MobFileStatus mobFileStat = fileStatusMap.get(mobFileName.getFileName());
323 if (null == mobFileStat) {
324 LOG.info("[In reducer] Cannot find the file, probably this record is obsolete");
325 return;
326 }
327
328 if (compactionBegin - mobFileStat.getFileStatus().getModificationTime()
329 <= mobCompactionDelay) {
330 return;
331 }
332
333 writer.append(mobFileName.getFileName(), MobConstants.EMPTY_STRING);
334 Set<KeyValue> kvs = new HashSet<KeyValue>();
335 for (KeyValue kv : values) {
336 if (kv.getValueLength() > Bytes.SIZEOF_INT) {
337 mobFileStat.addValidSize(Bytes.toInt(kv.getValueArray(), kv.getValueOffset(),
338 Bytes.SIZEOF_INT));
339 }
340 kvs.add(kv.createKeyOnly(false));
341 }
342
343 if (mobFileStat.needClean() || (mergeSmall && mobFileStat.needMerge())) {
344 context.getCounter(SweepCounter.INPUT_FILE_COUNT).increment(1);
345 MobFile file = MobFile.create(fs,
346 new Path(familyDir, mobFileName.getFileName()), conf, cacheConfig);
347 StoreFileScanner scanner = null;
348 file.open();
349 try {
350 scanner = file.getScanner();
351 scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_BYTE_ARRAY));
352 Cell cell;
353 while (null != (cell = scanner.next())) {
354 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
355 KeyValue keyOnly = kv.createKeyOnly(false);
356 if (kvs.contains(keyOnly)) {
357
358 memstore.addToMemstore(kv);
359 memstoreUpdated = true;
360 }
361 }
362 } finally {
363 if (scanner != null) {
364 scanner.close();
365 }
366 file.close();
367 }
368 toBeDeleted.add(mobFileStat.getFileStatus().getPath());
369 }
370 }
371
372
373
374
375
376
377
378
379 private FileStatus[] listStatus(Path p, String prefix) throws IOException {
380 return fs.listStatus(p, new PathPrefixFilter(prefix));
381 }
382 }
383
384 static class PathPrefixFilter implements PathFilter {
385
386 private final String prefix;
387
388 public PathPrefixFilter(String prefix) {
389 this.prefix = prefix;
390 }
391
392 public boolean accept(Path path) {
393 return path.getName().startsWith(prefix, 0);
394 }
395
396 }
397
398
399
400
401
402
403 private CompactionPartitionId createPartitionId(String fileNameAsString) {
404 MobFileName fileName = MobFileName.create(fileNameAsString);
405 return new CompactionPartitionId(fileName.getStartKey(), fileName.getDate());
406 }
407
408
409
410
411 private static class MobFileStatus {
412 private FileStatus fileStatus;
413 private int validSize;
414 private long size;
415
416 private float compactionRatio = MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_RATIO;
417 private long compactionMergeableSize =
418 MobConstants.DEFAULT_SWEEP_TOOL_MOB_COMPACTION_MERGEABLE_SIZE;
419
420
421
422
423
424
425
426
427
428
429 public MobFileStatus(FileStatus fileStatus, float compactionRatio,
430 long compactionMergeableSize) {
431 this.fileStatus = fileStatus;
432 this.size = fileStatus.getLen();
433 validSize = 0;
434 this.compactionRatio = compactionRatio;
435 this.compactionMergeableSize = compactionMergeableSize;
436 }
437
438
439
440
441
442 public void addValidSize(int size) {
443 this.validSize += size;
444 }
445
446
447
448
449
450
451 public boolean needClean() {
452 return validSize < compactionRatio * size;
453 }
454
455
456
457
458
459
460 public boolean needMerge() {
461 return this.size < compactionMergeableSize;
462 }
463
464
465
466
467
468 public FileStatus getFileStatus() {
469 return fileStatus;
470 }
471 }
472 }