View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
22  import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
23  import static org.junit.Assert.*;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.when;
26  
27  import java.io.IOException;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.testclassification.MediumTests;
40  import org.apache.hadoop.hbase.RegionLocations;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.client.ClusterConnection;
43  import org.apache.hadoop.hbase.client.ConnectionFactory;
44  import org.apache.hadoop.hbase.client.HTable;
45  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
46  import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
47  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
49  import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
50  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
51  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
52  import org.apache.hadoop.hbase.regionserver.HRegion;
53  import org.apache.hadoop.hbase.regionserver.HRegionServer;
54  import org.apache.hadoop.hbase.regionserver.Region;
55  import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
56  import org.apache.hadoop.hbase.wal.WAL;
57  import org.apache.hadoop.hbase.wal.WAL.Entry;
58  import org.apache.hadoop.hbase.wal.WALKey;
59  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
61  import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
62  import org.apache.hadoop.hbase.replication.ReplicationPeer;
63  import org.apache.hadoop.hbase.replication.WALEntryFilter;
64  import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
65  import org.apache.hadoop.hbase.testclassification.MediumTests;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
68  import org.junit.After;
69  import org.junit.AfterClass;
70  import org.junit.Assert;
71  import org.junit.Before;
72  import org.junit.BeforeClass;
73  import org.junit.Test;
74  import org.junit.experimental.categories.Category;
75  
76  import com.google.common.collect.Lists;
77  
78  /**
79   * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
80   * class contains lower level tests using callables.
81   */
82  @Category(MediumTests.class)
83  public class TestRegionReplicaReplicationEndpointNoMaster {
84  
85    private static final Log LOG = LogFactory.getLog(
86      TestRegionReplicaReplicationEndpointNoMaster.class);
87  
88    private static final int NB_SERVERS = 2;
89    private static TableName tableName = TableName.valueOf(
90      TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
91    private static HTable table;
92    private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
93  
94    private static HRegionServer rs0;
95    private static HRegionServer rs1;
96  
97    private static HRegionInfo hriPrimary;
98    private static HRegionInfo hriSecondary;
99  
100   private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
101   private static final byte[] f = HConstants.CATALOG_FAMILY;
102 
103   @BeforeClass
104   public static void beforeClass() throws Exception {
105     Configuration conf = HTU.getConfiguration();
106     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
107     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
108     conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
109 
110     // install WALObserver coprocessor for tests
111     String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
112     if (walCoprocs == null) {
113       walCoprocs = WALEditCopro.class.getName();
114     } else {
115       walCoprocs += "," + WALEditCopro.class.getName();
116     }
117     HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118       walCoprocs);
119     HTU.startMiniCluster(NB_SERVERS);
120 
121     // Create table then get the single region for our new table.
122     HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
123     table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration());
124 
125     hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
126 
127     // mock a secondary region info to open
128     hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
129         hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
130 
131     // No master
132     TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
133     rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
134     rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
135   }
136 
137   @AfterClass
138   public static void afterClass() throws Exception {
139     table.close();
140     HTU.shutdownMiniCluster();
141   }
142 
143   @Before
144   public void before() throws Exception{
145     entries.clear();
146   }
147 
148   @After
149   public void after() throws Exception {
150   }
151 
152   static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
153 
154   public static class WALEditCopro extends BaseWALObserver {
155     public WALEditCopro() {
156       entries.clear();
157     }
158     @Override
159     public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
160         HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
161       // only keep primary region's edits
162       if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) {
163         entries.add(new Entry(logKey, logEdit));
164       }
165     }
166   }
167 
168   @Test (timeout = 240000)
169   public void testReplayCallable() throws Exception {
170     // tests replaying the edits to a secondary region replica using the Callable directly
171     openRegion(HTU, rs0, hriSecondary);
172     ClusterConnection connection =
173         (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
174 
175     //load some data to primary
176     HTU.loadNumericRows(table, f, 0, 1000);
177 
178     Assert.assertEquals(1000, entries.size());
179     // replay the edits to the secondary using replay callable
180     replicateUsingCallable(connection, entries);
181 
182     Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
183     HTU.verifyNumericRows(region, f, 0, 1000);
184 
185     HTU.deleteNumericRows(table, f, 0, 1000);
186     closeRegion(HTU, rs0, hriSecondary);
187     connection.close();
188   }
189 
190   private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
191       throws IOException, RuntimeException {
192     Entry entry;
193     while ((entry = entries.poll()) != null) {
194       byte[] row = entry.getEdit().getCells().get(0).getRow();
195       RegionLocations locations = connection.locateRegion(tableName, row, true, true);
196       RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
197         RpcControllerFactory.instantiate(connection.getConfiguration()),
198         table.getName(), locations.getRegionLocation(1),
199         locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
200         new AtomicLong());
201 
202       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
203         connection.getConfiguration());
204       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
205     }
206   }
207 
208   @Test (timeout = 240000)
209   public void testReplayCallableWithRegionMove() throws Exception {
210     // tests replaying the edits to a secondary region replica using the Callable directly while
211     // the region is moved to another location.It tests handling of RME.
212     openRegion(HTU, rs0, hriSecondary);
213     ClusterConnection connection =
214         (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
215     //load some data to primary
216     HTU.loadNumericRows(table, f, 0, 1000);
217 
218     Assert.assertEquals(1000, entries.size());
219     // replay the edits to the secondary using replay callable
220     replicateUsingCallable(connection, entries);
221 
222     Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
223     HTU.verifyNumericRows(region, f, 0, 1000);
224 
225     HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
226 
227     // move the secondary region from RS0 to RS1
228     closeRegion(HTU, rs0, hriSecondary);
229     openRegion(HTU, rs1, hriSecondary);
230 
231     // replicate the new data
232     replicateUsingCallable(connection, entries);
233 
234     region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName());
235     // verify the new data. old data may or may not be there
236     HTU.verifyNumericRows(region, f, 1000, 2000);
237 
238     HTU.deleteNumericRows(table, f, 0, 2000);
239     closeRegion(HTU, rs1, hriSecondary);
240     connection.close();
241   }
242 
243   @Test (timeout = 240000)
244   public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
245     // tests replaying the edits to a secondary region replica using the RRRE.replicate()
246     openRegion(HTU, rs0, hriSecondary);
247     ClusterConnection connection =
248         (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
249     RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
250 
251     ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
252     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
253     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
254 
255     replicator.init(context);
256     replicator.start();
257 
258     //load some data to primary
259     HTU.loadNumericRows(table, f, 0, 1000);
260 
261     Assert.assertEquals(1000, entries.size());
262     // replay the edits to the secondary using replay callable
263     replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)));
264 
265     Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
266     HTU.verifyNumericRows(region, f, 0, 1000);
267 
268     HTU.deleteNumericRows(table, f, 0, 1000);
269     closeRegion(HTU, rs0, hriSecondary);
270     connection.close();
271   }
272 
273   @Test (timeout = 240000)
274   public void testReplayedEditsAreSkipped() throws Exception {
275     openRegion(HTU, rs0, hriSecondary);
276     ClusterConnection connection =
277         (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
278     RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
279 
280     ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
281     when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
282     when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
283 
284     ReplicationPeer mockPeer = mock(ReplicationPeer.class);
285     when(mockPeer.getTableCFs()).thenReturn(null);
286     when(context.getReplicationPeer()).thenReturn(mockPeer);
287 
288     replicator.init(context);
289     replicator.start();
290 
291     // test the filter for the RE, not actual replication
292     WALEntryFilter filter = replicator.getWALEntryfilter();
293 
294     //load some data to primary
295     HTU.loadNumericRows(table, f, 0, 1000);
296 
297     Assert.assertEquals(1000, entries.size());
298     for (Entry e: entries) {
299       if (Integer.parseInt(Bytes.toString(e.getEdit().getCells().get(0).getValue())) % 2 == 0) {
300         e.getKey().setOrigLogSeqNum(1); // simulate dist log replay by setting orig seq id
301       }
302     }
303 
304     long skipped = 0, replayed = 0;
305     for (Entry e : entries) {
306       if (filter.filter(e) == null) {
307         skipped++;
308       } else {
309         replayed++;
310       }
311     }
312 
313     assertEquals(500, skipped);
314     assertEquals(500, replayed);
315 
316     HTU.deleteNumericRows(table, f, 0, 1000);
317     closeRegion(HTU, rs0, hriSecondary);
318     connection.close();
319   }
320 
321 }