1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.testclassification.MediumTests;
41 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
42 import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
43 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44 import org.apache.hadoop.hbase.util.FSUtils;
45 import org.apache.hadoop.hbase.wal.WAL;
46 import org.apache.hadoop.hbase.wal.WALFactory;
47 import org.apache.hadoop.hbase.wal.WALKey;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.mapreduce.InputSplit;
50 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56
57
58
59
60 @Category(MediumTests.class)
61 public class TestWALRecordReader {
62 private final Log LOG = LogFactory.getLog(getClass());
63 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64 private static Configuration conf;
65 private static FileSystem fs;
66 private static Path hbaseDir;
67 private static FileSystem walFs;
68 private static Path walRootDir;
69
70 static final TableName tableName = TableName.valueOf(getName());
71 private static final byte [] rowName = tableName.getName();
72
73 static final HRegionInfo info = new HRegionInfo(tableName,
74 Bytes.toBytes(""), Bytes.toBytes(""), false);
75 private static final byte [] family = Bytes.toBytes("column");
76 private static final byte [] value = Bytes.toBytes("value");
77 private static HTableDescriptor htd;
78 private static Path logDir;
79
80 private static String getName() {
81 return "TestWALRecordReader";
82 }
83
84 @Before
85 public void setUp() throws Exception {
86 fs.delete(hbaseDir, true);
87 walFs.delete(walRootDir, true);
88
89 }
90 @BeforeClass
91 public static void setUpBeforeClass() throws Exception {
92
93 conf = TEST_UTIL.getConfiguration();
94 conf.setInt("dfs.blocksize", 1024 * 1024);
95 conf.setInt("dfs.replication", 1);
96 TEST_UTIL.startMiniDFSCluster(1);
97
98 conf = TEST_UTIL.getConfiguration();
99 fs = TEST_UTIL.getDFSCluster().getFileSystem();
100
101 hbaseDir = TEST_UTIL.createRootDir();
102
103 walRootDir = TEST_UTIL.createWALRootDir();
104 walFs = FSUtils.getWALFileSystem(conf);
105 logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
106
107 htd = new HTableDescriptor(tableName);
108 htd.addFamily(new HColumnDescriptor(family));
109 }
110
111 @AfterClass
112 public static void tearDownAfterClass() throws Exception {
113 fs.delete(hbaseDir, true);
114 walFs.delete(walRootDir, true);
115 TEST_UTIL.shutdownMiniCluster();
116 }
117
118
119
120
121
122 @Test
123 public void testPartialRead() throws Exception {
124 final WALFactory walfactory = new WALFactory(conf, null, getName());
125 WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
126
127
128 long ts = System.currentTimeMillis();
129 WALEdit edit = new WALEdit();
130 final AtomicLong sequenceId = new AtomicLong(0);
131 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
132 log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
133 edit = new WALEdit();
134 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
135 log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
136 true, null);
137 log.sync();
138 LOG.info("Before 1st WAL roll " + log.toString());
139 log.rollWriter();
140 LOG.info("Past 1st WAL roll " + log.toString());
141
142 Thread.sleep(1);
143 long ts1 = System.currentTimeMillis();
144
145 edit = new WALEdit();
146 edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
147 log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
148 true, null);
149 edit = new WALEdit();
150 edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
151 log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
152 true, null);
153 log.sync();
154 log.shutdown();
155 walfactory.shutdown();
156 LOG.info("Closed WAL " + log.toString());
157
158
159 WALInputFormat input = new WALInputFormat();
160 Configuration jobConf = new Configuration(conf);
161 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
162 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
163
164
165 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
166
167 assertEquals(1, splits.size());
168 testSplit(splits.get(0), Bytes.toBytes("1"));
169
170 jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
171 jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
172 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
173
174 assertEquals(2, splits.size());
175
176 testSplit(splits.get(0), Bytes.toBytes("2"));
177
178 testSplit(splits.get(1), Bytes.toBytes("3"));
179 }
180
181
182
183
184
185 @Test
186 public void testWALRecordReader() throws Exception {
187 final WALFactory walfactory = new WALFactory(conf, null, getName());
188 WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
189 byte [] value = Bytes.toBytes("value");
190 final AtomicLong sequenceId = new AtomicLong(0);
191 WALEdit edit = new WALEdit();
192 edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
193 System.currentTimeMillis(), value));
194 long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
195 null);
196 log.sync(txid);
197
198 Thread.sleep(1);
199 long secondTs = System.currentTimeMillis();
200 log.rollWriter();
201
202 edit = new WALEdit();
203 edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
204 System.currentTimeMillis(), value));
205 txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
206 null);
207 log.sync(txid);
208 log.shutdown();
209 walfactory.shutdown();
210 long thirdTs = System.currentTimeMillis();
211
212
213 WALInputFormat input = new WALInputFormat();
214 Configuration jobConf = new Configuration(conf);
215 jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
216
217
218 List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
219 assertEquals(2, splits.size());
220
221
222 testSplit(splits.get(0), Bytes.toBytes("1"));
223
224 testSplit(splits.get(1), Bytes.toBytes("2"));
225
226
227
228
229 jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
230 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
231 assertEquals(1, splits.size());
232 testSplit(splits.get(0), Bytes.toBytes("1"));
233
234
235 jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
236 jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
237 splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
238
239 assertEquals(2, splits.size());
240
241 testSplit(splits.get(0));
242 testSplit(splits.get(1));
243 }
244
245 protected WALKey getWalKey(final long sequenceid) {
246 return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
247 }
248
249 protected WALRecordReader getReader() {
250 return new WALKeyRecordReader();
251 }
252
253
254
255
256 private void testSplit(InputSplit split, byte[]... columns) throws Exception {
257 final WALRecordReader reader = getReader();
258 reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
259
260 for (byte[] column : columns) {
261 assertTrue(reader.nextKeyValue());
262 Cell cell = reader.getCurrentValue().getCells().get(0);
263 if (!Bytes.equals(column, cell.getQualifier())) {
264 assertTrue("expected [" + Bytes.toString(column) + "], actual ["
265 + Bytes.toString(cell.getQualifier()) + "]", false);
266 }
267 }
268 assertFalse(reader.nextKeyValue());
269 reader.close();
270 }
271
272 }