1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import static org.junit.Assert.assertEquals;
22
23 import java.io.IOException;
24 import java.util.List;
25 import java.util.NavigableSet;
26 import java.util.Set;
27 import java.util.TreeSet;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.testclassification.LargeTests;
37 import org.apache.hadoop.hbase.MiniHBaseCluster;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.RegionLocator;
42 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
45 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
46 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48 import org.apache.zookeeper.KeeperException;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
51
52
53
54
55 @Category(LargeTests.class)
56 public class TestRollingRestart {
57 private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);
58
59 @Test (timeout=500000)
60 public void testBasicRollingRestart() throws Exception {
61
62
63 final int NUM_MASTERS = 2;
64 final int NUM_RS = 3;
65 final int NUM_REGIONS_TO_CREATE = 20;
66
67 int expectedNumRS = 3;
68
69
70 log("Starting cluster");
71 Configuration conf = HBaseConfiguration.create();
72 conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
73 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
74 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
75 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
76 log("Waiting for active/ready master");
77 cluster.waitForActiveAndReadyMaster();
78 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart",
79 null);
80 HMaster master = cluster.getMaster();
81
82
83 TableName table = TableName.valueOf("tableRestart");
84 byte [] family = Bytes.toBytes("family");
85 log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
86 HTable ht = TEST_UTIL.createMultiRegionTable(table, family, NUM_REGIONS_TO_CREATE);
87 int numRegions = -1;
88 try (RegionLocator r = ht.getRegionLocator()) {
89 numRegions = r.getStartKeys().length;
90 }
91 numRegions += 1;
92 log("Waiting for no more RIT\n");
93 blockUntilNoRIT(zkw, master);
94 log("Disabling table\n");
95 TEST_UTIL.getHBaseAdmin().disableTable(table);
96 log("Waiting for no more RIT\n");
97 blockUntilNoRIT(zkw, master);
98 NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
99 log("Verifying only catalog and namespace regions are assigned\n");
100 if (regions.size() != 2) {
101 for (String oregion : regions) log("Region still online: " + oregion);
102 }
103 assertEquals(2, regions.size());
104 log("Enabling table\n");
105 TEST_UTIL.getHBaseAdmin().enableTable(table);
106 log("Waiting for no more RIT\n");
107 blockUntilNoRIT(zkw, master);
108 log("Verifying there are " + numRegions + " assigned on cluster\n");
109 regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
110 assertRegionsAssigned(cluster, regions);
111 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
112
113
114 log("Adding a fourth RS");
115 RegionServerThread restarted = cluster.startRegionServer();
116 expectedNumRS++;
117 restarted.waitForServerOnline();
118 log("Additional RS is online");
119 log("Waiting for no more RIT");
120 blockUntilNoRIT(zkw, master);
121 log("Verifying there are " + numRegions + " assigned on cluster");
122 assertRegionsAssigned(cluster, regions);
123 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
124
125
126 List<MasterThread> masterThreads = cluster.getMasterThreads();
127 MasterThread activeMaster = null;
128 MasterThread backupMaster = null;
129 assertEquals(2, masterThreads.size());
130 if (masterThreads.get(0).getMaster().isActiveMaster()) {
131 activeMaster = masterThreads.get(0);
132 backupMaster = masterThreads.get(1);
133 } else {
134 activeMaster = masterThreads.get(1);
135 backupMaster = masterThreads.get(0);
136 }
137
138
139 log("Stopping backup master\n\n");
140 backupMaster.getMaster().stop("Stop of backup during rolling restart");
141 cluster.hbaseCluster.waitOnMaster(backupMaster);
142
143
144 log("Stopping primary master\n\n");
145 activeMaster.getMaster().stop("Stop of active during rolling restart");
146 cluster.hbaseCluster.waitOnMaster(activeMaster);
147
148
149 log("Restarting primary master\n\n");
150 activeMaster = cluster.startMaster();
151 cluster.waitForActiveAndReadyMaster();
152 master = activeMaster.getMaster();
153
154
155 log("Restarting backup master\n\n");
156 backupMaster = cluster.startMaster();
157
158 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
159
160
161
162
163 List<RegionServerThread> regionServers =
164 cluster.getLiveRegionServerThreads();
165 int num = 1;
166 int total = regionServers.size();
167 for (RegionServerThread rst : regionServers) {
168 ServerName serverName = rst.getRegionServer().getServerName();
169 log("Stopping region server " + num + " of " + total + " [ " +
170 serverName + "]");
171 rst.getRegionServer().stop("Stopping RS during rolling restart");
172 cluster.hbaseCluster.waitOnRegionServer(rst);
173 log("Waiting for RS shutdown to be handled by master");
174 waitForRSShutdownToStartAndFinish(activeMaster, serverName);
175 log("RS shutdown done, waiting for no more RIT");
176 blockUntilNoRIT(zkw, master);
177 log("Verifying there are " + numRegions + " assigned on cluster");
178 assertRegionsAssigned(cluster, regions);
179 expectedNumRS--;
180 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
181 log("Restarting region server " + num + " of " + total);
182 restarted = cluster.startRegionServer();
183 restarted.waitForServerOnline();
184 expectedNumRS++;
185 log("Region server " + num + " is back online");
186 log("Waiting for no more RIT");
187 blockUntilNoRIT(zkw, master);
188 log("Verifying there are " + numRegions + " assigned on cluster");
189 assertRegionsAssigned(cluster, regions);
190 assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
191 num++;
192 }
193 Thread.sleep(1000);
194 assertRegionsAssigned(cluster, regions);
195
196
197
198 ht.close();
199
200 TEST_UTIL.shutdownMiniCluster();
201 }
202
203 private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
204 throws KeeperException, InterruptedException {
205 ZKAssign.blockUntilNoRIT(zkw);
206 master.assignmentManager.waitUntilNoRegionsInTransition(60000);
207 }
208
209 private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
210 ServerName serverName) throws InterruptedException {
211 ServerManager sm = activeMaster.getMaster().getServerManager();
212
213 while (!sm.getDeadServers().isDeadServer(serverName)) {
214 log("Waiting for [" + serverName + "] to be listed as dead in master");
215 Thread.sleep(1);
216 }
217 log("Server [" + serverName + "] marked as dead, waiting for it to " +
218 "finish dead processing");
219 while (sm.areDeadServersInProgress()) {
220 log("Server [" + serverName + "] still being processed, waiting");
221 Thread.sleep(100);
222 }
223 log("Server [" + serverName + "] done with server shutdown processing");
224 }
225
226 private void log(String msg) {
227 LOG.debug("\n\nTRR: " + msg + "\n");
228 }
229
230 private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
231 int numFound = 0;
232 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
233 numFound += rst.getRegionServer().getNumberOfOnlineRegions();
234 }
235 for (MasterThread mt : cluster.getMasterThreads()) {
236 numFound += mt.getMaster().getNumberOfOnlineRegions();
237 }
238 return numFound;
239 }
240
241 private void assertRegionsAssigned(MiniHBaseCluster cluster,
242 Set<String> expectedRegions) throws IOException {
243 int numFound = getNumberOfOnlineRegions(cluster);
244 if (expectedRegions.size() > numFound) {
245 log("Expected to find " + expectedRegions.size() + " but only found"
246 + " " + numFound);
247 NavigableSet<String> foundRegions =
248 HBaseTestingUtility.getAllOnlineRegions(cluster);
249 for (String region : expectedRegions) {
250 if (!foundRegions.contains(region)) {
251 log("Missing region: " + region);
252 }
253 }
254 assertEquals(expectedRegions.size(), numFound);
255 } else if (expectedRegions.size() < numFound) {
256 int doubled = numFound - expectedRegions.size();
257 log("Expected to find " + expectedRegions.size() + " but found"
258 + " " + numFound + " (" + doubled + " double assignments?)");
259 NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
260 for (String region : doubleRegions) {
261 log("Region is double assigned: " + region);
262 }
263 assertEquals(expectedRegions.size(), numFound);
264 } else {
265 log("Success! Found expected number of " + numFound + " regions");
266 }
267 }
268
269 private NavigableSet<String> getDoubleAssignedRegions(
270 MiniHBaseCluster cluster) throws IOException {
271 NavigableSet<String> online = new TreeSet<String>();
272 NavigableSet<String> doubled = new TreeSet<String>();
273 for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
274 for (HRegionInfo region : ProtobufUtil.getOnlineRegions(
275 rst.getRegionServer().getRSRpcServices())) {
276 if(!online.add(region.getRegionNameAsString())) {
277 doubled.add(region.getRegionNameAsString());
278 }
279 }
280 }
281 return doubled;
282 }
283
284
285 }
286