1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.UUID;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.testclassification.MediumTests;
33 import org.apache.hadoop.hbase.Waiter;
34 import org.apache.hadoop.hbase.client.Connection;
35 import org.apache.hadoop.hbase.client.ConnectionFactory;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
39 import org.apache.hadoop.hbase.wal.WAL.Entry;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
42 import org.apache.hadoop.hbase.util.Threads;
43 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
44 import org.junit.AfterClass;
45 import org.junit.Assert;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Ignore;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52
53
54
55 @Category(MediumTests.class)
56 public class TestReplicationEndpoint extends TestReplicationBase {
57 static final Log LOG = LogFactory.getLog(TestReplicationEndpoint.class);
58
59 static int numRegionServers;
60
61 @BeforeClass
62 public static void setUpBeforeClass() throws Exception {
63 TestReplicationBase.setUpBeforeClass();
64 utility2.shutdownMiniCluster();
65 admin.removePeer("2");
66 numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
67 }
68
69 @AfterClass
70 public static void tearDownAfterClass() throws Exception {
71 TestReplicationBase.tearDownAfterClass();
72
73 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
74 }
75
76 @Before
77 public void setup() throws FailedLogCloseException, IOException {
78 ReplicationEndpointForTest.contructedCount.set(0);
79 ReplicationEndpointForTest.startedCount.set(0);
80 ReplicationEndpointForTest.replicateCount.set(0);
81 ReplicationEndpointReturningFalse.replicated.set(false);
82 ReplicationEndpointForTest.lastEntries = null;
83 for (RegionServerThread rs : utility1.getMiniHBaseCluster().getRegionServerThreads()) {
84 utility1.getHBaseAdmin().rollWALWriter(rs.getRegionServer().getServerName());
85 }
86 }
87
88 @Test (timeout=120000)
89 public void testCustomReplicationEndpoint() throws Exception {
90
91 admin.addPeer("testCustomReplicationEndpoint",
92 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
93 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
94
95
96 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
97 @Override
98 public boolean evaluate() throws Exception {
99 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
100 }
101 });
102
103 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
104 @Override
105 public boolean evaluate() throws Exception {
106 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
107 }
108 });
109
110 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
111
112
113 doPut(Bytes.toBytes("row42"));
114
115 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
116 @Override
117 public boolean evaluate() throws Exception {
118 return ReplicationEndpointForTest.replicateCount.get() >= 1;
119 }
120 });
121
122 doAssert(Bytes.toBytes("row42"));
123
124 admin.removePeer("testCustomReplicationEndpoint");
125 }
126
127 @Ignore @Test (timeout=120000)
128 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
129 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
130 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
131 int peerCount = admin.getPeersCount();
132 final String id = "testReplicationEndpointReturnsFalseOnReplicate";
133 admin.addPeer(id,
134 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
135 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
136
137
138
139 if (admin.getPeersCount() <= peerCount) {
140 LOG.info("Waiting on peercount to go up from " + peerCount);
141 Threads.sleep(100);
142 }
143
144 doPut(row);
145
146 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
147 @Override
148 public boolean evaluate() throws Exception {
149
150
151 int count = ReplicationEndpointForTest.replicateCount.get();
152 LOG.info("count=" + count);
153 return ReplicationEndpointReturningFalse.replicated.get();
154 }
155 });
156 if (ReplicationEndpointReturningFalse.ex.get() != null) {
157 throw ReplicationEndpointReturningFalse.ex.get();
158 }
159
160 admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
161 }
162
163 @Test (timeout=120000)
164 public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
165 admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
166 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
167 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
168
169 try (Connection connection = ConnectionFactory.createConnection(conf1)) {
170 doPut(connection, Bytes.toBytes("row1"));
171 doPut(connection, row);
172 doPut(connection, Bytes.toBytes("row2"));
173 }
174
175 Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
176 @Override
177 public boolean evaluate() throws Exception {
178 return ReplicationEndpointForTest.replicateCount.get() >= 1;
179 }
180 });
181
182 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
183 admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
184 }
185
186
187 private void doPut(byte[] row) throws IOException {
188 try (Connection connection = ConnectionFactory.createConnection(conf1)) {
189 doPut(connection, row);
190 }
191 }
192
193 private void doPut(final Connection connection, final byte [] row) throws IOException {
194 try (Table t = connection.getTable(tableName)) {
195 Put put = new Put(row);
196 put.add(famName, row, row);
197 t.put(put);
198 }
199 }
200
201 private static void doAssert(byte[] row) throws Exception {
202 if (ReplicationEndpointForTest.lastEntries == null) {
203 return;
204 }
205 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
206 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
207 Assert.assertEquals(1, cells.size());
208 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
209 cells.get(0).getRowLength(), row, 0, row.length));
210 }
211
212 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
213 static UUID uuid = UUID.randomUUID();
214 static AtomicInteger contructedCount = new AtomicInteger();
215 static AtomicInteger startedCount = new AtomicInteger();
216 static AtomicInteger stoppedCount = new AtomicInteger();
217 static AtomicInteger replicateCount = new AtomicInteger();
218 static volatile List<Entry> lastEntries = null;
219
220 public ReplicationEndpointForTest() {
221 contructedCount.incrementAndGet();
222 }
223
224 @Override
225 public UUID getPeerUUID() {
226 return uuid;
227 }
228
229 @Override
230 public boolean replicate(ReplicateContext replicateContext) {
231 replicateCount.incrementAndGet();
232 lastEntries = replicateContext.entries;
233 return true;
234 }
235
236 @Override
237 protected void doStart() {
238 startedCount.incrementAndGet();
239 notifyStarted();
240 }
241
242 @Override
243 protected void doStop() {
244 stoppedCount.incrementAndGet();
245 notifyStopped();
246 }
247 }
248
249 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
250 static int COUNT = 10;
251 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
252 static AtomicBoolean replicated = new AtomicBoolean(false);
253 @Override
254 public boolean replicate(ReplicateContext replicateContext) {
255 try {
256
257 doAssert(row);
258 } catch (Exception e) {
259 ex.set(e);
260 }
261
262 super.replicate(replicateContext);
263 LOG.info("Replicated " + row + ", count=" + replicateCount.get());
264
265 replicated.set(replicateCount.get() > COUNT);
266 return replicated.get();
267 }
268 }
269
270
271 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
272 static AtomicReference<Exception> ex = new AtomicReference<Exception>(null);
273
274 @Override
275 public boolean replicate(ReplicateContext replicateContext) {
276 try {
277 super.replicate(replicateContext);
278 doAssert(row);
279 } catch (Exception e) {
280 ex.set(e);
281 }
282 return true;
283 }
284
285 @Override
286 public WALEntryFilter getWALEntryfilter() {
287 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
288 @Override
289 public Entry filter(Entry entry) {
290 ArrayList<Cell> cells = entry.getEdit().getCells();
291 int size = cells.size();
292 for (int i = size-1; i >= 0; i--) {
293 Cell cell = cells.get(i);
294 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
295 row, 0, row.length)) {
296 cells.remove(i);
297 }
298 }
299 return entry;
300 }
301 });
302 }
303 }
304 }