1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21
22 import java.io.IOException;
23 import java.util.Arrays;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
40 import org.apache.hadoop.hbase.testclassification.LargeTests;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.mapreduce.Counters;
43 import org.junit.AfterClass;
44 import org.junit.Assert;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 import com.google.common.base.Throwables;
50
51
52
53
54 @Category(LargeTests.class)
55 public class TestSyncTable {
56
57 private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
58
59 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60
61 @BeforeClass
62 public static void beforeClass() throws Exception {
63 TEST_UTIL.startMiniCluster(3);
64 TEST_UTIL.startMiniMapReduceCluster();
65 }
66
67 @AfterClass
68 public static void afterClass() throws Exception {
69 TEST_UTIL.shutdownMiniMapReduceCluster();
70 TEST_UTIL.shutdownMiniCluster();
71 }
72
73 private static byte[][] generateSplits(int numRows, int numRegions) {
74 byte[][] splitRows = new byte[numRegions-1][];
75 for (int i = 1; i < numRegions; i++) {
76 splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
77 }
78 return splitRows;
79 }
80
81 @Test
82 public void testSyncTable() throws Exception {
83 String sourceTableName = "testSourceTable";
84 String targetTableName = "testTargetTable";
85 Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
86
87 writeTestData(sourceTableName, targetTableName);
88 hashSourceTable(sourceTableName, testDir);
89 Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
90 assertEqualTables(90, sourceTableName, targetTableName);
91
92 assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
93 assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
94 assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
95 assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
96 assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
97 assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
98
99 TEST_UTIL.deleteTable(sourceTableName);
100 TEST_UTIL.deleteTable(targetTableName);
101 TEST_UTIL.cleanupDataTestDirOnTestFS();
102 }
103
104 private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName)
105 throws Exception {
106 Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
107 Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
108
109 ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
110 ResultScanner targetScanner = targetTable.getScanner(new Scan());
111
112 for (int i = 0; i < expectedRows; i++) {
113 Result sourceRow = sourceScanner.next();
114 Result targetRow = targetScanner.next();
115
116 LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
117 + " cells:" + sourceRow);
118 LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
119 + " cells:" + targetRow);
120
121 if (sourceRow == null) {
122 Assert.fail("Expected " + expectedRows
123 + " source rows but only found " + i);
124 }
125 if (targetRow == null) {
126 Assert.fail("Expected " + expectedRows
127 + " target rows but only found " + i);
128 }
129 Cell[] sourceCells = sourceRow.rawCells();
130 Cell[] targetCells = targetRow.rawCells();
131 if (sourceCells.length != targetCells.length) {
132 LOG.debug("Source cells: " + Arrays.toString(sourceCells));
133 LOG.debug("Target cells: " + Arrays.toString(targetCells));
134 Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
135 + " has " + sourceCells.length
136 + " cells in source table but " + targetCells.length
137 + " cells in target table");
138 }
139 for (int j = 0; j < sourceCells.length; j++) {
140 Cell sourceCell = sourceCells[j];
141 Cell targetCell = targetCells[j];
142 try {
143 if (!CellUtil.matchingRow(sourceCell, targetCell)) {
144 Assert.fail("Rows don't match");
145 }
146 if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
147 Assert.fail("Families don't match");
148 }
149 if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
150 Assert.fail("Qualifiers don't match");
151 }
152 if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
153 Assert.fail("Timestamps don't match");
154 }
155 if (!CellUtil.matchingValue(sourceCell, targetCell)) {
156 Assert.fail("Values don't match");
157 }
158 } catch (Throwable t) {
159 LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
160 Throwables.propagate(t);
161 }
162 }
163 }
164 Result sourceRow = sourceScanner.next();
165 if (sourceRow != null) {
166 Assert.fail("Source table has more than " + expectedRows
167 + " rows. Next row: " + Bytes.toInt(sourceRow.getRow()));
168 }
169 Result targetRow = targetScanner.next();
170 if (targetRow != null) {
171 Assert.fail("Target table has more than " + expectedRows
172 + " rows. Next row: " + Bytes.toInt(targetRow.getRow()));
173 }
174 sourceScanner.close();
175 targetScanner.close();
176 sourceTable.close();
177 targetTable.close();
178 }
179
180 private Counters syncTables(String sourceTableName, String targetTableName,
181 Path testDir) throws Exception {
182 SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
183 int code = syncTable.run(new String[] {
184 testDir.toString(),
185 sourceTableName,
186 targetTableName
187 });
188 assertEquals("sync table job failed", 0, code);
189
190 LOG.info("Sync tables completed");
191 return syncTable.counters;
192 }
193
194 private void hashSourceTable(String sourceTableName, Path testDir)
195 throws Exception, IOException {
196 int numHashFiles = 3;
197 long batchSize = 100;
198 int scanBatch = 1;
199 HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
200 int code = hashTable.run(new String[] {
201 "--batchsize=" + batchSize,
202 "--numhashfiles=" + numHashFiles,
203 "--scanbatch=" + scanBatch,
204 sourceTableName,
205 testDir.toString()});
206 assertEquals("hash table job failed", 0, code);
207
208 FileSystem fs = TEST_UTIL.getTestFileSystem();
209
210 HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
211 assertEquals(sourceTableName, tableHash.tableName);
212 assertEquals(batchSize, tableHash.batchSize);
213 assertEquals(numHashFiles, tableHash.numHashFiles);
214 assertEquals(numHashFiles - 1, tableHash.partitions.size());
215
216 LOG.info("Hash table completed");
217 }
218
219 private void writeTestData(String sourceTableName, String targetTableName)
220 throws Exception {
221 final byte[] family = Bytes.toBytes("family");
222 final byte[] column1 = Bytes.toBytes("c1");
223 final byte[] column2 = Bytes.toBytes("c2");
224 final byte[] value1 = Bytes.toBytes("val1");
225 final byte[] value2 = Bytes.toBytes("val2");
226 final byte[] value3 = Bytes.toBytes("val3");
227
228 int numRows = 100;
229 int sourceRegions = 10;
230 int targetRegions = 6;
231
232 HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
233 family, generateSplits(numRows, sourceRegions));
234
235 HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
236 family, generateSplits(numRows, targetRegions));
237
238 long timestamp = 1430764183454L;
239
240 int rowIndex = 0;
241
242 for (; rowIndex < 40; rowIndex++) {
243 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
244 sourcePut.addColumn(family, column1, timestamp, value1);
245 sourcePut.addColumn(family, column2, timestamp, value2);
246 sourceTable.put(sourcePut);
247
248 Put targetPut = new Put(Bytes.toBytes(rowIndex));
249 targetPut.addColumn(family, column1, timestamp, value1);
250 targetPut.addColumn(family, column2, timestamp, value2);
251 targetTable.put(targetPut);
252 }
253
254
255
256
257 for (; rowIndex < 50; rowIndex++) {
258 Put put = new Put(Bytes.toBytes(rowIndex));
259 put.addColumn(family, column1, timestamp, value1);
260 put.addColumn(family, column2, timestamp, value2);
261 sourceTable.put(put);
262 }
263
264
265
266
267 for (; rowIndex < 60; rowIndex++) {
268 Put put = new Put(Bytes.toBytes(rowIndex));
269 put.addColumn(family, column1, timestamp, value1);
270 put.addColumn(family, column2, timestamp, value2);
271 targetTable.put(put);
272 }
273
274
275
276 for (; rowIndex < 70; rowIndex++) {
277 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
278 sourcePut.addColumn(family, column1, timestamp, value1);
279 sourcePut.addColumn(family, column2, timestamp, value2);
280 sourceTable.put(sourcePut);
281
282 Put targetPut = new Put(Bytes.toBytes(rowIndex));
283 targetPut.addColumn(family, column1, timestamp, value1);
284 targetTable.put(targetPut);
285 }
286
287
288
289 for (; rowIndex < 80; rowIndex++) {
290 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
291 sourcePut.addColumn(family, column1, timestamp, value1);
292 sourceTable.put(sourcePut);
293
294 Put targetPut = new Put(Bytes.toBytes(rowIndex));
295 targetPut.addColumn(family, column1, timestamp, value1);
296 targetPut.addColumn(family, column2, timestamp, value2);
297 targetTable.put(targetPut);
298 }
299
300
301
302
303 for (; rowIndex < 90; rowIndex++) {
304 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
305 sourcePut.addColumn(family, column1, timestamp, column1);
306 sourcePut.addColumn(family, column2, timestamp, value2);
307 sourceTable.put(sourcePut);
308
309 Put targetPut = new Put(Bytes.toBytes(rowIndex));
310 targetPut.addColumn(family, column1, timestamp+1, column1);
311 targetPut.addColumn(family, column2, timestamp-1, value2);
312 targetTable.put(targetPut);
313 }
314
315
316
317 for (; rowIndex < numRows; rowIndex++) {
318 Put sourcePut = new Put(Bytes.toBytes(rowIndex));
319 sourcePut.addColumn(family, column1, timestamp, value1);
320 sourcePut.addColumn(family, column2, timestamp, value2);
321 sourceTable.put(sourcePut);
322
323 Put targetPut = new Put(Bytes.toBytes(rowIndex));
324 targetPut.addColumn(family, column1, timestamp, value3);
325 targetPut.addColumn(family, column2, timestamp, value3);
326 targetTable.put(targetPut);
327 }
328
329 sourceTable.close();
330 targetTable.close();
331 }
332
333
334 }