1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master.cleaner;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.mockito.Mockito.doThrow;
24 import static org.mockito.Mockito.spy;
25
26 import java.io.IOException;
27 import java.lang.reflect.Field;
28 import java.net.URLEncoder;
29 import java.util.Iterator;
30 import java.util.LinkedList;
31 import java.util.List;
32
33 import com.google.common.collect.Lists;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Abortable;
39 import org.apache.hadoop.hbase.ChoreService;
40 import org.apache.hadoop.hbase.CoordinatedStateManager;
41 import org.apache.hadoop.hbase.HBaseTestingUtility;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.Server;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.Waiter;
47 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
48 import org.apache.hadoop.hbase.client.ClusterConnection;
49 import org.apache.hadoop.hbase.replication.ReplicationFactory;
50 import org.apache.hadoop.hbase.replication.ReplicationQueues;
51 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
52 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
53 import org.apache.hadoop.hbase.replication.regionserver.Replication;
54 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
55 import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
56 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57 import org.apache.zookeeper.KeeperException;
58 import org.apache.zookeeper.data.Stat;
59 import org.junit.AfterClass;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import org.mockito.Mockito;
64
65 @Category(MediumTests.class)
66 public class TestLogsCleaner {
67
68 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
69
70
71
72
73 @BeforeClass
74 public static void setUpBeforeClass() throws Exception {
75 TEST_UTIL.startMiniZKCluster();
76 }
77
78
79
80
81 @AfterClass
82 public static void tearDownAfterClass() throws Exception {
83 TEST_UTIL.shutdownMiniZKCluster();
84 }
85
86 @Test
87 public void testLogCleaning() throws Exception{
88 Configuration conf = TEST_UTIL.getConfiguration();
89
90 long ttl = 10000;
91 conf.setLong("hbase.master.logcleaner.ttl", ttl);
92 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
93 Replication.decorateMasterConfiguration(conf);
94 Server server = new DummyServer();
95 ReplicationQueues repQueues =
96 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
97 repQueues.init(server.getServerName().toString());
98 final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
99 HConstants.HREGION_OLDLOGDIR_NAME);
100 String fakeMachineName =
101 URLEncoder.encode(server.getServerName().toString(), "UTF8");
102
103 final FileSystem fs = FileSystem.get(conf);
104
105
106 long now = System.currentTimeMillis();
107 fs.delete(oldLogDir, true);
108 fs.mkdirs(oldLogDir);
109
110 fs.createNewFile(new Path(oldLogDir, "a"));
111 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
112
113
114 System.out.println("Now is: " + now);
115 for (int i = 1; i < 31; i++) {
116
117
118 Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i) );
119 fs.createNewFile(fileName);
120
121
122
123
124 if (i % (30/3) == 1) {
125 repQueues.addLog(fakeMachineName, fileName.getName());
126 System.out.println("Replication log file: " + fileName);
127 }
128 }
129
130
131 Thread.sleep(ttl);
132 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
133
134
135
136 fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + 10000) ));
137
138 for (FileStatus stat : fs.listStatus(oldLogDir)) {
139 System.out.println(stat.getPath().toString());
140 }
141
142 assertEquals(34, fs.listStatus(oldLogDir).length);
143
144 LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
145
146 cleaner.chore();
147
148
149
150 TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
151 @Override
152 public boolean evaluate() throws Exception {
153 return 5 == fs.listStatus(oldLogDir).length;
154 }
155 });
156
157 for (FileStatus file : fs.listStatus(oldLogDir)) {
158 System.out.println("Kept log files: " + file.getPath().getName());
159 }
160 }
161
162 @Test(timeout=5000)
163 public void testZnodeCversionChange() throws Exception {
164 Configuration conf = TEST_UTIL.getConfiguration();
165 ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
166 cleaner.setConf(conf);
167
168 ReplicationQueuesClient rqcMock = Mockito.mock(ReplicationQueuesClient.class);
169 Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
170
171 Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
172 rqc.setAccessible(true);
173
174 rqc.set(cleaner, rqcMock);
175
176
177 cleaner.getDeletableFiles(new LinkedList<FileStatus>());
178 }
179
180
181
182
183
184 @Test
185 public void testZooKeeperAbort() throws Exception {
186 Configuration conf = TEST_UTIL.getConfiguration();
187 ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
188
189 List<FileStatus> dummyFiles = Lists.newArrayList(
190 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
191 new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
192 );
193
194 FaultyZooKeeperWatcher faultyZK =
195 new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
196 try {
197 faultyZK.init();
198 cleaner.setConf(conf, faultyZK);
199
200 Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
201 assertFalse(toDelete.iterator().hasNext());
202 assertFalse(cleaner.isStopped());
203 } finally {
204 faultyZK.close();
205 }
206
207
208 cleaner = new ReplicationLogCleaner();
209 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null);
210 try {
211 cleaner.setConf(conf, zkw);
212 Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
213 Iterator<FileStatus> iter = filesToDelete.iterator();
214 assertTrue(iter.hasNext());
215 assertEquals(new Path("log1"), iter.next().getPath());
216 assertTrue(iter.hasNext());
217 assertEquals(new Path("log2"), iter.next().getPath());
218 assertFalse(iter.hasNext());
219 } finally {
220 zkw.close();
221 }
222 }
223
224 static class DummyServer implements Server {
225
226 @Override
227 public Configuration getConfiguration() {
228 return TEST_UTIL.getConfiguration();
229 }
230
231 @Override
232 public ZooKeeperWatcher getZooKeeper() {
233 try {
234 return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
235 } catch (IOException e) {
236 e.printStackTrace();
237 }
238 return null;
239 }
240
241 @Override
242 public CoordinatedStateManager getCoordinatedStateManager() {
243 return null;
244 }
245
246 @Override
247 public ClusterConnection getConnection() {
248 return null;
249 }
250
251 @Override
252 public MetaTableLocator getMetaTableLocator() {
253 return null;
254 }
255
256 @Override
257 public ServerName getServerName() {
258 return ServerName.valueOf("regionserver,60020,000000");
259 }
260
261 @Override
262 public void abort(String why, Throwable e) {}
263
264 @Override
265 public boolean isAborted() {
266 return false;
267 }
268
269 @Override
270 public void stop(String why) {}
271
272 @Override
273 public boolean isStopped() {
274 return false;
275 }
276
277 @Override
278 public ChoreService getChoreService() {
279 return null;
280 }
281 }
282
283 static class FaultyZooKeeperWatcher extends ZooKeeperWatcher {
284 private RecoverableZooKeeper zk;
285
286 public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
287 throws ZooKeeperConnectionException, IOException {
288 super(conf, identifier, abortable);
289 }
290
291 public void init() throws Exception {
292 this.zk = spy(super.getRecoverableZooKeeper());
293 doThrow(new KeeperException.ConnectionLossException())
294 .when(zk).getData("/hbase/replication/rs", null, new Stat());
295 }
296
297 public RecoverableZooKeeper getRecoverableZooKeeper() {
298 return zk;
299 }
300 }
301 }