1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.junit.Assert.fail;
28
29 import java.io.IOException;
30 import java.lang.reflect.Method;
31 import java.net.BindException;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.Cell;
44 import org.apache.hadoop.hbase.Coprocessor;
45 import org.apache.hadoop.hbase.HBaseTestingUtility;
46 import org.apache.hadoop.hbase.HColumnDescriptor;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.KeyValue;
51 import org.apache.hadoop.hbase.testclassification.MediumTests;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.FSUtils;
57 import org.apache.hadoop.hbase.util.Threads;
58 import org.apache.hadoop.hdfs.DistributedFileSystem;
59 import org.apache.hadoop.hdfs.MiniDFSCluster;
60 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
61 import org.junit.After;
62 import org.junit.AfterClass;
63 import org.junit.Before;
64 import org.junit.BeforeClass;
65 import org.junit.Rule;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68 import org.junit.rules.TestName;
69
70
71 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
72 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
73 import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
74 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
75 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
76 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
77
78
79
80
81 @Category(MediumTests.class)
82 public class TestWALFactory {
83 protected static final Log LOG = LogFactory.getLog(TestWALFactory.class);
84
85 protected static Configuration conf;
86 private static MiniDFSCluster cluster;
87 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
88 protected static Path hbaseDir;
89 protected static Path hbaseWALDir;
90
91 protected FileSystem fs;
92 protected Path dir;
93 protected WALFactory wals;
94
95 @Rule
96 public final TestName currentTest = new TestName();
97
98 @Before
99 public void setUp() throws Exception {
100 fs = cluster.getFileSystem();
101 dir = new Path(hbaseDir, currentTest.getMethodName());
102 wals = new WALFactory(conf, null, currentTest.getMethodName());
103 }
104
105 @After
106 public void tearDown() throws Exception {
107
108 try {
109 wals.close();
110 } catch (IOException exception) {
111 LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
112 " may be the cause. Message: " + exception);
113 LOG.debug("Exception details for failure to close wal factory.", exception);
114 }
115 FileStatus[] entries = fs.listStatus(new Path("/"));
116 for (FileStatus dir : entries) {
117 fs.delete(dir.getPath(), true);
118 }
119 }
120
121 @BeforeClass
122 public static void setUpBeforeClass() throws Exception {
123
124 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
125
126 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
127 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
128
129 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
130 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
131 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
132
133
134 TEST_UTIL.getConfiguration()
135 .setInt("hbase.ipc.client.connect.max.retries", 1);
136 TEST_UTIL.getConfiguration().setInt(
137 "dfs.client.block.recovery.retries", 1);
138 TEST_UTIL.getConfiguration().setInt(
139 "hbase.ipc.client.connection.maxidletime", 500);
140 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
141 SampleRegionWALObserver.class.getName());
142 TEST_UTIL.startMiniDFSCluster(3);
143
144 conf = TEST_UTIL.getConfiguration();
145 cluster = TEST_UTIL.getDFSCluster();
146
147 hbaseDir = TEST_UTIL.createRootDir();
148 hbaseWALDir = TEST_UTIL.createWALRootDir();
149 }
150
151 @AfterClass
152 public static void tearDownAfterClass() throws Exception {
153 TEST_UTIL.shutdownMiniCluster();
154 }
155
156 @Test
157 public void canCloseSingleton() throws IOException {
158 WALFactory.getInstance(conf).close();
159 }
160
161
162
163
164
165
166 @Test
167 public void testSplit() throws IOException {
168 final TableName tableName = TableName.valueOf(currentTest.getMethodName());
169 final byte [] rowName = tableName.getName();
170 final Path logdir = new Path(hbaseWALDir,
171 DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
172 Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
173 final int howmany = 3;
174 HRegionInfo[] infos = new HRegionInfo[3];
175 Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
176 fs.mkdirs(tabledir);
177 for(int i = 0; i < howmany; i++) {
178 infos[i] = new HRegionInfo(tableName,
179 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
180 fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
181 LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
182 }
183 HTableDescriptor htd = new HTableDescriptor(tableName);
184 htd.addFamily(new HColumnDescriptor("column"));
185
186
187 final AtomicLong sequenceId = new AtomicLong(1);
188 for (int ii = 0; ii < howmany; ii++) {
189 for (int i = 0; i < howmany; i++) {
190 final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes());
191 for (int j = 0; j < howmany; j++) {
192 WALEdit edit = new WALEdit();
193 byte [] family = Bytes.toBytes("column");
194 byte [] qualifier = Bytes.toBytes(Integer.toString(j));
195 byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
196 edit.add(new KeyValue(rowName, family, qualifier,
197 System.currentTimeMillis(), column));
198 LOG.info("Region " + i + ": " + edit);
199 log.append(htd, infos[i], new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
200 System.currentTimeMillis()), edit, sequenceId, true, null);
201 }
202 log.sync();
203 log.rollWriter();
204 }
205 }
206 wals.shutdown();
207 List<Path> splits = WALSplitter.split(hbaseWALDir, logdir, oldLogDir, fs, conf, wals);
208 verifySplits(splits, howmany);
209 }
210
211
212
213
214
215 @Test
216 public void Broken_testSync() throws Exception {
217 TableName tableName = TableName.valueOf(currentTest.getMethodName());
218
219 Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
220 FSDataOutputStream out = fs.create(p);
221 out.write(tableName.getName());
222 Method syncMethod = null;
223 try {
224 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
225 } catch (NoSuchMethodException e) {
226 try {
227 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
228 } catch (NoSuchMethodException ex) {
229 fail("This version of Hadoop supports neither Syncable.sync() " +
230 "nor Syncable.hflush().");
231 }
232 }
233 syncMethod.invoke(out, new Object[]{});
234 FSDataInputStream in = fs.open(p);
235 assertTrue(in.available() > 0);
236 byte [] buffer = new byte [1024];
237 int read = in.read(buffer);
238 assertEquals(tableName.getName().length, read);
239 out.close();
240 in.close();
241
242 final AtomicLong sequenceId = new AtomicLong(1);
243 final int total = 20;
244 WAL.Reader reader = null;
245
246 try {
247 HRegionInfo info = new HRegionInfo(tableName,
248 null,null, false);
249 HTableDescriptor htd = new HTableDescriptor();
250 htd.addFamily(new HColumnDescriptor(tableName.getName()));
251 final WAL wal = wals.getWAL(info.getEncodedNameAsBytes());
252
253 for (int i = 0; i < total; i++) {
254 WALEdit kvs = new WALEdit();
255 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
256 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
257 System.currentTimeMillis()), kvs, sequenceId, true, null);
258 }
259
260
261 wal.sync();
262
263 Path walPath = DefaultWALProvider.getCurrentFileName(wal);
264 reader = wals.createReader(fs, walPath);
265 int count = 0;
266 WAL.Entry entry = new WAL.Entry();
267 while ((entry = reader.next(entry)) != null) count++;
268 assertEquals(total, count);
269 reader.close();
270
271
272 for (int i = 0; i < total; i++) {
273 WALEdit kvs = new WALEdit();
274 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
275 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
276 System.currentTimeMillis()), kvs, sequenceId, true, null);
277 }
278 wal.sync();
279 reader = wals.createReader(fs, walPath);
280 count = 0;
281 while((entry = reader.next(entry)) != null) count++;
282 assertTrue(count >= total);
283 reader.close();
284
285 wal.sync();
286 reader = wals.createReader(fs, walPath);
287 count = 0;
288 while((entry = reader.next(entry)) != null) count++;
289 assertEquals(total * 2, count);
290 reader.close();
291
292
293 final byte [] value = new byte[1025 * 1024];
294 for (int i = 0; i < total; i++) {
295 WALEdit kvs = new WALEdit();
296 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
297 wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
298 System.currentTimeMillis()), kvs, sequenceId, true, null);
299 }
300
301 wal.sync();
302 reader = wals.createReader(fs, walPath);
303 count = 0;
304 while((entry = reader.next(entry)) != null) count++;
305 assertEquals(total * 3, count);
306 reader.close();
307
308 wal.shutdown();
309 reader = wals.createReader(fs, walPath);
310 count = 0;
311 while((entry = reader.next(entry)) != null) count++;
312 assertEquals(total * 3, count);
313 reader.close();
314 } finally {
315 if (reader != null) reader.close();
316 }
317 }
318
319 private void verifySplits(final List<Path> splits, final int howmany)
320 throws IOException {
321 assertEquals(howmany * howmany, splits.size());
322 for (int i = 0; i < splits.size(); i++) {
323 LOG.info("Verifying=" + splits.get(i));
324 WAL.Reader reader = wals.createReader(fs, splits.get(i));
325 try {
326 int count = 0;
327 String previousRegion = null;
328 long seqno = -1;
329 WAL.Entry entry = new WAL.Entry();
330 while((entry = reader.next(entry)) != null) {
331 WALKey key = entry.getKey();
332 String region = Bytes.toString(key.getEncodedRegionName());
333
334 if (previousRegion != null) {
335 assertEquals(previousRegion, region);
336 }
337 LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
338 assertTrue(seqno < key.getLogSeqNum());
339 seqno = key.getLogSeqNum();
340 previousRegion = region;
341 count++;
342 }
343 assertEquals(howmany, count);
344 } finally {
345 reader.close();
346 }
347 }
348 }
349
350
351
352
353
354
355
356
357
358
359 @Test (timeout=300000)
360 public void testAppendClose() throws Exception {
361 TableName tableName =
362 TableName.valueOf(currentTest.getMethodName());
363 HRegionInfo regioninfo = new HRegionInfo(tableName,
364 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
365
366 final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
367 final AtomicLong sequenceId = new AtomicLong(1);
368 final int total = 20;
369
370 HTableDescriptor htd = new HTableDescriptor();
371 htd.addFamily(new HColumnDescriptor(tableName.getName()));
372
373 for (int i = 0; i < total; i++) {
374 WALEdit kvs = new WALEdit();
375 kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
376 wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
377 System.currentTimeMillis()), kvs, sequenceId, true, null);
378 }
379
380 wal.sync();
381 int namenodePort = cluster.getNameNodePort();
382 final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
383
384
385
386 try {
387 DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
388 dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
389 TEST_UTIL.shutdownMiniDFSCluster();
390 try {
391
392
393 wal.shutdown();
394 } catch (IOException e) {
395 LOG.info(e);
396 }
397 fs.close();
398 LOG.info("STOPPED first instance of the cluster");
399 } finally {
400
401 while (cluster.isClusterUp()){
402 LOG.error("Waiting for cluster to go down");
403 Thread.sleep(1000);
404 }
405 assertFalse(cluster.isClusterUp());
406 cluster = null;
407 for (int i = 0; i < 100; i++) {
408 try {
409 cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
410 break;
411 } catch (BindException e) {
412 LOG.info("Sleeping. BindException bringing up new cluster");
413 Threads.sleep(1000);
414 }
415 }
416 cluster.waitActive();
417 fs = cluster.getFileSystem();
418 LOG.info("STARTED second instance.");
419 }
420
421
422
423 Method setLeasePeriod = cluster.getClass()
424 .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
425 setLeasePeriod.setAccessible(true);
426 setLeasePeriod.invoke(cluster, 1000L, 1000L);
427 try {
428 Thread.sleep(1000);
429 } catch (InterruptedException e) {
430 LOG.info(e);
431 }
432
433
434 final FileSystem recoveredFs = fs;
435 final Configuration rlConf = conf;
436
437 class RecoverLogThread extends Thread {
438 public Exception exception = null;
439 public void run() {
440 try {
441 FSUtils.getInstance(fs, rlConf)
442 .recoverFileLease(recoveredFs, walPath, rlConf, null);
443 } catch (IOException e) {
444 exception = e;
445 }
446 }
447 }
448
449 RecoverLogThread t = new RecoverLogThread();
450 t.start();
451
452 t.join(60 * 1000);
453 if(t.isAlive()) {
454 t.interrupt();
455 throw new Exception("Timed out waiting for WAL.recoverLog()");
456 }
457
458 if (t.exception != null)
459 throw t.exception;
460
461
462 WAL.Reader reader = wals.createReader(fs, walPath);
463 int count = 0;
464 WAL.Entry entry = new WAL.Entry();
465 while (reader.next(entry) != null) {
466 count++;
467 assertTrue("Should be one KeyValue per WALEdit",
468 entry.getEdit().getCells().size() == 1);
469 }
470 assertEquals(total, count);
471 reader.close();
472
473
474 setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
475 }
476
477
478
479
480
481 @Test
482 public void testEditAdd() throws IOException {
483 final int COL_COUNT = 10;
484 final HTableDescriptor htd =
485 new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
486 "column"));
487 final byte [] row = Bytes.toBytes("row");
488 WAL.Reader reader = null;
489 try {
490 final AtomicLong sequenceId = new AtomicLong(1);
491
492
493
494 long timestamp = System.currentTimeMillis();
495 WALEdit cols = new WALEdit();
496 for (int i = 0; i < COL_COUNT; i++) {
497 cols.add(new KeyValue(row, Bytes.toBytes("column"),
498 Bytes.toBytes(Integer.toString(i)),
499 timestamp, new byte[] { (byte)(i + '0') }));
500 }
501 HRegionInfo info = new HRegionInfo(htd.getTableName(),
502 row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
503 final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
504
505 final long txid = log.append(htd, info,
506 new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
507 cols, sequenceId, true, null);
508 log.sync(txid);
509 log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
510 log.completeCacheFlush(info.getEncodedNameAsBytes());
511 log.shutdown();
512 Path filename = DefaultWALProvider.getCurrentFileName(log);
513
514 reader = wals.createReader(fs, filename);
515
516
517 for (int i = 0; i < 1; i++) {
518 WAL.Entry entry = reader.next(null);
519 if (entry == null) break;
520 WALKey key = entry.getKey();
521 WALEdit val = entry.getEdit();
522 assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
523 assertTrue(htd.getTableName().equals(key.getTablename()));
524 Cell cell = val.getCells().get(0);
525 assertTrue(Bytes.equals(row, cell.getRow()));
526 assertEquals((byte)(i + '0'), cell.getValue()[0]);
527 System.out.println(key + " " + val);
528 }
529 } finally {
530 if (reader != null) {
531 reader.close();
532 }
533 }
534 }
535
536
537
538
539 @Test
540 public void testAppend() throws IOException {
541 final int COL_COUNT = 10;
542 final HTableDescriptor htd =
543 new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
544 "column"));
545 final byte [] row = Bytes.toBytes("row");
546 WAL.Reader reader = null;
547 final AtomicLong sequenceId = new AtomicLong(1);
548 try {
549
550
551 long timestamp = System.currentTimeMillis();
552 WALEdit cols = new WALEdit();
553 for (int i = 0; i < COL_COUNT; i++) {
554 cols.add(new KeyValue(row, Bytes.toBytes("column"),
555 Bytes.toBytes(Integer.toString(i)),
556 timestamp, new byte[] { (byte)(i + '0') }));
557 }
558 HRegionInfo hri = new HRegionInfo(htd.getTableName(),
559 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
560 final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
561 final long txid = log.append(htd, hri,
562 new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
563 cols, sequenceId, true, null);
564 log.sync(txid);
565 log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
566 log.completeCacheFlush(hri.getEncodedNameAsBytes());
567 log.shutdown();
568 Path filename = DefaultWALProvider.getCurrentFileName(log);
569
570 reader = wals.createReader(fs, filename);
571 WAL.Entry entry = reader.next();
572 assertEquals(COL_COUNT, entry.getEdit().size());
573 int idx = 0;
574 for (Cell val : entry.getEdit().getCells()) {
575 assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
576 entry.getKey().getEncodedRegionName()));
577 assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
578 assertTrue(Bytes.equals(row, val.getRow()));
579 assertEquals((byte)(idx + '0'), val.getValue()[0]);
580 System.out.println(entry.getKey() + " " + val);
581 idx++;
582 }
583 } finally {
584 if (reader != null) {
585 reader.close();
586 }
587 }
588 }
589
590
591
592
593
594 @Test
595 public void testVisitors() throws Exception {
596 final int COL_COUNT = 10;
597 final TableName tableName =
598 TableName.valueOf("tablename");
599 final byte [] row = Bytes.toBytes("row");
600 final DumbWALActionsListener visitor = new DumbWALActionsListener();
601 final AtomicLong sequenceId = new AtomicLong(1);
602 long timestamp = System.currentTimeMillis();
603 HTableDescriptor htd = new HTableDescriptor();
604 htd.addFamily(new HColumnDescriptor("column"));
605
606 HRegionInfo hri = new HRegionInfo(tableName,
607 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
608 final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
609 log.registerWALActionsListener(visitor);
610 for (int i = 0; i < COL_COUNT; i++) {
611 WALEdit cols = new WALEdit();
612 cols.add(new KeyValue(row, Bytes.toBytes("column"),
613 Bytes.toBytes(Integer.toString(i)),
614 timestamp, new byte[]{(byte) (i + '0')}));
615 log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
616 System.currentTimeMillis()), cols, sequenceId, true, null);
617 }
618 log.sync();
619 assertEquals(COL_COUNT, visitor.increments);
620 log.unregisterWALActionsListener(visitor);
621 WALEdit cols = new WALEdit();
622 cols.add(new KeyValue(row, Bytes.toBytes("column"),
623 Bytes.toBytes(Integer.toString(11)),
624 timestamp, new byte[]{(byte) (11 + '0')}));
625 log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
626 System.currentTimeMillis()), cols, sequenceId, true, null);
627 log.sync();
628 assertEquals(COL_COUNT, visitor.increments);
629 }
630
631
632
633
634 @Test
635 public void testWALCoprocessorLoaded() throws Exception {
636
637 WALCoprocessorHost host = wals.getWAL(UNSPECIFIED_REGION).getCoprocessorHost();
638 Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
639 assertNotNull(c);
640 }
641
642
643
644
645 @Test
646 public void testReadLegacyLog() throws IOException {
647 final int columnCount = 5;
648 final int recordCount = 5;
649 final TableName tableName =
650 TableName.valueOf("tablename");
651 final byte[] row = Bytes.toBytes("row");
652 long timestamp = System.currentTimeMillis();
653 Path path = new Path(dir, "tempwal");
654 SequenceFileLogWriter sflw = null;
655 WAL.Reader reader = null;
656 try {
657 HRegionInfo hri = new HRegionInfo(tableName,
658 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
659 HTableDescriptor htd = new HTableDescriptor(tableName);
660 fs.mkdirs(dir);
661
662 sflw = new SequenceFileLogWriter();
663 sflw.init(fs, path, conf, false);
664 for (int i = 0; i < recordCount; ++i) {
665 WALKey key = new HLogKey(
666 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
667 WALEdit edit = new WALEdit();
668 for (int j = 0; j < columnCount; ++j) {
669 if (i == 0) {
670 htd.addFamily(new HColumnDescriptor("column" + j));
671 }
672 String value = i + "" + j;
673 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
674 }
675 sflw.append(new WAL.Entry(key, edit));
676 }
677 sflw.sync();
678 sflw.close();
679
680
681 reader = wals.createReader(fs, path);
682 assertTrue(reader instanceof SequenceFileLogReader);
683 for (int i = 0; i < recordCount; ++i) {
684 WAL.Entry entry = reader.next();
685 assertNotNull(entry);
686 assertEquals(columnCount, entry.getEdit().size());
687 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
688 assertEquals(tableName, entry.getKey().getTablename());
689 int idx = 0;
690 for (Cell val : entry.getEdit().getCells()) {
691 assertTrue(Bytes.equals(row, val.getRow()));
692 String value = i + "" + idx;
693 assertArrayEquals(Bytes.toBytes(value), val.getValue());
694 idx++;
695 }
696 }
697 WAL.Entry entry = reader.next();
698 assertNull(entry);
699 } finally {
700 if (sflw != null) {
701 sflw.close();
702 }
703 if (reader != null) {
704 reader.close();
705 }
706 }
707 }
708
709 static class DumbWALActionsListener extends WALActionsListener.Base {
710 int increments = 0;
711
712 @Override
713 public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
714 WALEdit logEdit) {
715 increments++;
716 }
717
718 @Override
719 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
720
721 increments++;
722 }
723 }
724
725 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
726
727 }