View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.TreeMap;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.HBaseConfiguration;
33  import org.apache.hadoop.hbase.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Abortable;
36  import org.apache.hadoop.hbase.CompoundConfiguration;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.exceptions.DeserializationException;
39  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
42  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
43  import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Pair;
46  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
47  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
48  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
49  import org.apache.zookeeper.KeeperException;
50  import org.apache.zookeeper.KeeperException.NoNodeException;
51  
52  import com.google.protobuf.ByteString;
53  
54  /**
55   * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
56   * peers znode contains a list of all peer replication clusters and the current replication state of
57   * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
58   * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
59   * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
60   * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
61   * For example:
62   *
63   *  /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
64   *  /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
65   *
66   * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
67   * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
68   * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
69   * ReplicationPeer.PeerStateTracker class. For example:
70   *
71   * /hbase/replication/peers/1/peer-state [Value: ENABLED]
72   *
73   * Each of these peer znodes has a child znode that indicates which data will be replicated
74   * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
75   * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
76   * class. For example:
77   *
78   * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
79   */
80  @InterfaceAudience.Private
81  public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
82  
83    // Map of peer clusters keyed by their id
84    private Map<String, ReplicationPeerZKImpl> peerClusters;
85    private final String tableCFsNodeName;
86  
87    private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
88  
89    public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
90        Abortable abortable) {
91      super(zk, conf, abortable);
92      this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
93      this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
94    }
95  
96    @Override
97    public void init() throws ReplicationException {
98      try {
99        if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
100         ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
101       }
102     } catch (KeeperException e) {
103       throw new ReplicationException("Could not initialize replication peers", e);
104     }
105     addExistingPeers();
106   }
107 
108   @Override
109   public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
110       throws ReplicationException {
111     try {
112       if (peerExists(id)) {
113         throw new IllegalArgumentException("Cannot add a peer with id=" + id
114             + " because that id already exists.");
115       }
116 
117       if(id.contains("-")){
118         throw new IllegalArgumentException("Found invalid peer name:" + id);
119       }
120 
121       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
122       
123       // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node
124       if (replicationForBulkLoadEnabled) {
125         try {
126           String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
127           LOG.info("Adding peer " + peerId + " to hfile reference queue.");
128           ZKUtil.createWithParents(this.zookeeper, peerId);
129         } catch (KeeperException e) {
130           throw new ReplicationException("Failed to add peer with id=" + id
131             + ", node under hfile references node.", e);
132         }
133       }
134       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
135       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
136         toByteArray(peerConfig));
137       // There is a race (if hbase.zookeeper.useMulti is false)
138       // b/w PeerWatcher and ReplicationZookeeper#add method to create the
139       // peer-state znode. This happens while adding a peer
140       // The peer state data is set as "ENABLED" by default.
141       ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
142       String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
143       ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
144       listOfOps.add(op1);
145       listOfOps.add(op2);
146       listOfOps.add(op3);
147       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
148       // A peer is enabled by default
149     } catch (KeeperException e) {
150       throw new ReplicationException("Could not add peer with id=" + id
151           + ", peerConfif=>" + peerConfig, e);
152     }
153   }
154 
155   @Override
156   public void removePeer(String id) throws ReplicationException {
157     try {
158       if (!peerExists(id)) {
159         throw new IllegalArgumentException("Cannot remove peer with id=" + id
160             + " because that id does not exist.");
161       }
162       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
163       // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
164       // replication is enabled or not
165 
166       String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
167       try {
168         LOG.info("Removing peer " + peerId + " from hfile reference queue.");
169         ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
170       } catch (NoNodeException e) {
171         LOG.info("Did not find node " + peerId + " to delete.", e);
172       }
173     } catch (KeeperException e) {
174       throw new ReplicationException("Could not remove peer with id=" + id, e);
175     }
176   }
177 
178   @Override
179   public void enablePeer(String id) throws ReplicationException {
180     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
181     LOG.info("peer " + id + " is enabled");
182   }
183 
184   @Override
185   public void disablePeer(String id) throws ReplicationException {
186     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
187     LOG.info("peer " + id + " is disabled");
188   }
189 
190   @Override
191   public String getPeerTableCFsConfig(String id) throws ReplicationException {
192     try {
193       if (!peerExists(id)) {
194         throw new IllegalArgumentException("peer " + id + " doesn't exist");
195       }
196       try {
197         return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
198       } catch (Exception e) {
199         throw new ReplicationException(e);
200       }
201     } catch (KeeperException e) {
202       throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
203     }
204   }
205 
206   @Override
207   public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
208     try {
209       if (!peerExists(id)) {
210         throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
211             + " does not exist.");
212       }
213       String tableCFsZKNode = getTableCFsNode(id);
214       byte[] tableCFs = Bytes.toBytes(tableCFsStr);
215       if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
216         ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
217       } else {
218         ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
219       }
220       LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
221     } catch (KeeperException e) {
222       throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
223     }
224   }
225 
226   @Override
227   public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
228     ReplicationPeer replicationPeer = this.peerClusters.get(id);
229     if (replicationPeer == null) {
230       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
231     }
232     return replicationPeer.getTableCFs();
233   }
234 
235   @Override
236   public boolean getStatusOfPeer(String id) {
237     ReplicationPeer replicationPeer = this.peerClusters.get(id);
238     if (replicationPeer == null) {
239       throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
240     }
241     return replicationPeer.getPeerState() == PeerState.ENABLED;
242   }
243 
244   @Override
245   public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
246     try {
247       if (!peerExists(id)) {
248         throw new IllegalArgumentException("peer " + id + " doesn't exist");
249       }
250       String peerStateZNode = getPeerStateNode(id);
251       try {
252         return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
253       } catch (KeeperException e) {
254         throw new ReplicationException(e);
255       } catch (DeserializationException e) {
256         throw new ReplicationException(e);
257       }
258     } catch (KeeperException e) {
259       throw new ReplicationException("Unable to get status of the peer with id=" + id +
260           " from backing store", e);
261     } catch (InterruptedException e) {
262       throw new ReplicationException(e);
263     }
264   }
265 
266   @Override
267   public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
268     Map<String, ReplicationPeerConfig> peers = new TreeMap<String, ReplicationPeerConfig>();
269     List<String> ids = null;
270     try {
271       ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
272       for (String id : ids) {
273         ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
274         if (peerConfig == null) {
275           LOG.warn("Failed to get replication peer configuration of clusterid=" + id
276             + " znode content, continuing.");
277           continue;
278         }
279         peers.put(id, peerConfig);
280       }
281     } catch (KeeperException e) {
282       this.abortable.abort("Cannot get the list of peers ", e);
283     } catch (ReplicationException e) {
284       this.abortable.abort("Cannot get the list of peers ", e);
285     }
286     return peers;
287   }
288 
289   @Override
290   public ReplicationPeer getPeer(String peerId) {
291     return peerClusters.get(peerId);
292   }
293 
294   @Override
295   public Set<String> getPeerIds() {
296     return peerClusters.keySet(); // this is not thread-safe
297   }
298 
299   /**
300    * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
301    */
302   @Override
303   public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
304       throws ReplicationException {
305     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
306     byte[] data = null;
307     try {
308       data = ZKUtil.getData(this.zookeeper, znode);
309     } catch (InterruptedException e) {
310       LOG.warn("Could not get configuration for peer because the thread " +
311           "was interrupted. peerId=" + peerId);
312       Thread.currentThread().interrupt();
313       return null;
314     } catch (KeeperException e) {
315       throw new ReplicationException("Error getting configuration for peer with id="
316           + peerId, e);
317     }
318     if (data == null) {
319       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
320       return null;
321     }
322 
323     try {
324       return parsePeerFrom(data);
325     } catch (DeserializationException e) {
326       LOG.warn("Failed to parse cluster key from peerId=" + peerId
327           + ", specifically the content from the following znode: " + znode);
328       return null;
329     }
330   }
331 
332   @Override
333   public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
334       throws ReplicationException {
335     ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
336 
337     if (peerConfig == null) {
338       return null;
339     }
340 
341     Configuration otherConf;
342     try {
343       otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
344     } catch (IOException e) {
345       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
346       return null;
347     }
348 
349     if (!peerConfig.getConfiguration().isEmpty()) {
350       CompoundConfiguration compound = new CompoundConfiguration();
351       compound.add(otherConf);
352       compound.addStringMap(peerConfig.getConfiguration());
353       return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, compound);
354     }
355 
356     return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
357   }
358 
359   /**
360    * List all registered peer clusters and set a watch on their znodes.
361    */
362   @Override
363   public List<String> getAllPeerIds() {
364     List<String> ids = null;
365     try {
366       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
367     } catch (KeeperException e) {
368       this.abortable.abort("Cannot get the list of peers ", e);
369     }
370     return ids;
371   }
372 
373   /**
374    * A private method used during initialization. This method attempts to add all registered
375    * peer clusters. This method does not set a watch on the peer cluster znodes.
376    */
377   private void addExistingPeers() throws ReplicationException {
378     List<String> znodes = null;
379     try {
380       znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
381     } catch (KeeperException e) {
382       throw new ReplicationException("Error getting the list of peer clusters.", e);
383     }
384     if (znodes != null) {
385       for (String z : znodes) {
386         createAndAddPeer(z);
387       }
388     }
389   }
390 
391   @Override
392   public boolean peerAdded(String peerId) throws ReplicationException {
393     return createAndAddPeer(peerId);
394   }
395 
396   @Override
397   public void peerRemoved(String peerId) {
398     ReplicationPeer rp = this.peerClusters.get(peerId);
399     if (rp != null) {
400       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
401     }
402   }
403 
404   /**
405    * Attempt to connect to a new remote slave cluster.
406    * @param peerId a short that identifies the cluster
407    * @return true if a new connection was made, false if no new connection was made.
408    */
409   public boolean createAndAddPeer(String peerId) throws ReplicationException {
410     if (peerClusters == null) {
411       return false;
412     }
413     if (this.peerClusters.containsKey(peerId)) {
414       return false;
415     }
416 
417     ReplicationPeerZKImpl peer = null;
418     try {
419       peer = createPeer(peerId);
420     } catch (Exception e) {
421       throw new ReplicationException("Error adding peer with id=" + peerId, e);
422     }
423     if (peer == null) {
424       return false;
425     }
426     ReplicationPeerZKImpl previous =
427       ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
428     if (previous == null) {
429       LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
430     } else {
431       LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
432         ", new cluster=" + peer.getPeerConfig().getClusterKey());
433     }
434     return true;
435   }
436 
437   private String getTableCFsNode(String id) {
438     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
439   }
440 
441   private String getPeerStateNode(String id) {
442     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
443   }
444 
445   /**
446    * Update the state znode of a peer cluster.
447    * @param id
448    * @param state
449    */
450   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
451       throws ReplicationException {
452     try {
453       if (!peerExists(id)) {
454         throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
455             + " does not exist.");
456       }
457       String peerStateZNode = getPeerStateNode(id);
458       byte[] stateBytes =
459           (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
460               : DISABLED_ZNODE_BYTES;
461       if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
462         ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
463       } else {
464         ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
465       }
466       LOG.info("Peer with id= " + id + " is now " + state.name());
467     } catch (KeeperException e) {
468       throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
469     }
470   }
471 
472   /**
473    * Helper method to connect to a peer
474    * @param peerId peer's identifier
475    * @return object representing the peer
476    * @throws ReplicationException
477    */
478   private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
479     Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
480     if (pair == null) {
481       return null;
482     }
483     Configuration peerConf = pair.getSecond();
484 
485     ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
486     try {
487       peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
488     } catch (KeeperException e) {
489       throw new ReplicationException("Error starting the peer state tracker for peerId=" +
490           peerId, e);
491     }
492 
493     try {
494       peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
495     } catch (KeeperException e) {
496       throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
497           peerId, e);
498     }
499 
500     return peer;
501   }
502 
503   /**
504    * @param bytes Content of a peer znode.
505    * @return ClusterKey parsed from the passed bytes.
506    * @throws DeserializationException
507    */
508   private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
509       throws DeserializationException {
510     if (ProtobufUtil.isPBMagicPrefix(bytes)) {
511       int pblen = ProtobufUtil.lengthOfPBMagic();
512       ZooKeeperProtos.ReplicationPeer.Builder builder =
513           ZooKeeperProtos.ReplicationPeer.newBuilder();
514       ZooKeeperProtos.ReplicationPeer peer;
515       try {
516         ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
517         peer = builder.build();
518       } catch (IOException e) {
519         throw new DeserializationException(e);
520       }
521       return convert(peer);
522     } else {
523       if (bytes.length > 0) {
524         return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
525       }
526       return new ReplicationPeerConfig().setClusterKey("");
527     }
528   }
529 
530   private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
531     ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
532     if (peer.hasClusterkey()) {
533       peerConfig.setClusterKey(peer.getClusterkey());
534     }
535     if (peer.hasReplicationEndpointImpl()) {
536       peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
537     }
538 
539     for (BytesBytesPair pair : peer.getDataList()) {
540       peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
541     }
542 
543     for (NameStringPair pair : peer.getConfigurationList()) {
544       peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
545     }
546     return peerConfig;
547   }
548 
549   private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig  peerConfig) {
550     ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
551     if (peerConfig.getClusterKey() != null) {
552       builder.setClusterkey(peerConfig.getClusterKey());
553     }
554     if (peerConfig.getReplicationEndpointImpl() != null) {
555       builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
556     }
557 
558     for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
559       builder.addData(BytesBytesPair.newBuilder()
560         .setFirst(ByteString.copyFrom(entry.getKey()))
561         .setSecond(ByteString.copyFrom(entry.getValue()))
562           .build());
563     }
564 
565     for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
566       builder.addConfiguration(NameStringPair.newBuilder()
567         .setName(entry.getKey())
568         .setValue(entry.getValue())
569         .build());
570     }
571 
572     return builder.build();
573   }
574 
575   /**
576    * @param peerConfig
577    * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
578    *         for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
579    *         /hbase/replication/peers/PEER_ID
580    */
581   private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
582     byte[] bytes = convert(peerConfig).toByteArray();
583     return ProtobufUtil.prependPBMagic(bytes);
584   }
585 
586 
587 }