View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  
22  import java.io.IOException;
23  import java.util.Arrays;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
40  import org.apache.hadoop.hbase.testclassification.LargeTests;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.mapreduce.Counters;
43  import org.junit.AfterClass;
44  import org.junit.Assert;
45  import org.junit.BeforeClass;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  import com.google.common.base.Throwables;
50  
51  /**
52   * Basic test for the SyncTable M/R tool
53   */
54  @Category(LargeTests.class)
55  public class TestSyncTable {
56    
57    private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
58    
59    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();  
60    
61    @BeforeClass
62    public static void beforeClass() throws Exception {
63      TEST_UTIL.startMiniCluster(3);
64      TEST_UTIL.startMiniMapReduceCluster();
65    }
66    
67    @AfterClass
68    public static void afterClass() throws Exception {
69      TEST_UTIL.shutdownMiniMapReduceCluster();
70      TEST_UTIL.shutdownMiniCluster();
71    }
72    
73    private static byte[][] generateSplits(int numRows, int numRegions) {
74      byte[][] splitRows = new byte[numRegions-1][];
75      for (int i = 1; i < numRegions; i++) {
76        splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
77      }
78      return splitRows;
79    }
80    
81    @Test
82    public void testSyncTable() throws Exception {
83      String sourceTableName = "testSourceTable";
84      String targetTableName = "testTargetTable";
85      Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
86      
87      writeTestData(sourceTableName, targetTableName);
88      hashSourceTable(sourceTableName, testDir);
89      Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
90      assertEqualTables(90, sourceTableName, targetTableName);
91      
92      assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
93      assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
94      assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
95      assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
96      assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
97      assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
98      
99      TEST_UTIL.deleteTable(sourceTableName);
100     TEST_UTIL.deleteTable(targetTableName);
101     TEST_UTIL.cleanupDataTestDirOnTestFS();
102   }
103 
104   private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName) 
105       throws Exception {
106     Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
107     Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
108     
109     ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
110     ResultScanner targetScanner = targetTable.getScanner(new Scan());
111     
112     for (int i = 0; i < expectedRows; i++) {
113       Result sourceRow = sourceScanner.next();
114       Result targetRow = targetScanner.next();
115       
116       LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
117           + " cells:" + sourceRow);
118       LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
119           + " cells:" + targetRow);
120       
121       if (sourceRow == null) {
122         Assert.fail("Expected " + expectedRows
123             + " source rows but only found " + i); 
124       }
125       if (targetRow == null) {
126         Assert.fail("Expected " + expectedRows
127             + " target rows but only found " + i); 
128       }
129       Cell[] sourceCells = sourceRow.rawCells();
130       Cell[] targetCells = targetRow.rawCells();
131       if (sourceCells.length != targetCells.length) {
132         LOG.debug("Source cells: " + Arrays.toString(sourceCells));
133         LOG.debug("Target cells: " + Arrays.toString(targetCells));
134         Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
135             + " has " + sourceCells.length
136             + " cells in source table but " + targetCells.length
137             + " cells in target table");
138       }
139       for (int j = 0; j < sourceCells.length; j++) {
140         Cell sourceCell = sourceCells[j];
141         Cell targetCell = targetCells[j];
142         try {
143           if (!CellUtil.matchingRow(sourceCell, targetCell)) {
144             Assert.fail("Rows don't match");
145           }
146           if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
147             Assert.fail("Families don't match");
148           }
149           if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
150             Assert.fail("Qualifiers don't match");
151           }
152           if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
153             Assert.fail("Timestamps don't match");
154           }
155           if (!CellUtil.matchingValue(sourceCell, targetCell)) {
156             Assert.fail("Values don't match");
157           }
158         } catch (Throwable t) {
159           LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
160           Throwables.propagate(t);
161         }
162       }
163     }
164     Result sourceRow = sourceScanner.next();
165     if (sourceRow != null) {
166       Assert.fail("Source table has more than " + expectedRows
167           + " rows.  Next row: " + Bytes.toInt(sourceRow.getRow()));
168     }
169     Result targetRow = targetScanner.next();
170     if (targetRow != null) {
171       Assert.fail("Target table has more than " + expectedRows
172           + " rows.  Next row: " + Bytes.toInt(targetRow.getRow()));
173     }
174     sourceScanner.close();
175     targetScanner.close();
176     sourceTable.close();
177     targetTable.close();
178   }
179 
180   private Counters syncTables(String sourceTableName, String targetTableName,
181       Path testDir) throws Exception {
182     SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
183     int code = syncTable.run(new String[] { 
184         testDir.toString(),
185         sourceTableName,
186         targetTableName
187         });
188     assertEquals("sync table job failed", 0, code);
189     
190     LOG.info("Sync tables completed");
191     return syncTable.counters;
192   }
193 
194   private void hashSourceTable(String sourceTableName, Path testDir)
195       throws Exception, IOException {
196     int numHashFiles = 3;
197     long batchSize = 100;  // should be 2 batches per region
198     int scanBatch = 1;
199     HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
200     int code = hashTable.run(new String[] { 
201         "--batchsize=" + batchSize,
202         "--numhashfiles=" + numHashFiles,
203         "--scanbatch=" + scanBatch,
204         sourceTableName,
205         testDir.toString()});
206     assertEquals("hash table job failed", 0, code);
207     
208     FileSystem fs = TEST_UTIL.getTestFileSystem();
209     
210     HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
211     assertEquals(sourceTableName, tableHash.tableName);
212     assertEquals(batchSize, tableHash.batchSize);
213     assertEquals(numHashFiles, tableHash.numHashFiles);
214     assertEquals(numHashFiles - 1, tableHash.partitions.size());
215 
216     LOG.info("Hash table completed");
217   }
218 
219   private void writeTestData(String sourceTableName, String targetTableName)
220       throws Exception {
221     final byte[] family = Bytes.toBytes("family");
222     final byte[] column1 = Bytes.toBytes("c1");
223     final byte[] column2 = Bytes.toBytes("c2");
224     final byte[] value1 = Bytes.toBytes("val1");
225     final byte[] value2 = Bytes.toBytes("val2");
226     final byte[] value3 = Bytes.toBytes("val3");
227     
228     int numRows = 100;
229     int sourceRegions = 10;
230     int targetRegions = 6;
231     
232     HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
233         family, generateSplits(numRows, sourceRegions));
234 
235     HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
236         family, generateSplits(numRows, targetRegions));
237 
238     long timestamp = 1430764183454L;
239 
240     int rowIndex = 0;
241     // a bunch of identical rows
242     for (; rowIndex < 40; rowIndex++) {
243       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
244       sourcePut.addColumn(family, column1, timestamp, value1);
245       sourcePut.addColumn(family, column2, timestamp, value2);
246       sourceTable.put(sourcePut);
247      
248       Put targetPut = new Put(Bytes.toBytes(rowIndex));
249       targetPut.addColumn(family, column1, timestamp, value1);
250       targetPut.addColumn(family, column2, timestamp, value2);
251       targetTable.put(targetPut);
252     }
253     // some rows only in the source table
254     // ROWSWITHDIFFS: 10
255     // TARGETMISSINGROWS: 10
256     // TARGETMISSINGCELLS: 20
257     for (; rowIndex < 50; rowIndex++) {
258       Put put = new Put(Bytes.toBytes(rowIndex));
259       put.addColumn(family, column1, timestamp, value1);
260       put.addColumn(family, column2, timestamp, value2);
261       sourceTable.put(put);
262     }
263     // some rows only in the target table
264     // ROWSWITHDIFFS: 10
265     // SOURCEMISSINGROWS: 10
266     // SOURCEMISSINGCELLS: 20
267     for (; rowIndex < 60; rowIndex++) {
268       Put put = new Put(Bytes.toBytes(rowIndex));
269       put.addColumn(family, column1, timestamp, value1);
270       put.addColumn(family, column2, timestamp, value2);
271       targetTable.put(put);
272     }
273     // some rows with 1 missing cell in target table
274     // ROWSWITHDIFFS: 10
275     // TARGETMISSINGCELLS: 10
276     for (; rowIndex < 70; rowIndex++) {
277       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
278       sourcePut.addColumn(family, column1, timestamp, value1);
279       sourcePut.addColumn(family, column2, timestamp, value2);
280       sourceTable.put(sourcePut);
281 
282       Put targetPut = new Put(Bytes.toBytes(rowIndex));
283       targetPut.addColumn(family, column1, timestamp, value1);
284       targetTable.put(targetPut);
285     }
286     // some rows with 1 missing cell in source table
287     // ROWSWITHDIFFS: 10
288     // SOURCEMISSINGCELLS: 10
289     for (; rowIndex < 80; rowIndex++) {
290       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
291       sourcePut.addColumn(family, column1, timestamp, value1);
292       sourceTable.put(sourcePut);
293 
294       Put targetPut = new Put(Bytes.toBytes(rowIndex));
295       targetPut.addColumn(family, column1, timestamp, value1);
296       targetPut.addColumn(family, column2, timestamp, value2);
297       targetTable.put(targetPut);
298     }
299     // some rows differing only in timestamp
300     // ROWSWITHDIFFS: 10
301     // SOURCEMISSINGCELLS: 20
302     // TARGETMISSINGCELLS: 20
303     for (; rowIndex < 90; rowIndex++) {
304       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
305       sourcePut.addColumn(family, column1, timestamp, column1);
306       sourcePut.addColumn(family, column2, timestamp, value2);
307       sourceTable.put(sourcePut);
308 
309       Put targetPut = new Put(Bytes.toBytes(rowIndex));
310       targetPut.addColumn(family, column1, timestamp+1, column1);
311       targetPut.addColumn(family, column2, timestamp-1, value2);
312       targetTable.put(targetPut);
313     }
314     // some rows with different values
315     // ROWSWITHDIFFS: 10
316     // DIFFERENTCELLVALUES: 20
317     for (; rowIndex < numRows; rowIndex++) {
318       Put sourcePut = new Put(Bytes.toBytes(rowIndex));
319       sourcePut.addColumn(family, column1, timestamp, value1);
320       sourcePut.addColumn(family, column2, timestamp, value2);
321       sourceTable.put(sourcePut);
322       
323       Put targetPut = new Put(Bytes.toBytes(rowIndex));
324       targetPut.addColumn(family, column1, timestamp, value3);
325       targetPut.addColumn(family, column2, timestamp, value3);
326       targetTable.put(targetPut);
327     }
328     
329     sourceTable.close();
330     targetTable.close();
331   }
332   
333 
334 }