View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  package org.apache.hadoop.hbase.master.cleaner;
12  
13  import static org.junit.Assert.assertEquals;
14  import static org.junit.Assert.assertFalse;
15  import static org.junit.Assert.assertTrue;
16  import static org.junit.Assert.fail;
17  import static org.mockito.Mockito.doThrow;
18  import static org.mockito.Mockito.spy;
19  
20  import com.google.common.collect.Lists;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Abortable;
35  import org.apache.hadoop.hbase.ChoreService;
36  import org.apache.hadoop.hbase.CoordinatedStateManager;
37  import org.apache.hadoop.hbase.HBaseTestingUtility;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
42  import org.apache.hadoop.hbase.client.ClusterConnection;
43  import org.apache.hadoop.hbase.replication.ReplicationException;
44  import org.apache.hadoop.hbase.replication.ReplicationFactory;
45  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
46  import org.apache.hadoop.hbase.replication.ReplicationPeers;
47  import org.apache.hadoop.hbase.replication.ReplicationQueues;
48  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
49  import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
50  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
51  import org.apache.hadoop.hbase.replication.regionserver.Replication;
52  import org.apache.hadoop.hbase.testclassification.SmallTests;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
55  import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.zookeeper.KeeperException;
58  import org.apache.zookeeper.data.Stat;
59  import org.junit.After;
60  import org.junit.AfterClass;
61  import org.junit.Before;
62  import org.junit.BeforeClass;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.mockito.Mockito;
66  
67  @Category({ SmallTests.class })
68  public class TestReplicationHFileCleaner {
69    private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
70    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71    private static Server server;
72    private static ReplicationQueues rq;
73    private static ReplicationPeers rp;
74    private static final String peerId = "TestReplicationHFileCleaner";
75    private static Configuration conf = TEST_UTIL.getConfiguration();
76    static FileSystem fs = null;
77    Path root;
78  
79    /**
80     * @throws java.lang.Exception
81     */
82    @BeforeClass
83    public static void setUpBeforeClass() throws Exception {
84      TEST_UTIL.startMiniZKCluster();
85      server = new DummyServer();
86      conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
87      Replication.decorateMasterConfiguration(conf);
88      rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
89      rp.init();
90  
91      rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
92      rq.init(server.getServerName().toString());
93      try {
94        fs = FileSystem.get(conf);
95      } finally {
96        if (fs != null) {
97          fs.close();
98        }
99      }
100   }
101 
102   /**
103    * @throws java.lang.Exception
104    */
105   @AfterClass
106   public static void tearDownAfterClass() throws Exception {
107     TEST_UTIL.shutdownMiniZKCluster();
108   }
109 
110   @Before
111   public void setup() throws ReplicationException, IOException {
112     root = TEST_UTIL.getDataTestDirOnTestFS();
113     rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
114   }
115 
116   @After
117   public void cleanup() throws ReplicationException {
118     try {
119       fs.delete(root, true);
120     } catch (IOException e) {
121       LOG.warn("Failed to delete files recursively from path " + root);
122     }
123     rp.removePeer(peerId);
124   }
125 
126   @Test
127   public void testIsFileDeletable() throws IOException, ReplicationException {
128     // 1. Create a file
129     Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
130     fs.createNewFile(file);
131     // 2. Assert file is successfully created
132     assertTrue("Test file not created!", fs.exists(file));
133     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
134     cleaner.setConf(conf);
135     // 3. Assert that file as is should be deletable
136     assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
137         + "for it in the queue.",
138       cleaner.isFileDeletable(fs.getFileStatus(file)));
139 
140     List<Pair<Path, Path>> files = new ArrayList<>(1);
141     files.add(new Pair<Path, Path>(null, file));
142     // 4. Add the file to hfile-refs queue
143     rq.addHFileRefs(peerId, files);
144     // 5. Assert file should not be deletable
145     assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
146         + "for it in the queue.",
147       cleaner.isFileDeletable(fs.getFileStatus(file)));
148   }
149 
150   @Test
151   public void testGetDeletableFiles() throws Exception {
152     // 1. Create two files and assert that they do not exist
153     Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
154     fs.createNewFile(notDeletablefile);
155     assertTrue("Test file not created!", fs.exists(notDeletablefile));
156     Path deletablefile = new Path(root, "testGetDeletableFiles_2");
157     fs.createNewFile(deletablefile);
158     assertTrue("Test file not created!", fs.exists(deletablefile));
159 
160     List<FileStatus> files = new ArrayList<FileStatus>(2);
161     FileStatus f = new FileStatus();
162     f.setPath(deletablefile);
163     files.add(f);
164     f = new FileStatus();
165     f.setPath(notDeletablefile);
166     files.add(f);
167 
168     List<Pair<Path, Path>> hfiles = new ArrayList<>(1);
169     hfiles.add(new Pair<Path, Path>(null, notDeletablefile));
170     // 2. Add one file to hfile-refs queue
171     rq.addHFileRefs(peerId, hfiles);
172 
173     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
174     cleaner.setConf(conf);
175     Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
176     int i = 0;
177     while (deletableFilesIterator.hasNext() && i < 2) {
178       i++;
179     }
180     // 5. Assert one file should not be deletable and it is present in the list returned
181     if (i > 2) {
182       fail("File " + notDeletablefile
183           + " should not be deletable as its hfile reference node is not added.");
184     }
185     assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
186   }
187 
188   /*
189    * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
190    * will end up in a infinite loop, so it will timeout.
191    */
192   @Test(timeout = 15000)
193   public void testForDifferntHFileRefsZnodeVersion() throws Exception {
194     // 1. Create a file
195     Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
196     fs.createNewFile(file);
197     // 2. Assert file is successfully created
198     assertTrue("Test file not created!", fs.exists(file));
199     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
200     cleaner.setConf(conf);
201 
202     ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
203     //Return different znode version for each call
204     Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
205 
206     Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
207     Field rqc = cleanerClass.getDeclaredField("rqc");
208     rqc.setAccessible(true);
209     rqc.set(cleaner, replicationQueuesClient);
210 
211     cleaner.isFileDeletable(fs.getFileStatus(file));
212   }
213 
214   /**
215    * ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting.
216    */
217   @Test
218   public void testZooKeeperAbort() throws Exception {
219     ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
220 
221     List<FileStatus> dummyFiles =
222         Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
223             "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path(
224             "hfile2")));
225 
226     FaultyZooKeeperWatcher faultyZK =
227         new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
228     try {
229       faultyZK.init();
230       cleaner.setConf(conf, faultyZK);
231       // should keep all files due to a ConnectionLossException getting the queues znodes
232       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
233       assertFalse(toDelete.iterator().hasNext());
234       assertFalse(cleaner.isStopped());
235     } finally {
236       faultyZK.close();
237     }
238 
239     // when zk is working both files should be returned
240     cleaner = new ReplicationHFileCleaner();
241     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
242     try {
243       cleaner.setConf(conf, zkw);
244       Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
245       Iterator<FileStatus> iter = filesToDelete.iterator();
246       assertTrue(iter.hasNext());
247       assertEquals(new Path("hfile1"), iter.next().getPath());
248       assertTrue(iter.hasNext());
249       assertEquals(new Path("hfile2"), iter.next().getPath());
250       assertFalse(iter.hasNext());
251     } finally {
252       zkw.close();
253     }
254   }
255 
256   static class DummyServer implements Server {
257 
258     @Override
259     public Configuration getConfiguration() {
260       return TEST_UTIL.getConfiguration();
261     }
262 
263     @Override
264     public ZooKeeperWatcher getZooKeeper() {
265       try {
266         return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
267       } catch (IOException e) {
268         e.printStackTrace();
269       }
270       return null;
271     }
272 
273     @Override
274     public CoordinatedStateManager getCoordinatedStateManager() {
275       return null;
276     }
277 
278     @Override
279     public ClusterConnection getConnection() {
280       return null;
281     }
282 
283     @Override
284     public MetaTableLocator getMetaTableLocator() {
285       return null;
286     }
287 
288     @Override
289     public ServerName getServerName() {
290       return ServerName.valueOf("regionserver,60020,000000");
291     }
292 
293     @Override
294     public void abort(String why, Throwable e) {
295     }
296 
297     @Override
298     public boolean isAborted() {
299       return false;
300     }
301 
302     @Override
303     public void stop(String why) {
304     }
305 
306     @Override
307     public boolean isStopped() {
308       return false;
309     }
310 
311     @Override
312     public ChoreService getChoreService() {
313       return null;
314     }
315   }
316 
317   static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
318     private RecoverableZooKeeper zk;
319     public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
320         throws ZooKeeperConnectionException, IOException {
321       super(conf, identifier, abortable);
322     }
323 
324     public void init() throws Exception {
325       this.zk = spy(super.getRecoverableZooKeeper());
326       doThrow(new KeeperException.ConnectionLossException())
327           .when(zk).getData("/hbase/replication/hfile-refs", null, new Stat());
328     }
329 
330     public RecoverableZooKeeper getRecoverableZooKeeper() {
331       return zk;
332     }
333   }
334 }