1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.zookeeper;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.CountDownLatch;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.Abortable;
35 import org.apache.hadoop.hbase.AuthUtil;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionInfo;
38 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.security.Superusers;
41 import org.apache.hadoop.security.UserGroupInformation;
42 import org.apache.zookeeper.KeeperException;
43 import org.apache.zookeeper.WatchedEvent;
44 import org.apache.zookeeper.Watcher;
45 import org.apache.zookeeper.ZooDefs;
46 import org.apache.zookeeper.ZooDefs.Ids;
47 import org.apache.zookeeper.ZooDefs.Perms;
48 import org.apache.zookeeper.data.ACL;
49 import org.apache.zookeeper.data.Id;
50 import org.apache.zookeeper.data.Stat;
51
52
53
54
55
56
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
65 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
66
67
68
69 private String prefix;
70 private String identifier;
71
72
73 private String quorum;
74
75
76 private final RecoverableZooKeeper recoverableZooKeeper;
77
78
79 protected Abortable abortable;
80
81 private boolean aborted = false;
82
83
84 private final List<ZooKeeperListener> listeners =
85 new CopyOnWriteArrayList<ZooKeeperListener>();
86
87
88
89 public CountDownLatch saslLatch = new CountDownLatch(1);
90
91
92
93
94 public String baseZNode;
95
96 private Map<Integer,String> metaReplicaZnodes = new HashMap<Integer, String>();
97
98 public String rsZNode;
99
100 public String drainingZNode;
101
102 private String masterAddressZNode;
103
104 public String backupMasterAddressesZNode;
105
106 public String clusterStateZNode;
107
108 public String assignmentZNode;
109
110 public String tableZNode;
111
112 public String clusterIdZNode;
113
114 public String splitLogZNode;
115
116 public String balancerZNode;
117
118 private String regionNormalizerZNode;
119
120 private String switchZNode;
121
122 public String tableLockZNode;
123
124 public String recoveringRegionsZNode;
125
126 public static String namespaceZNode = "namespace";
127
128 public static String masterMaintZNode = "masterMaintenance";
129
130
131 public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
132 new ArrayList<ACL>() { {
133 add(new ACL(ZooDefs.Perms.READ,ZooDefs.Ids.ANYONE_ID_UNSAFE));
134 add(new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.AUTH_IDS));
135 }};
136
137 public final static String META_ZNODE_PREFIX = "meta-region-server";
138
139 private final Configuration conf;
140
141
142
143
144
145
146
147
148 public ZooKeeperWatcher(Configuration conf, String identifier,
149 Abortable abortable) throws ZooKeeperConnectionException, IOException {
150 this(conf, identifier, abortable, false);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164 public ZooKeeperWatcher(Configuration conf, String identifier,
165 Abortable abortable, boolean canCreateBaseZNode)
166 throws IOException, ZooKeeperConnectionException {
167 this.conf = conf;
168 this.quorum = ZKConfig.getZKQuorumServersString(conf);
169 this.prefix = identifier;
170
171
172 this.identifier = identifier + "0x0";
173 this.abortable = abortable;
174 setNodeNames(conf);
175 PendingWatcher pendingWatcher = new PendingWatcher();
176 this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, pendingWatcher, identifier);
177 pendingWatcher.prepare(this);
178 if (canCreateBaseZNode) {
179 createBaseZNodes();
180 }
181 }
182
183 private void createBaseZNodes() throws ZooKeeperConnectionException {
184 try {
185
186 ZKUtil.createWithParents(this, baseZNode);
187 if (conf.getBoolean("hbase.assignment.usezk", true)) {
188 ZKUtil.createAndFailSilent(this, assignmentZNode);
189 }
190 ZKUtil.createAndFailSilent(this, rsZNode);
191 ZKUtil.createAndFailSilent(this, drainingZNode);
192 ZKUtil.createAndFailSilent(this, tableZNode);
193 ZKUtil.createAndFailSilent(this, splitLogZNode);
194 ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
195 ZKUtil.createAndFailSilent(this, tableLockZNode);
196 ZKUtil.createAndFailSilent(this, recoveringRegionsZNode);
197 ZKUtil.createAndFailSilent(this, masterMaintZNode);
198 } catch (KeeperException e) {
199 throw new ZooKeeperConnectionException(
200 prefix("Unexpected KeeperException creating base node"), e);
201 }
202 }
203
204
205
206 public boolean isClientReadable(String node) {
207
208
209
210 return
211 node.equals(baseZNode) ||
212 isAnyMetaReplicaZnode(node) ||
213 node.equals(getMasterAddressZNode()) ||
214 node.equals(clusterIdZNode)||
215 node.equals(rsZNode) ||
216
217 node.equals(tableZNode) ||
218 node.startsWith(tableZNode + "/");
219 }
220
221
222
223
224
225
226
227 public void checkAndSetZNodeAcls() {
228 if (!ZKUtil.isSecureZooKeeper(getConfiguration())) {
229 return;
230 }
231
232
233
234 try {
235 List<ACL> actualAcls = recoverableZooKeeper.getAcl(baseZNode, new Stat());
236
237 if (!isBaseZnodeAclSetup(actualAcls)) {
238 LOG.info("setting znode ACLs");
239 setZnodeAclsRecursive(baseZNode);
240 }
241 } catch(KeeperException.NoNodeException nne) {
242 return;
243 } catch(InterruptedException ie) {
244 interruptedException(ie);
245 } catch (IOException|KeeperException e) {
246 LOG.warn("Received exception while checking and setting zookeeper ACLs", e);
247 }
248 }
249
250
251
252
253
254
255 private void setZnodeAclsRecursive(String znode) throws KeeperException, InterruptedException {
256 List<String> children = recoverableZooKeeper.getChildren(znode, false);
257
258 for (String child : children) {
259 setZnodeAclsRecursive(ZKUtil.joinZNode(znode, child));
260 }
261 List<ACL> acls = ZKUtil.createACL(this, znode, true);
262 LOG.info("Setting ACLs for znode:" + znode + " , acl:" + acls);
263 recoverableZooKeeper.setAcl(znode, acls, -1);
264 }
265
266
267
268
269
270
271
272 private boolean isBaseZnodeAclSetup(List<ACL> acls) throws IOException {
273 String[] superUsers = conf.getStrings(Superusers.SUPERUSER_CONF_KEY);
274
275 if (superUsers != null && !checkACLForSuperUsers(superUsers, acls)) {
276 return false;
277 }
278
279
280
281 String hbaseUser = UserGroupInformation.getCurrentUser().getShortUserName();
282
283 if (acls.isEmpty()) {
284 return false;
285 }
286
287 for (ACL acl : acls) {
288 int perms = acl.getPerms();
289 Id id = acl.getId();
290
291
292 if (Ids.ANYONE_ID_UNSAFE.equals(id)) {
293 if (perms != Perms.READ) {
294 return false;
295 }
296 } else if (superUsers != null && isSuperUserId(superUsers, id)) {
297 if (perms != Perms.ALL) {
298 return false;
299 }
300 } else if (new Id("sasl", hbaseUser).equals(id)) {
301 if (perms != Perms.ALL) {
302 return false;
303 }
304 } else {
305 return false;
306 }
307 }
308 return true;
309 }
310
311
312
313
314 private boolean checkACLForSuperUsers(String[] superUsers, List<ACL> acls) {
315 for (String user : superUsers) {
316 boolean hasAccess = false;
317
318 if (!user.startsWith(AuthUtil.GROUP_PREFIX)) {
319 for (ACL acl : acls) {
320 if (user.equals(acl.getId().getId()) && acl.getPerms() == Perms.ALL) {
321 hasAccess = true;
322 break;
323 }
324 }
325 if (!hasAccess) {
326 return false;
327 }
328 }
329 }
330 return true;
331 }
332
333
334
335
336 public static boolean isSuperUserId(String[] superUsers, Id id) {
337 for (String user : superUsers) {
338
339 if (!user.startsWith(AuthUtil.GROUP_PREFIX) && new Id("sasl", user).equals(id)) {
340 return true;
341 }
342 }
343 return false;
344 }
345
346 @Override
347 public String toString() {
348 return this.identifier + ", quorum=" + quorum + ", baseZNode=" + baseZNode;
349 }
350
351
352
353
354
355
356
357 public String prefix(final String str) {
358 return this.toString() + " " + str;
359 }
360
361
362
363
364 private void setNodeNames(Configuration conf) {
365 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
366 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
367 metaReplicaZnodes.put(0, ZKUtil.joinZNode(baseZNode,
368 conf.get("zookeeper.znode.metaserver", "meta-region-server")));
369 int numMetaReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
370 HConstants.DEFAULT_META_REPLICA_NUM);
371 for (int i = 1; i < numMetaReplicas; i++) {
372 String str = ZKUtil.joinZNode(baseZNode,
373 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + i);
374 metaReplicaZnodes.put(i, str);
375 }
376 rsZNode = ZKUtil.joinZNode(baseZNode,
377 conf.get("zookeeper.znode.rs", "rs"));
378 drainingZNode = ZKUtil.joinZNode(baseZNode,
379 conf.get("zookeeper.znode.draining.rs", "draining"));
380 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
381 conf.get("zookeeper.znode.master", "master"));
382 backupMasterAddressesZNode = ZKUtil.joinZNode(baseZNode,
383 conf.get("zookeeper.znode.backup.masters", "backup-masters"));
384 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
385 conf.get("zookeeper.znode.state", "running"));
386 assignmentZNode = ZKUtil.joinZNode(baseZNode,
387 conf.get("zookeeper.znode.unassigned", "region-in-transition"));
388 tableZNode = ZKUtil.joinZNode(baseZNode,
389 conf.get("zookeeper.znode.tableEnableDisable", "table"));
390 clusterIdZNode = ZKUtil.joinZNode(baseZNode,
391 conf.get("zookeeper.znode.clusterId", "hbaseid"));
392 splitLogZNode = ZKUtil.joinZNode(baseZNode,
393 conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
394 balancerZNode = ZKUtil.joinZNode(baseZNode,
395 conf.get("zookeeper.znode.balancer", "balancer"));
396 regionNormalizerZNode = ZKUtil.joinZNode(baseZNode,
397 conf.get("zookeeper.znode.regionNormalizer", "normalizer"));
398 switchZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.switch", "switch"));
399 tableLockZNode = ZKUtil.joinZNode(baseZNode,
400 conf.get("zookeeper.znode.tableLock", "table-lock"));
401 recoveringRegionsZNode = ZKUtil.joinZNode(baseZNode,
402 conf.get("zookeeper.znode.recovering.regions", "recovering-regions"));
403 namespaceZNode = ZKUtil.joinZNode(baseZNode,
404 conf.get("zookeeper.znode.namespace", "namespace"));
405 masterMaintZNode = ZKUtil.joinZNode(baseZNode,
406 conf.get("zookeeper.znode.masterMaintenance", "master-maintenance"));
407 }
408
409
410
411
412
413
414 public boolean isAnyMetaReplicaZnode(String node) {
415 if (metaReplicaZnodes.values().contains(node)) {
416 return true;
417 }
418 return false;
419 }
420
421
422
423
424
425
426 public boolean isDefaultMetaReplicaZnode(String node) {
427 if (getZNodeForReplica(HRegionInfo.DEFAULT_REPLICA_ID).equals(node)) {
428 return true;
429 }
430 return false;
431 }
432
433
434
435
436
437
438 public List<String> getMetaReplicaNodes() throws KeeperException {
439 List<String> childrenOfBaseNode = ZKUtil.listChildrenNoWatch(this, baseZNode);
440 List<String> metaReplicaNodes = new ArrayList<String>(2);
441 if (childrenOfBaseNode != null) {
442 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
443 for (String child : childrenOfBaseNode) {
444 if (child.startsWith(pattern)) metaReplicaNodes.add(child);
445 }
446 }
447 return metaReplicaNodes;
448 }
449
450
451
452
453
454
455 public String getZNodeForReplica(int replicaId) {
456 String str = metaReplicaZnodes.get(replicaId);
457
458
459
460 if (str == null) {
461 str = ZKUtil.joinZNode(baseZNode,
462 conf.get("zookeeper.znode.metaserver", "meta-region-server") + "-" + replicaId);
463 }
464 return str;
465 }
466
467
468
469
470
471
472 public int getMetaReplicaIdFromZnode(String znode) {
473 String pattern = conf.get("zookeeper.znode.metaserver","meta-region-server");
474 if (znode.equals(pattern)) return HRegionInfo.DEFAULT_REPLICA_ID;
475
476 String nonDefaultPattern = pattern + "-";
477 return Integer.parseInt(znode.substring(nonDefaultPattern.length()));
478 }
479
480
481
482
483
484 public void registerListener(ZooKeeperListener listener) {
485 listeners.add(listener);
486 }
487
488
489
490
491
492
493 public void registerListenerFirst(ZooKeeperListener listener) {
494 listeners.add(0, listener);
495 }
496
497 public void unregisterListener(ZooKeeperListener listener) {
498 listeners.remove(listener);
499 }
500
501
502
503
504 public void unregisterAllListeners() {
505 listeners.clear();
506 }
507
508
509
510
511 public List<ZooKeeperListener> getListeners() {
512 return new ArrayList<ZooKeeperListener>(listeners);
513 }
514
515
516
517
518 public int getNumberOfListeners() {
519 return listeners.size();
520 }
521
522
523
524
525
526 public RecoverableZooKeeper getRecoverableZooKeeper() {
527 return recoverableZooKeeper;
528 }
529
530 public void reconnectAfterExpiration() throws IOException, KeeperException, InterruptedException {
531 recoverableZooKeeper.reconnectAfterExpiration();
532 }
533
534
535
536
537
538 public String getQuorum() {
539 return quorum;
540 }
541
542
543
544
545 public String getBaseZNode() {
546 return baseZNode;
547 }
548
549
550
551
552
553
554
555 @Override
556 public void process(WatchedEvent event) {
557 LOG.debug(prefix("Received ZooKeeper Event, " +
558 "type=" + event.getType() + ", " +
559 "state=" + event.getState() + ", " +
560 "path=" + event.getPath()));
561
562 switch(event.getType()) {
563
564
565 case None: {
566 connectionEvent(event);
567 break;
568 }
569
570
571
572 case NodeCreated: {
573 for(ZooKeeperListener listener : listeners) {
574 listener.nodeCreated(event.getPath());
575 }
576 break;
577 }
578
579 case NodeDeleted: {
580 for(ZooKeeperListener listener : listeners) {
581 listener.nodeDeleted(event.getPath());
582 }
583 break;
584 }
585
586 case NodeDataChanged: {
587 for(ZooKeeperListener listener : listeners) {
588 listener.nodeDataChanged(event.getPath());
589 }
590 break;
591 }
592
593 case NodeChildrenChanged: {
594 for(ZooKeeperListener listener : listeners) {
595 listener.nodeChildrenChanged(event.getPath());
596 }
597 break;
598 }
599 }
600 }
601
602
603
604
605
606
607
608
609
610
611
612
613
614 private void connectionEvent(WatchedEvent event) {
615 switch(event.getState()) {
616 case SyncConnected:
617 this.identifier = this.prefix + "-0x" +
618 Long.toHexString(this.recoverableZooKeeper.getSessionId());
619
620 LOG.debug(this.identifier + " connected");
621 break;
622
623
624 case Disconnected:
625 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
626 break;
627
628 case Expired:
629 String msg = prefix(this.identifier + " received expired from " +
630 "ZooKeeper, aborting");
631
632
633 if (this.abortable != null) {
634 this.abortable.abort(msg, new KeeperException.SessionExpiredException());
635 }
636 break;
637
638 case ConnectedReadOnly:
639 case SaslAuthenticated:
640 case AuthFailed:
641 break;
642
643 default:
644 throw new IllegalStateException("Received event is not valid: " + event.getState());
645 }
646 }
647
648
649
650
651
652
653
654
655
656
657
658
659
660 public void sync(String path) throws KeeperException {
661 this.recoverableZooKeeper.sync(path, null, null);
662 }
663
664
665
666
667
668
669
670
671
672
673
674 public void keeperException(KeeperException ke)
675 throws KeeperException {
676 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
677 throw ke;
678 }
679
680
681
682
683
684
685
686
687
688
689
690
691 public void interruptedException(InterruptedException ie) {
692 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
693
694 Thread.currentThread().interrupt();
695
696 }
697
698
699
700
701
702
703 @Override
704 public void close() {
705 try {
706 recoverableZooKeeper.close();
707 } catch (InterruptedException e) {
708 Thread.currentThread().interrupt();
709 }
710 }
711
712 public Configuration getConfiguration() {
713 return conf;
714 }
715
716 @Override
717 public void abort(String why, Throwable e) {
718 if (this.abortable != null) this.abortable.abort(why, e);
719 else this.aborted = true;
720 }
721
722 @Override
723 public boolean isAborted() {
724 return this.abortable == null? this.aborted: this.abortable.isAborted();
725 }
726
727
728
729
730 public String getMasterAddressZNode() {
731 return this.masterAddressZNode;
732 }
733
734
735
736
737 public String getRegionNormalizerZNode() {
738 return regionNormalizerZNode;
739 }
740
741
742
743
744 public String getSwitchZNode() {
745 return switchZNode;
746 }
747 }