View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.util.ArrayList;
22  import java.util.List;
23  import java.util.SortedMap;
24  import java.util.SortedSet;
25  import java.util.TreeMap;
26  import java.util.TreeSet;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.Abortable;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.exceptions.DeserializationException;
36  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
37  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.Pair;
40  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
41  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
42  import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
43  import org.apache.zookeeper.KeeperException;
44  
45  /**
46   * This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
47   * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
48   * all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
49   * the regionserver name (a concatenation of the region server’s hostname, client port and start
50   * code). For example:
51   *
52   * /hbase/replication/rs/hostname.example.org,6020,1234
53   *
54   * Within this znode, the region server maintains a set of WAL replication queues. These queues are
55   * represented by child znodes named using there give queue id. For example:
56   *
57   * /hbase/replication/rs/hostname.example.org,6020,1234/1
58   * /hbase/replication/rs/hostname.example.org,6020,1234/2
59   *
60   * Each queue has one child znode for every WAL that still needs to be replicated. The value of
61   * these WAL child znodes is the latest position that has been replicated. This position is updated
62   * every time a WAL entry is replicated. For example:
63   *
64   * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
65   */
66  @InterfaceAudience.Private
67  public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
68  
69    /** Znode containing all replication queues for this region server. */
70    private String myQueuesZnode;
71    /** Name of znode we use to lock during failover */
72    private final static String RS_LOCK_ZNODE = "lock";
73  
74    private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
75  
76    public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
77        Abortable abortable) {
78      super(zk, conf, abortable);
79    }
80  
81    @Override
82    public void init(String serverName) throws ReplicationException {
83      this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
84      try {
85        ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
86      } catch (KeeperException e) {
87        throw new ReplicationException("Could not initialize replication queues.", e);
88      }
89      // If only bulk load hfile replication is enabled then create the hfile-refs znode
90      if (replicationForBulkLoadEnabled) {
91        try {
92          ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
93        } catch (KeeperException e) {
94          throw new ReplicationException("Could not initialize hfile references replication queue.",
95              e);
96        }
97      }
98    }
99  
100   @Override
101   public void removeQueue(String queueId) {
102     try {
103       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.myQueuesZnode, queueId));
104     } catch (KeeperException e) {
105       this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
106     }
107   }
108 
109   @Override
110   public void addLog(String queueId, String filename) throws ReplicationException {
111     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
112     znode = ZKUtil.joinZNode(znode, filename);
113     try {
114       ZKUtil.createWithParents(this.zookeeper, znode);
115     } catch (KeeperException e) {
116       throw new ReplicationException(
117           "Could not add log because znode could not be created. queueId=" + queueId
118               + ", filename=" + filename);
119     }
120   }
121 
122   @Override
123   public void removeLog(String queueId, String filename) {
124     try {
125       String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
126       znode = ZKUtil.joinZNode(znode, filename);
127       ZKUtil.deleteNode(this.zookeeper, znode);
128     } catch (KeeperException e) {
129       this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
130           + filename + ")", e);
131     }
132   }
133 
134   @Override
135   public void setLogPosition(String queueId, String filename, long position) {
136     try {
137       String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
138       znode = ZKUtil.joinZNode(znode, filename);
139       // Why serialize String of Long and not Long as bytes?
140       ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
141     } catch (KeeperException e) {
142       this.abortable.abort("Failed to write replication wal position (filename=" + filename
143           + ", position=" + position + ")", e);
144     }
145   }
146 
147   @Override
148   public long getLogPosition(String queueId, String filename) throws ReplicationException {
149     String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
150     String znode = ZKUtil.joinZNode(clusterZnode, filename);
151     byte[] bytes = null;
152     try {
153       bytes = ZKUtil.getData(this.zookeeper, znode);
154     } catch (KeeperException e) {
155       throw new ReplicationException("Internal Error: could not get position in log for queueId="
156           + queueId + ", filename=" + filename, e);
157     } catch (InterruptedException e) {
158       Thread.currentThread().interrupt();
159       return 0;
160     }
161     try {
162       return ZKUtil.parseWALPositionFrom(bytes);
163     } catch (DeserializationException de) {
164       LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
165           + "znode content, continuing.");
166     }
167     // if we can not parse the position, start at the beginning of the wal file
168     // again
169     return 0;
170   }
171 
172   @Override
173   public boolean isThisOurZnode(String znode) {
174     return ZKUtil.joinZNode(this.queuesZNode, znode).equals(this.myQueuesZnode);
175   }
176 
177   @Override
178   public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) {
179     SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>();
180     // check whether there is multi support. If yes, use it.
181     if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
182       LOG.info("Atomically moving " + regionserverZnode + "'s wals to my queue");
183       newQueues = copyQueuesFromRSUsingMulti(regionserverZnode);
184     } else {
185       LOG.info("Moving " + regionserverZnode + "'s wals to my queue");
186       if (!lockOtherRS(regionserverZnode)) {
187         return newQueues;
188       }
189       newQueues = copyQueuesFromRS(regionserverZnode);
190       deleteAnotherRSQueues(regionserverZnode);
191     }
192     return newQueues;
193   }
194 
195   @Override
196   public void removeAllQueues() {
197     try {
198       ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
199     } catch (KeeperException e) {
200       // if the znode is already expired, don't bother going further
201       if (e instanceof KeeperException.SessionExpiredException) {
202         return;
203       }
204       this.abortable.abort("Failed to delete replication queues for region server: "
205           + this.myQueuesZnode, e);
206     }
207   }
208 
209   @Override
210   public List<String> getLogsInQueue(String queueId) {
211     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
212     List<String> result = null;
213     try {
214       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
215     } catch (KeeperException e) {
216       this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
217     }
218     return result;
219   }
220 
221   @Override
222   public List<String> getAllQueues() {
223     List<String> listOfQueues = null;
224     try {
225       listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
226     } catch (KeeperException e) {
227       this.abortable.abort("Failed to get a list of queues for region server: "
228           + this.myQueuesZnode, e);
229     }
230     return listOfQueues;
231   }
232 
233   /**
234    * Try to set a lock in another region server's znode.
235    * @param znode the server names of the other server
236    * @return true if the lock was acquired, false in every other cases
237    */
238   private boolean lockOtherRS(String znode) {
239     try {
240       String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
241       if (parent.equals(this.myQueuesZnode)) {
242         LOG.warn("Won't lock because this is us, we're dead!");
243         return false;
244       }
245       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
246       ZKUtil.createAndWatch(this.zookeeper, p, lockToByteArray(this.myQueuesZnode));
247     } catch (KeeperException e) {
248       // This exception will pop up if the znode under which we're trying to
249       // create the lock is already deleted by another region server, meaning
250       // that the transfer already occurred.
251       // NoNode => transfer is done and znodes are already deleted
252       // NodeExists => lock znode already created by another RS
253       if (e instanceof KeeperException.NoNodeException
254           || e instanceof KeeperException.NodeExistsException) {
255         LOG.info("Won't transfer the queue," + " another RS took care of it because of: "
256             + e.getMessage());
257       } else {
258         LOG.info("Failed lock other rs", e);
259       }
260       return false;
261     }
262     return true;
263   }
264 
265   /**
266    * Delete all the replication queues for a given region server.
267    * @param regionserverZnode The znode of the region server to delete.
268    */
269   private void deleteAnotherRSQueues(String regionserverZnode) {
270     String fullpath = ZKUtil.joinZNode(this.queuesZNode, regionserverZnode);
271     try {
272       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
273       for (String cluster : clusters) {
274         // No need to delete, it will be deleted later.
275         if (cluster.equals(RS_LOCK_ZNODE)) {
276           continue;
277         }
278         String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
279         ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
280       }
281       // Finish cleaning up
282       ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
283     } catch (KeeperException e) {
284       if (e instanceof KeeperException.NoNodeException
285           || e instanceof KeeperException.NotEmptyException) {
286         // Testing a special case where another region server was able to
287         // create a lock just after we deleted it, but then was also able to
288         // delete the RS znode before us or its lock znode is still there.
289         if (e.getPath().equals(fullpath)) {
290           return;
291         }
292       }
293       this.abortable.abort("Failed to delete replication queues for region server: "
294           + regionserverZnode, e);
295     }
296   }
297 
298   /**
299    * It "atomically" copies all the wals queues from another region server and returns them all
300    * sorted per peer cluster (appended with the dead server's znode).
301    * @param znode pertaining to the region server to copy the queues from
302    * @return WAL queues sorted per peer cluster
303    */
304   private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) {
305     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
306     // hbase/replication/rs/deadrs
307     String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
308     List<String> peerIdsToProcess = null;
309     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
310     try {
311       peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
312       if (peerIdsToProcess == null) return queues; // node already processed
313       for (String peerId : peerIdsToProcess) {
314         ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
315         if (!peerExists(replicationQueueInfo.getPeerId())) {
316           // the orphaned queues must be moved, otherwise the delete op of dead rs will fail,
317           // this will cause the whole multi op fail.
318           // NodeFailoverWorker will skip the orphaned queues.
319           LOG.warn("Peer " + peerId
320               + " didn't exist, will move its queue to avoid the failure of multi op");
321         }
322         String newPeerId = peerId + "-" + znode;
323         String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
324         // check the logs queue for the old peer cluster
325         String oldClusterZnode = ZKUtil.joinZNode(deadRSZnodePath, peerId);
326         List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
327         if (wals == null || wals.size() == 0) {
328           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
329           continue; // empty log queue.
330         }
331         // create the new cluster znode
332         SortedSet<String> logQueue = new TreeSet<String>();
333         queues.put(newPeerId, logQueue);
334         ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
335         listOfOps.add(op);
336         // get the offset of the logs and set it to new znodes
337         for (String wal : wals) {
338           String oldWalZnode = ZKUtil.joinZNode(oldClusterZnode, wal);
339           byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
340           LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
341           String newLogZnode = ZKUtil.joinZNode(newPeerZnode, wal);
342           listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
343           // add ops for deleting
344           listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
345           logQueue.add(wal);
346         }
347         // add delete op for peer
348         listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
349       }
350       // add delete op for dead rs, this will update the cversion of the parent.
351       // The reader will make optimistic locking with this to get a consistent
352       // snapshot
353       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(deadRSZnodePath));
354       LOG.debug(" The multi list size is: " + listOfOps.size());
355       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
356       LOG.info("Atomically moved the dead regionserver logs. ");
357     } catch (KeeperException e) {
358       // Multi call failed; it looks like some other regionserver took away the logs.
359       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
360       queues.clear();
361     } catch (InterruptedException e) {
362       LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
363       queues.clear();
364       Thread.currentThread().interrupt();
365     }
366     return queues;
367   }
368 
369   /**
370    * This methods copies all the wals queues from another region server and returns them all sorted
371    * per peer cluster (appended with the dead server's znode)
372    * @param znode server names to copy
373    * @return all wals for all peers of that cluster, null if an error occurred
374    */
375   private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
376     // TODO this method isn't atomic enough, we could start copying and then
377     // TODO fail for some reason and we would end up with znodes we don't want.
378     SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>();
379     try {
380       String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
381       List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
382       // We have a lock znode in there, it will count as one.
383       if (clusters == null || clusters.size() <= 1) {
384         return queues;
385       }
386       // The lock isn't a peer cluster, remove it
387       clusters.remove(RS_LOCK_ZNODE);
388       for (String cluster : clusters) {
389         ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
390         if (!peerExists(replicationQueueInfo.getPeerId())) {
391           LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
392           // Protection against moving orphaned queues
393           continue;
394         }
395         // We add the name of the recovered RS to the new znode, we can even
396         // do that for queues that were recovered 10 times giving a znode like
397         // number-startcode-number-otherstartcode-number-anotherstartcode-etc
398         String newCluster = cluster + "-" + znode;
399         String newClusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, newCluster);
400         String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
401         List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
402         // That region server didn't have anything to replicate for this cluster
403         if (wals == null || wals.size() == 0) {
404           continue;
405         }
406         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
407           HConstants.EMPTY_BYTE_ARRAY);
408         SortedSet<String> logQueue = new TreeSet<String>();
409         queues.put(newCluster, logQueue);
410         for (String wal : wals) {
411           String z = ZKUtil.joinZNode(clusterPath, wal);
412           byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
413           long position = 0;
414           try {
415             position = ZKUtil.parseWALPositionFrom(positionBytes);
416           } catch (DeserializationException e) {
417             LOG.warn("Failed parse of wal position from the following znode: " + z
418                 + ", Exception: " + e);
419           }
420           LOG.debug("Creating " + wal + " with data " + position);
421           String child = ZKUtil.joinZNode(newClusterZnode, wal);
422           // Position doesn't actually change, we are just deserializing it for
423           // logging, so just use the already serialized version
424           ZKUtil.createAndWatch(this.zookeeper, child, positionBytes);
425           logQueue.add(wal);
426         }
427       }
428     } catch (KeeperException e) {
429       this.abortable.abort("Copy queues from rs", e);
430     } catch (InterruptedException e) {
431       LOG.warn(e);
432       Thread.currentThread().interrupt();
433     }
434     return queues;
435   }
436 
437   /**
438    * @param lockOwner
439    * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
440    *         for use as content of an replication lock during region server fail over.
441    */
442   static byte[] lockToByteArray(final String lockOwner) {
443     byte[] bytes =
444         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
445     return ProtobufUtil.prependPBMagic(bytes);
446   }
447 
448   @Override
449   public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
450       throws ReplicationException {
451     String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
452     boolean debugEnabled = LOG.isDebugEnabled();
453     if (debugEnabled) {
454       LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
455     }
456     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
457     int size = pairs.size();
458     for (int i = 0; i < size; i++) {
459       listOfOps.add(ZKUtilOp.createAndFailSilent(
460         ZKUtil.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
461         HConstants.EMPTY_BYTE_ARRAY));
462     }
463     if (debugEnabled) {
464       LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
465           + " is " + listOfOps.size());
466     }
467     try {
468       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
469     } catch (KeeperException e) {
470       throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
471     }
472   }
473 
474   @Override
475   public void removeHFileRefs(String peerId, List<String> files) {
476     String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
477     boolean debugEnabled = LOG.isDebugEnabled();
478     if (debugEnabled) {
479       LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
480     }
481     List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
482     int size = files.size();
483     for (int i = 0; i < size; i++) {
484       listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
485     }
486     if (debugEnabled) {
487       LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
488           + " is " + listOfOps.size());
489     }
490     try {
491       ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
492     } catch (KeeperException e) {
493       LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
494     }
495   }
496 
497   @Override
498   public void addPeerToHFileRefs(String peerId) throws ReplicationException {
499     String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
500     try {
501       if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
502         LOG.info("Adding peer " + peerId + " to hfile reference queue.");
503         ZKUtil.createWithParents(this.zookeeper, peerZnode);
504       }
505     } catch (KeeperException e) {
506       throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
507           e);
508     }
509   }
510 }