View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.apache.commons.lang.RandomStringUtils;
29  import org.apache.commons.lang.math.RandomUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.client.Admin;
34  import org.apache.hadoop.hbase.client.Connection;
35  import org.apache.hadoop.hbase.client.ConnectionFactory;
36  import org.apache.hadoop.hbase.client.Put;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
39  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.HBaseFsck;
42  import org.apache.hadoop.hbase.util.Threads;
43  import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
44  import org.apache.hadoop.util.ToolRunner;
45  import org.junit.Assert;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  /**
50   *
51   * Integration test that verifies Procedure V2. <br/><br/>
52   *
53   * DDL operations should go through (rollforward or rollback) when primary master is killed by
54   * ChaosMonkey (default MASTER_KILLING)<br/><br/>
55   *
56   * Multiple Worker threads are started to randomly do the following Actions in loops:<br/>
57   * Actions generating and populating tables:
58   * <ul>
59   *     <li>CreateTableAction</li>
60   *     <li>DisableTableAction</li>
61   *     <li>EnableTableAction</li>
62   *     <li>DeleteTableAction</li>
63   *     <li>AddRowAction</li>
64   * </ul>
65   * Actions performing DDL operations:
66   * <ul>
67   *     <li>AddColumnFamilyAction</li>
68   *     <li>AlterColumnFamilyVersionsAction</li>
69   *     <li>AlterColumnFamilyEncodingAction</li>
70   *     <li>DeleteColumnFamilyAction</li>
71   * </ul>
72   * <br/>
73   *
74   * The threads run for a period of time (default 20 minutes) then are stopped at the end of
75   * runtime. Verification is performed towards those checkpoints:
76   * <ol>
77   *     <li>No Actions throw Exceptions.</li>
78   *     <li>No inconsistencies are detected in hbck.</li>
79   * </ol>
80   *
81   * <p>
82   * This test should be run by the hbase user since it invokes hbck at the end
83   * </p><p>
84   * Usage:
85   *  hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
86   *    -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
87   *    -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
88   *    -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
89   */
90  
91  @Category(IntegrationTests.class)
92  public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
93  
94    private static final Log LOG = LogFactory.getLog(IntegrationTestDDLMasterFailover.class);
95  
96    private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
97  
98    protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
99  
100   protected static final int DEFAULT_NUM_THREADS = 20;
101 
102   protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
103 
104   protected HBaseCluster cluster;
105 
106   protected Connection connection;
107 
108   /**
109    * A soft limit on how long we should run
110    */
111   protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
112   protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
113   protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
114 
115   protected AtomicBoolean running = new AtomicBoolean(true);
116 
117   protected AtomicBoolean create_table = new AtomicBoolean(true);
118 
119   protected int numThreads, numRegions;
120 
121   ConcurrentHashMap<TableName, HTableDescriptor> enabledTables =
122       new ConcurrentHashMap<TableName, HTableDescriptor>();
123 
124   ConcurrentHashMap<TableName, HTableDescriptor> disabledTables =
125       new ConcurrentHashMap<TableName, HTableDescriptor>();
126 
127   ConcurrentHashMap<TableName, HTableDescriptor> deletedTables =
128       new ConcurrentHashMap<TableName, HTableDescriptor>();
129 
130   @Override
131   public void setUpCluster() throws Exception {
132     util = getTestingUtil(getConf());
133     LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
134     util.initializeCluster(getMinServerCount());
135     LOG.debug("Done initializing/checking cluster");
136     cluster = util.getHBaseClusterInterface();
137   }
138 
139   @Override
140   public void cleanUpCluster() throws Exception {
141     Admin admin = util.getHBaseAdmin();
142     admin.disableTables("ittable-\\d+");
143     admin.deleteTables("ittable-\\d+");
144     Connection connection = getConnection();
145     connection.close();
146     super.cleanUpCluster();
147   }
148 
149   protected int getMinServerCount() {
150     return SERVER_COUNT;
151   }
152 
153   protected synchronized void setConnection(Connection connection){
154     this.connection = connection;
155   }
156 
157   protected synchronized Connection getConnection(){
158     if (this.connection == null) {
159       try {
160         Connection connection = ConnectionFactory.createConnection(getConf());
161         setConnection(connection);
162       } catch (IOException e) {
163         LOG.fatal("Failed to establish connection.", e);
164       }
165     }
166     return connection;
167   }
168 
169   protected void verifyTables() throws  IOException{
170     Connection connection = getConnection();
171     Admin admin = connection.getAdmin();
172     // iterating concurrent map
173     for (TableName tableName : enabledTables.keySet()){
174       Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
175           admin.isTableEnabled(tableName));
176     }
177     for (TableName tableName : disabledTables.keySet()){
178       Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
179           admin.isTableDisabled(tableName));
180     }
181     for (TableName tableName : deletedTables.keySet()){
182       Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
183           admin.tableExists(tableName));
184     }
185     admin.close();
186   }
187 
188   @Test
189   public void testAsUnitTest() throws Exception {
190     runTest();
191   }
192 
193   @Override
194   public int runTestFromCommandLine() throws Exception {
195     int ret = runTest();
196     return ret;
197   }
198 
199   private abstract class MasterAction{
200     Connection connection = getConnection();
201 
202     abstract void perform() throws IOException;
203   }
204 
205   private abstract class TableAction extends  MasterAction{
206     // TableAction has implemented selectTable() shared by multiple table Actions
207     protected HTableDescriptor selectTable(ConcurrentHashMap<TableName, HTableDescriptor> tableMap)
208     {
209       // synchronization to prevent removal from multiple threads
210       synchronized (tableMap){
211         // randomly select table from tableMap
212         if (tableMap.isEmpty()) {
213           return null;
214         }
215         ArrayList<TableName> tableList = new ArrayList<TableName>(tableMap.keySet());
216         TableName randomKey = tableList.get(RandomUtils.nextInt(tableList.size()));
217         HTableDescriptor randomHtd = tableMap.get(randomKey);
218         // remove from tableMap
219         tableMap.remove(randomKey);
220         return randomHtd;
221       }
222     }
223   }
224 
225   private class CreateTableAction extends TableAction {
226 
227     @Override
228     void perform() throws IOException {
229       Admin admin = connection.getAdmin();
230       try {
231         HTableDescriptor htd = createTableDesc();
232         TableName tableName = htd.getTableName();
233         if ( admin.tableExists(tableName)){
234           return;
235         }
236         String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
237         numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
238         byte[] startKey = Bytes.toBytes("row-0000000000");
239         byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
240         LOG.info("Creating table:" + htd);
241         admin.createTable(htd, startKey, endKey, numRegions);
242         Assert.assertTrue("Table: " + htd + " was not created", admin.tableExists(tableName));
243         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
244         Assert.assertTrue(
245           "After create, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
246         enabledTables.put(tableName, freshTableDesc);
247         LOG.info("Created table:" + freshTableDesc);
248       } catch (Exception e) {
249         LOG.warn("Caught exception in action: " + this.getClass());
250         // TODO workaround
251         // when master failover happens during CREATE_TABLE, client will do RPC retry and get TableExistsException
252         // ignore for now till better resolution
253         if (e instanceof TableExistsException) {
254           LOG.warn("Caught TableExistsException in action: " + this.getClass(), e);
255         } else {
256           throw e;
257         }
258       } finally {
259         admin.close();
260       }
261     }
262 
263     private HTableDescriptor createTableDesc() {
264       String tableName = "ittable-" + String.format("%010d",
265         RandomUtils.nextInt(Integer.MAX_VALUE));
266       String familyName = "cf-" + Math.abs(RandomUtils.nextInt());
267       HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
268       // add random column family
269       htd.addFamily(new HColumnDescriptor(familyName));
270       return htd;
271     }
272   }
273 
274   private class DisableTableAction extends TableAction {
275 
276     @Override
277     void perform() throws IOException {
278 
279       HTableDescriptor selected = selectTable(enabledTables);
280       if (selected == null) {
281         return;
282       }
283 
284       Admin admin = connection.getAdmin();
285       try {
286         TableName tableName = selected.getTableName();
287         LOG.info("Disabling table :" + selected);
288         admin.disableTable(tableName);
289         Assert.assertTrue("Table: " + selected + " was not disabled",
290             admin.isTableDisabled(tableName));
291         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
292         Assert.assertTrue(
293           "After disable, Table: " + tableName + " is not disabled",
294           admin.isTableDisabled(tableName));
295         disabledTables.put(tableName, freshTableDesc);
296         LOG.info("Disabled table :" + freshTableDesc);
297       } catch (Exception e){
298         LOG.warn("Caught exception in action: " + this.getClass());
299         // TODO workaround
300         // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
301         // operations
302         // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
303         // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
304         // 2) if master failover happens in the middle of the enable/disable operation, the new
305         // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
306         // AssignmentManager#recoverTableInEnablingState() and
307         // AssignmentManager#recoverTableInDisablingState()
308         // 3) after the new master initialization completes, the procedure tries to re-do the
309         // enable/disable operation, which was already done. Ignore those exceptions before change
310         // of behaviors of AssignmentManager in presence of PV2
311         if (e instanceof TableNotEnabledException) {
312           LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
313           e.printStackTrace();
314         } else {
315           throw e;
316         }
317       } finally {
318         admin.close();
319       }
320     }
321   }
322 
323   private class EnableTableAction extends TableAction {
324 
325     @Override
326     void perform() throws IOException {
327 
328       HTableDescriptor selected = selectTable(disabledTables);
329       if (selected == null ) {
330         return;
331       }
332 
333       Admin admin = connection.getAdmin();
334       try {
335         TableName tableName = selected.getTableName();
336         LOG.info("Enabling table :" + selected);
337         admin.enableTable(tableName);
338         Assert.assertTrue("Table: " + selected + " was not enabled",
339             admin.isTableEnabled(tableName));
340         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
341         Assert.assertTrue(
342           "After enable, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
343         enabledTables.put(tableName, freshTableDesc);
344         LOG.info("Enabled table :" + freshTableDesc);
345       } catch (Exception e){
346         LOG.warn("Caught exception in action: " + this.getClass());
347         // TODO workaround
348         // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
349         // operations 1) when enable/disable starts, the table state is changed to
350         // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
351         // once the operation completes 2) if master failover happens in the middle of the
352         // enable/disable operation, the new master will try to recover the tables in
353         // ENABLING/DISABLING state, as programmed in
354         // AssignmentManager#recoverTableInEnablingState() and
355         // AssignmentManager#recoverTableInDisablingState()
356         // 3) after the new master initialization completes, the procedure tries to re-do the
357         // enable/disable operation, which was already done. Ignore those exceptions before
358         // change of behaviors of AssignmentManager in presence of PV2
359         if (e instanceof TableNotDisabledException) {
360           LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
361           e.printStackTrace();
362         } else {
363           throw e;
364         }
365       } finally {
366         admin.close();
367       }
368     }
369   }
370 
371   private class DeleteTableAction extends TableAction {
372 
373     @Override
374     void perform() throws IOException {
375 
376       HTableDescriptor selected = selectTable(disabledTables);
377       if (selected == null) {
378         return;
379       }
380 
381       Admin admin = connection.getAdmin();
382       try {
383         TableName tableName = selected.getTableName();
384         LOG.info("Deleting table :" + selected);
385         admin.deleteTable(tableName);
386         Assert.assertFalse("Table: " + selected + " was not deleted",
387                 admin.tableExists(tableName));
388         deletedTables.put(tableName, selected);
389         LOG.info("Deleted table :" + selected);
390       } catch (Exception e){
391         LOG.warn("Caught exception in action: " + this.getClass());
392         // TODO workaround
393         // when master failover happens during DELETE_TABLE, client will do RPC retry and get
394         // TableNotFoundException ignore for now till better resolution
395         if (e instanceof TableNotFoundException) {
396           LOG.warn("Caught TableNotFoundException in action: " + this.getClass());
397           e.printStackTrace();
398         } else {
399           throw e;
400         }
401       } finally {
402         admin.close();
403       }
404     }
405   }
406 
407 
408   private abstract class ColumnAction extends TableAction{
409     // ColumnAction has implemented selectFamily() shared by multiple family Actions
410     protected HColumnDescriptor selectFamily(HTableDescriptor htd) {
411       if (htd == null) {
412         return null;
413       }
414       HColumnDescriptor[] families = htd.getColumnFamilies();
415       if (families.length == 0){
416         LOG.info("No column families in table: " + htd);
417         return null;
418       }
419       HColumnDescriptor randomCfd = families[RandomUtils.nextInt(families.length)];
420       return randomCfd;
421     }
422   }
423 
424   private class AddColumnFamilyAction extends ColumnAction {
425 
426     @Override
427     void perform() throws IOException {
428       HTableDescriptor selected = selectTable(disabledTables);
429       if (selected == null) {
430         return;
431       }
432 
433       Admin admin = connection.getAdmin();
434       try {
435         HColumnDescriptor cfd = createFamilyDesc();
436         if (selected.hasFamily(cfd.getName())){
437           LOG.info(new String(cfd.getName()) + " already exists in table "
438               + selected.getTableName());
439           return;
440         }
441         TableName tableName = selected.getTableName();
442         LOG.info("Adding column family: " + cfd + " to table: " + tableName);
443         admin.addColumn(tableName, cfd);
444         // assertion
445         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
446         Assert.assertTrue("Column family: " + cfd + " was not added",
447             freshTableDesc.hasFamily(cfd.getName()));
448         Assert.assertTrue(
449           "After add column family, Table: " + tableName + " is not disabled",
450           admin.isTableDisabled(tableName));
451         disabledTables.put(tableName, freshTableDesc);
452         LOG.info("Added column family: " + cfd + " to table: " + tableName);
453       } catch (Exception e) {
454         LOG.warn("Caught exception in action: " + this.getClass());
455         // TODO HBASE-13415
456         // loose restriction for InvalidFamilyOperationException thrown in async operations before
457         // HBASE-13415 completes when failover happens, multiple procids may be created from the
458         // same request when 1 procedure succeeds, the others would complain about family already
459         // exists
460         if (e instanceof InvalidFamilyOperationException) {
461           LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
462           e.printStackTrace();
463         } else {
464           throw e;
465         }
466       } finally {
467         admin.close();
468       }
469     }
470 
471     private HColumnDescriptor createFamilyDesc() {
472       String familyName = "cf-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE));
473       HColumnDescriptor cfd = new HColumnDescriptor(familyName);
474       return cfd;
475     }
476   }
477 
478   private class AlterFamilyVersionsAction extends ColumnAction {
479 
480     @Override
481     void perform() throws IOException {
482       HTableDescriptor selected = selectTable(disabledTables);
483       if (selected == null) {
484         return;
485       }
486       HColumnDescriptor columnDesc = selectFamily(selected);
487       if (columnDesc == null){
488         return;
489       }
490 
491       Admin admin = connection.getAdmin();
492       int versions = RandomUtils.nextInt(10) + 3;
493       try {
494         TableName tableName = selected.getTableName();
495         LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions +
496             " in table: " + tableName);
497         columnDesc.setMinVersions(versions);
498         columnDesc.setMaxVersions(versions);
499         admin.modifyTable(tableName, selected);
500         // assertion
501         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
502         HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
503         Assert.assertEquals("Column family: " + columnDesc + " was not altered",
504             freshColumnDesc.getMaxVersions(), versions);
505         Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
506             freshColumnDesc.getMinVersions(), versions);
507         Assert.assertTrue(
508           "After alter versions of column family, Table: " + tableName + " is not disabled",
509           admin.isTableDisabled(tableName));
510         disabledTables.put(tableName, freshTableDesc);
511         LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions +
512           " in table: " + tableName);
513       } catch (Exception e) {
514         LOG.warn("Caught exception in action: " + this.getClass());
515         throw e;
516       } finally {
517         admin.close();
518       }
519     }
520   }
521 
522   private class AlterFamilyEncodingAction extends ColumnAction {
523 
524     @Override
525     void perform() throws IOException {
526       HTableDescriptor selected = selectTable(disabledTables);
527       if (selected == null) {
528         return;
529       }
530       HColumnDescriptor columnDesc = selectFamily(selected);
531       if (columnDesc == null){
532         return;
533       }
534 
535       Admin admin = connection.getAdmin();
536       try {
537         TableName tableName = selected.getTableName();
538         // possible DataBlockEncoding ids
539         int[] possibleIds = {0, 2, 3, 4, 6};
540         short id = (short) possibleIds[RandomUtils.nextInt(possibleIds.length)];
541         LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id +
542             " in table: " + tableName);
543         columnDesc.setDataBlockEncoding(DataBlockEncoding.getEncodingById(id));
544         admin.modifyTable(tableName, selected);
545         // assertion
546         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
547         HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
548         Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
549             freshColumnDesc.getDataBlockEncoding().getId(), id);
550         Assert.assertTrue(
551           "After alter encoding of column family, Table: " + tableName + " is not disabled",
552           admin.isTableDisabled(tableName));
553         disabledTables.put(tableName, freshTableDesc);
554         LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id +
555           " in table: " + tableName);
556       } catch (Exception e) {
557         LOG.warn("Caught exception in action: " + this.getClass());
558         throw e;
559       } finally {
560         admin.close();
561       }
562     }
563   }
564 
565   private class DeleteColumnFamilyAction extends ColumnAction {
566 
567     @Override
568     void perform() throws IOException {
569       HTableDescriptor selected = selectTable(disabledTables);
570       HColumnDescriptor cfd = selectFamily(selected);
571       if (selected == null || cfd == null) {
572         return;
573       }
574 
575       Admin admin = connection.getAdmin();
576       try {
577         if (selected.getColumnFamilies().length < 2) {
578           LOG.info("No enough column families to delete in table " + selected.getTableName());
579           return;
580         }
581         TableName tableName = selected.getTableName();
582         LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
583         admin.deleteColumn(tableName, cfd.getName());
584         // assertion
585         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
586         Assert.assertFalse("Column family: " + cfd + " was not added",
587             freshTableDesc.hasFamily(cfd.getName()));
588         Assert.assertTrue(
589           "After delete column family, Table: " + tableName + " is not disabled",
590           admin.isTableDisabled(tableName));
591         disabledTables.put(tableName, freshTableDesc);
592         LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
593       } catch (Exception e) {
594         LOG.warn("Caught exception in action: " + this.getClass());
595         // TODO HBASE-13415
596         // loose restriction for InvalidFamilyOperationException thrown in async operations before
597         // HBASE-13415 completes when failover happens, multiple procids may be created from the
598         //  same request when 1 procedure succeeds, the others would complain about family not
599         // exists
600         if (e instanceof InvalidFamilyOperationException) {
601           LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
602           e.printStackTrace();
603         } else {
604           throw e;
605         }
606       } finally {
607         admin.close();
608       }
609     }
610   }
611 
612   private class AddRowAction extends ColumnAction {
613     // populate tables
614     @Override
615     void perform() throws IOException {
616       HTableDescriptor selected = selectTable(enabledTables);
617       if (selected == null ) {
618         return;
619       }
620 
621       Admin admin = connection.getAdmin();
622       TableName tableName = selected.getTableName();
623       try (Table table = connection.getTable(tableName)){
624         ArrayList<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(admin.getTableRegions(
625             selected.getTableName()));
626         int numRegions = regionInfos.size();
627         // average number of rows to be added per action to each region
628         int average_rows = 1;
629         int numRows = average_rows * numRegions;
630         LOG.info("Adding " + numRows + " rows to table: " + selected);
631         for (int i = 0; i < numRows; i++){
632           // nextInt(Integer.MAX_VALUE)) to return positive numbers only
633           byte[] rowKey = Bytes.toBytes(
634               "row-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE)));
635           HColumnDescriptor cfd = selectFamily(selected);
636           if (cfd == null){
637             return;
638           }
639           byte[] family = cfd.getName();
640           byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt(Integer.MAX_VALUE) % 10);
641           byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10));
642           Put put = new Put(rowKey);
643           put.addColumn(family, qualifier, value);
644           table.put(put);
645         }
646         HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
647         Assert.assertTrue(
648           "After insert, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName));
649         enabledTables.put(tableName, freshTableDesc);
650         LOG.info("Added " + numRows + " rows to table: " + selected);
651       } catch (Exception e) {
652         LOG.warn("Caught exception in action: " + this.getClass());
653         throw e;
654       } finally {
655         admin.close();
656       }
657     }
658   }
659 
660   private enum ACTION {
661     CREATE_TABLE,
662     DISABLE_TABLE,
663     ENABLE_TABLE,
664     DELETE_TABLE,
665     ADD_COLUMNFAMILY,
666     DELETE_COLUMNFAMILY,
667     ALTER_FAMILYVERSIONS,
668     ALTER_FAMILYENCODING,
669     ADD_ROW
670   }
671 
672   private class Worker extends Thread {
673 
674     private Exception savedException;
675 
676     private ACTION action;
677 
678     @Override
679     public void run() {
680       while (running.get()) {
681         // select random action
682         ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length];
683         this.action = selectedAction;
684         LOG.info("Performing Action: " + selectedAction);
685 
686         try {
687           switch (selectedAction) {
688           case CREATE_TABLE:
689             // stop creating new tables in the later stage of the test to avoid too many empty
690             // tables
691             if (create_table.get()) {
692               new CreateTableAction().perform();
693             }
694             break;
695           case ADD_ROW:
696             new AddRowAction().perform();
697             break;
698           case DISABLE_TABLE:
699             new DisableTableAction().perform();
700             break;
701           case ENABLE_TABLE:
702             new EnableTableAction().perform();
703             break;
704           case DELETE_TABLE:
705             // reduce probability of deleting table to 20%
706             if (RandomUtils.nextInt(100) < 20) {
707               new DeleteTableAction().perform();
708             }
709             break;
710           case ADD_COLUMNFAMILY:
711             new AddColumnFamilyAction().perform();
712             break;
713           case DELETE_COLUMNFAMILY:
714             // reduce probability of deleting column family to 20%
715             if (RandomUtils.nextInt(100) < 20) {
716               new DeleteColumnFamilyAction().perform();
717             }
718             break;
719           case ALTER_FAMILYVERSIONS:
720             new AlterFamilyVersionsAction().perform();
721             break;
722           case ALTER_FAMILYENCODING:
723             new AlterFamilyEncodingAction().perform();
724             break;
725           }
726         } catch (Exception ex) {
727           this.savedException = ex;
728           return;
729         }
730       }
731       LOG.info(this.getName() + " stopped");
732     }
733 
734     public Exception getSavedException(){
735       return this.savedException;
736     }
737 
738     public ACTION getAction(){
739       return this.action;
740     }
741   }
742 
743   private void checkException(List<Worker> workers){
744     if(workers == null || workers.isEmpty())
745       return;
746     for (Worker worker : workers){
747       Exception e = worker.getSavedException();
748       if (e != null) {
749         LOG.error("Found exception in thread: " + worker.getName());
750         e.printStackTrace();
751       }
752       Assert.assertNull("Action failed: " + worker.getAction() + " in thread: "
753           + worker.getName(), e);
754     }
755   }
756 
757   private int runTest() throws Exception {
758     LOG.info("Starting the test");
759 
760     String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
761     long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
762 
763     String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
764     numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
765 
766     ArrayList<Worker> workers = new ArrayList<>();
767     for (int i = 0; i < numThreads; i++) {
768       checkException(workers);
769       Worker worker = new Worker();
770       LOG.info("Launching worker thread " + worker.getName());
771       workers.add(worker);
772       worker.start();
773     }
774 
775     Threads.sleep(runtime / 2);
776     LOG.info("Stopping creating new tables");
777     create_table.set(false);
778     Threads.sleep(runtime / 2);
779     LOG.info("Runtime is up");
780     running.set(false);
781 
782     checkException(workers);
783 
784     for (Worker worker : workers) {
785       worker.join();
786     }
787     LOG.info("All Worker threads stopped");
788 
789     // verify
790     LOG.info("Verify actions of all threads succeeded");
791     checkException(workers);
792     LOG.info("Verify states of all tables");
793     verifyTables();
794 
795     // RUN HBCK
796 
797     HBaseFsck hbck = null;
798     try {
799       LOG.info("Running hbck");
800       hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
801       HbckTestingUtil.assertNoErrors(hbck);
802       LOG.info("Finished hbck");
803     } finally {
804       if (hbck != null) {
805         hbck.close();
806       }
807     }
808      return 0;
809   }
810 
811   @Override
812   public TableName getTablename() {
813     return null;
814   }
815 
816   @Override
817   protected Set<String> getColumnFamilies() {
818     return null;
819   }
820 
821   public static void main(String[] args) throws Exception {
822     Configuration conf = HBaseConfiguration.create();
823     IntegrationTestingUtility.setUseDistributedCluster(conf);
824     IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
825     Connection connection = null;
826     int ret = 1;
827     try {
828       // Initialize connection once, then pass to Actions
829       LOG.debug("Setting up connection ...");
830       connection = ConnectionFactory.createConnection(conf);
831       masterFailover.setConnection(connection);
832       ret = ToolRunner.run(conf, masterFailover, args);
833     } catch (IOException e){
834       LOG.fatal("Failed to establish connection. Aborting test ...", e);
835     } finally {
836       connection = masterFailover.getConnection();
837       if (connection != null){
838         connection.close();
839       }
840       System.exit(ret);
841     }
842   }
843 }