View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.fail;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.testclassification.LargeTests;
28  import org.apache.hadoop.hbase.MiniHBaseCluster;
29  import org.apache.hadoop.hbase.client.Get;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.client.Result;
32  import org.apache.hadoop.hbase.client.ResultScanner;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.hbase.util.JVMClusterUtil;
36  import org.junit.Before;
37  import org.junit.Test;
38  import org.junit.experimental.categories.Category;
39  
40  import java.io.IOException;
41  
42  /**
43   * Test handling of changes to the number of a peer's regionservers.
44   */
45  @Category(LargeTests.class)
46  public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
47  
48    private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
49  
50    /**
51     * @throws java.lang.Exception
52     */
53    @Before
54    public void setUp() throws Exception {
55      // Starting and stopping replication can make us miss new logs,
56      // rolling like this makes sure the most recent one gets added to the queue
57      for (JVMClusterUtil.RegionServerThread r :
58                            utility1.getHBaseCluster().getRegionServerThreads()) {
59        utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
60      }
61      utility1.deleteTableData(tableName);
62      // truncating the table will send one Delete per row to the slave cluster
63      // in an async fashion, which is why we cannot just call deleteTableData on
64      // utility2 since late writes could make it to the slave in some way.
65      // Instead, we truncate the first table and wait for all the Deletes to
66      // make it to the slave.
67      Scan scan = new Scan();
68      int lastCount = 0;
69      for (int i = 0; i < NB_RETRIES; i++) {
70        if (i == NB_RETRIES - 1) {
71          fail("Waited too much time for truncate");
72        }
73        ResultScanner scanner = htable2.getScanner(scan);
74        Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
75        scanner.close();
76        if (res.length != 0) {
77          if (res.length < lastCount) {
78            i--; // Don't increment timeout if we make progress
79          }
80          lastCount = res.length;
81          LOG.info("Still got " + res.length + " rows");
82          Thread.sleep(SLEEP_TIME);
83        } else {
84          break;
85        }
86      }
87    }
88  
89    @Test(timeout = 300000)
90    public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
91  
92      LOG.info("testSimplePutDelete");
93      MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
94  
95      doPutTest(Bytes.toBytes(1));
96  
97      int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
98      peerCluster.stopRegionServer(rsToStop);
99      peerCluster.waitOnRegionServer(rsToStop);
100 
101     // Sanity check
102     assertEquals(1, peerCluster.getRegionServerThreads().size());
103 
104     doPutTest(Bytes.toBytes(2));
105 
106     peerCluster.startRegionServer();
107 
108     // Sanity check
109     assertEquals(2, peerCluster.getRegionServerThreads().size());
110 
111     doPutTest(Bytes.toBytes(3));
112 
113   }
114 
115   private void doPutTest(byte[] row) throws IOException, InterruptedException {
116     Put put = new Put(row);
117     put.add(famName, row, row);
118 
119     if (htable1 == null) {
120       htable1 = utility1.getConnection().getTable(tableName);
121     }
122 
123     htable1.put(put);
124 
125     Get get = new Get(row);
126     for (int i = 0; i < NB_RETRIES; i++) {
127       if (i == NB_RETRIES - 1) {
128         fail("Waited too much time for put replication");
129       }
130       Result res = htable2.get(get);
131       if (res.size() == 0) {
132         LOG.info("Row not available");
133         Thread.sleep(SLEEP_TIME);
134       } else {
135         assertArrayEquals(res.value(), row);
136         break;
137       }
138     }
139 
140   }
141 
142 }