1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mob.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.TreeSet;
28
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.CommonConfigurationKeys;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.HTableDescriptor;
37 import org.apache.hadoop.hbase.KeyValue;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.*;
41 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
42 import org.apache.hadoop.hbase.master.TableLockManager;
43 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
44 import org.apache.hadoop.hbase.mob.MobConstants;
45 import org.apache.hadoop.hbase.mob.MobUtils;
46 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
47 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
48 import org.apache.hadoop.hbase.testclassification.MediumTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
51 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
52 import org.apache.hadoop.io.SequenceFile;
53 import org.apache.hadoop.io.Text;
54 import org.apache.hadoop.io.Writable;
55 import org.apache.hadoop.io.serializer.JavaSerialization;
56 import org.apache.hadoop.mapreduce.Counter;
57 import org.apache.hadoop.mapreduce.Reducer;
58 import org.apache.hadoop.mapreduce.counters.GenericCounter;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.Before;
62 import org.junit.BeforeClass;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.mockito.Matchers;
66
67 @Category(MediumTests.class)
68 public class TestMobSweepReducer {
69
70 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71 private final static String tableName = "testSweepReducer";
72 private final static String row = "row";
73 private final static String family = "family";
74 private final static String qf = "qf";
75 private static BufferedMutator table;
76 private static Admin admin;
77
78 @BeforeClass
79 public static void setUpBeforeClass() throws Exception {
80 TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
81 TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
82
83 TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
84
85 TEST_UTIL.startMiniCluster(1);
86 }
87
88 @AfterClass
89 public static void tearDownAfterClass() throws Exception {
90 TEST_UTIL.shutdownMiniCluster();
91 }
92
93 @SuppressWarnings("deprecation")
94 @Before
95 public void setUp() throws Exception {
96 HTableDescriptor desc = new HTableDescriptor(tableName);
97 HColumnDescriptor hcd = new HColumnDescriptor(family);
98 hcd.setMobEnabled(true);
99 hcd.setMobThreshold(3L);
100 hcd.setMaxVersions(4);
101 desc.addFamily(hcd);
102
103 admin = TEST_UTIL.getHBaseAdmin();
104 admin.createTable(desc);
105 table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
106 .getBufferedMutator(TableName.valueOf(tableName));
107 }
108
109 @After
110 public void tearDown() throws Exception {
111 admin.disableTable(TableName.valueOf(tableName));
112 admin.deleteTable(TableName.valueOf(tableName));
113 admin.close();
114 }
115
116 private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
117 Configuration conf) throws Exception {
118 List<String> list = new ArrayList<String>();
119 SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
120
121 String next = (String) reader.next((String) null);
122 while (next != null) {
123 list.add(next);
124 next = (String) reader.next((String) null);
125 }
126 reader.close();
127 return list;
128 }
129
130 @Test
131 public void testRun() throws Exception {
132
133 TableName tn = TableName.valueOf(tableName);
134 byte[] mobValueBytes = new byte[100];
135
136
137 Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
138
139 Put put = new Put(Bytes.toBytes(row));
140 put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
141 Put put2 = new Put(Bytes.toBytes(row + "ignore"));
142 put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
143 table.mutate(put);
144 table.mutate(put2);
145 table.flush();
146 admin.flush(tn);
147
148 FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
149
150 assertEquals(1, fileStatuses.length);
151
152 String mobFile1 = fileStatuses[0].getPath().getName();
153
154 Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
155 configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
156 configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
157 configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
158 configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
159 configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
160 configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
161 JavaSerialization.class.getName());
162 configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
163 configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
164 System.currentTimeMillis() + 24 * 3600 * 1000);
165
166 ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
167 TableName lockName = MobUtils.getTableLockName(tn);
168 String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
169 configuration.set(SweepJob.SWEEP_JOB_ID, "1");
170 configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
171 ServerName serverName = SweepJob.getCurrentServerName(configuration);
172 configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
173
174 TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
175 serverName);
176 TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
177 lock.acquire();
178 try {
179
180 Counter counter = new GenericCounter();
181 Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
182 when(ctx.getConfiguration()).thenReturn(configuration);
183 when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
184 when(ctx.nextKey()).thenReturn(true).thenReturn(false);
185 when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
186
187 byte[] refBytes = Bytes.toBytes(mobFile1);
188 long valueLength = refBytes.length;
189 byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
190 KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
191 KeyValue.Type.Put, newValue);
192 List<KeyValue> list = new ArrayList<KeyValue>();
193 list.add(kv2);
194
195 when(ctx.getValues()).thenReturn(list);
196
197 SweepReducer reducer = new SweepReducer();
198 reducer.run(ctx);
199 } finally {
200 lock.release();
201 }
202 FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
203 String mobFile2 = filsStatuses2[0].getPath().getName();
204
205 assertEquals(1, filsStatuses2.length);
206 assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
207
208
209 String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
210 FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
211 Set<String> files = new TreeSet<String>();
212 for (FileStatus st : statuses) {
213 files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
214 st.getPath(), configuration));
215 }
216 assertEquals(1, files.size());
217 assertEquals(true, files.contains(mobFile1));
218 }
219 }