1
2
3
4
5
6
7
8
9
10
11 package org.apache.hadoop.hbase.master.cleaner;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertFalse;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.Mockito.doThrow;
18 import static org.mockito.Mockito.spy;
19
20 import com.google.common.collect.Lists;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Abortable;
35 import org.apache.hadoop.hbase.ChoreService;
36 import org.apache.hadoop.hbase.CoordinatedStateManager;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.Server;
40 import org.apache.hadoop.hbase.ServerName;
41 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
42 import org.apache.hadoop.hbase.client.ClusterConnection;
43 import org.apache.hadoop.hbase.replication.ReplicationException;
44 import org.apache.hadoop.hbase.replication.ReplicationFactory;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46 import org.apache.hadoop.hbase.replication.ReplicationPeers;
47 import org.apache.hadoop.hbase.replication.ReplicationQueues;
48 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
49 import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
50 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
51 import org.apache.hadoop.hbase.replication.regionserver.Replication;
52 import org.apache.hadoop.hbase.testclassification.SmallTests;
53 import org.apache.hadoop.hbase.util.Pair;
54 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
55 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
56 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57 import org.apache.zookeeper.KeeperException;
58 import org.apache.zookeeper.data.Stat;
59 import org.junit.After;
60 import org.junit.AfterClass;
61 import org.junit.Before;
62 import org.junit.BeforeClass;
63 import org.junit.Test;
64 import org.junit.experimental.categories.Category;
65 import org.mockito.Mockito;
66
67 @Category({ SmallTests.class })
68 public class TestReplicationHFileCleaner {
69 private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
70 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71 private static Server server;
72 private static ReplicationQueues rq;
73 private static ReplicationPeers rp;
74 private static final String peerId = "TestReplicationHFileCleaner";
75 private static Configuration conf = TEST_UTIL.getConfiguration();
76 static FileSystem fs = null;
77 Path root;
78
79
80
81
82 @BeforeClass
83 public static void setUpBeforeClass() throws Exception {
84 TEST_UTIL.startMiniZKCluster();
85 server = new DummyServer();
86 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
87 Replication.decorateMasterConfiguration(conf);
88 rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
89 rp.init();
90
91 rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
92 rq.init(server.getServerName().toString());
93 try {
94 fs = FileSystem.get(conf);
95 } finally {
96 if (fs != null) {
97 fs.close();
98 }
99 }
100 }
101
102
103
104
105 @AfterClass
106 public static void tearDownAfterClass() throws Exception {
107 TEST_UTIL.shutdownMiniZKCluster();
108 }
109
110 @Before
111 public void setup() throws ReplicationException, IOException {
112 root = TEST_UTIL.getDataTestDirOnTestFS();
113 rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
114 }
115
116 @After
117 public void cleanup() throws ReplicationException {
118 try {
119 fs.delete(root, true);
120 } catch (IOException e) {
121 LOG.warn("Failed to delete files recursively from path " + root);
122 }
123 rp.removePeer(peerId);
124 }
125
126 @Test
127 public void testIsFileDeletable() throws IOException, ReplicationException {
128
129 Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
130 fs.createNewFile(file);
131
132 assertTrue("Test file not created!", fs.exists(file));
133 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
134 cleaner.setConf(conf);
135
136 assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
137 + "for it in the queue.",
138 cleaner.isFileDeletable(fs.getFileStatus(file)));
139
140 List<Pair<Path, Path>> files = new ArrayList<>(1);
141 files.add(new Pair<Path, Path>(null, file));
142
143 rq.addHFileRefs(peerId, files);
144
145 assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
146 + "for it in the queue.",
147 cleaner.isFileDeletable(fs.getFileStatus(file)));
148 }
149
150 @Test
151 public void testGetDeletableFiles() throws Exception {
152
153 Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
154 fs.createNewFile(notDeletablefile);
155 assertTrue("Test file not created!", fs.exists(notDeletablefile));
156 Path deletablefile = new Path(root, "testGetDeletableFiles_2");
157 fs.createNewFile(deletablefile);
158 assertTrue("Test file not created!", fs.exists(deletablefile));
159
160 List<FileStatus> files = new ArrayList<FileStatus>(2);
161 FileStatus f = new FileStatus();
162 f.setPath(deletablefile);
163 files.add(f);
164 f = new FileStatus();
165 f.setPath(notDeletablefile);
166 files.add(f);
167
168 List<Pair<Path, Path>> hfiles = new ArrayList<>(1);
169 hfiles.add(new Pair<Path, Path>(null, notDeletablefile));
170
171 rq.addHFileRefs(peerId, hfiles);
172
173 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
174 cleaner.setConf(conf);
175 Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
176 int i = 0;
177 while (deletableFilesIterator.hasNext() && i < 2) {
178 i++;
179 }
180
181 if (i > 2) {
182 fail("File " + notDeletablefile
183 + " should not be deletable as its hfile reference node is not added.");
184 }
185 assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
186 }
187
188
189
190
191
192 @Test(timeout = 15000)
193 public void testForDifferntHFileRefsZnodeVersion() throws Exception {
194
195 Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
196 fs.createNewFile(file);
197
198 assertTrue("Test file not created!", fs.exists(file));
199 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
200 cleaner.setConf(conf);
201
202 ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
203
204 Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
205
206 Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
207 Field rqc = cleanerClass.getDeclaredField("rqc");
208 rqc.setAccessible(true);
209 rqc.set(cleaner, replicationQueuesClient);
210
211 cleaner.isFileDeletable(fs.getFileStatus(file));
212 }
213
214
215
216
217 @Test
218 public void testZooKeeperAbort() throws Exception {
219 ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
220
221 List<FileStatus> dummyFiles =
222 Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
223 "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
224 "hfile2")));
225
226 FaultyZooKeeperWatcher faultyZK =
227 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
228 try {
229 faultyZK.init();
230 cleaner.setConf(conf, faultyZK);
231
232 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
233 assertFalse(toDelete.iterator().hasNext());
234 assertFalse(cleaner.isStopped());
235 } finally {
236 faultyZK.close();
237 }
238
239
240 cleaner = new ReplicationHFileCleaner();
241 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
242 try {
243 cleaner.setConf(conf, zkw);
244 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
245 Iterator<FileStatus> iter = filesToDelete.iterator();
246 assertTrue(iter.hasNext());
247 assertEquals(new Path("hfile1"), iter.next().getPath());
248 assertTrue(iter.hasNext());
249 assertEquals(new Path("hfile2"), iter.next().getPath());
250 assertFalse(iter.hasNext());
251 } finally {
252 zkw.close();
253 }
254 }
255
256 static class DummyServer implements Server {
257
258 @Override
259 public Configuration getConfiguration() {
260 return TEST_UTIL.getConfiguration();
261 }
262
263 @Override
264 public ZooKeeperWatcher getZooKeeper() {
265 try {
266 return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
267 } catch (IOException e) {
268 e.printStackTrace();
269 }
270 return null;
271 }
272
273 @Override
274 public CoordinatedStateManager getCoordinatedStateManager() {
275 return null;
276 }
277
278 @Override
279 public ClusterConnection getConnection() {
280 return null;
281 }
282
283 @Override
284 public MetaTableLocator getMetaTableLocator() {
285 return null;
286 }
287
288 @Override
289 public ServerName getServerName() {
290 return ServerName.valueOf("regionserver,60020,000000");
291 }
292
293 @Override
294 public void abort(String why, Throwable e) {
295 }
296
297 @Override
298 public boolean isAborted() {
299 return false;
300 }
301
302 @Override
303 public void stop(String why) {
304 }
305
306 @Override
307 public boolean isStopped() {
308 return false;
309 }
310
311 @Override
312 public ChoreService getChoreService() {
313 return null;
314 }
315 }
316
317 static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
318 private RecoverableZooKeeper zk;
319 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
320 throws ZooKeeperConnectionException, IOException {
321 super(conf, identifier, abortable);
322 }
323
324 public void init() throws Exception {
325 this.zk = spy(super.getRecoverableZooKeeper());
326 doThrow(new KeeperException.ConnectionLossException())
327 .when(zk).getData("/hbase/replication/hfile-refs", null, new Stat());
328 }
329
330 public RecoverableZooKeeper getRecoverableZooKeeper() {
331 return zk;
332 }
333 }
334 }