View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.*;
22  import static org.mockito.Mockito.mock;
23  import static org.mockito.Mockito.when;
24  
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.commons.logging.impl.Log4JLogger;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.HTableDescriptor;
38  import org.apache.hadoop.hbase.testclassification.MediumTests;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.Waiter;
41  import org.apache.hadoop.hbase.client.ClusterConnection;
42  import org.apache.hadoop.hbase.client.RpcRetryingCaller;
43  import org.apache.hadoop.hbase.client.Connection;
44  import org.apache.hadoop.hbase.client.ConnectionFactory;
45  import org.apache.hadoop.hbase.client.RegionLocator;
46  import org.apache.hadoop.hbase.client.Table;
47  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
48  import org.apache.hadoop.hbase.regionserver.HRegionServer;
49  import org.apache.hadoop.hbase.regionserver.Region;
50  import org.apache.hadoop.hbase.wal.WAL.Entry;
51  import org.apache.hadoop.hbase.wal.WALKey;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.replication.ReplicationException;
54  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
55  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
57  import org.apache.log4j.Level;
58  import org.junit.AfterClass;
59  import org.junit.BeforeClass;
60  import org.junit.Test;
61  import org.junit.experimental.categories.Category;
62  
63  import com.google.common.collect.Lists;
64  
65  /**
66   * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
67   * async wal replication replays the edits to the secondary region in various scenarios.
68   */
69  @Category(MediumTests.class)
70  public class TestRegionReplicaReplicationEndpoint {
71  
72    private static final Log LOG = LogFactory.getLog(TestRegionReplicaReplicationEndpoint.class);
73  
74    static {
75      ((Log4JLogger)RpcRetryingCaller.LOG).getLogger().setLevel(Level.ALL);
76    }
77  
78    private static final int NB_SERVERS = 2;
79  
80    private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
81  
82    @BeforeClass
83    public static void beforeClass() throws Exception {
84      Configuration conf = HTU.getConfiguration();
85      conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
86      conf.setInt("replication.source.size.capacity", 10240);
87      conf.setLong("replication.source.sleepforretries", 100);
88      conf.setInt("hbase.regionserver.maxlogs", 10);
89      conf.setLong("hbase.master.logcleaner.ttl", 10);
90      conf.setInt("zookeeper.recovery.retry", 1);
91      conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
92      conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
93      conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
94      conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
95      conf.setInt("replication.stats.thread.period.seconds", 5);
96      conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
97      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
98      conf.setInt("hbase.client.serverside.retries.multiplier", 1);
99  
100     HTU.startMiniCluster(NB_SERVERS);
101   }
102 
103   @AfterClass
104   public static void afterClass() throws Exception {
105     HTU.shutdownMiniCluster();
106   }
107 
108   @Test
109   public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException {
110     // create a table with region replicas. Check whether the replication peer is created
111     // and replication started.
112     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
113     String peerId = "region_replica_replication";
114 
115     if (admin.getPeerConfig(peerId) != null) {
116       admin.removePeer(peerId);
117     }
118 
119     HTableDescriptor htd = HTU.createTableDescriptor(
120       "testReplicationPeerIsCreated_no_region_replicas");
121     HTU.getHBaseAdmin().createTable(htd);
122     ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
123     assertNull(peerConfig);
124 
125     htd = HTU.createTableDescriptor("testReplicationPeerIsCreated");
126     htd.setRegionReplication(2);
127     HTU.getHBaseAdmin().createTable(htd);
128 
129     // assert peer configuration is correct
130     peerConfig = admin.getPeerConfig(peerId);
131     assertNotNull(peerConfig);
132     assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
133         HTU.getConfiguration()));
134     assertEquals(peerConfig.getReplicationEndpointImpl(),
135       RegionReplicaReplicationEndpoint.class.getName());
136     admin.close();
137   }
138 
139   @Test (timeout=240000)
140   public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
141     // modify a table by adding region replicas. Check whether the replication peer is created
142     // and replication started.
143     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
144     String peerId = "region_replica_replication";
145 
146     if (admin.getPeerConfig(peerId) != null) {
147       admin.removePeer(peerId);
148     }
149 
150     HTableDescriptor htd
151       = HTU.createTableDescriptor("testRegionReplicaReplicationPeerIsCreatedForModifyTable");
152     HTU.getHBaseAdmin().createTable(htd);
153 
154     // assert that replication peer is not created yet
155     ReplicationPeerConfig peerConfig = admin.getPeerConfig(peerId);
156     assertNull(peerConfig);
157 
158     HTU.getHBaseAdmin().disableTable(htd.getTableName());
159     htd.setRegionReplication(2);
160     HTU.getHBaseAdmin().modifyTable(htd.getTableName(), htd);
161     HTU.getHBaseAdmin().enableTable(htd.getTableName());
162 
163     // assert peer configuration is correct
164     peerConfig = admin.getPeerConfig(peerId);
165     assertNotNull(peerConfig);
166     assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
167         HTU.getConfiguration()));
168     assertEquals(peerConfig.getReplicationEndpointImpl(),
169       RegionReplicaReplicationEndpoint.class.getName());
170     admin.close();
171   }
172 
173   public void testRegionReplicaReplication(int regionReplication) throws Exception {
174     // test region replica replication. Create a table with single region, write some data
175     // ensure that data is replicated to the secondary region
176     TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
177         + regionReplication);
178     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
179     htd.setRegionReplication(regionReplication);
180     HTU.getHBaseAdmin().createTable(htd);
181     TableName tableNameNoReplicas =
182         TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
183     HTU.deleteTableIfAny(tableNameNoReplicas);
184     HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
185 
186     Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
187     Table table = connection.getTable(tableName);
188     Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
189 
190     try {
191       // load some data to the non-replicated table
192       HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
193 
194       // load the data to the table
195       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
196 
197       verifyReplication(tableName, regionReplication, 0, 1000);
198 
199     } finally {
200       table.close();
201       tableNoReplicas.close();
202       HTU.deleteTableIfAny(tableNameNoReplicas);
203       connection.close();
204     }
205   }
206 
207   private void verifyReplication(TableName tableName, int regionReplication,
208       final int startRow, final int endRow) throws Exception {
209     verifyReplication(tableName, regionReplication, startRow, endRow, true);
210   }
211 
212   private void verifyReplication(TableName tableName, int regionReplication,
213       final int startRow, final int endRow, final boolean present) throws Exception {
214     // find the regions
215     final Region[] regions = new Region[regionReplication];
216 
217     for (int i=0; i < NB_SERVERS; i++) {
218       HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
219       List<Region> onlineRegions = rs.getOnlineRegions(tableName);
220       for (Region region : onlineRegions) {
221         regions[region.getRegionInfo().getReplicaId()] = region;
222       }
223     }
224 
225     for (Region region : regions) {
226       assertNotNull(region);
227     }
228 
229     for (int i = 1; i < regionReplication; i++) {
230       final Region region = regions[i];
231       // wait until all the data is replicated to all secondary regions
232       Waiter.waitFor(HTU.getConfiguration(), 90000, new Waiter.Predicate<Exception>() {
233         @Override
234         public boolean evaluate() throws Exception {
235           LOG.info("verifying replication for region replica:" + region.getRegionInfo());
236           try {
237             HTU.verifyNumericRows(region, HBaseTestingUtility.fam1, startRow, endRow, present);
238           } catch(Throwable ex) {
239             LOG.warn("Verification from secondary region is not complete yet", ex);
240             // still wait
241             return false;
242           }
243           return true;
244         }
245       });
246     }
247   }
248 
249   @Test(timeout = 240000)
250   public void testRegionReplicaReplicationWith2Replicas() throws Exception {
251     testRegionReplicaReplication(2);
252   }
253 
254   @Test(timeout = 240000)
255   public void testRegionReplicaReplicationWith3Replicas() throws Exception {
256     testRegionReplicaReplication(3);
257   }
258 
259   @Test(timeout = 240000)
260   public void testRegionReplicaReplicationWith10Replicas() throws Exception {
261     testRegionReplicaReplication(10);
262   }
263 
264   @Test (timeout = 240000)
265   public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
266     int regionReplication = 3;
267     TableName tableName = TableName.valueOf("testRegionReplicaWithoutMemstoreReplication");
268     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
269     htd.setRegionReplication(regionReplication);
270     htd.setRegionMemstoreReplication(false);
271     HTU.getHBaseAdmin().createTable(htd);
272 
273     Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
274     Table table = connection.getTable(tableName);
275     try {
276       // write data to the primary. The replicas should not receive the data
277       final int STEP = 100;
278       for (int i = 0; i < 3; ++i) {
279         final int startRow = i * STEP;
280         final int endRow = (i + 1) * STEP;
281         LOG.info("Writing data from " + startRow + " to " + endRow);
282         HTU.loadNumericRows(table, HBaseTestingUtility.fam1, startRow, endRow);
283         verifyReplication(tableName, regionReplication, startRow, endRow, false);
284 
285         // Flush the table, now the data should show up in the replicas
286         LOG.info("flushing table");
287         HTU.flush(tableName);
288         verifyReplication(tableName, regionReplication, 0, endRow, true);
289       }
290     } finally {
291       table.close();
292       connection.close();
293     }
294   }
295 
296   @Test (timeout = 240000)
297   public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
298     // Tests a table with region replication 3. Writes some data, and causes flushes and
299     // compactions. Verifies that the data is readable from the replicas. Note that this
300     // does not test whether the replicas actually pick up flushed files and apply compaction
301     // to their stores
302     int regionReplication = 3;
303     TableName tableName = TableName.valueOf("testRegionReplicaReplicationForFlushAndCompaction");
304     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
305     htd.setRegionReplication(regionReplication);
306     HTU.getHBaseAdmin().createTable(htd);
307 
308     Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
309     Table table = connection.getTable(tableName);
310 
311     try {
312       // load the data to the table
313 
314       for (int i = 0; i < 6000; i += 1000) {
315         LOG.info("Writing data from " + i + " to " + (i+1000));
316         HTU.loadNumericRows(table, HBaseTestingUtility.fam1, i, i+1000);
317         LOG.info("flushing table");
318         HTU.flush(tableName);
319         LOG.info("compacting table");
320         HTU.compact(tableName, false);
321       }
322 
323       verifyReplication(tableName, regionReplication, 0, 1000);
324     } finally {
325       table.close();
326       connection.close();
327     }
328   }
329 
330   @Test (timeout = 240000)
331   public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
332     testRegionReplicaReplicationIgnoresDisabledTables(false);
333   }
334 
335   @Test (timeout = 240000)
336   public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
337     testRegionReplicaReplicationIgnoresDisabledTables(true);
338   }
339 
340   public void testRegionReplicaReplicationIgnoresDisabledTables(boolean dropTable)
341       throws Exception {
342     // tests having edits from a disabled or dropped table is handled correctly by skipping those
343     // entries and further edits after the edits from dropped/disabled table can be replicated
344     // without problems.
345     TableName tableName = TableName.valueOf("testRegionReplicaReplicationIgnoresDisabledTables"
346       + dropTable);
347     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
348     int regionReplication = 3;
349     htd.setRegionReplication(regionReplication);
350     HTU.deleteTableIfAny(tableName);
351     HTU.getHBaseAdmin().createTable(htd);
352     TableName toBeDisabledTable = TableName.valueOf(dropTable ? "droppedTable" : "disabledTable");
353     HTU.deleteTableIfAny(toBeDisabledTable);
354     htd = HTU.createTableDescriptor(toBeDisabledTable.toString());
355     htd.setRegionReplication(regionReplication);
356     HTU.getHBaseAdmin().createTable(htd);
357 
358     // both tables are created, now pause replication
359     ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
360     admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
361 
362     // now that the replication is disabled, write to the table to be dropped, then drop the table.
363 
364     Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
365     Table table = connection.getTable(tableName);
366     Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
367 
368     HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
369 
370     AtomicLong skippedEdits = new AtomicLong();
371     RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
372         mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
373     when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
374     RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
375         new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
376           (ClusterConnection) connection,
377           Executors.newSingleThreadExecutor(), Integer.MAX_VALUE);
378     RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
379     HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
380     byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
381 
382     Entry entry = new Entry(
383       new WALKey(encodedRegionName, toBeDisabledTable, 1),
384       new WALEdit());
385 
386     HTU.getHBaseAdmin().disableTable(toBeDisabledTable); // disable the table
387     if (dropTable) {
388       HTU.getHBaseAdmin().deleteTable(toBeDisabledTable);
389     }
390 
391     sinkWriter.append(toBeDisabledTable, encodedRegionName,
392       HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
393 
394     assertEquals(2, skippedEdits.get());
395 
396     try {
397       // load some data to the to-be-dropped table
398 
399       // load the data to the table
400       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
401 
402       // now enable the replication
403       admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
404 
405       verifyReplication(tableName, regionReplication, 0, 1000);
406 
407     } finally {
408       admin.close();
409       table.close();
410       rl.close();
411       tableToBeDisabled.close();
412       HTU.deleteTableIfAny(toBeDisabledTable);
413       connection.close();
414     }
415   }
416 }