View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import static org.junit.Assert.*;
22  
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.SortedMap;
26  import java.util.SortedSet;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.ServerName;
32  import org.apache.hadoop.hbase.util.Pair;
33  import org.apache.hadoop.hbase.zookeeper.ZKConfig;
34  import org.apache.zookeeper.KeeperException;
35  import org.junit.Before;
36  import org.junit.Test;
37  
38  /**
39   * White box testing for replication state interfaces. Implementations should extend this class, and
40   * initialize the interfaces properly.
41   */
42  public abstract class TestReplicationStateBasic {
43  
44    protected ReplicationQueues rq1;
45    protected ReplicationQueues rq2;
46    protected ReplicationQueues rq3;
47    protected ReplicationQueuesClient rqc;
48    protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
49    protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
50    protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
51    protected ReplicationPeers rp;
52    protected static final String ID_ONE = "1";
53    protected static final String ID_TWO = "2";
54    protected static String KEY_ONE;
55    protected static String KEY_TWO;
56  
57    // For testing when we try to replicate to ourself
58    protected String OUR_ID = "3";
59    protected String OUR_KEY;
60  
61    protected static int zkTimeoutCount;
62    protected static final int ZK_MAX_COUNT = 300;
63    protected static final int ZK_SLEEP_INTERVAL = 100; // millis
64  
65    private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
66  
67    @Before
68    public void setUp() {
69      zkTimeoutCount = 0;
70    }
71  
72    @Test
73    public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
74      rqc.init();
75      // Test methods with empty state
76      assertEquals(0, rqc.getListOfReplicators().size());
77      assertNull(rqc.getLogsInQueue(server1, "qId1"));
78      assertNull(rqc.getAllQueues(server1));
79  
80      /*
81       * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
82       * server2: zero queues
83       */
84      rq1.init(server1);
85      rq2.init(server2);
86      rq1.addLog("qId1", "trash");
87      rq1.removeLog("qId1", "trash");
88      rq1.addLog("qId2", "filename1");
89      rq1.addLog("qId3", "filename2");
90      rq1.addLog("qId3", "filename3");
91      rq2.addLog("trash", "trash");
92      rq2.removeQueue("trash");
93  
94      List<String> reps = rqc.getListOfReplicators();
95      assertEquals(2, reps.size());
96      assertTrue(server1, reps.contains(server1));
97      assertTrue(server2, reps.contains(server2));
98  
99      assertNull(rqc.getLogsInQueue("bogus", "bogus"));
100     assertNull(rqc.getLogsInQueue(server1, "bogus"));
101     assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
102     assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
103     assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
104 
105     assertNull(rqc.getAllQueues("bogus"));
106     assertEquals(0, rqc.getAllQueues(server2).size());
107     List<String> list = rqc.getAllQueues(server1);
108     assertEquals(3, list.size());
109     assertTrue(list.contains("qId2"));
110     assertTrue(list.contains("qId3"));
111   }
112 
113   @Test
114   public void testReplicationQueues() throws ReplicationException {
115     rq1.init(server1);
116     rq2.init(server2);
117     rq3.init(server3);
118     //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
119     rp.init();
120 
121     // 3 replicators should exist
122     assertEquals(3, rq1.getListOfReplicators().size());
123     rq1.removeQueue("bogus");
124     rq1.removeLog("bogus", "bogus");
125     rq1.removeAllQueues();
126     assertNull(rq1.getAllQueues());
127     assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
128     assertNull(rq1.getLogsInQueue("bogus"));
129     assertEquals(0, rq1.claimQueues(ServerName.valueOf("bogus", 1234, -1L).toString()).size());
130 
131     rq1.setLogPosition("bogus", "bogus", 5L);
132 
133     populateQueues();
134 
135     assertEquals(3, rq1.getListOfReplicators().size());
136     assertEquals(0, rq2.getLogsInQueue("qId1").size());
137     assertEquals(5, rq3.getLogsInQueue("qId5").size());
138     assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
139     rq3.setLogPosition("qId5", "filename4", 354L);
140     assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
141 
142     assertEquals(5, rq3.getLogsInQueue("qId5").size());
143     assertEquals(0, rq2.getLogsInQueue("qId1").size());
144     assertEquals(0, rq1.getAllQueues().size());
145     assertEquals(1, rq2.getAllQueues().size());
146     assertEquals(5, rq3.getAllQueues().size());
147 
148     assertEquals(0, rq3.claimQueues(server1).size());
149     assertEquals(2, rq3.getListOfReplicators().size());
150 
151     SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3);
152     assertEquals(5, queues.size());
153     assertEquals(1, rq2.getListOfReplicators().size());
154 
155     // Try to claim our own queues
156     assertEquals(0, rq2.claimQueues(server2).size());
157 
158     assertEquals(6, rq2.getAllQueues().size());
159 
160     rq2.removeAllQueues();
161 
162     assertEquals(0, rq2.getListOfReplicators().size());
163   }
164 
165   @Test
166   public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
167     rp.init();
168     rq1.init(server1);
169     rqc.init();
170 
171     List<Pair<Path, Path>> files1 = new ArrayList<>(3);
172     files1.add(new Pair<Path, Path>(null, new Path("file_1")));
173     files1.add(new Pair<Path, Path>(null, new Path("file_2")));
174     files1.add(new Pair<Path, Path>(null, new Path("file_3")));
175     assertNull(rqc.getReplicableHFiles(ID_ONE));
176     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
177     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
178     rq1.addHFileRefs(ID_ONE, files1);
179     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
180     assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
181     List<String> hfiles2 = new ArrayList<>();
182     for (Pair<Path, Path> p : files1) {
183       hfiles2.add(p.getSecond().getName());
184     }
185     String removedString = hfiles2.remove(0);
186     rq1.removeHFileRefs(ID_ONE, hfiles2);
187     assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
188     hfiles2 = new ArrayList<>(1);
189     hfiles2.add(removedString);
190     rq1.removeHFileRefs(ID_ONE, hfiles2);
191     assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
192     rp.removePeer(ID_ONE);
193   }
194 
195   @Test
196   public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
197     rq1.init(server1);
198     rqc.init();
199 
200     rp.init();
201     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
202     rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
203 
204     List<Pair<Path, Path>> files1 = new ArrayList<>(3);
205     files1.add(new Pair<Path, Path>(null, new Path("file_1")));
206     files1.add(new Pair<Path, Path>(null, new Path("file_2")));
207     files1.add(new Pair<Path, Path>(null, new Path("file_3")));
208     rq1.addHFileRefs(ID_ONE, files1);
209     rq1.addHFileRefs(ID_TWO, files1);
210     assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
211     assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
212     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
213 
214     rp.removePeer(ID_ONE);
215     assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
216     assertNull(rqc.getReplicableHFiles(ID_ONE));
217     assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
218 
219     rp.removePeer(ID_TWO);
220     assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
221     assertNull(rqc.getReplicableHFiles(ID_TWO));
222   }
223 
224   @Test
225   public void testReplicationPeers() throws Exception {
226     rp.init();
227 
228     // Test methods with non-existent peer ids
229     try {
230       rp.removePeer("bogus");
231       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
232     } catch (IllegalArgumentException e) {
233     }
234     try {
235       rp.enablePeer("bogus");
236       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
237     } catch (IllegalArgumentException e) {
238     }
239     try {
240       rp.disablePeer("bogus");
241       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
242     } catch (IllegalArgumentException e) {
243     }
244     try {
245       rp.getStatusOfPeer("bogus");
246       fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
247     } catch (IllegalArgumentException e) {
248     }
249     assertFalse(rp.peerAdded("bogus"));
250     rp.peerRemoved("bogus");
251 
252     assertNull(rp.getPeerConf("bogus"));
253     assertNumberOfPeers(0);
254 
255     // Add some peers
256     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
257     assertNumberOfPeers(1);
258     rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
259     assertNumberOfPeers(2);
260 
261     // Test methods with a peer that is added but not connected
262     try {
263       rp.getStatusOfPeer(ID_ONE);
264       fail("There are no connected peers, should have thrown an IllegalArgumentException");
265     } catch (IllegalArgumentException e) {
266     }
267     assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
268     rp.removePeer(ID_ONE);
269     rp.peerRemoved(ID_ONE);
270     assertNumberOfPeers(1);
271 
272     // Add one peer
273     rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
274     rp.peerAdded(ID_ONE);
275     assertNumberOfPeers(2);
276     assertTrue(rp.getStatusOfPeer(ID_ONE));
277     rp.disablePeer(ID_ONE);
278     assertConnectedPeerStatus(false, ID_ONE);
279     rp.enablePeer(ID_ONE);
280     assertConnectedPeerStatus(true, ID_ONE);
281 
282     // Disconnect peer
283     rp.peerRemoved(ID_ONE);
284     assertNumberOfPeers(2);
285     try {
286       rp.getStatusOfPeer(ID_ONE);
287       fail("There are no connected peers, should have thrown an IllegalArgumentException");
288     } catch (IllegalArgumentException e) {
289     }
290   }
291 
292   protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
293     // we can first check if the value was changed in the store, if it wasn't then fail right away
294     if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
295       fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
296     }
297     while (true) {
298       if (status == rp.getStatusOfPeer(peerId)) {
299         return;
300       }
301       if (zkTimeoutCount < ZK_MAX_COUNT) {
302         LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
303             + ", sleeping and trying again.");
304         Thread.sleep(ZK_SLEEP_INTERVAL);
305       } else {
306         fail("Timed out waiting for ConnectedPeerStatus to be " + status);
307       }
308     }
309   }
310 
311   protected void assertNumberOfPeers(int total) {
312     assertEquals(total, rp.getAllPeerConfigs().size());
313     assertEquals(total, rp.getAllPeerIds().size());
314     assertEquals(total, rp.getAllPeerIds().size());
315   }
316 
317   /*
318    * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
319    * 3, 4, 5 log files respectively
320    */
321   protected void populateQueues() throws ReplicationException {
322     rq1.addLog("trash", "trash");
323     rq1.removeQueue("trash");
324 
325     rq2.addLog("qId1", "trash");
326     rq2.removeLog("qId1", "trash");
327 
328     for (int i = 1; i < 6; i++) {
329       for (int j = 0; j < i; j++) {
330         rq3.addLog("qId" + i, "filename" + j);
331       }
332       //Add peers for the corresponding queues so they are not orphans
333       rp.addPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("bogus" + i), null);
334     }
335   }
336 }
337