1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23
24 import java.io.IOException;
25 import java.util.List;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
32 import org.apache.hadoop.hbase.testclassification.LargeTests;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.io.NullWritable;
40 import org.apache.hadoop.mapreduce.InputSplit;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.mapreduce.RecordReader;
43 import org.apache.hadoop.mapreduce.Reducer;
44 import org.apache.hadoop.mapreduce.TaskAttemptContext;
45 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
46 import org.junit.After;
47 import org.junit.Assert;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50
51 import com.google.common.collect.Lists;
52
53 @Category(LargeTests.class)
54 public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
55
56 private static final byte[] bbb = Bytes.toBytes("bbb");
57 private static final byte[] yyy = Bytes.toBytes("yyy");
58
59 @Override
60 protected byte[] getStartRow() {
61 return bbb;
62 }
63
64 @Override
65 protected byte[] getEndRow() {
66 return yyy;
67 }
68
69 @After
70 public void tearDown() throws Exception {
71 }
72
73 @Test
74 public void testGetBestLocations() throws IOException {
75 TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
76 Configuration conf = UTIL.getConfiguration();
77
78 HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
79 Assert.assertEquals(Lists.newArrayList(), tsif.getBestLocations(conf, blockDistribution));
80
81 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
82 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
83
84 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
85 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
86
87 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
88 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
89
90 blockDistribution = new HDFSBlocksDistribution();
91 blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
92 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
93 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
94 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
95 Assert.assertEquals(Lists.newArrayList("h1"), tsif.getBestLocations(conf, blockDistribution));
96
97 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
98 Assert.assertEquals(Lists.newArrayList("h1", "h2"),
99 tsif.getBestLocations(conf, blockDistribution));
100
101 blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
102 Assert.assertEquals(Lists.newArrayList("h2", "h1"),
103 tsif.getBestLocations(conf, blockDistribution));
104
105 blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
106 blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
107
108 Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
109 tsif.getBestLocations(conf, blockDistribution));
110 }
111
112 public static enum TestTableSnapshotCounters {
113 VALIDATION_ERROR
114 }
115
116 public static class TestTableSnapshotMapper
117 extends TableMapper<ImmutableBytesWritable, NullWritable> {
118 @Override
119 protected void map(ImmutableBytesWritable key, Result value,
120 Context context) throws IOException, InterruptedException {
121
122 verifyRowFromMap(key, value);
123 context.write(key, NullWritable.get());
124 }
125 }
126
127 public static class TestTableSnapshotReducer
128 extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
129 HBaseTestingUtility.SeenRowTracker rowTracker =
130 new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
131 @Override
132 protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
133 Context context) throws IOException, InterruptedException {
134 rowTracker.addRow(key.get());
135 }
136
137 @Override
138 protected void cleanup(Context context) throws IOException,
139 InterruptedException {
140 rowTracker.validate();
141 }
142 }
143
144 @Test
145 public void testInitTableSnapshotMapperJobConfig() throws Exception {
146 setupCluster();
147 TableName tableName = TableName.valueOf("testInitTableSnapshotMapperJobConfig");
148 String snapshotName = "foo";
149
150 try {
151 createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
152 Job job = new Job(UTIL.getConfiguration());
153 Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
154
155 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
156 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
157 NullWritable.class, job, false, tmpTableDir);
158
159
160
161 Assert.assertEquals(
162 "Snapshot job should be configured for default LruBlockCache.",
163 HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
164 job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
165 Assert.assertEquals(
166 "Snapshot job should not use BucketCache.",
167 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
168 } finally {
169 UTIL.getHBaseAdmin().deleteSnapshot(snapshotName);
170 UTIL.deleteTable(tableName);
171 tearDownCluster();
172 }
173 }
174
175 @Override
176 public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
177 String snapshotName, Path tmpTableDir) throws Exception {
178 Job job = new Job(UTIL.getConfiguration());
179 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
180 new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
181 NullWritable.class, job, false, tmpTableDir);
182 }
183
184 @Override
185 public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
186 int numRegions, int expectedNumSplits) throws Exception {
187 setupCluster();
188 TableName tableName = TableName.valueOf("testWithMockedMapReduce");
189 try {
190 createTableAndSnapshot(
191 util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
192
193 Job job = new Job(util.getConfiguration());
194 Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
195 Scan scan = new Scan(getStartRow(), getEndRow());
196
197 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
198 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
199 NullWritable.class, job, false, tmpTableDir);
200
201 verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
202
203 } finally {
204 util.getHBaseAdmin().deleteSnapshot(snapshotName);
205 util.deleteTable(tableName);
206 tearDownCluster();
207 }
208 }
209
210 private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
211 byte[] startRow, byte[] stopRow)
212 throws IOException, InterruptedException {
213 TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
214 List<InputSplit> splits = tsif.getSplits(job);
215
216 Assert.assertEquals(expectedNumSplits, splits.size());
217
218 HBaseTestingUtility.SeenRowTracker rowTracker =
219 new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
220
221 for (int i = 0; i < splits.size(); i++) {
222
223 InputSplit split = splits.get(i);
224 Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
225
226
227 TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
228 when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
229 RecordReader<ImmutableBytesWritable, Result> rr =
230 tsif.createRecordReader(split, taskAttemptContext);
231 rr.initialize(split, taskAttemptContext);
232
233
234 while (rr.nextKeyValue()) {
235 byte[] row = rr.getCurrentKey().get();
236 verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
237 rowTracker.addRow(row);
238 }
239
240 rr.close();
241 }
242
243
244 rowTracker.validate();
245 }
246
247 @Override
248 protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
249 String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
250 boolean shutdownCluster) throws Exception {
251 doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
252 numRegions, expectedNumSplits, shutdownCluster);
253 }
254
255
256 public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
257 String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
258 int expectedNumSplits, boolean shutdownCluster) throws Exception {
259
260
261 createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
262
263 if (shutdownCluster) {
264 util.shutdownMiniHBaseCluster();
265 }
266
267 try {
268
269 Job job = new Job(util.getConfiguration());
270 Scan scan = new Scan(startRow, endRow);
271
272 job.setJarByClass(util.getClass());
273 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
274 TestTableSnapshotInputFormat.class);
275
276 TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
277 scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
278 NullWritable.class, job, true, tableDir);
279
280 job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
281 job.setNumReduceTasks(1);
282 job.setOutputFormatClass(NullOutputFormat.class);
283
284 Assert.assertTrue(job.waitForCompletion(true));
285 } finally {
286 if (!shutdownCluster) {
287 util.getHBaseAdmin().deleteSnapshot(snapshotName);
288 util.deleteTable(tableName);
289 }
290 }
291 }
292 }