View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  package org.apache.hadoop.hbase.replication.master;
12  
13  import com.google.common.annotations.VisibleForTesting;
14  import com.google.common.base.Predicate;
15  import com.google.common.collect.ImmutableSet;
16  import com.google.common.collect.Iterables;
17  import com.google.common.collect.Sets;
18  
19  import java.io.IOException;
20  import java.util.Collections;
21  import java.util.List;
22  import java.util.Set;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileStatus;
28  import org.apache.hadoop.hbase.Abortable;
29  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
34  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
35  import org.apache.hadoop.hbase.replication.ReplicationFactory;
36  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
37  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38  import org.apache.zookeeper.KeeperException;
39  
40  /**
41   * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
42   * deleting it from hfile archive directory.
43   */
44  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
45  public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
46    private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
47    private ZooKeeperWatcher zkw;
48    private ReplicationQueuesClient rqc;
49    private boolean stopped = false;
50  
51    @Override
52    public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
53      // all members of this class are null if replication is disabled,
54      // so we cannot filter the files
55      if (this.getConf() == null) {
56        return files;
57      }
58  
59      final Set<String> hfileRefs;
60      try {
61        // The concurrently created new hfile entries in ZK may not be included in the return list,
62        // but they won't be deleted because they're not in the checking set.
63        hfileRefs = loadHFileRefsFromPeers();
64      } catch (KeeperException e) {
65        LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
66        return Collections.emptyList();
67      }
68      return Iterables.filter(files, new Predicate<FileStatus>() {
69        @Override
70        public boolean apply(FileStatus file) {
71          String hfile = file.getPath().getName();
72          boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
73          if (LOG.isDebugEnabled()) {
74            if (foundHFileRefInQueue) {
75              LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
76            } else {
77              LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
78            }
79          }
80          return !foundHFileRefInQueue;
81        }
82      });
83    }
84  
85    /**
86     * Load all hfile references in all replication queues from ZK. This method guarantees to return a
87     * snapshot which contains all hfile references in the zookeeper at the start of this call.
88     * However, some newly created hfile references during the call may not be included.
89     */
90    private Set<String> loadHFileRefsFromPeers() throws KeeperException {
91      Set<String> hfileRefs = Sets.newHashSet();
92      List<String> listOfPeers;
93      for (int retry = 0;; retry++) {
94        int v0 = rqc.getHFileRefsNodeChangeVersion();
95        hfileRefs.clear();
96        listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
97        if (listOfPeers == null) {
98          LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
99          return ImmutableSet.of();
100       }
101       for (String id : listOfPeers) {
102         List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
103         if (peerHFileRefs != null) {
104           hfileRefs.addAll(peerHFileRefs);
105         }
106       }
107       int v1 = rqc.getHFileRefsNodeChangeVersion();
108       if (v0 == v1) {
109         return hfileRefs;
110       }
111       LOG.debug(String.format("Replication hfile references node cversion changed from "
112           + "%d to %d, retry = %d", v0, v1, retry));
113     }
114   }
115 
116   @Override
117   public void setConf(Configuration config) {
118     // If either replication or replication of bulk load hfiles is disabled, keep all members null
119     if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
120       HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
121       HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
122       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
123       LOG.warn(HConstants.REPLICATION_ENABLE_KEY
124           + " is not enabled so allowing all hfile references to be deleted. Better to remove "
125           + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
126           + " configuration.");
127       return;
128     }
129     // Make my own Configuration. Then I'll have my own connection to zk that
130     // I can close myself when time comes.
131     Configuration conf = new Configuration(config);
132     try {
133       setConf(conf, new ZooKeeperWatcher(conf, "replicationHFileCleaner", null));
134     } catch (IOException e) {
135       LOG.error("Error while configuring " + this.getClass().getName(), e);
136     }
137   }
138 
139   @VisibleForTesting
140   public void setConf(Configuration conf, ZooKeeperWatcher zk) {
141     super.setConf(conf);
142     try {
143       initReplicationQueuesClient(conf, zk);
144     } catch (IOException e) {
145       LOG.error("Error while configuring " + this.getClass().getName(), e);
146     }
147   }
148 
149   private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk)
150       throws ZooKeeperConnectionException, IOException {
151     this.zkw = zk;
152     this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable());
153   }
154 
155   @Override
156   public void stop(String why) {
157     if (this.stopped) {
158       return;
159     }
160     this.stopped = true;
161     if (this.zkw != null) {
162       LOG.info("Stopping " + this.zkw);
163       this.zkw.close();
164     }
165   }
166 
167   @Override
168   public boolean isStopped() {
169     return this.stopped;
170   }
171 
172   @Override
173   public boolean isFileDeletable(FileStatus fStat) {
174     Set<String> hfileRefsFromQueue;
175     // all members of this class are null if replication is disabled,
176     // so do not stop from deleting the file
177     if (getConf() == null) {
178       return true;
179     }
180 
181     try {
182       hfileRefsFromQueue = loadHFileRefsFromPeers();
183     } catch (KeeperException e) {
184       LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
185           + "file for " + fStat.getPath());
186       return false;
187     }
188     return !hfileRefsFromQueue.contains(fStat.getPath().getName());
189   }
190 
191   private static class WarnOnlyAbortable implements Abortable {
192     @Override
193     public void abort(String why, Throwable e) {
194       LOG.warn("ReplicationHFileCleaner received abort, ignoring.  Reason: " + why);
195       if (LOG.isDebugEnabled()) {
196         LOG.debug(e);
197       }
198     }
199 
200     @Override
201     public boolean isAborted() {
202       return false;
203     }
204   }
205 }