1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.fail;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.*;
36 import org.apache.hadoop.hbase.client.Delete;
37 import org.apache.hadoop.hbase.client.Get;
38 import org.apache.hadoop.hbase.client.Admin;
39 import org.apache.hadoop.hbase.client.HBaseAdmin;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.Result;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
45 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
46 import org.apache.hadoop.hbase.regionserver.HRegion;
47 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
51 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
52 import org.junit.BeforeClass;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55
56 @Category(LargeTests.class)
57 public class TestMultiSlaveReplication {
58
59 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
60
61 private static Configuration conf1;
62 private static Configuration conf2;
63 private static Configuration conf3;
64
65 private static HBaseTestingUtility utility1;
66 private static HBaseTestingUtility utility2;
67 private static HBaseTestingUtility utility3;
68 private static final long SLEEP_TIME = 500;
69 private static final int NB_RETRIES = 100;
70
71 private static final TableName tableName = TableName.valueOf("test");
72 private static final byte[] famName = Bytes.toBytes("f");
73 private static final byte[] row = Bytes.toBytes("row");
74 private static final byte[] row1 = Bytes.toBytes("row1");
75 private static final byte[] row2 = Bytes.toBytes("row2");
76 private static final byte[] row3 = Bytes.toBytes("row3");
77 private static final byte[] noRepfamName = Bytes.toBytes("norep");
78
79 private static HTableDescriptor table;
80
81 @BeforeClass
82 public static void setUpBeforeClass() throws Exception {
83 conf1 = HBaseConfiguration.create();
84 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
85
86
87 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
88 conf1.setInt("replication.source.size.capacity", 1024);
89 conf1.setLong("replication.source.sleepforretries", 100);
90 conf1.setInt("hbase.regionserver.maxlogs", 10);
91 conf1.setLong("hbase.master.logcleaner.ttl", 10);
92 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
93 conf1.setBoolean("dfs.support.append", true);
94 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
95 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
96 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
97
98 utility1 = new HBaseTestingUtility(conf1);
99 utility1.startMiniZKCluster();
100 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
101 new ZooKeeperWatcher(conf1, "cluster1", null, true);
102
103 conf2 = new Configuration(conf1);
104 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
105
106 conf3 = new Configuration(conf1);
107 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
108
109 utility2 = new HBaseTestingUtility(conf2);
110 utility2.setZkCluster(miniZK);
111 new ZooKeeperWatcher(conf2, "cluster2", null, true);
112
113 utility3 = new HBaseTestingUtility(conf3);
114 utility3.setZkCluster(miniZK);
115 new ZooKeeperWatcher(conf3, "cluster3", null, true);
116
117 table = new HTableDescriptor(tableName);
118 HColumnDescriptor fam = new HColumnDescriptor(famName);
119 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
120 table.addFamily(fam);
121 fam = new HColumnDescriptor(noRepfamName);
122 table.addFamily(fam);
123 }
124
125 @Test(timeout=300000)
126 public void testMultiSlaveReplication() throws Exception {
127 LOG.info("testCyclicReplication");
128 MiniHBaseCluster master = utility1.startMiniCluster();
129 utility2.startMiniCluster();
130 utility3.startMiniCluster();
131 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
132
133 new HBaseAdmin(conf1).createTable(table);
134 new HBaseAdmin(conf2).createTable(table);
135 new HBaseAdmin(conf3).createTable(table);
136 Table htable1 = new HTable(conf1, tableName);
137 htable1.setWriteBufferSize(1024);
138 Table htable2 = new HTable(conf2, tableName);
139 htable2.setWriteBufferSize(1024);
140 Table htable3 = new HTable(conf3, tableName);
141 htable3.setWriteBufferSize(1024);
142
143 admin1.addPeer("1", utility2.getClusterKey());
144
145
146 putAndWait(row, famName, htable1, htable2);
147 deleteAndWait(row, htable1, htable2);
148
149 checkRow(row,0,htable3);
150
151 putAndWait(row2, famName, htable1, htable2);
152
153
154 rollWALAndWait(utility1, htable1.getName(), row2);
155
156
157 putAndWait(row3, famName, htable1, htable2);
158
159 admin1.addPeer("2", utility3.getClusterKey());
160
161
162 putAndWait(row1, famName, htable1, htable2, htable3);
163
164 deleteAndWait(row1, htable1, htable2, htable3);
165
166
167
168 checkRow(row2,0,htable3);
169
170
171
172 checkRow(row3,1,htable3);
173
174 Put p = new Put(row);
175 p.add(famName, row, row);
176 htable1.put(p);
177
178 rollWALAndWait(utility1, htable1.getName(), row);
179
180
181
182 deleteAndWait(row2, htable1, htable2, htable3);
183
184
185 checkRow(row, 1, htable2);
186
187
188 checkWithWait(row, 1, htable3);
189
190
191 deleteAndWait(row, htable1, htable2, htable3);
192 deleteAndWait(row3, htable1, htable2, htable3);
193
194 utility3.shutdownMiniCluster();
195 utility2.shutdownMiniCluster();
196 utility1.shutdownMiniCluster();
197 }
198
199 private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
200 final byte[] row) throws IOException {
201 final Admin admin = utility.getHBaseAdmin();
202 final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
203
204
205 HRegion region = null;
206 for (HRegion candidate : cluster.getRegions(table)) {
207 if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
208 region = candidate;
209 break;
210 }
211 }
212 assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
213
214 final CountDownLatch latch = new CountDownLatch(1);
215
216
217 final WALActionsListener listener = new WALActionsListener.Base() {
218 @Override
219 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
220 latch.countDown();
221 }
222 };
223 region.getWAL().registerWALActionsListener(listener);
224
225
226 admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
227 region.getRegionInfo().getRegionName()));
228
229
230 try {
231 latch.await();
232 } catch (InterruptedException exception) {
233 LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
234 "replication tests fail, it's probably because we should still be waiting.");
235 Thread.currentThread().interrupt();
236 }
237 region.getWAL().unregisterWALActionsListener(listener);
238 }
239
240
241 private void checkWithWait(byte[] row, int count, Table table) throws Exception {
242 Get get = new Get(row);
243 for (int i = 0; i < NB_RETRIES; i++) {
244 if (i == NB_RETRIES - 1) {
245 fail("Waited too much time while getting the row.");
246 }
247 boolean rowReplicated = false;
248 Result res = table.get(get);
249 if (res.size() >= 1) {
250 LOG.info("Row is replicated");
251 rowReplicated = true;
252 assertEquals("Table '" + table + "' did not have the expected number of results.",
253 count, res.size());
254 break;
255 }
256 if (rowReplicated) {
257 break;
258 } else {
259 Thread.sleep(SLEEP_TIME);
260 }
261 }
262 }
263
264 private void checkRow(byte[] row, int count, Table... tables) throws IOException {
265 Get get = new Get(row);
266 for (Table table : tables) {
267 Result res = table.get(get);
268 assertEquals("Table '" + table + "' did not have the expected number of results.",
269 count, res.size());
270 }
271 }
272
273 private void deleteAndWait(byte[] row, Table source, Table... targets)
274 throws Exception {
275 Delete del = new Delete(row);
276 source.delete(del);
277
278 Get get = new Get(row);
279 for (int i = 0; i < NB_RETRIES; i++) {
280 if (i==NB_RETRIES-1) {
281 fail("Waited too much time for del replication");
282 }
283 boolean removedFromAll = true;
284 for (Table target : targets) {
285 Result res = target.get(get);
286 if (res.size() >= 1) {
287 LOG.info("Row not deleted");
288 removedFromAll = false;
289 break;
290 }
291 }
292 if (removedFromAll) {
293 break;
294 } else {
295 Thread.sleep(SLEEP_TIME);
296 }
297 }
298 }
299
300 private void putAndWait(byte[] row, byte[] fam, Table source, Table... targets)
301 throws Exception {
302 Put put = new Put(row);
303 put.add(fam, row, row);
304 source.put(put);
305
306 Get get = new Get(row);
307 for (int i = 0; i < NB_RETRIES; i++) {
308 if (i==NB_RETRIES-1) {
309 fail("Waited too much time for put replication");
310 }
311 boolean replicatedToAll = true;
312 for (Table target : targets) {
313 Result res = target.get(get);
314 if (res.size() == 0) {
315 LOG.info("Row not available");
316 replicatedToAll = false;
317 break;
318 } else {
319 assertArrayEquals(res.value(), row);
320 }
321 }
322 if (replicatedToAll) {
323 break;
324 } else {
325 Thread.sleep(SLEEP_TIME);
326 }
327 }
328 }
329
330 }
331