1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.*;
28 import org.apache.hadoop.hbase.client.HConnectable;
29 import org.apache.hadoop.hbase.client.HConnection;
30 import org.apache.hadoop.hbase.client.HConnectionManager;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Put;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
38 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
39 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
40 import org.apache.hadoop.hbase.mapreduce.TableMapper;
41 import org.apache.hadoop.hbase.mapreduce.TableSplit;
42 import org.apache.hadoop.hbase.replication.ReplicationException;
43 import org.apache.hadoop.hbase.replication.ReplicationFactory;
44 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46 import org.apache.hadoop.hbase.replication.ReplicationPeers;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.Pair;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.apache.hadoop.mapreduce.Job;
51 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
52 import org.apache.hadoop.util.Tool;
53 import org.apache.hadoop.util.ToolRunner;
54
55
56
57
58
59
60
61
62
63
64
65 public class VerifyReplication extends Configured implements Tool {
66
67 private static final Log LOG =
68 LogFactory.getLog(VerifyReplication.class);
69
70 public final static String NAME = "verifyrep";
71 private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
72 static long startTime = 0;
73 static long endTime = Long.MAX_VALUE;
74 static int versions = -1;
75 static String tableName = null;
76 static String families = null;
77 static String peerId = null;
78
79
80
81
82 public static class Verifier
83 extends TableMapper<ImmutableBytesWritable, Put> {
84
85 public static enum Counters {
86 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
87
88 private ResultScanner replicatedScanner;
89 private Result currentCompareRowInPeerTable;
90 private Table replicatedTable;
91
92
93
94
95
96
97
98
99
100 @Override
101 public void map(ImmutableBytesWritable row, final Result value,
102 Context context)
103 throws IOException {
104 if (replicatedScanner == null) {
105 Configuration conf = context.getConfiguration();
106 final Scan scan = new Scan();
107 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
108 long startTime = conf.getLong(NAME + ".startTime", 0);
109 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
110 String families = conf.get(NAME + ".families", null);
111 if(families != null) {
112 String[] fams = families.split(",");
113 for(String fam : fams) {
114 scan.addFamily(Bytes.toBytes(fam));
115 }
116 }
117 scan.setTimeRange(startTime, endTime);
118 if (versions >= 0) {
119 scan.setMaxVersions(versions);
120 }
121
122 final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
123 HConnectionManager.execute(new HConnectable<Void>(conf) {
124 @Override
125 public Void connect(HConnection conn) throws IOException {
126 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
127 Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
128 zkClusterKey, PEER_CONFIG_PREFIX);
129
130 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
131 replicatedTable = new HTable(peerConf, tableName);
132 scan.setStartRow(value.getRow());
133 scan.setStopRow(tableSplit.getEndRow());
134 replicatedScanner = replicatedTable.getScanner(scan);
135 return null;
136 }
137 });
138 currentCompareRowInPeerTable = replicatedScanner.next();
139 }
140 while (true) {
141 if (currentCompareRowInPeerTable == null) {
142
143 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
144 break;
145 }
146 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
147 if (rowCmpRet == 0) {
148
149 try {
150 Result.compareResults(value, currentCompareRowInPeerTable);
151 context.getCounter(Counters.GOODROWS).increment(1);
152 } catch (Exception e) {
153 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
154 }
155 currentCompareRowInPeerTable = replicatedScanner.next();
156 break;
157 } else if (rowCmpRet < 0) {
158
159 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
160 break;
161 } else {
162
163 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
164 currentCompareRowInPeerTable);
165 currentCompareRowInPeerTable = replicatedScanner.next();
166 }
167 }
168 }
169
170 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
171 context.getCounter(counter).increment(1);
172 context.getCounter(Counters.BADROWS).increment(1);
173 LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
174 }
175
176 @Override
177 protected void cleanup(Context context) {
178 if (replicatedScanner != null) {
179 try {
180 while (currentCompareRowInPeerTable != null) {
181 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
182 currentCompareRowInPeerTable);
183 currentCompareRowInPeerTable = replicatedScanner.next();
184 }
185 } catch (Exception e) {
186 LOG.error("fail to scan peer table in cleanup", e);
187 } finally {
188 replicatedScanner.close();
189 replicatedScanner = null;
190 }
191 }
192 if (replicatedTable != null) {
193 TableName tableName = replicatedTable.getName();
194 try {
195 replicatedTable.close();
196 } catch (IOException ioe) {
197 LOG.warn("Exception closing " + tableName, ioe);
198 }
199 }
200 }
201 }
202
203 private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
204 final Configuration conf) throws IOException {
205 ZooKeeperWatcher localZKW = null;
206 ReplicationPeerZKImpl peer = null;
207 try {
208 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
209 new Abortable() {
210 @Override public void abort(String why, Throwable e) {}
211 @Override public boolean isAborted() {return false;}
212 });
213
214 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
215 rp.init();
216
217 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
218 if (pair == null) {
219 throw new IOException("Couldn't get peer conf!");
220 }
221
222 return pair;
223 } catch (ReplicationException e) {
224 throw new IOException(
225 "An error occured while trying to connect to the remove peer cluster", e);
226 } finally {
227 if (peer != null) {
228 peer.close();
229 }
230 if (localZKW != null) {
231 localZKW.close();
232 }
233 }
234 }
235
236
237
238
239
240
241
242
243
244 public static Job createSubmittableJob(Configuration conf, String[] args)
245 throws IOException {
246 if (!doCommandLine(args)) {
247 return null;
248 }
249 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
250 HConstants.REPLICATION_ENABLE_DEFAULT)) {
251 throw new IOException("Replication needs to be enabled to verify it.");
252 }
253 conf.set(NAME+".peerId", peerId);
254 conf.set(NAME+".tableName", tableName);
255 conf.setLong(NAME+".startTime", startTime);
256 conf.setLong(NAME+".endTime", endTime);
257 if (families != null) {
258 conf.set(NAME+".families", families);
259 }
260
261 Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
262 ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
263 String peerQuorumAddress = peerConfig.getClusterKey();
264 LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
265 peerConfig.getConfiguration());
266 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
267 HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
268 peerConfig.getConfiguration().entrySet());
269
270 Job job = new Job(conf, NAME + "_" + tableName);
271 job.setJarByClass(VerifyReplication.class);
272
273 Scan scan = new Scan();
274 scan.setTimeRange(startTime, endTime);
275 if (versions >= 0) {
276 scan.setMaxVersions(versions);
277 }
278 if(families != null) {
279 String[] fams = families.split(",");
280 for(String fam : fams) {
281 scan.addFamily(Bytes.toBytes(fam));
282 }
283 }
284 TableMapReduceUtil.initTableMapperJob(tableName, scan,
285 Verifier.class, null, null, job);
286
287 Configuration peerClusterConf = peerConfigPair.getSecond();
288
289 TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
290
291 job.setOutputFormatClass(NullOutputFormat.class);
292 job.setNumReduceTasks(0);
293 return job;
294 }
295
296 private static boolean doCommandLine(final String[] args) {
297 if (args.length < 2) {
298 printUsage(null);
299 return false;
300 }
301 try {
302 for (int i = 0; i < args.length; i++) {
303 String cmd = args[i];
304 if (cmd.equals("-h") || cmd.startsWith("--h")) {
305 printUsage(null);
306 return false;
307 }
308
309 final String startTimeArgKey = "--starttime=";
310 if (cmd.startsWith(startTimeArgKey)) {
311 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
312 continue;
313 }
314
315 final String endTimeArgKey = "--endtime=";
316 if (cmd.startsWith(endTimeArgKey)) {
317 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
318 continue;
319 }
320
321 final String versionsArgKey = "--versions=";
322 if (cmd.startsWith(versionsArgKey)) {
323 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
324 continue;
325 }
326
327 final String familiesArgKey = "--families=";
328 if (cmd.startsWith(familiesArgKey)) {
329 families = cmd.substring(familiesArgKey.length());
330 continue;
331 }
332
333 if (cmd.startsWith("--")) {
334 printUsage("Invalid argument '" + cmd + "'");
335 }
336
337 if (i == args.length-2) {
338 peerId = cmd;
339 }
340
341 if (i == args.length-1) {
342 tableName = cmd;
343 }
344 }
345 } catch (Exception e) {
346 e.printStackTrace();
347 printUsage("Can't start because " + e.getMessage());
348 return false;
349 }
350 return true;
351 }
352
353
354
355
356 private static void printUsage(final String errorMsg) {
357 if (errorMsg != null && errorMsg.length() > 0) {
358 System.err.println("ERROR: " + errorMsg);
359 }
360 System.err.println("Usage: verifyrep [--starttime=X]" +
361 " [--endtime=Y] [--families=A] <peerid> <tablename>");
362 System.err.println();
363 System.err.println("Options:");
364 System.err.println(" starttime beginning of the time range");
365 System.err.println(" without endtime means from starttime to forever");
366 System.err.println(" endtime end of the time range");
367 System.err.println(" versions number of cell versions to verify");
368 System.err.println(" families comma-separated list of families to copy");
369 System.err.println();
370 System.err.println("Args:");
371 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
372 System.err.println(" tablename Name of the table to verify");
373 System.err.println();
374 System.err.println("Examples:");
375 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
376 System.err.println(" $ bin/hbase " +
377 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
378 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
379 }
380
381 @Override
382 public int run(String[] args) throws Exception {
383 Configuration conf = this.getConf();
384 Job job = createSubmittableJob(conf, args);
385 if (job != null) {
386 return job.waitForCompletion(true) ? 0 : 1;
387 }
388 return 1;
389 }
390
391
392
393
394
395
396
397 public static void main(String[] args) throws Exception {
398 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
399 System.exit(res);
400 }
401 }