View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.test;
20  
21  import com.google.common.base.Joiner;
22  import org.apache.commons.cli.CommandLine;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.conf.Configured;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.IntegrationTestingUtility;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.TableName;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
37  import org.apache.hadoop.hbase.client.Admin;
38  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
39  import org.apache.hadoop.util.Tool;
40  import org.apache.hadoop.util.ToolRunner;
41  
42  import java.util.ArrayList;
43  import java.util.HashMap;
44  import java.util.Set;
45  import java.util.TreeSet;
46  import java.util.UUID;
47  
48  
49  /**
50   * This is an integration test for replication. It is derived off
51   * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular
52   * linked list in one cluster and verifies that the data is correct in a sink cluster. The test
53   * handles creating the tables and schema and setting up the replication.
54   */
55  public class IntegrationTestReplication extends IntegrationTestBigLinkedList {
56    protected String sourceClusterIdString;
57    protected String sinkClusterIdString;
58    protected int numIterations;
59    protected int numMappers;
60    protected long numNodes;
61    protected String outputDir;
62    protected int numReducers;
63    protected int generateVerifyGap;
64    protected Integer width;
65    protected Integer wrapMultiplier;
66    protected boolean noReplicationSetup = false;
67  
68    private final String SOURCE_CLUSTER_OPT = "sourceCluster";
69    private final String DEST_CLUSTER_OPT = "destCluster";
70    private final String ITERATIONS_OPT = "iterations";
71    private final String NUM_MAPPERS_OPT = "numMappers";
72    private final String OUTPUT_DIR_OPT = "outputDir";
73    private final String NUM_REDUCERS_OPT = "numReducers";
74    private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup";
75  
76    /**
77     * The gap (in seconds) from when data is finished being generated at the source
78     * to when it can be verified. This is the replication lag we are willing to tolerate
79     */
80    private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap";
81  
82    /**
83     * The width of the linked list.
84     * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
85     */
86    private final String WIDTH_OPT = "width";
87  
88    /**
89     * The number of rows after which the linked list points to the first row.
90     * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
91     */
92    private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier";
93  
94    /**
95     * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH
96     * in order to ensure that the linked list can is complete.
97     * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details
98     */
99    private final String NUM_NODES_OPT = "numNodes";
100 
101   private final int DEFAULT_NUM_MAPPERS = 1;
102   private final int DEFAULT_NUM_REDUCERS = 1;
103   private final int DEFAULT_NUM_ITERATIONS = 1;
104   private final int DEFAULT_GENERATE_VERIFY_GAP = 60;
105   private final int DEFAULT_WIDTH = 1000000;
106   private final int DEFAULT_WRAP_MULTIPLIER = 25;
107   private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER;
108 
109   /**
110    * Wrapper around an HBase ClusterID allowing us
111    * to get admin connections and configurations for it
112    */
113   protected class ClusterID {
114     private final Configuration configuration;
115     private Connection connection = null;
116 
117     /**
118      * This creates a new ClusterID wrapper that will automatically build connections and
119      * configurations to be able to talk to the specified cluster
120      *
121      * @param base the base configuration that this class will add to
122      * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node
123      */
124     public ClusterID(Configuration base,
125                      String key) {
126       configuration = new Configuration(base);
127       String[] parts = key.split(":");
128       configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
129       configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
130       configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
131     }
132 
133     @Override
134     public String toString() {
135       return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM),
136                                  configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT),
137                                  configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
138     }
139 
140     public Configuration getConfiguration() {
141       return this.configuration;
142     }
143 
144     public Connection getConnection() throws Exception {
145       if (this.connection == null) {
146         this.connection = ConnectionFactory.createConnection(this.configuration);
147       }
148       return this.connection;
149     }
150 
151     public void closeConnection() throws Exception {
152       this.connection.close();
153       this.connection = null;
154     }
155 
156     public boolean equals(ClusterID other) {
157       return this.toString().equalsIgnoreCase(other.toString());
158     }
159   }
160 
161   /**
162    * The main runner loop for the test. It uses
163    * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList}
164    * for the generation and verification of the linked list. It is heavily based on
165    * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop}
166    */
167   protected class VerifyReplicationLoop extends Configured implements Tool {
168     private final Log LOG = LogFactory.getLog(VerifyReplicationLoop.class);
169     protected ClusterID source;
170     protected ClusterID sink;
171 
172     IntegrationTestBigLinkedList integrationTestBigLinkedList;
173 
174     /**
175      * This tears down any tables that existed from before and rebuilds the tables and schemas on
176      * the source cluster. It then sets up replication from the source to the sink cluster by using
177      * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin}
178      * connection.
179      *
180      * @throws Exception
181      */
182     protected void setupTablesAndReplication() throws Exception {
183       TableName tableName = getTableName(source.getConfiguration());
184 
185       ClusterID[] clusters = {source, sink};
186 
187       // delete any old tables in the source and sink
188       for (ClusterID cluster : clusters) {
189         Admin admin = cluster.getConnection().getAdmin();
190 
191         if (admin.tableExists(tableName)) {
192           if (admin.isTableEnabled(tableName)) {
193             admin.disableTable(tableName);
194           }
195 
196           /**
197            * TODO: This is a work around on a replication bug (HBASE-13416)
198            * When we recreate a table against that has recently been
199            * deleted, the contents of the logs are replayed even though
200            * they should not. This ensures that we flush the logs
201            * before the table gets deleted. Eventually the bug should be
202            * fixed and this should be removed.
203            */
204           Set<ServerName> regionServers = new TreeSet<>();
205           for (HRegionLocation rl :
206                cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) {
207             regionServers.add(rl.getServerName());
208           }
209 
210           for (ServerName server : regionServers) {
211             source.getConnection().getAdmin().rollWALWriter(server);
212           }
213 
214           admin.deleteTable(tableName);
215         }
216       }
217 
218       // create the schema
219       Generator generator = new Generator();
220       generator.setConf(source.getConfiguration());
221       generator.createSchema();
222 
223       // setup the replication on the source
224       if (!source.equals(sink)) {
225         ReplicationAdmin replicationAdmin = new ReplicationAdmin(source.getConfiguration());
226         // remove any old replication peers
227         for (String oldPeer : replicationAdmin.listPeerConfigs().keySet()) {
228           replicationAdmin.removePeer(oldPeer);
229         }
230 
231         // set the sink to be the target
232         ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
233         peerConfig.setClusterKey(sink.toString());
234 
235         // set the test table to be the table to replicate
236         HashMap<TableName, ArrayList<String>> toReplicate = new HashMap<>();
237         toReplicate.put(tableName, new ArrayList<String>(0));
238 
239         replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate);
240 
241         replicationAdmin.enableTableRep(tableName);
242         replicationAdmin.close();
243       }
244 
245       for (ClusterID cluster : clusters) {
246         cluster.closeConnection();
247       }
248     }
249 
250     protected void waitForReplication() throws Exception {
251       // TODO: we shouldn't be sleeping here. It would be better to query the region servers
252       // and wait for them to report 0 replication lag.
253       Thread.sleep(generateVerifyGap * 1000);
254     }
255 
256     /**
257      * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the
258      * source cluster. This assumes that the tables have been setup via setupTablesAndReplication.
259      *
260      * @throws Exception
261      */
262     protected void runGenerator() throws Exception {
263       Path outputPath = new Path(outputDir);
264       UUID uuid = UUID.randomUUID(); //create a random UUID.
265       Path generatorOutput = new Path(outputPath, uuid.toString());
266 
267       Generator generator = new Generator();
268       generator.setConf(source.getConfiguration());
269 
270       int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier);
271       if (retCode > 0) {
272         throw new RuntimeException("Generator failed with return code: " + retCode);
273       }
274     }
275 
276 
277     /**
278      * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify}
279      * in the sink cluster. If replication is working properly the data written at the source
280      * cluster should be available in the sink cluster after a reasonable gap
281      *
282      * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster
283      * @throws Exception
284      */
285     protected void runVerify(long expectedNumNodes) throws Exception {
286       Path outputPath = new Path(outputDir);
287       UUID uuid = UUID.randomUUID(); //create a random UUID.
288       Path iterationOutput = new Path(outputPath, uuid.toString());
289 
290       Verify verify = new Verify();
291       verify.setConf(sink.getConfiguration());
292 
293       int retCode = verify.run(iterationOutput, numReducers);
294       if (retCode > 0) {
295         throw new RuntimeException("Verify.run failed with return code: " + retCode);
296       }
297 
298       if (!verify.verify(expectedNumNodes)) {
299         throw new RuntimeException("Verify.verify failed");
300       }
301 
302       LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes);
303     }
304 
305     /**
306      * The main test runner
307      *
308      * This test has 4 steps:
309      *  1: setupTablesAndReplication
310      *  2: generate the data into the source cluster
311      *  3: wait for replication to propagate
312      *  4: verify that the data is available in the sink cluster
313      *
314      * @param args should be empty
315      * @return 0 on success
316      * @throws Exception on an error
317      */
318     @Override
319     public int run(String[] args) throws Exception {
320       source = new ClusterID(getConf(), sourceClusterIdString);
321       sink = new ClusterID(getConf(), sinkClusterIdString);
322 
323       if (!noReplicationSetup) {
324         setupTablesAndReplication();
325       }
326       int expectedNumNodes = 0;
327       for (int i = 0; i < numIterations; i++) {
328         LOG.info("Starting iteration = " + i);
329 
330         expectedNumNodes += numMappers * numNodes;
331 
332         runGenerator();
333         waitForReplication();
334         runVerify(expectedNumNodes);
335       }
336 
337       /**
338        * we are always returning 0 because exceptions are thrown when there is an error
339        * in the verification step.
340        */
341       return 0;
342     }
343   }
344 
345   @Override
346   protected void addOptions() {
347     super.addOptions();
348     addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT,
349                           "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)");
350     addRequiredOptWithArg("r", DEST_CLUSTER_OPT,
351                           "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)");
352     addRequiredOptWithArg("d", OUTPUT_DIR_OPT,
353                           "Temporary directory where to write keys for the test");
354 
355     addOptWithArg("nm", NUM_MAPPERS_OPT,
356                   "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")");
357     addOptWithArg("nr", NUM_REDUCERS_OPT,
358                   "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")");
359     addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT,
360                   "Don't setup tables or configure replication before starting test");
361     addOptWithArg("n", NUM_NODES_OPT,
362                   "Number of nodes. This should be a multiple of width * wrapMultiplier."  +
363                   " (default: " + DEFAULT_NUM_NODES + ")");
364     addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " +
365                   DEFAULT_NUM_ITERATIONS +  ")");
366     addOptWithArg("t", GENERATE_VERIFY_GAP_OPT,
367                   "Gap between generate and verify steps in seconds (default: " +
368                   DEFAULT_GENERATE_VERIFY_GAP + ")");
369     addOptWithArg("w", WIDTH_OPT,
370                   "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")");
371     addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " +
372                   DEFAULT_WRAP_MULTIPLIER + ")");
373   }
374 
375   @Override
376   protected void processOptions(CommandLine cmd) {
377     processBaseOptions(cmd);
378 
379     sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT);
380     sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT);
381     outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT);
382 
383     /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */
384     numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT,
385                                              Integer.toString(DEFAULT_NUM_MAPPERS)),
386                           1, Integer.MAX_VALUE);
387     numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT,
388                                               Integer.toString(DEFAULT_NUM_REDUCERS)),
389                            1, Integer.MAX_VALUE);
390     numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)),
391                         1, Integer.MAX_VALUE);
392     generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT,
393                                                     Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)),
394                                  1, Integer.MAX_VALUE);
395     numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT,
396                                                 Integer.toString(DEFAULT_NUM_ITERATIONS)),
397                              1, Integer.MAX_VALUE);
398     width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)),
399                                         1, Integer.MAX_VALUE);
400     wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT,
401                                                  Integer.toString(DEFAULT_WRAP_MULTIPLIER)),
402                               1, Integer.MAX_VALUE);
403 
404     if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) {
405       noReplicationSetup = true;
406     }
407 
408     if (numNodes % (width * wrapMultiplier) != 0) {
409       throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier");
410     }
411   }
412 
413   @Override
414   public int runTestFromCommandLine() throws Exception {
415     VerifyReplicationLoop tool = new  VerifyReplicationLoop();
416     tool.integrationTestBigLinkedList = this;
417     return ToolRunner.run(getConf(), tool, null);
418   }
419 
420   public static void main(String[] args) throws Exception {
421     Configuration conf = HBaseConfiguration.create();
422     IntegrationTestingUtility.setUseDistributedCluster(conf);
423     int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args);
424     System.exit(ret);
425   }
426 }