1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.InterruptedIOException;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.OutputStream;
27 import java.io.Reader;
28 import java.net.BindException;
29 import java.net.InetSocketAddress;
30 import java.net.Socket;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Random;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.zookeeper.server.NIOServerCnxnFactory;
42 import org.apache.zookeeper.server.ZooKeeperServer;
43 import org.apache.zookeeper.server.persistence.FileTxnLog;
44
45 import com.google.common.annotations.VisibleForTesting;
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Evolving
54 public class MiniZooKeeperCluster {
55 private static final Log LOG = LogFactory.getLog(MiniZooKeeperCluster.class);
56
57 private static final int TICK_TIME = 2000;
58 private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
59 private int connectionTimeout;
60
61 private boolean started;
62
63
64 private int defaultClientPort = 0;
65
66 private List<NIOServerCnxnFactory> standaloneServerFactoryList;
67 private List<ZooKeeperServer> zooKeeperServers;
68 private List<Integer> clientPortList;
69
70 private int activeZKServerIndex;
71 private int tickTime = 0;
72
73 private Configuration configuration;
74
75 public MiniZooKeeperCluster() {
76 this(new Configuration());
77 }
78
79 public MiniZooKeeperCluster(Configuration configuration) {
80 this.started = false;
81 this.configuration = configuration;
82 activeZKServerIndex = -1;
83 zooKeeperServers = new ArrayList<ZooKeeperServer>();
84 clientPortList = new ArrayList<Integer>();
85 standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
86 connectionTimeout = configuration.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster",
87 DEFAULT_CONNECTION_TIMEOUT);
88 }
89
90
91
92
93
94
95 public void addClientPort(int clientPort) {
96 clientPortList.add(clientPort);
97 }
98
99
100
101
102
103 @VisibleForTesting
104 public List<Integer> getClientPortList() {
105 return clientPortList;
106 }
107
108
109
110
111
112
113 private boolean hasValidClientPortInList(int index) {
114 return (clientPortList.size() > index && clientPortList.get(index) > 0);
115 }
116
117 public void setDefaultClientPort(int clientPort) {
118 if (clientPort <= 0) {
119 throw new IllegalArgumentException("Invalid default ZK client port: "
120 + clientPort);
121 }
122 this.defaultClientPort = clientPort;
123 }
124
125
126
127
128
129
130
131 private int selectClientPort(int seedPort) {
132 int i;
133 int returnClientPort = seedPort + 1;
134 if (returnClientPort == 0) {
135
136
137
138
139 if (defaultClientPort > 0) {
140 returnClientPort = defaultClientPort;
141 } else {
142 returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
143 }
144 }
145
146 while (true) {
147 for (i = 0; i < clientPortList.size(); i++) {
148 if (returnClientPort == clientPortList.get(i)) {
149
150 returnClientPort++;
151 break;
152 }
153 }
154 if (i == clientPortList.size()) {
155 break;
156 }
157 }
158 return returnClientPort;
159 }
160
161 public void setTickTime(int tickTime) {
162 this.tickTime = tickTime;
163 }
164
165 public int getBackupZooKeeperServerNum() {
166 return zooKeeperServers.size()-1;
167 }
168
169 public int getZooKeeperServerNum() {
170 return zooKeeperServers.size();
171 }
172
173
174 private static void setupTestEnv() {
175
176
177
178
179 System.setProperty("zookeeper.preAllocSize", "100");
180 FileTxnLog.setPreallocSize(100 * 1024);
181 }
182
183 public int startup(File baseDir) throws IOException, InterruptedException {
184 int numZooKeeperServers = clientPortList.size();
185 if (numZooKeeperServers == 0) {
186 numZooKeeperServers = 1;
187 }
188 return startup(baseDir, numZooKeeperServers);
189 }
190
191
192
193
194
195
196
197
198
199 public int startup(File baseDir, int numZooKeeperServers) throws IOException,
200 InterruptedException {
201 if (numZooKeeperServers <= 0)
202 return -1;
203
204 setupTestEnv();
205 shutdown();
206
207 int tentativePort = -1;
208 int currentClientPort;
209
210
211 for (int i = 0; i < numZooKeeperServers; i++) {
212 File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
213 createDir(dir);
214 int tickTimeToUse;
215 if (this.tickTime > 0) {
216 tickTimeToUse = this.tickTime;
217 } else {
218 tickTimeToUse = TICK_TIME;
219 }
220
221
222 if (hasValidClientPortInList(i)) {
223 currentClientPort = clientPortList.get(i);
224 } else {
225 tentativePort = selectClientPort(tentativePort);
226 currentClientPort = tentativePort;
227 }
228
229 ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
230 NIOServerCnxnFactory standaloneServerFactory;
231 while (true) {
232 try {
233 standaloneServerFactory = new NIOServerCnxnFactory();
234 standaloneServerFactory.configure(
235 new InetSocketAddress(currentClientPort),
236 configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, 1000));
237 } catch (BindException e) {
238 LOG.debug("Failed binding ZK Server to client port: " +
239 currentClientPort, e);
240
241 if (hasValidClientPortInList(i)) {
242 return -1;
243 }
244
245 tentativePort = selectClientPort(tentativePort);
246 currentClientPort = tentativePort;
247 continue;
248 }
249 break;
250 }
251
252
253 standaloneServerFactory.startup(server);
254
255 if (!waitForServerUp(currentClientPort, connectionTimeout)) {
256 throw new IOException("Waiting for startup of standalone server");
257 }
258
259
260 if (clientPortList.size() <= i) {
261 clientPortList.add(currentClientPort);
262 }
263 else if (clientPortList.get(i) <= 0) {
264 clientPortList.remove(i);
265 clientPortList.add(i, currentClientPort);
266 }
267
268 standaloneServerFactoryList.add(standaloneServerFactory);
269 zooKeeperServers.add(server);
270 }
271
272
273 activeZKServerIndex = 0;
274 started = true;
275 int clientPort = clientPortList.get(activeZKServerIndex);
276 LOG.info("Started MiniZooKeeperCluster and ran successful 'stat' " +
277 "on client port=" + clientPort);
278 return clientPort;
279 }
280
281 private void createDir(File dir) throws IOException {
282 try {
283 if (!dir.exists()) {
284 dir.mkdirs();
285 }
286 } catch (SecurityException e) {
287 throw new IOException("creating dir: " + dir, e);
288 }
289 }
290
291
292
293
294 public void shutdown() throws IOException {
295
296 for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
297 NIOServerCnxnFactory standaloneServerFactory =
298 standaloneServerFactoryList.get(i);
299 int clientPort = clientPortList.get(i);
300
301 standaloneServerFactory.shutdown();
302 if (!waitForServerDown(clientPort, connectionTimeout)) {
303 throw new IOException("Waiting for shutdown of standalone server");
304 }
305 }
306 standaloneServerFactoryList.clear();
307
308 for (ZooKeeperServer zkServer: zooKeeperServers) {
309
310 zkServer.getZKDatabase().close();
311 }
312 zooKeeperServers.clear();
313
314
315 if (started) {
316 started = false;
317 activeZKServerIndex = 0;
318 clientPortList.clear();
319 LOG.info("Shutdown MiniZK cluster with all ZK servers");
320 }
321 }
322
323
324
325
326
327
328 public int killCurrentActiveZooKeeperServer() throws IOException,
329 InterruptedException {
330 if (!started || activeZKServerIndex < 0) {
331 return -1;
332 }
333
334
335 NIOServerCnxnFactory standaloneServerFactory =
336 standaloneServerFactoryList.get(activeZKServerIndex);
337 int clientPort = clientPortList.get(activeZKServerIndex);
338
339 standaloneServerFactory.shutdown();
340 if (!waitForServerDown(clientPort, connectionTimeout)) {
341 throw new IOException("Waiting for shutdown of standalone server");
342 }
343
344 zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
345
346
347 standaloneServerFactoryList.remove(activeZKServerIndex);
348 clientPortList.remove(activeZKServerIndex);
349 zooKeeperServers.remove(activeZKServerIndex);
350 LOG.info("Kill the current active ZK servers in the cluster " +
351 "on client port: " + clientPort);
352
353 if (standaloneServerFactoryList.size() == 0) {
354
355 return -1;
356 }
357 clientPort = clientPortList.get(activeZKServerIndex);
358 LOG.info("Activate a backup zk server in the cluster " +
359 "on client port: " + clientPort);
360
361 return clientPort;
362 }
363
364
365
366
367
368
369 public void killOneBackupZooKeeperServer() throws IOException,
370 InterruptedException {
371 if (!started || activeZKServerIndex < 0 ||
372 standaloneServerFactoryList.size() <= 1) {
373 return ;
374 }
375
376 int backupZKServerIndex = activeZKServerIndex+1;
377
378 NIOServerCnxnFactory standaloneServerFactory =
379 standaloneServerFactoryList.get(backupZKServerIndex);
380 int clientPort = clientPortList.get(backupZKServerIndex);
381
382 standaloneServerFactory.shutdown();
383 if (!waitForServerDown(clientPort, connectionTimeout)) {
384 throw new IOException("Waiting for shutdown of standalone server");
385 }
386
387 zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
388
389
390 standaloneServerFactoryList.remove(backupZKServerIndex);
391 clientPortList.remove(backupZKServerIndex);
392 zooKeeperServers.remove(backupZKServerIndex);
393 LOG.info("Kill one backup ZK servers in the cluster " +
394 "on client port: " + clientPort);
395 }
396
397
398 private static boolean waitForServerDown(int port, long timeout) throws IOException {
399 long start = System.currentTimeMillis();
400 while (true) {
401 try {
402 Socket sock = new Socket("localhost", port);
403 try {
404 OutputStream outstream = sock.getOutputStream();
405 outstream.write("stat".getBytes());
406 outstream.flush();
407 } finally {
408 sock.close();
409 }
410 } catch (IOException e) {
411 return true;
412 }
413
414 if (System.currentTimeMillis() > start + timeout) {
415 break;
416 }
417 try {
418 Thread.sleep(250);
419 } catch (InterruptedException e) {
420 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
421 }
422 }
423 return false;
424 }
425
426
427 private static boolean waitForServerUp(int port, long timeout) throws IOException {
428 long start = System.currentTimeMillis();
429 while (true) {
430 try {
431 Socket sock = new Socket("localhost", port);
432 BufferedReader reader = null;
433 try {
434 OutputStream outstream = sock.getOutputStream();
435 outstream.write("stat".getBytes());
436 outstream.flush();
437
438 Reader isr = new InputStreamReader(sock.getInputStream());
439 reader = new BufferedReader(isr);
440 String line = reader.readLine();
441 if (line != null && line.startsWith("Zookeeper version:")) {
442 return true;
443 }
444 } finally {
445 sock.close();
446 if (reader != null) {
447 reader.close();
448 }
449 }
450 } catch (IOException e) {
451
452 LOG.info("server localhost:" + port + " not up " + e);
453 }
454
455 if (System.currentTimeMillis() > start + timeout) {
456 break;
457 }
458 try {
459 Thread.sleep(250);
460 } catch (InterruptedException e) {
461 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
462 }
463 }
464 return false;
465 }
466
467 public int getClientPort() {
468 return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
469 : clientPortList.get(activeZKServerIndex);
470 }
471 }