1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.Cell;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.KeyValueUtil;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29 import org.apache.hadoop.hbase.mapreduce.TableMapper;
30 import org.apache.hadoop.hbase.mob.MobUtils;
31 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
32 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
33 import org.apache.hadoop.io.Text;
34 import org.apache.zookeeper.KeeperException;
35
36
37
38
39
40
41 @InterfaceAudience.Private
42 public class SweepMapper extends TableMapper<Text, KeyValue> {
43
44 private ZooKeeperWatcher zkw = null;
45
46 @Override
47 protected void setup(Context context) throws IOException,
48 InterruptedException {
49 String id = context.getConfiguration().get(SweepJob.SWEEP_JOB_ID);
50 String owner = context.getConfiguration().get(SweepJob.SWEEP_JOB_SERVERNAME);
51 String sweeperNode = context.getConfiguration().get(SweepJob.SWEEP_JOB_TABLE_NODE);
52 zkw = new ZooKeeperWatcher(context.getConfiguration(), id,
53 new DummyMobAbortable());
54 try {
55 SweepJobNodeTracker tracker = new SweepJobNodeTracker(zkw, sweeperNode, owner);
56 tracker.start();
57 } catch (KeeperException e) {
58 throw new IOException(e);
59 }
60 }
61
62 @Override
63 protected void cleanup(Context context) throws IOException,
64 InterruptedException {
65 if (zkw != null) {
66 zkw.close();
67 }
68 }
69
70 @Override
71 public void map(ImmutableBytesWritable r, Result columns, Context context) throws IOException,
72 InterruptedException {
73 if (columns == null) {
74 return;
75 }
76 Cell[] cells = columns.rawCells();
77 if (cells == null || cells.length == 0) {
78 return;
79 }
80 for (Cell c : cells) {
81 if (MobUtils.hasValidMobRefCellValue(c)) {
82 String fileName = MobUtils.getMobFileName(c);
83 context.write(new Text(fileName), KeyValueUtil.ensureKeyValue(c));
84 }
85 }
86 }
87 }