1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mob.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Matchers.any;
22 import static org.mockito.Mockito.doAnswer;
23 import static org.mockito.Mockito.mock;
24 import static org.mockito.Mockito.when;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HBaseTestingUtility;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.testclassification.SmallTests;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.master.TableLockManager;
35 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
36 import org.apache.hadoop.hbase.mob.MobUtils;
37 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
40 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
41 import org.apache.hadoop.io.Text;
42 import org.apache.hadoop.mapreduce.Mapper;
43 import org.junit.AfterClass;
44 import org.junit.BeforeClass;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47 import org.mockito.invocation.InvocationOnMock;
48 import org.mockito.stubbing.Answer;
49
50 @Category(SmallTests.class)
51 public class TestMobSweepMapper {
52
53 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54
55 @BeforeClass
56 public static void setUpBeforeClass() throws Exception {
57 TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
58 TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
59 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
60 TEST_UTIL.startMiniCluster(1);
61 }
62
63 @AfterClass
64 public static void tearDownAfterClass() throws Exception {
65 TEST_UTIL.shutdownMiniCluster();
66 }
67
68 @Test
69 public void TestMap() throws Exception {
70 String prefix = "0000";
71 final String fileName = "19691231f2cd014ea28f42788214560a21a44cef";
72 final String mobFilePath = prefix + fileName;
73
74 ImmutableBytesWritable r = new ImmutableBytesWritable(Bytes.toBytes("r"));
75 final KeyValue[] kvList = new KeyValue[1];
76 kvList[0] = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"),
77 Bytes.toBytes("column"), Bytes.toBytes(mobFilePath));
78
79 Result columns = mock(Result.class);
80 when(columns.rawCells()).thenReturn(kvList);
81
82 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
83 ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
84 TableName tn = TableName.valueOf("testSweepMapper");
85 TableName lockName = MobUtils.getTableLockName(tn);
86 String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
87 configuration.set(SweepJob.SWEEP_JOB_ID, "1");
88 configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
89 ServerName serverName = SweepJob.getCurrentServerName(configuration);
90 configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
91
92 TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
93 serverName);
94 TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
95 lock.acquire();
96 try {
97 Mapper<ImmutableBytesWritable, Result, Text, KeyValue>.Context ctx =
98 mock(Mapper.Context.class);
99 when(ctx.getConfiguration()).thenReturn(configuration);
100 SweepMapper map = new SweepMapper();
101 doAnswer(new Answer<Void>() {
102
103 @Override
104 public Void answer(InvocationOnMock invocation) throws Throwable {
105 Text text = (Text) invocation.getArguments()[0];
106 KeyValue kv = (KeyValue) invocation.getArguments()[1];
107
108 assertEquals(Bytes.toString(text.getBytes(), 0, text.getLength()), fileName);
109 assertEquals(0, Bytes.compareTo(kv.getKey(), kvList[0].getKey()));
110
111 return null;
112 }
113 }).when(ctx).write(any(Text.class), any(KeyValue.class));
114
115 map.map(r, columns, ctx);
116 } finally {
117 lock.release();
118 }
119 }
120 }