1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.*;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.SortedMap;
26 import java.util.SortedSet;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.util.Pair;
33 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
34 import org.apache.zookeeper.KeeperException;
35 import org.junit.Before;
36 import org.junit.Test;
37
38
39
40
41
42 public abstract class TestReplicationStateBasic {
43
44 protected ReplicationQueues rq1;
45 protected ReplicationQueues rq2;
46 protected ReplicationQueues rq3;
47 protected ReplicationQueuesClient rqc;
48 protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
49 protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
50 protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
51 protected ReplicationPeers rp;
52 protected static final String ID_ONE = "1";
53 protected static final String ID_TWO = "2";
54 protected static String KEY_ONE;
55 protected static String KEY_TWO;
56
57
58 protected String OUR_ID = "3";
59 protected String OUR_KEY;
60
61 protected static int zkTimeoutCount;
62 protected static final int ZK_MAX_COUNT = 300;
63 protected static final int ZK_SLEEP_INTERVAL = 100;
64
65 private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
66
67 @Before
68 public void setUp() {
69 zkTimeoutCount = 0;
70 }
71
72 @Test
73 public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
74 rqc.init();
75
76 assertEquals(0, rqc.getListOfReplicators().size());
77 assertNull(rqc.getLogsInQueue(server1, "qId1"));
78 assertNull(rqc.getAllQueues(server1));
79
80
81
82
83
84 rq1.init(server1);
85 rq2.init(server2);
86 rq1.addLog("qId1", "trash");
87 rq1.removeLog("qId1", "trash");
88 rq1.addLog("qId2", "filename1");
89 rq1.addLog("qId3", "filename2");
90 rq1.addLog("qId3", "filename3");
91 rq2.addLog("trash", "trash");
92 rq2.removeQueue("trash");
93
94 List<String> reps = rqc.getListOfReplicators();
95 assertEquals(2, reps.size());
96 assertTrue(server1, reps.contains(server1));
97 assertTrue(server2, reps.contains(server2));
98
99 assertNull(rqc.getLogsInQueue("bogus", "bogus"));
100 assertNull(rqc.getLogsInQueue(server1, "bogus"));
101 assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
102 assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
103 assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
104
105 assertNull(rqc.getAllQueues("bogus"));
106 assertEquals(0, rqc.getAllQueues(server2).size());
107 List<String> list = rqc.getAllQueues(server1);
108 assertEquals(3, list.size());
109 assertTrue(list.contains("qId2"));
110 assertTrue(list.contains("qId3"));
111 }
112
113 @Test
114 public void testReplicationQueues() throws ReplicationException {
115 rq1.init(server1);
116 rq2.init(server2);
117 rq3.init(server3);
118
119 rp.init();
120
121
122 assertEquals(3, rq1.getListOfReplicators().size());
123 rq1.removeQueue("bogus");
124 rq1.removeLog("bogus", "bogus");
125 rq1.removeAllQueues();
126 assertNull(rq1.getAllQueues());
127 assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
128 assertNull(rq1.getLogsInQueue("bogus"));
129 assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
130
131 rq1.setLogPosition("bogus", "bogus", 5L);
132
133 populateQueues();
134
135 assertEquals(3, rq1.getListOfReplicators().size());
136 assertEquals(0, rq2.getLogsInQueue("qId1").size());
137 assertEquals(5, rq3.getLogsInQueue("qId5").size());
138 assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
139 rq3.setLogPosition("qId5", "filename4", 354L);
140 assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
141
142 assertEquals(5, rq3.getLogsInQueue("qId5").size());
143 assertEquals(0, rq2.getLogsInQueue("qId1").size());
144 assertEquals(0, rq1.getAllQueues().size());
145 assertEquals(1, rq2.getAllQueues().size());
146 assertEquals(5, rq3.getAllQueues().size());
147
148 assertEquals(0, rq3.claimQueues(server1).size());
149 assertEquals(2, rq3.getListOfReplicators().size());
150
151 SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
152 assertEquals(5, queues.size());
153 assertEquals(1, rq2.getListOfReplicators().size());
154
155
156 assertEquals(0, rq2.claimQueues(server2).size());
157
158 assertEquals(6, rq2.getAllQueues().size());
159
160 rq2.removeAllQueues();
161
162 assertEquals(0, rq2.getListOfReplicators().size());
163 }
164
165 @Test
166 public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
167 rp.init();
168 rq1.init(server1);
169 rqc.init();
170
171 List<Pair<Path, Path>> files1 = new ArrayList<>(3);
172 files1.add(new Pair<Path, Path>(null, new Path("file_1")));
173 files1.add(new Pair<Path, Path>(null, new Path("file_2")));
174 files1.add(new Pair<Path, Path>(null, new Path("file_3")));
175 assertNull(rqc.getReplicableHFiles(ID_ONE));
176 assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
177 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
178 rq1.addHFileRefs(ID_ONE, files1);
179 assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
180 assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
181 List<String> hfiles2 = new ArrayList<>();
182 for (Pair<Path, Path> p : files1) {
183 hfiles2.add(p.getSecond().getName());
184 }
185 String removedString = hfiles2.remove(0);
186 rq1.removeHFileRefs(ID_ONE, hfiles2);
187 assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
188 hfiles2 = new ArrayList<>(1);
189 hfiles2.add(removedString);
190 rq1.removeHFileRefs(ID_ONE, hfiles2);
191 assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
192 rp.removePeer(ID_ONE);
193 }
194
195 @Test
196 public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
197 rq1.init(server1);
198 rqc.init();
199
200 rp.init();
201 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
202 rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
203
204 List<Pair<Path, Path>> files1 = new ArrayList<>(3);
205 files1.add(new Pair<Path, Path>(null, new Path("file_1")));
206 files1.add(new Pair<Path, Path>(null, new Path("file_2")));
207 files1.add(new Pair<Path, Path>(null, new Path("file_3")));
208 rq1.addHFileRefs(ID_ONE, files1);
209 rq1.addHFileRefs(ID_TWO, files1);
210 assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
211 assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
212 assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
213
214 rp.removePeer(ID_ONE);
215 assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
216 assertNull(rqc.getReplicableHFiles(ID_ONE));
217 assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
218
219 rp.removePeer(ID_TWO);
220 assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
221 assertNull(rqc.getReplicableHFiles(ID_TWO));
222 }
223
224 @Test
225 public void testReplicationPeers() throws Exception {
226 rp.init();
227
228
229 try {
230 rp.removePeer("bogus");
231 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
232 } catch (IllegalArgumentException e) {
233 }
234 try {
235 rp.enablePeer("bogus");
236 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
237 } catch (IllegalArgumentException e) {
238 }
239 try {
240 rp.disablePeer("bogus");
241 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
242 } catch (IllegalArgumentException e) {
243 }
244 try {
245 rp.getStatusOfPeer("bogus");
246 fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
247 } catch (IllegalArgumentException e) {
248 }
249 assertFalse(rp.peerAdded("bogus"));
250 rp.peerRemoved("bogus");
251
252 assertNull(rp.getPeerConf("bogus"));
253 assertNumberOfPeers(0);
254
255
256 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
257 assertNumberOfPeers(1);
258 rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
259 assertNumberOfPeers(2);
260
261
262 try {
263 rp.getStatusOfPeer(ID_ONE);
264 fail("There are no connected peers, should have thrown an IllegalArgumentException");
265 } catch (IllegalArgumentException e) {
266 }
267 assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
268 rp.removePeer(ID_ONE);
269 rp.peerRemoved(ID_ONE);
270 assertNumberOfPeers(1);
271
272
273 rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
274 rp.peerAdded(ID_ONE);
275 assertNumberOfPeers(2);
276 assertTrue(rp.getStatusOfPeer(ID_ONE));
277 rp.disablePeer(ID_ONE);
278 assertConnectedPeerStatus(false, ID_ONE);
279 rp.enablePeer(ID_ONE);
280 assertConnectedPeerStatus(true, ID_ONE);
281
282
283 rp.peerRemoved(ID_ONE);
284 assertNumberOfPeers(2);
285 try {
286 rp.getStatusOfPeer(ID_ONE);
287 fail("There are no connected peers, should have thrown an IllegalArgumentException");
288 } catch (IllegalArgumentException e) {
289 }
290 }
291
292 protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
293
294 if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
295 fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
296 }
297 while (true) {
298 if (status == rp.getStatusOfPeer(peerId)) {
299 return;
300 }
301 if (zkTimeoutCount < ZK_MAX_COUNT) {
302 LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
303 + ", sleeping and trying again.");
304 Thread.sleep(ZK_SLEEP_INTERVAL);
305 } else {
306 fail("Timed out waiting for ConnectedPeerStatus to be " + status);
307 }
308 }
309 }
310
311 protected void assertNumberOfPeers(int total) {
312 assertEquals(total, rp.getAllPeerConfigs().size());
313 assertEquals(total, rp.getAllPeerIds().size());
314 assertEquals(total, rp.getAllPeerIds().size());
315 }
316
317
318
319
320
321 protected void populateQueues() throws ReplicationException {
322 rq1.addLog("trash", "trash");
323 rq1.removeQueue("trash");
324
325 rq2.addLog("qId1", "trash");
326 rq2.removeLog("qId1", "trash");
327
328 for (int i = 1; i < 6; i++) {
329 for (int j = 0; j < i; j++) {
330 rq3.addLog("qId" + i, "filename" + j);
331 }
332
333 rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
334 }
335 }
336 }
337