View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.testclassification.LargeTests;
26  import org.apache.hadoop.hbase.UnknownScannerException;
27  import org.apache.hadoop.hbase.client.HTable;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.client.ResultScanner;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.junit.experimental.categories.Category;
32  
33  import static org.junit.Assert.fail;
34  
35  @Category(LargeTests.class)
36  public class TestReplicationKillRS extends TestReplicationBase {
37  
38    private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
39  
40    /**
41     * Load up 1 tables over 2 region servers and kill a source during
42     * the upload. The failover happens internally.
43     *
44     * WARNING this test sometimes fails because of HBASE-3515
45     *
46     * @throws Exception
47     */
48    public void loadTableAndKillRS(HBaseTestingUtility util) throws Exception {
49      // killing the RS with hbase:meta can result into failed puts until we solve
50      // IO fencing
51      int rsToKill1 =
52          util.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
53  
54      // Takes about 20 secs to run the full loading, kill around the middle
55      Thread killer = killARegionServer(util, 5000, rsToKill1);
56  
57      LOG.info("Start loading table");
58      int initialCount = utility1.loadTable((HTable)htable1, famName);
59      LOG.info("Done loading table");
60      killer.join(5000);
61      LOG.info("Done waiting for threads");
62  
63      Result[] res;
64      while (true) {
65        try {
66          Scan scan = new Scan();
67          ResultScanner scanner = htable1.getScanner(scan);
68          res = scanner.next(initialCount);
69          scanner.close();
70          break;
71        } catch (UnknownScannerException ex) {
72          LOG.info("Cluster wasn't ready yet, restarting scanner");
73        }
74      }
75      // Test we actually have all the rows, we may miss some because we
76      // don't have IO fencing.
77      if (res.length != initialCount) {
78        LOG.warn("We lost some rows on the master cluster!");
79        // We don't really expect the other cluster to have more rows
80        initialCount = res.length;
81      }
82  
83      int lastCount = 0;
84  
85      final long start = System.currentTimeMillis();
86      int i = 0;
87      while (true) {
88        if (i==NB_RETRIES-1) {
89          fail("Waited too much time for queueFailover replication. " +
90              "Waited "+(System.currentTimeMillis() - start)+"ms.");
91        }
92        Scan scan2 = new Scan();
93        ResultScanner scanner2 = htable2.getScanner(scan2);
94        Result[] res2 = scanner2.next(initialCount * 2);
95        scanner2.close();
96        if (res2.length < initialCount) {
97          if (lastCount < res2.length) {
98            i--; // Don't increment timeout if we make progress
99          } else {
100           i++;
101         }
102         lastCount = res2.length;
103         LOG.info("Only got " + lastCount + " rows instead of " +
104             initialCount + " current i=" + i);
105         Thread.sleep(SLEEP_TIME*2);
106       } else {
107         break;
108       }
109     }
110   }
111 
112   private static Thread killARegionServer(final HBaseTestingUtility utility,
113                                           final long timeout, final int rs) {
114     Thread killer = new Thread() {
115       public void run() {
116         try {
117           Thread.sleep(timeout);
118           utility.getHBaseCluster().getRegionServer(rs).stop("Stopping as part of the test");
119         } catch (Exception e) {
120           LOG.error("Couldn't kill a region server", e);
121         }
122       }
123     };
124     killer.setDaemon(true);
125     killer.start();
126     return killer;
127   }
128 }