1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HRegionInfo;
31 import org.apache.hadoop.hbase.testclassification.MediumTests;
32 import org.apache.hadoop.hbase.MiniHBaseCluster;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.ServerName;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.regionserver.HRegion;
40 import org.apache.hadoop.hbase.regionserver.Region;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.JVMClusterUtil;
43 import org.junit.AfterClass;
44 import org.junit.BeforeClass;
45 import org.junit.Ignore;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 @Category(MediumTests.class)
50 public class TestAssignmentListener {
51 private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
52
53 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54
55 static class DummyListener {
56 protected AtomicInteger modified = new AtomicInteger(0);
57
58 public void awaitModifications(int count) throws InterruptedException {
59 while (!modified.compareAndSet(count, 0)) {
60 Thread.sleep(100);
61 }
62 }
63 }
64
65 static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
66 private AtomicInteger closeCount = new AtomicInteger(0);
67 private AtomicInteger openCount = new AtomicInteger(0);
68
69 public DummyAssignmentListener() {
70 }
71
72 @Override
73 public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
74 LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
75 openCount.incrementAndGet();
76 modified.incrementAndGet();
77 }
78
79 @Override
80 public void regionClosed(final HRegionInfo regionInfo) {
81 LOG.info("Assignment close region=" + regionInfo);
82 closeCount.incrementAndGet();
83 modified.incrementAndGet();
84 }
85
86 public void reset() {
87 openCount.set(0);
88 closeCount.set(0);
89 }
90
91 public int getLoadCount() {
92 return openCount.get();
93 }
94
95 public int getCloseCount() {
96 return closeCount.get();
97 }
98 }
99
100 static class DummyServerListener extends DummyListener implements ServerListener {
101 private AtomicInteger removedCount = new AtomicInteger(0);
102 private AtomicInteger addedCount = new AtomicInteger(0);
103
104 public DummyServerListener() {
105 }
106
107 @Override
108 public void serverAdded(final ServerName serverName) {
109 LOG.info("Server added " + serverName);
110 addedCount.incrementAndGet();
111 modified.incrementAndGet();
112 }
113
114 @Override
115 public void serverRemoved(final ServerName serverName) {
116 LOG.info("Server removed " + serverName);
117 removedCount.incrementAndGet();
118 modified.incrementAndGet();
119 }
120
121 public void reset() {
122 addedCount.set(0);
123 removedCount.set(0);
124 }
125
126 public int getAddedCount() {
127 return addedCount.get();
128 }
129
130 public int getRemovedCount() {
131 return removedCount.get();
132 }
133 }
134
135 @BeforeClass
136 public static void beforeAllTests() throws Exception {
137 TEST_UTIL.startMiniCluster(2);
138 }
139
140 @AfterClass
141 public static void afterAllTests() throws Exception {
142 TEST_UTIL.shutdownMiniCluster();
143 }
144
145 @Test(timeout=60000)
146 public void testServerListener() throws IOException, InterruptedException {
147 ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
148
149 DummyServerListener listener = new DummyServerListener();
150 serverManager.registerListener(listener);
151 try {
152 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
153
154
155 miniCluster.startRegionServer();
156 listener.awaitModifications(1);
157 assertEquals(1, listener.getAddedCount());
158 assertEquals(0, listener.getRemovedCount());
159
160
161 listener.reset();
162 miniCluster.startRegionServer();
163 listener.awaitModifications(1);
164 assertEquals(1, listener.getAddedCount());
165 assertEquals(0, listener.getRemovedCount());
166
167 int nrs = miniCluster.getRegionServerThreads().size();
168
169
170 listener.reset();
171 miniCluster.stopRegionServer(nrs - 1);
172 listener.awaitModifications(1);
173 assertEquals(0, listener.getAddedCount());
174 assertEquals(1, listener.getRemovedCount());
175
176
177 listener.reset();
178 miniCluster.stopRegionServer(nrs - 2);
179 listener.awaitModifications(1);
180 assertEquals(0, listener.getAddedCount());
181 assertEquals(1, listener.getRemovedCount());
182 } finally {
183 serverManager.unregisterListener(listener);
184 }
185 }
186
187 @Ignore @Test(timeout=60000)
188 public void testAssignmentListener() throws IOException, InterruptedException {
189 AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
190 Admin admin = TEST_UTIL.getHBaseAdmin();
191
192 DummyAssignmentListener listener = new DummyAssignmentListener();
193 am.registerListener(listener);
194 try {
195 final String TABLE_NAME_STR = "testtb";
196 final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
197 final byte[] FAMILY = Bytes.toBytes("cf");
198
199
200 LOG.info("Create Table");
201 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
202 listener.awaitModifications(1);
203 assertEquals(1, listener.getLoadCount());
204 assertEquals(0, listener.getCloseCount());
205
206
207 Table table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
208 try {
209 for (int i = 0; i < 10; ++i) {
210 byte[] key = Bytes.toBytes("row-" + i);
211 Put put = new Put(key);
212 put.add(FAMILY, null, key);
213 table.put(put);
214 }
215 } finally {
216 table.close();
217 }
218
219
220 LOG.info("Split Table");
221 listener.reset();
222 admin.split(TABLE_NAME, Bytes.toBytes("row-3"));
223 listener.awaitModifications(3);
224 assertEquals(2, listener.getLoadCount());
225 assertEquals(1, listener.getCloseCount());
226
227
228 MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
229 int mergeable = 0;
230 while (mergeable < 2) {
231 Thread.sleep(100);
232 admin.majorCompact(TABLE_NAME);
233 mergeable = 0;
234 for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
235 for (Region region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
236 mergeable += ((HRegion)region).isMergeable() ? 1 : 0;
237 }
238 }
239 }
240
241
242 LOG.info("Merge Regions");
243 listener.reset();
244 List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
245 assertEquals(2, regions.size());
246 admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
247 regions.get(1).getEncodedNameAsBytes(), true);
248 listener.awaitModifications(3);
249 assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
250 assertEquals(1, listener.getLoadCount());
251 assertEquals(2, listener.getCloseCount());
252
253
254 LOG.info("Drop Table");
255 listener.reset();
256 TEST_UTIL.deleteTable(TABLE_NAME);
257 listener.awaitModifications(1);
258 assertEquals(0, listener.getLoadCount());
259 assertEquals(1, listener.getCloseCount());
260 } finally {
261 am.unregisterListener(listener);
262 }
263 }
264 }