1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.example;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.ChoreService;
36 import org.apache.hadoop.hbase.HBaseTestingUtility;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.apache.hadoop.hbase.Stoppable;
41 import org.apache.hadoop.hbase.client.ClusterConnection;
42 import org.apache.hadoop.hbase.client.ConnectionFactory;
43 import org.apache.hadoop.hbase.client.Put;
44 import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
45 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
46 import org.apache.hadoop.hbase.regionserver.Region;
47 import org.apache.hadoop.hbase.regionserver.Store;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.FSUtils;
50 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
51 import org.apache.hadoop.hbase.util.StoppableImplementation;
52 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
53 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
54 import org.apache.zookeeper.KeeperException;
55 import org.junit.After;
56 import org.junit.AfterClass;
57 import org.junit.BeforeClass;
58 import org.junit.Test;
59 import org.junit.experimental.categories.Category;
60 import org.mockito.Mockito;
61 import org.mockito.invocation.InvocationOnMock;
62 import org.mockito.stubbing.Answer;
63
64
65
66
67
68 @Category(MediumTests.class)
69 public class TestZooKeeperTableArchiveClient {
70
71 private static final Log LOG = LogFactory.getLog(TestZooKeeperTableArchiveClient.class);
72 private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
73 private static final String STRING_TABLE_NAME = "test";
74 private static final byte[] TEST_FAM = Bytes.toBytes("fam");
75 private static final byte[] TABLE_NAME = Bytes.toBytes(STRING_TABLE_NAME);
76 private static ZKTableArchiveClient archivingClient;
77 private final List<Path> toCleanup = new ArrayList<Path>();
78 private static ClusterConnection CONNECTION;
79
80
81
82
83 @BeforeClass
84 public static void setupCluster() throws Exception {
85 setupConf(UTIL.getConfiguration());
86 UTIL.startMiniZKCluster();
87 CONNECTION = (ClusterConnection)ConnectionFactory.createConnection(UTIL.getConfiguration());
88 archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
89
90 ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
91 String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
92 ZKUtil.createWithParents(watcher, archivingZNode);
93 }
94
95 private static void setupConf(Configuration conf) {
96
97 conf.setInt("hbase.hstore.compaction.min", 3);
98 }
99
100 @After
101 public void tearDown() throws Exception {
102 try {
103 FileSystem fs = UTIL.getTestFileSystem();
104
105 for (Path file : toCleanup) {
106
107 FSUtils.delete(fs, file, true);
108 }
109 } catch (IOException e) {
110 LOG.warn("Failure to delete archive directory", e);
111 } finally {
112 toCleanup.clear();
113 }
114
115 archivingClient.disableHFileBackup();
116 }
117
118 @AfterClass
119 public static void cleanupTest() throws Exception {
120 try {
121 CONNECTION.close();
122 UTIL.shutdownMiniZKCluster();
123 } catch (Exception e) {
124 LOG.warn("problem shutting down cluster", e);
125 }
126 }
127
128
129
130
131 @Test (timeout=300000)
132 public void testArchivingEnableDisable() throws Exception {
133
134 LOG.debug("----Starting archiving");
135 archivingClient.enableHFileBackupAsync(TABLE_NAME);
136 assertTrue("Archving didn't get turned on", archivingClient
137 .getArchivingEnabled(TABLE_NAME));
138
139
140 archivingClient.disableHFileBackup();
141 assertFalse("Archving didn't get turned off.", archivingClient.getArchivingEnabled(TABLE_NAME));
142
143
144 archivingClient.enableHFileBackupAsync(TABLE_NAME);
145 assertTrue("Archving didn't get turned on", archivingClient
146 .getArchivingEnabled(TABLE_NAME));
147
148
149 archivingClient.disableHFileBackup(TABLE_NAME);
150 assertFalse("Archving didn't get turned off for " + STRING_TABLE_NAME,
151 archivingClient.getArchivingEnabled(TABLE_NAME));
152 }
153
154 @Test (timeout=300000)
155 public void testArchivingOnSingleTable() throws Exception {
156 createArchiveDirectory();
157 FileSystem fs = UTIL.getTestFileSystem();
158 Path archiveDir = getArchiveDir();
159 Path tableDir = getTableDir(STRING_TABLE_NAME);
160 toCleanup.add(archiveDir);
161 toCleanup.add(tableDir);
162
163 Configuration conf = UTIL.getConfiguration();
164
165 Stoppable stop = new StoppableImplementation();
166 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
167 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
168 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
169
170
171 HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
172 Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
173
174 loadFlushAndCompact(region, TEST_FAM);
175
176
177 List<Path> files = getAllFiles(fs, archiveDir);
178 if (files == null) {
179 FSUtils.logFileSystemState(fs, UTIL.getDataTestDir(), LOG);
180 throw new RuntimeException("Didn't archive any files!");
181 }
182 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size());
183
184 runCleaner(cleaner, finished, stop);
185
186
187 List<Path> archivedFiles = getAllFiles(fs, archiveDir);
188 assertEquals("Archived files changed after running archive cleaner.", files, archivedFiles);
189
190
191 assertTrue(fs.exists(HFileArchiveUtil.getArchivePath(UTIL.getConfiguration())));
192 }
193
194
195
196
197
198 @Test (timeout=300000)
199 public void testMultipleTables() throws Exception {
200 createArchiveDirectory();
201 String otherTable = "otherTable";
202
203 FileSystem fs = UTIL.getTestFileSystem();
204 Path archiveDir = getArchiveDir();
205 Path tableDir = getTableDir(STRING_TABLE_NAME);
206 Path otherTableDir = getTableDir(otherTable);
207
208
209 toCleanup.add(archiveDir);
210 toCleanup.add(tableDir);
211 toCleanup.add(otherTableDir);
212 Configuration conf = UTIL.getConfiguration();
213
214 Stoppable stop = new StoppableImplementation();
215 final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
216 HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
217 List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
218 final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
219
220
221 HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
222 Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
223 loadFlushAndCompact(region, TEST_FAM);
224
225
226 hcd = new HColumnDescriptor(TEST_FAM);
227 Region otherRegion = UTIL.createTestRegion(otherTable, hcd);
228 loadFlushAndCompact(otherRegion, TEST_FAM);
229
230
231 List<Path> files = getAllFiles(fs, archiveDir);
232 if (files == null) {
233 FSUtils.logFileSystemState(fs, archiveDir, LOG);
234 throw new RuntimeException("Didn't load archive any files!");
235 }
236
237
238 int initialCountForPrimary = 0;
239 int initialCountForOtherTable = 0;
240 for (Path file : files) {
241 String tableName = file.getParent().getParent().getParent().getName();
242
243 if (tableName.equals(otherTable)) initialCountForOtherTable++;
244 else if (tableName.equals(STRING_TABLE_NAME)) initialCountForPrimary++;
245 }
246
247 assertTrue("Didn't archive files for:" + STRING_TABLE_NAME, initialCountForPrimary > 0);
248 assertTrue("Didn't archive files for:" + otherTable, initialCountForOtherTable > 0);
249
250
251
252 CountDownLatch finished = setupCleanerWatching(delegate, cleaners, files.size() + 3);
253
254 choreService.scheduleChore(cleaner);
255
256 finished.await();
257
258 stop.stop("");
259
260
261 List<Path> archivedFiles = getAllFiles(fs, archiveDir);
262 int archivedForPrimary = 0;
263 for(Path file: archivedFiles) {
264 String tableName = file.getParent().getParent().getParent().getName();
265
266 assertFalse("Have a file from the non-archived table: " + file, tableName.equals(otherTable));
267 if (tableName.equals(STRING_TABLE_NAME)) archivedForPrimary++;
268 }
269
270 assertEquals("Not all archived files for the primary table were retained.", initialCountForPrimary,
271 archivedForPrimary);
272
273
274 assertTrue("Archive directory was deleted via archiver", fs.exists(archiveDir));
275 }
276
277
278 private void createArchiveDirectory() throws IOException {
279
280 FileSystem fs = UTIL.getTestFileSystem();
281 Path archiveDir = getArchiveDir();
282 fs.mkdirs(archiveDir);
283 }
284
285 private Path getArchiveDir() throws IOException {
286 return new Path(UTIL.getDataTestDir(), HConstants.HFILE_ARCHIVE_DIRECTORY);
287 }
288
289 private Path getTableDir(String tableName) throws IOException {
290 Path testDataDir = UTIL.getDataTestDir();
291 FSUtils.setRootDir(UTIL.getConfiguration(), testDataDir);
292 return new Path(testDataDir, tableName);
293 }
294
295 private HFileCleaner setupAndCreateCleaner(Configuration conf, FileSystem fs, Path archiveDir,
296 Stoppable stop) {
297 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
298 LongTermArchivingHFileCleaner.class.getCanonicalName());
299 return new HFileCleaner(1000, stop, conf, fs, archiveDir);
300 }
301
302
303
304
305
306
307
308
309
310 private List<BaseHFileCleanerDelegate> turnOnArchiving(String tableName, HFileCleaner cleaner)
311 throws IOException, KeeperException {
312
313 LOG.debug("----Starting archiving for table:" + tableName);
314 archivingClient.enableHFileBackupAsync(Bytes.toBytes(tableName));
315 assertTrue("Archving didn't get turned on", archivingClient.getArchivingEnabled(tableName));
316
317
318 List<BaseHFileCleanerDelegate> cleaners = cleaner.getDelegatesForTesting();
319 LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
320 while (!delegate.archiveTracker.keepHFiles(STRING_TABLE_NAME)) {
321
322 }
323 return cleaners;
324 }
325
326
327
328
329
330
331
332 private CountDownLatch setupCleanerWatching(LongTermArchivingHFileCleaner cleaner,
333 List<BaseHFileCleanerDelegate> cleaners, final int expected) {
334
335 BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner);
336 final int[] counter = new int[] { 0 };
337 final CountDownLatch finished = new CountDownLatch(1);
338 Mockito.doAnswer(new Answer<Iterable<FileStatus>>() {
339
340 @Override
341 public Iterable<FileStatus> answer(InvocationOnMock invocation) throws Throwable {
342 counter[0]++;
343 LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: "
344 + invocation.getArguments()[0]);
345
346 @SuppressWarnings("unchecked")
347 Iterable<FileStatus> ret = (Iterable<FileStatus>) invocation.callRealMethod();
348 if (counter[0] >= expected) finished.countDown();
349 return ret;
350 }
351 }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class));
352 cleaners.set(0, delegateSpy);
353
354 return finished;
355 }
356
357
358
359
360
361
362 private List<Path> getAllFiles(FileSystem fs, Path dir) throws IOException {
363 FileStatus[] files = FSUtils.listStatus(fs, dir, null);
364 if (files == null) {
365 LOG.warn("No files under:" + dir);
366 return null;
367 }
368
369 List<Path> allFiles = new ArrayList<Path>();
370 for (FileStatus file : files) {
371 if (file.isDirectory()) {
372 List<Path> subFiles = getAllFiles(fs, file.getPath());
373 if (subFiles != null) allFiles.addAll(subFiles);
374 continue;
375 }
376 allFiles.add(file.getPath());
377 }
378 return allFiles;
379 }
380
381 private void loadFlushAndCompact(Region region, byte[] family) throws IOException {
382
383 createHFileInRegion(region, family);
384 createHFileInRegion(region, family);
385
386 Store s = region.getStore(family);
387 int count = s.getStorefilesCount();
388 assertTrue("Don't have the expected store files, wanted >= 2 store files, but was:" + count,
389 count >= 2);
390
391
392 LOG.debug("Compacting stores");
393 region.compact(true);
394 }
395
396
397
398
399
400
401
402 private void createHFileInRegion(Region region, byte[] columnFamily) throws IOException {
403
404 Put p = new Put(Bytes.toBytes("row"));
405 p.add(columnFamily, Bytes.toBytes("Qual"), Bytes.toBytes("v1"));
406 region.put(p);
407
408 region.flush(true);
409 }
410
411
412
413
414 private void runCleaner(HFileCleaner cleaner, CountDownLatch finished, Stoppable stop)
415 throws InterruptedException {
416 final ChoreService choreService = new ChoreService("CLEANER_SERVER_NAME");
417
418 choreService.scheduleChore(cleaner);
419
420 finished.await();
421
422 stop.stop("");
423 }
424 }