1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.coprocessor;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.security.PrivilegedExceptionAction;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.Coprocessor;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.KeyValue;
48 import org.apache.hadoop.hbase.testclassification.MediumTests;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.regionserver.HRegion;
52 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
54 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
56 import org.apache.hadoop.hbase.wal.WAL;
57 import org.apache.hadoop.hbase.wal.WALFactory;
58 import org.apache.hadoop.hbase.wal.WALKey;
59 import org.apache.hadoop.hbase.wal.WALSplitter;
60 import org.apache.hadoop.hbase.security.User;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdge;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.hbase.util.FSUtils;
65 import org.junit.After;
66 import org.junit.AfterClass;
67 import org.junit.Before;
68 import org.junit.BeforeClass;
69 import org.junit.Rule;
70 import org.junit.Test;
71 import org.junit.rules.TestName;
72 import org.junit.experimental.categories.Category;
73
74
75
76
77
78
79 @Category(MediumTests.class)
80 public class TestWALObserver {
81 private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
82 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
83
84 private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
85 private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
86 Bytes.toBytes("fam2"), Bytes.toBytes("fam3"), };
87 private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
88 Bytes.toBytes("q2"), Bytes.toBytes("q3"), };
89 private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
90 Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
91 private static byte[] TEST_ROW = Bytes.toBytes("testRow");
92
93 @Rule
94 public TestName currentTest = new TestName();
95
96 private Configuration conf;
97 private FileSystem fs;
98 private Path dir;
99 private Path hbaseRootDir;
100 private Path hbaseWALRootDir;
101 private String logName;
102 private Path oldLogDir;
103 private Path logDir;
104 private WALFactory wals;
105
106 @BeforeClass
107 public static void setupBeforeClass() throws Exception {
108 Configuration conf = TEST_UTIL.getConfiguration();
109 conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
110 SampleRegionWALObserver.class.getName(), SampleRegionWALObserver.Legacy.class.getName());
111 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
112 SampleRegionWALObserver.class.getName());
113 conf.setBoolean("dfs.support.append", true);
114 conf.setInt("dfs.client.block.recovery.retries", 2);
115
116 TEST_UTIL.startMiniCluster(1);
117 Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
118 .makeQualified(new Path("/hbase"));
119 Path hbaseWALRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
120 .makeQualified(new Path("/hbaseLogRoot"));
121 LOG.info("hbase.rootdir=" + hbaseRootDir);
122 FSUtils.setRootDir(conf, hbaseRootDir);
123 FSUtils.setWALRootDir(conf, hbaseWALRootDir);
124 }
125
126 @AfterClass
127 public static void teardownAfterClass() throws Exception {
128 TEST_UTIL.shutdownMiniCluster();
129 }
130
131 @Before
132 public void setUp() throws Exception {
133 this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
134
135 this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
136 this.hbaseRootDir = FSUtils.getRootDir(conf);
137 this.hbaseWALRootDir = FSUtils.getWALRootDir(conf);
138 this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
139 this.oldLogDir = new Path(this.hbaseWALRootDir,
140 HConstants.HREGION_OLDLOGDIR_NAME);
141 this.logDir = new Path(this.hbaseWALRootDir,
142 DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
143 this.logName = HConstants.HREGION_LOGDIR_NAME;
144
145 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
146 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
147 }
148 if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseWALRootDir)) {
149 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
150 }
151 this.wals = new WALFactory(conf, null, currentTest.getMethodName());
152 }
153
154 @After
155 public void tearDown() throws Exception {
156 try {
157 wals.shutdown();
158 } catch (IOException exception) {
159
160 LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
161 LOG.debug("details of failure to close wal factory.", exception);
162 }
163 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
164 TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseWALRootDir, true);
165 }
166
167
168
169
170
171
172 @Test
173 public void testWALObserverWriteToWAL() throws Exception {
174 final WAL log = wals.getWAL(UNSPECIFIED_REGION);
175 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.class), false);
176 }
177
178
179
180
181
182
183 @Test
184 public void testLegacyWALObserverWriteToWAL() throws Exception {
185 final WAL log = wals.getWAL(UNSPECIFIED_REGION);
186 verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.Legacy.class), true);
187 }
188
189 private void verifyWritesSeen(final WAL log, final SampleRegionWALObserver cp,
190 final boolean seesLegacy) throws Exception {
191 HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
192 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
193 .toString(TEST_TABLE));
194
195 Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
196 deleteDir(basedir);
197 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
198 final AtomicLong sequenceId = new AtomicLong(0);
199
200
201
202
203 cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
204 TEST_FAMILY[1], TEST_QUALIFIER[1], TEST_FAMILY[2], TEST_QUALIFIER[2]);
205
206 assertFalse(cp.isPreWALWriteCalled());
207 assertFalse(cp.isPostWALWriteCalled());
208 assertFalse(cp.isPreWALWriteDeprecatedCalled());
209 assertFalse(cp.isPostWALWriteDeprecatedCalled());
210
211
212
213
214 Put p = creatPutWith2Families(TEST_ROW);
215
216 Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
217 WALEdit edit = new WALEdit();
218 addFamilyMapToWALEdit(familyMap, edit);
219
220 boolean foundFamily0 = false;
221 boolean foundFamily2 = false;
222 boolean modifiedFamily1 = false;
223
224 List<Cell> cells = edit.getCells();
225
226 for (Cell cell : cells) {
227 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
228 foundFamily0 = true;
229 }
230 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
231 foundFamily2 = true;
232 }
233 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
234 if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
235 modifiedFamily1 = true;
236 }
237 }
238 }
239 assertTrue(foundFamily0);
240 assertFalse(foundFamily2);
241 assertFalse(modifiedFamily1);
242
243
244 long now = EnvironmentEdgeManager.currentTime();
245
246 long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
247 edit, sequenceId, true, null);
248 log.sync(txid);
249
250
251 foundFamily0 = false;
252 foundFamily2 = false;
253 modifiedFamily1 = false;
254 for (Cell cell : cells) {
255 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
256 foundFamily0 = true;
257 }
258 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
259 foundFamily2 = true;
260 }
261 if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
262 if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
263 modifiedFamily1 = true;
264 }
265 }
266 }
267 assertFalse(foundFamily0);
268 assertTrue(foundFamily2);
269 assertTrue(modifiedFamily1);
270
271 assertTrue(cp.isPreWALWriteCalled());
272 assertTrue(cp.isPostWALWriteCalled());
273 assertEquals(seesLegacy, cp.isPreWALWriteDeprecatedCalled());
274 assertEquals(seesLegacy, cp.isPostWALWriteDeprecatedCalled());
275 }
276
277 @Test
278 public void testNonLegacyWALKeysDoNotExplode() throws Exception {
279 TableName tableName = TableName.valueOf(TEST_TABLE);
280 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
281 .toString(TEST_TABLE));
282 final HRegionInfo hri = new HRegionInfo(tableName, null, null);
283 final AtomicLong sequenceId = new AtomicLong(0);
284
285 fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
286
287 final Configuration newConf = HBaseConfiguration.create(this.conf);
288
289 final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
290 final SampleRegionWALObserver newApi = getCoprocessor(wal, SampleRegionWALObserver.class);
291 newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
292 final SampleRegionWALObserver oldApi = getCoprocessor(wal,
293 SampleRegionWALObserver.Legacy.class);
294 oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
295
296 LOG.debug("ensuring wal entries haven't happened before we start");
297 assertFalse(newApi.isPreWALWriteCalled());
298 assertFalse(newApi.isPostWALWriteCalled());
299 assertFalse(newApi.isPreWALWriteDeprecatedCalled());
300 assertFalse(newApi.isPostWALWriteDeprecatedCalled());
301 assertFalse(oldApi.isPreWALWriteCalled());
302 assertFalse(oldApi.isPostWALWriteCalled());
303 assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
304 assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
305
306 LOG.debug("writing to WAL with non-legacy keys.");
307 final int countPerFamily = 5;
308 for (HColumnDescriptor hcd : htd.getFamilies()) {
309 addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
310 EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
311 }
312
313 LOG.debug("Verify that only the non-legacy CP saw edits.");
314 assertTrue(newApi.isPreWALWriteCalled());
315 assertTrue(newApi.isPostWALWriteCalled());
316 assertFalse(newApi.isPreWALWriteDeprecatedCalled());
317 assertFalse(newApi.isPostWALWriteDeprecatedCalled());
318
319 assertFalse(oldApi.isPreWALWriteCalled());
320 assertFalse(oldApi.isPostWALWriteCalled());
321 assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
322 assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
323
324 LOG.debug("reseting cp state.");
325 newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
326 oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
327
328 LOG.debug("write a log edit that supports legacy cps.");
329 final long now = EnvironmentEdgeManager.currentTime();
330 final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
331 final WALEdit edit = new WALEdit();
332 final byte[] nonce = Bytes.toBytes("1772");
333 edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
334 final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
335 wal.sync(txid);
336
337 LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
338 assertTrue("non-legacy WALObserver didn't see pre-write.", newApi.isPreWALWriteCalled());
339 assertTrue("non-legacy WALObserver didn't see post-write.", newApi.isPostWALWriteCalled());
340 assertFalse("non-legacy WALObserver shouldn't have seen legacy pre-write.",
341 newApi.isPreWALWriteDeprecatedCalled());
342 assertFalse("non-legacy WALObserver shouldn't have seen legacy post-write.",
343 newApi.isPostWALWriteDeprecatedCalled());
344 assertTrue("legacy WALObserver didn't see pre-write.", oldApi.isPreWALWriteCalled());
345 assertTrue("legacy WALObserver didn't see post-write.", oldApi.isPostWALWriteCalled());
346 assertTrue("legacy WALObserver didn't see legacy pre-write.",
347 oldApi.isPreWALWriteDeprecatedCalled());
348 assertTrue("legacy WALObserver didn't see legacy post-write.",
349 oldApi.isPostWALWriteDeprecatedCalled());
350 }
351
352
353
354
355 @Test
356 public void testEmptyWALEditAreNotSeen() throws Exception {
357 final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
358 final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
359 final AtomicLong sequenceId = new AtomicLong(0);
360
361 WAL log = wals.getWAL(UNSPECIFIED_REGION);
362 try {
363 SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
364
365 cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
366
367 assertFalse(cp.isPreWALWriteCalled());
368 assertFalse(cp.isPostWALWriteCalled());
369
370 final long now = EnvironmentEdgeManager.currentTime();
371 long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
372 new WALEdit(), sequenceId, true, null);
373 log.sync(txid);
374
375 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
376 assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
377 } finally {
378 log.close();
379 }
380 }
381
382
383
384
385 @Test
386 public void testWALCoprocessorReplay() throws Exception {
387
388
389 TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
390 final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
391 final AtomicLong sequenceId = new AtomicLong(0);
392
393
394
395
396 final HRegionInfo hri = new HRegionInfo(tableName, null, null);
397
398 final Path basedir =
399 FSUtils.getTableDir(this.hbaseRootDir, tableName);
400 deleteDir(basedir);
401 fs.mkdirs(new Path(basedir, hri.getEncodedName()));
402
403 final Configuration newConf = HBaseConfiguration.create(this.conf);
404
405
406 WAL wal = wals.getWAL(UNSPECIFIED_REGION);
407
408 WALEdit edit = new WALEdit();
409 long now = EnvironmentEdgeManager.currentTime();
410
411 final int countPerFamily = 1000;
412
413 for (HColumnDescriptor hcd : htd.getFamilies()) {
414 addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
415 EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
416 }
417 wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
418 true, null);
419
420 wal.sync();
421
422 User user = HBaseTestingUtility.getDifferentUser(newConf,
423 ".replay.wal.secondtime");
424 user.runAs(new PrivilegedExceptionAction() {
425 public Object run() throws Exception {
426 Path p = runWALSplit(newConf);
427 LOG.info("WALSplit path == " + p);
428 FileSystem newFS = FileSystem.get(newConf);
429
430 final WALFactory wals2 = new WALFactory(conf, null, currentTest.getMethodName()+"2");
431 WAL wal2 = wals2.getWAL(UNSPECIFIED_REGION);;
432 HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
433 hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
434 long seqid2 = region.getOpenSeqNum();
435
436 SampleRegionWALObserver cp2 =
437 (SampleRegionWALObserver)region.getCoprocessorHost().findCoprocessor(
438 SampleRegionWALObserver.class.getName());
439
440 assertNotNull(cp2);
441 assertTrue(cp2.isPreWALRestoreCalled());
442 assertTrue(cp2.isPostWALRestoreCalled());
443 assertFalse(cp2.isPreWALRestoreDeprecatedCalled());
444 assertFalse(cp2.isPostWALRestoreDeprecatedCalled());
445 region.close();
446 wals2.close();
447 return null;
448 }
449 });
450 }
451
452
453
454
455
456
457 @Test
458 public void testWALObserverLoaded() throws Exception {
459 WAL log = wals.getWAL(UNSPECIFIED_REGION);
460 assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
461 }
462
463 private SampleRegionWALObserver getCoprocessor(WAL wal,
464 Class<? extends SampleRegionWALObserver> clazz) throws Exception {
465 WALCoprocessorHost host = wal.getCoprocessorHost();
466 Coprocessor c = host.findCoprocessor(clazz.getName());
467 return (SampleRegionWALObserver) c;
468 }
469
470
471
472
473
474
475
476 private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
477 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
478
479 for (int i = 0; i < TEST_FAMILY.length; i++) {
480 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
481 htd.addFamily(a);
482 }
483 return new HRegionInfo(htd.getTableName(), null, null, false);
484 }
485
486
487
488
489 private void deleteDir(final Path p) throws IOException {
490 if (this.fs.exists(p)) {
491 if (!this.fs.delete(p, true)) {
492 throw new IOException("Failed remove of " + p);
493 }
494 }
495 }
496
497 private Put creatPutWith2Families(byte[] row) throws IOException {
498 Put p = new Put(row);
499 for (int i = 0; i < TEST_FAMILY.length - 1; i++) {
500 p.add(TEST_FAMILY[i], TEST_QUALIFIER[i], TEST_VALUE[i]);
501 }
502 return p;
503 }
504
505
506
507
508
509
510
511
512
513 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
514 WALEdit walEdit) {
515 for (List<Cell> edits : familyMap.values()) {
516 for (Cell cell : edits) {
517
518 walEdit.add(cell);
519 }
520 }
521 }
522
523 private Path runWALSplit(final Configuration c) throws IOException {
524 List<Path> splits = WALSplitter.split(
525 hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
526
527 assertEquals(1, splits.size());
528
529 assertTrue(fs.exists(splits.get(0)));
530 LOG.info("Split file=" + splits.get(0));
531 return splits.get(0);
532 }
533
534 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
535
536 private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
537 final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
538 final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
539 String familyStr = Bytes.toString(family);
540 long txid = -1;
541 for (int j = 0; j < count; j++) {
542 byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
543 byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
544 WALEdit edit = new WALEdit();
545 edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
546
547
548 txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
549 ee.currentTime()), edit, sequenceId, true, null);
550 }
551 if (-1 != txid) {
552 wal.sync(txid);
553 }
554 }
555
556 private HTableDescriptor getBasic3FamilyHTableDescriptor(
557 final TableName tableName) {
558 HTableDescriptor htd = new HTableDescriptor(tableName);
559
560 for (int i = 0; i < TEST_FAMILY.length; i++) {
561 HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
562 htd.addFamily(a);
563 }
564 return htd;
565 }
566
567 private HTableDescriptor createBasic3FamilyHTD(final String tableName) {
568 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
569 HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a"));
570 htd.addFamily(a);
571 HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b"));
572 htd.addFamily(b);
573 HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c"));
574 htd.addFamily(c);
575 return htd;
576 }
577 }