View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.cleaner;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.mockito.Mockito.doThrow;
24  import static org.mockito.Mockito.spy;
25  
26  import java.io.IOException;
27  import java.lang.reflect.Field;
28  import java.net.URLEncoder;
29  import java.util.Iterator;
30  import java.util.LinkedList;
31  import java.util.List;
32  
33  import com.google.common.collect.Lists;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileStatus;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Abortable;
39  import org.apache.hadoop.hbase.ChoreService;
40  import org.apache.hadoop.hbase.CoordinatedStateManager;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.testclassification.MediumTests;
44  import org.apache.hadoop.hbase.Server;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.Waiter;
47  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
48  import org.apache.hadoop.hbase.client.ClusterConnection;
49  import org.apache.hadoop.hbase.replication.ReplicationFactory;
50  import org.apache.hadoop.hbase.replication.ReplicationQueues;
51  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
52  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
53  import org.apache.hadoop.hbase.replication.regionserver.Replication;
54  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
55  import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.zookeeper.KeeperException;
58  import org.apache.zookeeper.data.Stat;
59  import org.junit.AfterClass;
60  import org.junit.BeforeClass;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  import org.mockito.Mockito;
64  
65  @Category(MediumTests.class)
66  public class TestLogsCleaner {
67  
68    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
69  
70    /**
71     * @throws java.lang.Exception
72     */
73    @BeforeClass
74    public static void setUpBeforeClass() throws Exception {
75      TEST_UTIL.startMiniZKCluster();
76    }
77  
78    /**
79     * @throws java.lang.Exception
80     */
81    @AfterClass
82    public static void tearDownAfterClass() throws Exception {
83      TEST_UTIL.shutdownMiniZKCluster();
84    }
85  
86    @Test
87    public void testLogCleaning() throws Exception{
88      Configuration conf = TEST_UTIL.getConfiguration();
89      // set TTL
90      long ttl = 10000;
91      conf.setLong("hbase.master.logcleaner.ttl", ttl);
92      conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
93      Replication.decorateMasterConfiguration(conf);
94      Server server = new DummyServer();
95      ReplicationQueues repQueues =
96          ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
97      repQueues.init(server.getServerName().toString());
98      final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
99          HConstants.HREGION_OLDLOGDIR_NAME);
100     String fakeMachineName =
101       URLEncoder.encode(server.getServerName().toString(), "UTF8");
102 
103     final FileSystem fs = FileSystem.get(conf);
104 
105     // Create 2 invalid files, 1 "recent" file, 1 very new file and 30 old files
106     long now = System.currentTimeMillis();
107     fs.delete(oldLogDir, true);
108     fs.mkdirs(oldLogDir);
109     // Case 1: 2 invalid files, which would be deleted directly
110     fs.createNewFile(new Path(oldLogDir, "a"));
111     fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
112     // Case 2: 1 "recent" file, not even deletable for the first log cleaner
113     // (TimeToLiveLogCleaner), so we are not going down the chain
114     System.out.println("Now is: " + now);
115     for (int i = 1; i < 31; i++) {
116       // Case 3: old files which would be deletable for the first log cleaner
117       // (TimeToLiveLogCleaner), and also for the second (ReplicationLogCleaner)
118       Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
119       fs.createNewFile(fileName);
120       // Case 4: put 3 old log files in ZK indicating that they are scheduled
121       // for replication so these files would pass the first log cleaner
122       // (TimeToLiveLogCleaner) but would be rejected by the second
123       // (ReplicationLogCleaner)
124       if (i % (30/3) == 1) {
125         repQueues.addLog(fakeMachineName, fileName.getName());
126         System.out.println("Replication log file: " + fileName);
127       }
128     }
129 
130     // sleep for sometime to get newer modifcation time
131     Thread.sleep(ttl);
132     fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
133 
134     // Case 2: 1 newer file, not even deletable for the first log cleaner
135     // (TimeToLiveLogCleaner), so we are not going down the chain
136     fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
137 
138     for (FileStatus stat : fs.listStatus(oldLogDir)) {
139       System.out.println(stat.getPath().toString());
140     }
141 
142     assertEquals(34, fs.listStatus(oldLogDir).length);
143 
144     LogCleaner cleaner  = new LogCleaner(1000, server, conf, fs, oldLogDir);
145 
146     cleaner.chore();
147 
148     // We end up with the current log file, a newer one and the 3 old log
149     // files which are scheduled for replication
150     TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
151       @Override
152       public boolean evaluate() throws Exception {
153         return 5 == fs.listStatus(oldLogDir).length;
154       }
155     });
156 
157     for (FileStatus file : fs.listStatus(oldLogDir)) {
158       System.out.println("Kept log files: " + file.getPath().getName());
159     }
160   }
161 
162   @Test(timeout=5000)
163   public void testZnodeCversionChange() throws Exception {
164     Configuration conf = TEST_UTIL.getConfiguration();
165     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
166     cleaner.setConf(conf);
167 
168     ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class);
169     Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
170 
171     Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
172     rqc.setAccessible(true);
173 
174     rqc.set(cleaner, rqcMock);
175 
176     // This should return eventually when cversion stabilizes
177     cleaner.getDeletableFiles(new LinkedList<FileStatus>());
178   }
179 
180   /**
181    * ReplicationLogCleaner should be able to ride over ZooKeeper errors without
182    * aborting.
183    */
184   @Test
185   public void testZooKeeperAbort() throws Exception {
186     Configuration conf = TEST_UTIL.getConfiguration();
187     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
188 
189     List<FileStatus> dummyFiles = Lists.newArrayList(
190         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
191         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
192     );
193 
194     FaultyZooKeeperWatcher faultyZK =
195         new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
196     try {
197       faultyZK.init();
198       cleaner.setConf(conf, faultyZK);
199       // should keep all files due to a ConnectionLossException getting the queues znodes
200       Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
201       assertFalse(toDelete.iterator().hasNext());
202       assertFalse(cleaner.isStopped());
203     } finally {
204       faultyZK.close();
205     }
206 
207     // when zk is working both files should be returned
208     cleaner = new ReplicationLogCleaner();
209     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
210     try {
211       cleaner.setConf(conf, zkw);
212       Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
213       Iterator<FileStatus> iter = filesToDelete.iterator();
214       assertTrue(iter.hasNext());
215       assertEquals(new Path("log1"), iter.next().getPath());
216       assertTrue(iter.hasNext());
217       assertEquals(new Path("log2"), iter.next().getPath());
218       assertFalse(iter.hasNext());
219     } finally {
220       zkw.close();
221     }
222   }
223 
224   static class DummyServer implements Server {
225 
226     @Override
227     public Configuration getConfiguration() {
228       return TEST_UTIL.getConfiguration();
229     }
230 
231     @Override
232     public ZooKeeperWatcher getZooKeeper() {
233       try {
234         return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
235       } catch (IOException e) {
236         e.printStackTrace();
237       }
238       return null;
239     }
240 
241     @Override
242     public CoordinatedStateManager getCoordinatedStateManager() {
243       return null;
244     }
245 
246     @Override
247     public ClusterConnection getConnection() {
248       return null;
249     }
250 
251     @Override
252     public MetaTableLocator getMetaTableLocator() {
253       return null;
254     }
255 
256     @Override
257     public ServerName getServerName() {
258       return ServerName.valueOf("regionserver,60020,000000");
259     }
260 
261     @Override
262     public void abort(String why, Throwable e) {}
263 
264     @Override
265     public boolean isAborted() {
266       return false;
267     }
268 
269     @Override
270     public void stop(String why) {}
271 
272     @Override
273     public boolean isStopped() {
274       return false;
275     }
276 
277     @Override
278     public ChoreService getChoreService() {
279       return null;
280     }
281   }
282 
283   static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
284     private RecoverableZooKeeper zk;
285 
286     public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
287         throws ZooKeeperConnectionException, IOException {
288       super(conf, identifier, abortable);
289     }
290 
291     public void init() throws Exception {
292       this.zk = spy(super.getRecoverableZooKeeper());
293       doThrow(new KeeperException.ConnectionLossException())
294           .when(zk).getData("/hbase/replication/rs", null, new Stat());
295     }
296 
297     public RecoverableZooKeeper getRecoverableZooKeeper() {
298       return zk;
299     }
300   }
301 }