View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.logging.impl.Log4JLogger;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.Waiter.Predicate;
39  import org.apache.hadoop.hbase.client.Admin;
40  import org.apache.hadoop.hbase.client.Connection;
41  import org.apache.hadoop.hbase.client.ConnectionFactory;
42  import org.apache.hadoop.hbase.client.Consistency;
43  import org.apache.hadoop.hbase.client.Get;
44  import org.apache.hadoop.hbase.client.RpcRetryingCaller;
45  import org.apache.hadoop.hbase.client.Table;
46  import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
47  import org.apache.hadoop.hbase.testclassification.LargeTests;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
50  import org.apache.hadoop.hbase.util.Threads;
51  import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
52  import org.apache.log4j.Level;
53  import org.junit.After;
54  import org.junit.Before;
55  import org.junit.Rule;
56  import org.junit.Test;
57  import org.junit.experimental.categories.Category;
58  import org.junit.rules.TestName;
59  import org.junit.runner.RunWith;
60  import org.junit.runners.Parameterized;
61  import org.junit.runners.Parameterized.Parameters;
62  
63  /**
64   * Tests failover of secondary region replicas.
65   */
66  @RunWith(Parameterized.class)
67  @Category(LargeTests.class)
68  public class TestRegionReplicaFailover {
69  
70    private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
71  
72    static {
73      ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
74    }
75  
76    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
77  
78    private static final int NB_SERVERS = 3;
79  
80    protected final byte[][] families = new byte[][] {HBaseTestingUtility.fam1,
81        HBaseTestingUtility.fam2, HBaseTestingUtility.fam3};
82    protected final byte[] fam = HBaseTestingUtility.fam1;
83    protected final byte[] qual1 = Bytes.toBytes("qual1");
84    protected final byte[] value1 = Bytes.toBytes("value1");
85    protected final byte[] row = Bytes.toBytes("rowA");
86    protected final byte[] row2 = Bytes.toBytes("rowB");
87  
88    @Rule public TestName name = new TestName();
89  
90    private HTableDescriptor htd;
91  
92    /*
93     * We are testing with dist log split and dist log replay separately
94     */
95    @Parameters
96    public static Collection<Object[]> getParameters() {
97      Object[][] params =
98          new Boolean[][] { {true}, {false} };
99      return Arrays.asList(params);
100   }
101 
102   @Parameterized.Parameter(0)
103   public boolean distributedLogReplay;
104 
105   @Before
106   public void before() throws Exception {
107     Configuration conf = HTU.getConfiguration();
108     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
109     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
110     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, true);
111     conf.setInt("replication.stats.thread.period.seconds", 5);
112     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
113     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
114     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 60);
115 
116     HTU.startMiniCluster(NB_SERVERS);
117     htd = HTU.createTableDescriptor(
118       name.getMethodName().substring(0, name.getMethodName().length()-3));
119     htd.setRegionReplication(3);
120     HTU.getHBaseAdmin().createTable(htd);
121   }
122 
123   @After
124   public void after() throws Exception {
125     HTU.deleteTableIfAny(htd.getTableName());
126     HTU.shutdownMiniCluster();
127   }
128 
129   /**
130    * Tests the case where a newly created table with region replicas and no data, the secondary
131    * region replicas are available to read immediately.
132    */
133   @Test(timeout = 60000)
134   public void testSecondaryRegionWithEmptyRegion() throws IOException {
135     // Create a new table with region replication, don't put any data. Test that the secondary
136     // region replica is available to read.
137     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
138         Table table = connection.getTable(htd.getTableName())) {
139 
140       Get get = new Get(row);
141       get.setConsistency(Consistency.TIMELINE);
142       get.setReplicaId(1);
143       table.get(get); // this should not block
144     }
145   }
146 
147   /**
148    * Tests the case where if there is some data in the primary region, reopening the region replicas
149    * (enable/disable table, etc) makes the region replicas readable.
150    * @throws IOException
151    */
152   @Test(timeout = 60000)
153   public void testSecondaryRegionWithNonEmptyRegion() throws IOException {
154     // Create a new table with region replication and load some data
155     // than disable and enable the table again and verify the data from secondary
156     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
157         Table table = connection.getTable(htd.getTableName())) {
158 
159       HTU.loadNumericRows(table, fam, 0, 1000);
160 
161       HTU.getHBaseAdmin().disableTable(htd.getTableName());
162       HTU.getHBaseAdmin().enableTable(htd.getTableName());
163 
164       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
165     }
166   }
167 
168   /**
169    * Tests the case where killing a primary region with unflushed data recovers
170    */
171   @Test (timeout = 120000)
172   public void testPrimaryRegionKill() throws Exception {
173     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
174         Table table = connection.getTable(htd.getTableName())) {
175 
176       HTU.loadNumericRows(table, fam, 0, 1000);
177 
178       // wal replication is async, we have to wait until the replication catches up, or we timeout
179       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
180       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
181 
182       // we should not have flushed files now, but data in memstores of primary and secondary
183       // kill the primary region replica now, and ensure that when it comes back up, we can still
184       // read from it the same data from primary and secondaries
185       boolean aborted = false;
186       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
187         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
188           if (r.getRegionInfo().getReplicaId() == 0) {
189             LOG.info("Aborting region server hosting primary region replica");
190             rs.getRegionServer().abort("for test");
191             aborted = true;
192           }
193         }
194       }
195       assertTrue(aborted);
196 
197       // wal replication is async, we have to wait until the replication catches up, or we timeout
198       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 0, 30000);
199       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
200       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
201     }
202 
203     // restart the region server
204     HTU.getMiniHBaseCluster().startRegionServer();
205   }
206 
207   /** wal replication is async, we have to wait until the replication catches up, or we timeout
208    */
209   private void verifyNumericRowsWithTimeout(final Table table, final byte[] f, final int startRow,
210       final int endRow, final int replicaId, final long timeout) throws Exception {
211     try {
212       HTU.waitFor(timeout, new Predicate<Exception>() {
213         @Override
214         public boolean evaluate() throws Exception {
215           try {
216             HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
217             return true;
218           } catch (AssertionError ae) {
219             return false;
220           }
221         }
222       });
223     } catch (Throwable t) {
224       // ignore this, but redo the verify do get the actual exception
225       HTU.verifyNumericRows(table, f, startRow, endRow, replicaId);
226     }
227   }
228 
229   /**
230    * Tests the case where killing a secondary region with unflushed data recovers, and the replica
231    * becomes available to read again shortly.
232    */
233   @Test (timeout = 120000)
234   public void testSecondaryRegionKill() throws Exception {
235     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
236         Table table = connection.getTable(htd.getTableName())) {
237       HTU.loadNumericRows(table, fam, 0, 1000);
238 
239       // wait for some time to ensure that async wal replication does it's magic
240       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 1, 30000);
241       verifyNumericRowsWithTimeout(table, fam, 0, 1000, 2, 30000);
242 
243       // we should not have flushed files now, but data in memstores of primary and secondary
244       // kill the secondary region replica now, and ensure that when it comes back up, we can still
245       // read from it the same data
246       boolean aborted = false;
247       for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
248         for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
249           if (r.getRegionInfo().getReplicaId() == 1) {
250             LOG.info("Aborting region server hosting secondary region replica");
251             rs.getRegionServer().abort("for test");
252             aborted = true;
253           }
254         }
255       }
256       assertTrue(aborted);
257 
258       Threads.sleep(5000);
259 
260       HTU.verifyNumericRows(table, fam, 0, 1000, 1);
261       HTU.verifyNumericRows(table, fam, 0, 1000, 2);
262     }
263 
264     // restart the region server
265     HTU.getMiniHBaseCluster().startRegionServer();
266   }
267 
268   /**
269    * Tests the case where there are 3 region replicas and the primary is continuously accepting
270    * new writes while one of the secondaries is killed. Verification is done for both of the
271    * secondary replicas.
272    */
273   @Test (timeout = 120000)
274   public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception {
275     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
276         Table table = connection.getTable(htd.getTableName());
277         Admin admin = connection.getAdmin()) {
278       // start a thread to do the loading of primary
279       HTU.loadNumericRows(table, fam, 0, 1000); // start with some base
280       admin.flush(table.getName());
281       HTU.loadNumericRows(table, fam, 1000, 2000);
282 
283       final AtomicReference<Throwable> ex = new AtomicReference<Throwable>(null);
284       final AtomicBoolean done = new AtomicBoolean(false);
285       final AtomicInteger key = new AtomicInteger(2000);
286 
287       Thread loader = new Thread() {
288         @Override
289         public void run() {
290           while (!done.get()) {
291             try {
292               HTU.loadNumericRows(table, fam, key.get(), key.get()+1000);
293               key.addAndGet(1000);
294             } catch (Throwable e) {
295               ex.compareAndSet(null, e);
296             }
297           }
298         }
299       };
300       loader.start();
301 
302       Thread aborter = new Thread() {
303         @Override
304         public void run() {
305           try {
306             boolean aborted = false;
307             for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
308               for (Region r : rs.getRegionServer().getOnlineRegions(htd.getTableName())) {
309                 if (r.getRegionInfo().getReplicaId() == 1) {
310                   LOG.info("Aborting region server hosting secondary region replica");
311                   rs.getRegionServer().abort("for test");
312                   aborted = true;
313                 }
314               }
315             }
316             assertTrue(aborted);
317           } catch (Throwable e) {
318             ex.compareAndSet(null, e);
319           }
320         };
321       };
322 
323       aborter.start();
324       aborter.join();
325       done.set(true);
326       loader.join();
327 
328       assertNull(ex.get());
329 
330       assertTrue(key.get() > 1000); // assert that the test is working as designed
331       LOG.info("Loaded up to key :" + key.get());
332       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 0, 30000);
333       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 1, 30000);
334       verifyNumericRowsWithTimeout(table, fam, 0, key.get(), 2, 30000);
335     }
336 
337     // restart the region server
338     HTU.getMiniHBaseCluster().startRegionServer();
339   }
340 
341   /**
342    * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
343    * replicas should not block handlers on RS indefinitely.
344    */
345   @Test (timeout = 120000)
346   public void testLotsOfRegionReplicas() throws IOException {
347     int numRegions = NB_SERVERS * 20;
348     int regionReplication = 10;
349     String tableName = htd.getTableName().getNameAsString() + "2";
350     htd = HTU.createTableDescriptor(tableName);
351     htd.setRegionReplication(regionReplication);
352 
353     // dont care about splits themselves too much
354     byte[] startKey = Bytes.toBytes("aaa");
355     byte[] endKey = Bytes.toBytes("zzz");
356     byte[][] splits = HTU.getRegionSplitStartKeys(startKey, endKey, numRegions);
357     HTU.getHBaseAdmin().createTable(htd, startKey, endKey, numRegions);
358 
359     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
360         Table table = connection.getTable(htd.getTableName())) {
361 
362       for (int i = 1; i < splits.length; i++) {
363         for (int j = 0; j < regionReplication; j++) {
364           Get get = new Get(splits[i]);
365           get.setConsistency(Consistency.TIMELINE);
366           get.setReplicaId(j);
367           table.get(get); // this should not block. Regions should be coming online
368         }
369       }
370     }
371 
372     HTU.deleteTableIfAny(TableName.valueOf(tableName));
373   }
374 }