1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
22 import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
23 import static org.junit.Assert.*;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.when;
26
27 import java.io.IOException;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentLinkedQueue;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.HTableDescriptor;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.apache.hadoop.hbase.RegionLocations;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.client.ClusterConnection;
43 import org.apache.hadoop.hbase.client.ConnectionFactory;
44 import org.apache.hadoop.hbase.client.HTable;
45 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
46 import org.apache.hadoop.hbase.coprocessor.BaseWALObserver;
47 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
48 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
49 import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
50 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
51 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
52 import org.apache.hadoop.hbase.regionserver.HRegion;
53 import org.apache.hadoop.hbase.regionserver.HRegionServer;
54 import org.apache.hadoop.hbase.regionserver.Region;
55 import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.apache.hadoop.hbase.wal.WAL.Entry;
58 import org.apache.hadoop.hbase.wal.WALKey;
59 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
60 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
61 import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
62 import org.apache.hadoop.hbase.replication.ReplicationPeer;
63 import org.apache.hadoop.hbase.replication.WALEntryFilter;
64 import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
65 import org.apache.hadoop.hbase.testclassification.MediumTests;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Assert;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75
76 import com.google.common.collect.Lists;
77
78
79
80
81
82 @Category(MediumTests.class)
83 public class TestRegionReplicaReplicationEndpointNoMaster {
84
85 private static final Log LOG = LogFactory.getLog(
86 TestRegionReplicaReplicationEndpointNoMaster.class);
87
88 private static final int NB_SERVERS = 2;
89 private static TableName tableName = TableName.valueOf(
90 TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
91 private static HTable table;
92 private static final byte[] row = "TestRegionReplicaReplicator".getBytes();
93
94 private static HRegionServer rs0;
95 private static HRegionServer rs1;
96
97 private static HRegionInfo hriPrimary;
98 private static HRegionInfo hriSecondary;
99
100 private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
101 private static final byte[] f = HConstants.CATALOG_FAMILY;
102
103 @BeforeClass
104 public static void beforeClass() throws Exception {
105 Configuration conf = HTU.getConfiguration();
106 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
107 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
108 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
109
110
111 String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
112 if (walCoprocs == null) {
113 walCoprocs = WALEditCopro.class.getName();
114 } else {
115 walCoprocs += "," + WALEditCopro.class.getName();
116 }
117 HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
118 walCoprocs);
119 HTU.startMiniCluster(NB_SERVERS);
120
121
122 HTableDescriptor htd = HTU.createTableDescriptor(tableName.toString());
123 table = HTU.createTable(htd, new byte[][]{f}, HTU.getConfiguration());
124
125 hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
126
127
128 hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
129 hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
130
131
132 TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
133 rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
134 rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
135 }
136
137 @AfterClass
138 public static void afterClass() throws Exception {
139 table.close();
140 HTU.shutdownMiniCluster();
141 }
142
143 @Before
144 public void before() throws Exception{
145 entries.clear();
146 }
147
148 @After
149 public void after() throws Exception {
150 }
151
152 static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
153
154 public static class WALEditCopro extends BaseWALObserver {
155 public WALEditCopro() {
156 entries.clear();
157 }
158 @Override
159 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
160 HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
161
162 if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) {
163 entries.add(new Entry(logKey, logEdit));
164 }
165 }
166 }
167
168 @Test (timeout = 240000)
169 public void testReplayCallable() throws Exception {
170
171 openRegion(HTU, rs0, hriSecondary);
172 ClusterConnection connection =
173 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
174
175
176 HTU.loadNumericRows(table, f, 0, 1000);
177
178 Assert.assertEquals(1000, entries.size());
179
180 replicateUsingCallable(connection, entries);
181
182 Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
183 HTU.verifyNumericRows(region, f, 0, 1000);
184
185 HTU.deleteNumericRows(table, f, 0, 1000);
186 closeRegion(HTU, rs0, hriSecondary);
187 connection.close();
188 }
189
190 private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
191 throws IOException, RuntimeException {
192 Entry entry;
193 while ((entry = entries.poll()) != null) {
194 byte[] row = entry.getEdit().getCells().get(0).getRow();
195 RegionLocations locations = connection.locateRegion(tableName, row, true, true);
196 RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
197 RpcControllerFactory.instantiate(connection.getConfiguration()),
198 table.getName(), locations.getRegionLocation(1),
199 locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
200 new AtomicLong());
201
202 RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
203 connection.getConfiguration());
204 factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
205 }
206 }
207
208 @Test (timeout = 240000)
209 public void testReplayCallableWithRegionMove() throws Exception {
210
211
212 openRegion(HTU, rs0, hriSecondary);
213 ClusterConnection connection =
214 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
215
216 HTU.loadNumericRows(table, f, 0, 1000);
217
218 Assert.assertEquals(1000, entries.size());
219
220 replicateUsingCallable(connection, entries);
221
222 Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
223 HTU.verifyNumericRows(region, f, 0, 1000);
224
225 HTU.loadNumericRows(table, f, 1000, 2000);
226
227
228 closeRegion(HTU, rs0, hriSecondary);
229 openRegion(HTU, rs1, hriSecondary);
230
231
232 replicateUsingCallable(connection, entries);
233
234 region = rs1.getFromOnlineRegions(hriSecondary.getEncodedName());
235
236 HTU.verifyNumericRows(region, f, 1000, 2000);
237
238 HTU.deleteNumericRows(table, f, 0, 2000);
239 closeRegion(HTU, rs1, hriSecondary);
240 connection.close();
241 }
242
243 @Test (timeout = 240000)
244 public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
245
246 openRegion(HTU, rs0, hriSecondary);
247 ClusterConnection connection =
248 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
249 RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
250
251 ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
252 when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
253 when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
254
255 replicator.init(context);
256 replicator.start();
257
258
259 HTU.loadNumericRows(table, f, 0, 1000);
260
261 Assert.assertEquals(1000, entries.size());
262
263 replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries)));
264
265 Region region = rs0.getFromOnlineRegions(hriSecondary.getEncodedName());
266 HTU.verifyNumericRows(region, f, 0, 1000);
267
268 HTU.deleteNumericRows(table, f, 0, 1000);
269 closeRegion(HTU, rs0, hriSecondary);
270 connection.close();
271 }
272
273 @Test (timeout = 240000)
274 public void testReplayedEditsAreSkipped() throws Exception {
275 openRegion(HTU, rs0, hriSecondary);
276 ClusterConnection connection =
277 (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
278 RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
279
280 ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
281 when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
282 when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
283
284 ReplicationPeer mockPeer = mock(ReplicationPeer.class);
285 when(mockPeer.getTableCFs()).thenReturn(null);
286 when(context.getReplicationPeer()).thenReturn(mockPeer);
287
288 replicator.init(context);
289 replicator.start();
290
291
292 WALEntryFilter filter = replicator.getWALEntryfilter();
293
294
295 HTU.loadNumericRows(table, f, 0, 1000);
296
297 Assert.assertEquals(1000, entries.size());
298 for (Entry e: entries) {
299 if (Integer.parseInt(Bytes.toString(e.getEdit().getCells().get(0).getValue())) % 2 == 0) {
300 e.getKey().setOrigLogSeqNum(1);
301 }
302 }
303
304 long skipped = 0, replayed = 0;
305 for (Entry e : entries) {
306 if (filter.filter(e) == null) {
307 skipped++;
308 } else {
309 replayed++;
310 }
311 }
312
313 assertEquals(500, skipped);
314 assertEquals(500, replayed);
315
316 HTU.deleteNumericRows(table, f, 0, 1000);
317 closeRegion(HTU, rs0, hriSecondary);
318 connection.close();
319 }
320
321 }