1
2
3
4
5
6
7
8
9
10
11 package org.apache.hadoop.hbase.replication.master;
12
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Predicate;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Iterables;
17 import com.google.common.collect.Sets;
18
19 import java.io.IOException;
20 import java.util.Collections;
21 import java.util.List;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileStatus;
28 import org.apache.hadoop.hbase.Abortable;
29 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
34 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
35 import org.apache.hadoop.hbase.replication.ReplicationFactory;
36 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
37 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38 import org.apache.zookeeper.KeeperException;
39
40
41
42
43
44 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
45 public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
46 private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
47 private ZooKeeperWatcher zkw;
48 private ReplicationQueuesClient rqc;
49 private boolean stopped = false;
50
51 @Override
52 public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
53
54
55 if (this.getConf() == null) {
56 return files;
57 }
58
59 final Set<String> hfileRefs;
60 try {
61
62
63 hfileRefs = loadHFileRefsFromPeers();
64 } catch (KeeperException e) {
65 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
66 return Collections.emptyList();
67 }
68 return Iterables.filter(files, new Predicate<FileStatus>() {
69 @Override
70 public boolean apply(FileStatus file) {
71 String hfile = file.getPath().getName();
72 boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
73 if (LOG.isDebugEnabled()) {
74 if (foundHFileRefInQueue) {
75 LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
76 } else {
77 LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
78 }
79 }
80 return !foundHFileRefInQueue;
81 }
82 });
83 }
84
85
86
87
88
89
90 private Set<String> loadHFileRefsFromPeers() throws KeeperException {
91 Set<String> hfileRefs = Sets.newHashSet();
92 List<String> listOfPeers;
93 for (int retry = 0;; retry++) {
94 int v0 = rqc.getHFileRefsNodeChangeVersion();
95 hfileRefs.clear();
96 listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
97 if (listOfPeers == null) {
98 LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
99 return ImmutableSet.of();
100 }
101 for (String id : listOfPeers) {
102 List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
103 if (peerHFileRefs != null) {
104 hfileRefs.addAll(peerHFileRefs);
105 }
106 }
107 int v1 = rqc.getHFileRefsNodeChangeVersion();
108 if (v0 == v1) {
109 return hfileRefs;
110 }
111 LOG.debug(String.format("Replication hfile references node cversion changed from "
112 + "%d to %d, retry = %d", v0, v1, retry));
113 }
114 }
115
116 @Override
117 public void setConf(Configuration config) {
118
119 if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
120 HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
121 HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
122 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
123 LOG.warn(HConstants.REPLICATION_ENABLE_KEY
124 + " is not enabled so allowing all hfile references to be deleted. Better to remove "
125 + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
126 + " configuration.");
127 return;
128 }
129
130
131 Configuration conf = new Configuration(config);
132 try {
133 setConf(conf, new ZooKeeperWatcher(conf, "replicationHFileCleaner", null));
134 } catch (IOException e) {
135 LOG.error("Error while configuring " + this.getClass().getName(), e);
136 }
137 }
138
139 @VisibleForTesting
140 public void setConf(Configuration conf, ZooKeeperWatcher zk) {
141 super.setConf(conf);
142 try {
143 initReplicationQueuesClient(conf, zk);
144 } catch (IOException e) {
145 LOG.error("Error while configuring " + this.getClass().getName(), e);
146 }
147 }
148
149 private void initReplicationQueuesClient(Configuration conf, ZooKeeperWatcher zk)
150 throws ZooKeeperConnectionException, IOException {
151 this.zkw = zk;
152 this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, new WarnOnlyAbortable());
153 }
154
155 @Override
156 public void stop(String why) {
157 if (this.stopped) {
158 return;
159 }
160 this.stopped = true;
161 if (this.zkw != null) {
162 LOG.info("Stopping " + this.zkw);
163 this.zkw.close();
164 }
165 }
166
167 @Override
168 public boolean isStopped() {
169 return this.stopped;
170 }
171
172 @Override
173 public boolean isFileDeletable(FileStatus fStat) {
174 Set<String> hfileRefsFromQueue;
175
176
177 if (getConf() == null) {
178 return true;
179 }
180
181 try {
182 hfileRefsFromQueue = loadHFileRefsFromPeers();
183 } catch (KeeperException e) {
184 LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
185 + "file for " + fStat.getPath());
186 return false;
187 }
188 return !hfileRefsFromQueue.contains(fStat.getPath().getName());
189 }
190
191 private static class WarnOnlyAbortable implements Abortable {
192 @Override
193 public void abort(String why, Throwable e) {
194 LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why);
195 if (LOG.isDebugEnabled()) {
196 LOG.debug(e);
197 }
198 }
199
200 @Override
201 public boolean isAborted() {
202 return false;
203 }
204 }
205 }