1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.fail;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.testclassification.LargeTests;
28 import org.apache.hadoop.hbase.MiniHBaseCluster;
29 import org.apache.hadoop.hbase.client.Get;
30 import org.apache.hadoop.hbase.client.Put;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.ResultScanner;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.JVMClusterUtil;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39
40 import java.io.IOException;
41
42
43
44
45 @Category(LargeTests.class)
46 public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
47
48 private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
49
50
51
52
53 @Before
54 public void setUp() throws Exception {
55
56
57 for (JVMClusterUtil.RegionServerThread r :
58 utility1.getHBaseCluster().getRegionServerThreads()) {
59 utility1.getHBaseAdmin().rollWALWriter(r.getRegionServer().getServerName());
60 }
61 utility1.deleteTableData(tableName);
62
63
64
65
66
67 Scan scan = new Scan();
68 int lastCount = 0;
69 for (int i = 0; i < NB_RETRIES; i++) {
70 if (i == NB_RETRIES - 1) {
71 fail("Waited too much time for truncate");
72 }
73 ResultScanner scanner = htable2.getScanner(scan);
74 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
75 scanner.close();
76 if (res.length != 0) {
77 if (res.length < lastCount) {
78 i--;
79 }
80 lastCount = res.length;
81 LOG.info("Still got " + res.length + " rows");
82 Thread.sleep(SLEEP_TIME);
83 } else {
84 break;
85 }
86 }
87 }
88
89 @Test(timeout = 300000)
90 public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
91
92 LOG.info("testSimplePutDelete");
93 MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
94
95 doPutTest(Bytes.toBytes(1));
96
97 int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
98 peerCluster.stopRegionServer(rsToStop);
99 peerCluster.waitOnRegionServer(rsToStop);
100
101
102 assertEquals(1, peerCluster.getRegionServerThreads().size());
103
104 doPutTest(Bytes.toBytes(2));
105
106 peerCluster.startRegionServer();
107
108
109 assertEquals(2, peerCluster.getRegionServerThreads().size());
110
111 doPutTest(Bytes.toBytes(3));
112
113 }
114
115 private void doPutTest(byte[] row) throws IOException, InterruptedException {
116 Put put = new Put(row);
117 put.add(famName, row, row);
118
119 if (htable1 == null) {
120 htable1 = utility1.getConnection().getTable(tableName);
121 }
122
123 htable1.put(put);
124
125 Get get = new Get(row);
126 for (int i = 0; i < NB_RETRIES; i++) {
127 if (i == NB_RETRIES - 1) {
128 fail("Waited too much time for put replication");
129 }
130 Result res = htable2.get(get);
131 if (res.size() == 0) {
132 LOG.info("Row not available");
133 Thread.sleep(SLEEP_TIME);
134 } else {
135 assertArrayEquals(res.value(), row);
136 break;
137 }
138 }
139
140 }
141
142 }