1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ConcurrentMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.hbase.classification.InterfaceAudience;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Abortable;
36 import org.apache.hadoop.hbase.CompoundConfiguration;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.exceptions.DeserializationException;
39 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
41 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
42 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
43 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.Pair;
46 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
47 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
49 import org.apache.zookeeper.KeeperException;
50 import org.apache.zookeeper.KeeperException.NoNodeException;
51
52 import com.google.protobuf.ByteString;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 @InterfaceAudience.Private
81 public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
82
83
84 private Map<String, ReplicationPeerZKImpl> peerClusters;
85 private final String tableCFsNodeName;
86
87 private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
88
89 public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
90 Abortable abortable) {
91 super(zk, conf, abortable);
92 this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
93 this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
94 }
95
96 @Override
97 public void init() throws ReplicationException {
98 try {
99 if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
100 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
101 }
102 } catch (KeeperException e) {
103 throw new ReplicationException("Could not initialize replication peers", e);
104 }
105 addExistingPeers();
106 }
107
108 @Override
109 public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
110 throws ReplicationException {
111 try {
112 if (peerExists(id)) {
113 throw new IllegalArgumentException("Cannot add a peer with id=" + id
114 + " because that id already exists.");
115 }
116
117 if(id.contains("-")){
118 throw new IllegalArgumentException("Found invalid peer name:" + id);
119 }
120
121 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
122
123
124 if (replicationForBulkLoadEnabled) {
125 try {
126 String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
127 LOG.info("Adding peer " + peerId + " to hfile reference queue.");
128 ZKUtil.createWithParents(this.zookeeper, peerId);
129 } catch (KeeperException e) {
130 throw new ReplicationException("Failed to add peer with id=" + id
131 + ", node under hfile references node.", e);
132 }
133 }
134 List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
135 ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
136 toByteArray(peerConfig));
137
138
139
140
141 ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
142 String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
143 ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
144 listOfOps.add(op1);
145 listOfOps.add(op2);
146 listOfOps.add(op3);
147 ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
148
149 } catch (KeeperException e) {
150 throw new ReplicationException("Could not add peer with id=" + id
151 + ", peerConfif=>" + peerConfig, e);
152 }
153 }
154
155 @Override
156 public void removePeer(String id) throws ReplicationException {
157 try {
158 if (!peerExists(id)) {
159 throw new IllegalArgumentException("Cannot remove peer with id=" + id
160 + " because that id does not exist.");
161 }
162 ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
163
164
165
166 String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
167 try {
168 LOG.info("Removing peer " + peerId + " from hfile reference queue.");
169 ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
170 } catch (NoNodeException e) {
171 LOG.info("Did not find node " + peerId + " to delete.", e);
172 }
173 } catch (KeeperException e) {
174 throw new ReplicationException("Could not remove peer with id=" + id, e);
175 }
176 }
177
178 @Override
179 public void enablePeer(String id) throws ReplicationException {
180 changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
181 LOG.info("peer " + id + " is enabled");
182 }
183
184 @Override
185 public void disablePeer(String id) throws ReplicationException {
186 changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
187 LOG.info("peer " + id + " is disabled");
188 }
189
190 @Override
191 public String getPeerTableCFsConfig(String id) throws ReplicationException {
192 try {
193 if (!peerExists(id)) {
194 throw new IllegalArgumentException("peer " + id + " doesn't exist");
195 }
196 try {
197 return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
198 } catch (Exception e) {
199 throw new ReplicationException(e);
200 }
201 } catch (KeeperException e) {
202 throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
203 }
204 }
205
206 @Override
207 public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
208 try {
209 if (!peerExists(id)) {
210 throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
211 + " does not exist.");
212 }
213 String tableCFsZKNode = getTableCFsNode(id);
214 byte[] tableCFs = Bytes.toBytes(tableCFsStr);
215 if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
216 ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
217 } else {
218 ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
219 }
220 LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
221 } catch (KeeperException e) {
222 throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
223 }
224 }
225
226 @Override
227 public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
228 ReplicationPeer replicationPeer = this.peerClusters.get(id);
229 if (replicationPeer == null) {
230 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
231 }
232 return replicationPeer.getTableCFs();
233 }
234
235 @Override
236 public boolean getStatusOfPeer(String id) {
237 ReplicationPeer replicationPeer = this.peerClusters.get(id);
238 if (replicationPeer == null) {
239 throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
240 }
241 return replicationPeer.getPeerState() == PeerState.ENABLED;
242 }
243
244 @Override
245 public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
246 try {
247 if (!peerExists(id)) {
248 throw new IllegalArgumentException("peer " + id + " doesn't exist");
249 }
250 String peerStateZNode = getPeerStateNode(id);
251 try {
252 return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
253 } catch (KeeperException e) {
254 throw new ReplicationException(e);
255 } catch (DeserializationException e) {
256 throw new ReplicationException(e);
257 }
258 } catch (KeeperException e) {
259 throw new ReplicationException("Unable to get status of the peer with id=" + id +
260 " from backing store", e);
261 } catch (InterruptedException e) {
262 throw new ReplicationException(e);
263 }
264 }
265
266 @Override
267 public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
268 Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
269 List<String> ids = null;
270 try {
271 ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
272 for (String id : ids) {
273 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
274 if (peerConfig == null) {
275 LOG.warn("Failed to get replication peer configuration of clusterid=" + id
276 + " znode content, continuing.");
277 continue;
278 }
279 peers.put(id, peerConfig);
280 }
281 } catch (KeeperException e) {
282 this.abortable.abort("Cannot get the list of peers ", e);
283 } catch (ReplicationException e) {
284 this.abortable.abort("Cannot get the list of peers ", e);
285 }
286 return peers;
287 }
288
289 @Override
290 public ReplicationPeer getPeer(String peerId) {
291 return peerClusters.get(peerId);
292 }
293
294 @Override
295 public Set<String> getPeerIds() {
296 return peerClusters.keySet();
297 }
298
299
300
301
302 @Override
303 public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
304 throws ReplicationException {
305 String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
306 byte[] data = null;
307 try {
308 data = ZKUtil.getData(this.zookeeper, znode);
309 } catch (InterruptedException e) {
310 LOG.warn("Could not get configuration for peer because the thread " +
311 "was interrupted. peerId=" + peerId);
312 Thread.currentThread().interrupt();
313 return null;
314 } catch (KeeperException e) {
315 throw new ReplicationException("Error getting configuration for peer with id="
316 + peerId, e);
317 }
318 if (data == null) {
319 LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
320 return null;
321 }
322
323 try {
324 return parsePeerFrom(data);
325 } catch (DeserializationException e) {
326 LOG.warn("Failed to parse cluster key from peerId=" + peerId
327 + ", specifically the content from the following znode: " + znode);
328 return null;
329 }
330 }
331
332 @Override
333 public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
334 throws ReplicationException {
335 ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
336
337 if (peerConfig == null) {
338 return null;
339 }
340
341 Configuration otherConf;
342 try {
343 otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
344 } catch (IOException e) {
345 LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
346 return null;
347 }
348
349 if (!peerConfig.getConfiguration().isEmpty()) {
350 CompoundConfiguration compound = new CompoundConfiguration();
351 compound.add(otherConf);
352 compound.addStringMap(peerConfig.getConfiguration());
353 return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
354 }
355
356 return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
357 }
358
359
360
361
362 @Override
363 public List<String> getAllPeerIds() {
364 List<String> ids = null;
365 try {
366 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
367 } catch (KeeperException e) {
368 this.abortable.abort("Cannot get the list of peers ", e);
369 }
370 return ids;
371 }
372
373
374
375
376
377 private void addExistingPeers() throws ReplicationException {
378 List<String> znodes = null;
379 try {
380 znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
381 } catch (KeeperException e) {
382 throw new ReplicationException("Error getting the list of peer clusters.", e);
383 }
384 if (znodes != null) {
385 for (String z : znodes) {
386 createAndAddPeer(z);
387 }
388 }
389 }
390
391 @Override
392 public boolean peerAdded(String peerId) throws ReplicationException {
393 return createAndAddPeer(peerId);
394 }
395
396 @Override
397 public void peerRemoved(String peerId) {
398 ReplicationPeer rp = this.peerClusters.get(peerId);
399 if (rp != null) {
400 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
401 }
402 }
403
404
405
406
407
408
409 public boolean createAndAddPeer(String peerId) throws ReplicationException {
410 if (peerClusters == null) {
411 return false;
412 }
413 if (this.peerClusters.containsKey(peerId)) {
414 return false;
415 }
416
417 ReplicationPeerZKImpl peer = null;
418 try {
419 peer = createPeer(peerId);
420 } catch (Exception e) {
421 throw new ReplicationException("Error adding peer with id=" + peerId, e);
422 }
423 if (peer == null) {
424 return false;
425 }
426 ReplicationPeerZKImpl previous =
427 ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
428 if (previous == null) {
429 LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
430 } else {
431 LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
432 ", new cluster=" + peer.getPeerConfig().getClusterKey());
433 }
434 return true;
435 }
436
437 private String getTableCFsNode(String id) {
438 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
439 }
440
441 private String getPeerStateNode(String id) {
442 return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
443 }
444
445
446
447
448
449
450 private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
451 throws ReplicationException {
452 try {
453 if (!peerExists(id)) {
454 throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
455 + " does not exist.");
456 }
457 String peerStateZNode = getPeerStateNode(id);
458 byte[] stateBytes =
459 (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
460 : DISABLED_ZNODE_BYTES;
461 if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
462 ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
463 } else {
464 ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
465 }
466 LOG.info("Peer with id= " + id + " is now " + state.name());
467 } catch (KeeperException e) {
468 throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
469 }
470 }
471
472
473
474
475
476
477
478 private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
479 Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
480 if (pair == null) {
481 return null;
482 }
483 Configuration peerConf = pair.getSecond();
484
485 ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
486 try {
487 peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
488 } catch (KeeperException e) {
489 throw new ReplicationException("Error starting the peer state tracker for peerId=" +
490 peerId, e);
491 }
492
493 try {
494 peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
495 } catch (KeeperException e) {
496 throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
497 peerId, e);
498 }
499
500 return peer;
501 }
502
503
504
505
506
507
508 private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
509 throws DeserializationException {
510 if (ProtobufUtil.isPBMagicPrefix(bytes)) {
511 int pblen = ProtobufUtil.lengthOfPBMagic();
512 ZooKeeperProtos.ReplicationPeer.Builder builder =
513 ZooKeeperProtos.ReplicationPeer.newBuilder();
514 ZooKeeperProtos.ReplicationPeer peer;
515 try {
516 ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
517 peer = builder.build();
518 } catch (IOException e) {
519 throw new DeserializationException(e);
520 }
521 return convert(peer);
522 } else {
523 if (bytes.length > 0) {
524 return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
525 }
526 return new ReplicationPeerConfig().setClusterKey("");
527 }
528 }
529
530 private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
531 ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
532 if (peer.hasClusterkey()) {
533 peerConfig.setClusterKey(peer.getClusterkey());
534 }
535 if (peer.hasReplicationEndpointImpl()) {
536 peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
537 }
538
539 for (BytesBytesPair pair : peer.getDataList()) {
540 peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
541 }
542
543 for (NameStringPair pair : peer.getConfigurationList()) {
544 peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
545 }
546 return peerConfig;
547 }
548
549 private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
550 ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
551 if (peerConfig.getClusterKey() != null) {
552 builder.setClusterkey(peerConfig.getClusterKey());
553 }
554 if (peerConfig.getReplicationEndpointImpl() != null) {
555 builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
556 }
557
558 for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
559 builder.addData(BytesBytesPair.newBuilder()
560 .setFirst(ByteString.copyFrom(entry.getKey()))
561 .setSecond(ByteString.copyFrom(entry.getValue()))
562 .build());
563 }
564
565 for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
566 builder.addConfiguration(NameStringPair.newBuilder()
567 .setName(entry.getKey())
568 .setValue(entry.getValue())
569 .build());
570 }
571
572 return builder.build();
573 }
574
575
576
577
578
579
580
581 private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
582 byte[] bytes = convert(peerConfig).toByteArray();
583 return ProtobufUtil.prependPBMagic(bytes);
584 }
585
586
587 }