View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertFalse;
24  import static org.junit.Assert.assertNotNull;
25  import static org.junit.Assert.assertNull;
26  import static org.junit.Assert.assertTrue;
27  import static org.junit.Assert.fail;
28  
29  import java.io.IOException;
30  import java.lang.reflect.Method;
31  import java.net.BindException;
32  import java.util.List;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FSDataOutputStream;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.Cell;
44  import org.apache.hadoop.hbase.Coprocessor;
45  import org.apache.hadoop.hbase.HBaseTestingUtility;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.KeyValue;
51  import org.apache.hadoop.hbase.testclassification.MediumTests;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.Threads;
58  import org.apache.hadoop.hdfs.DistributedFileSystem;
59  import org.apache.hadoop.hdfs.MiniDFSCluster;
60  import org.apache.hadoop.hdfs.protocol.HdfsConstants;
61  import org.junit.After;
62  import org.junit.AfterClass;
63  import org.junit.Before;
64  import org.junit.BeforeClass;
65  import org.junit.Rule;
66  import org.junit.Test;
67  import org.junit.experimental.categories.Category;
68  import org.junit.rules.TestName;
69  
70  // imports for things that haven't moved from regionserver.wal yet.
71  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
72  import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
73  import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
74  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
75  import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
76  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
77  
78  /**
79   * WAL tests that can be reused across providers.
80   */
81  @Category(MediumTests.class)
82  public class TestWALFactory {
83    protected static final Log LOG = LogFactory.getLog(TestWALFactory.class);
84  
85    protected static Configuration conf;
86    private static MiniDFSCluster cluster;
87    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
88    protected static Path hbaseDir;
89    protected static Path hbaseWALDir;
90  
91    protected FileSystem fs;
92    protected Path dir;
93    protected WALFactory wals;
94  
95    @Rule
96    public final TestName currentTest = new TestName();
97  
98    @Before
99    public void setUp() throws Exception {
100     fs = cluster.getFileSystem();
101     dir = new Path(hbaseDir, currentTest.getMethodName());
102     wals = new WALFactory(conf, null, currentTest.getMethodName());
103   }
104 
105   @After
106   public void tearDown() throws Exception {
107     // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
108     try {
109       wals.close();
110     } catch (IOException exception) {
111       LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
112           " may be the cause. Message: " + exception);
113       LOG.debug("Exception details for failure to close wal factory.", exception);
114     }
115     FileStatus[] entries = fs.listStatus(new Path("/"));
116     for (FileStatus dir : entries) {
117       fs.delete(dir.getPath(), true);
118     }
119   }
120 
121   @BeforeClass
122   public static void setUpBeforeClass() throws Exception {
123     // Make block sizes small.
124     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
125     // needed for testAppendClose()
126     TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
127     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
128     // quicker heartbeat interval for faster DN death notification
129     TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
130     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
131     TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
132 
133     // faster failover with cluster.shutdown();fs.close() idiom
134     TEST_UTIL.getConfiguration()
135         .setInt("hbase.ipc.client.connect.max.retries", 1);
136     TEST_UTIL.getConfiguration().setInt(
137         "dfs.client.block.recovery.retries", 1);
138     TEST_UTIL.getConfiguration().setInt(
139       "hbase.ipc.client.connection.maxidletime", 500);
140     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
141         SampleRegionWALObserver.class.getName());
142     TEST_UTIL.startMiniDFSCluster(3);
143 
144     conf = TEST_UTIL.getConfiguration();
145     cluster = TEST_UTIL.getDFSCluster();
146 
147     hbaseDir = TEST_UTIL.createRootDir();
148     hbaseWALDir = TEST_UTIL.createWALRootDir();
149   }
150 
151   @AfterClass
152   public static void tearDownAfterClass() throws Exception {
153     TEST_UTIL.shutdownMiniCluster();
154   }
155 
156   @Test
157   public void canCloseSingleton() throws IOException {
158     WALFactory.getInstance(conf).close();
159   }
160 
161   /**
162    * Just write multiple logs then split.  Before fix for HADOOP-2283, this
163    * would fail.
164    * @throws IOException
165    */
166   @Test
167   public void testSplit() throws IOException {
168     final TableName tableName = TableName.valueOf(currentTest.getMethodName());
169     final byte [] rowName = tableName.getName();
170     final Path logdir = new Path(hbaseWALDir,
171         DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
172     Path oldLogDir = new Path(hbaseWALDir, HConstants.HREGION_OLDLOGDIR_NAME);
173     final int howmany = 3;
174     HRegionInfo[] infos = new HRegionInfo[3];
175     Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
176     fs.mkdirs(tabledir);
177     for(int i = 0; i < howmany; i++) {
178       infos[i] = new HRegionInfo(tableName,
179                 Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
180       fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
181       LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
182     }
183     HTableDescriptor htd = new HTableDescriptor(tableName);
184     htd.addFamily(new HColumnDescriptor("column"));
185 
186     // Add edits for three regions.
187     final AtomicLong sequenceId = new AtomicLong(1);
188     for (int ii = 0; ii < howmany; ii++) {
189       for (int i = 0; i < howmany; i++) {
190         final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes());
191         for (int j = 0; j < howmany; j++) {
192           WALEdit edit = new WALEdit();
193           byte [] family = Bytes.toBytes("column");
194           byte [] qualifier = Bytes.toBytes(Integer.toString(j));
195           byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
196           edit.add(new KeyValue(rowName, family, qualifier,
197               System.currentTimeMillis(), column));
198           LOG.info("Region " + i + ": " + edit);
199           log.append(htd, infos[i], new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
200               System.currentTimeMillis()), edit, sequenceId, true, null);
201         }
202         log.sync();
203         log.rollWriter();
204       }
205     }
206     wals.shutdown();
207     List<Path> splits = WALSplitter.split(hbaseWALDir, logdir, oldLogDir, fs, conf, wals);
208     verifySplits(splits, howmany);
209   }
210 
211   /**
212    * Test new HDFS-265 sync.
213    * @throws Exception
214    */
215   @Test
216   public void Broken_testSync() throws Exception {
217     TableName tableName = TableName.valueOf(currentTest.getMethodName());
218     // First verify that using streams all works.
219     Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
220     FSDataOutputStream out = fs.create(p);
221     out.write(tableName.getName());
222     Method syncMethod = null;
223     try {
224       syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
225     } catch (NoSuchMethodException e) {
226       try {
227         syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
228       } catch (NoSuchMethodException ex) {
229         fail("This version of Hadoop supports neither Syncable.sync() " +
230             "nor Syncable.hflush().");
231       }
232     }
233     syncMethod.invoke(out, new Object[]{});
234     FSDataInputStream in = fs.open(p);
235     assertTrue(in.available() > 0);
236     byte [] buffer = new byte [1024];
237     int read = in.read(buffer);
238     assertEquals(tableName.getName().length, read);
239     out.close();
240     in.close();
241 
242     final AtomicLong sequenceId = new AtomicLong(1);
243     final int total = 20;
244     WAL.Reader reader = null;
245 
246     try {
247       HRegionInfo info = new HRegionInfo(tableName,
248                   null,null, false);
249       HTableDescriptor htd = new HTableDescriptor();
250       htd.addFamily(new HColumnDescriptor(tableName.getName()));
251       final WAL wal = wals.getWAL(info.getEncodedNameAsBytes());
252 
253       for (int i = 0; i < total; i++) {
254         WALEdit kvs = new WALEdit();
255         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
256         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
257             System.currentTimeMillis()), kvs, sequenceId, true, null);
258       }
259       // Now call sync and try reading.  Opening a Reader before you sync just
260       // gives you EOFE.
261       wal.sync();
262       // Open a Reader.
263       Path walPath = DefaultWALProvider.getCurrentFileName(wal);
264       reader = wals.createReader(fs, walPath);
265       int count = 0;
266       WAL.Entry entry = new WAL.Entry();
267       while ((entry = reader.next(entry)) != null) count++;
268       assertEquals(total, count);
269       reader.close();
270       // Add test that checks to see that an open of a Reader works on a file
271       // that has had a sync done on it.
272       for (int i = 0; i < total; i++) {
273         WALEdit kvs = new WALEdit();
274         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
275         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
276             System.currentTimeMillis()), kvs, sequenceId, true, null);
277       }
278       wal.sync();
279       reader = wals.createReader(fs, walPath);
280       count = 0;
281       while((entry = reader.next(entry)) != null) count++;
282       assertTrue(count >= total);
283       reader.close();
284       // If I sync, should see double the edits.
285       wal.sync();
286       reader = wals.createReader(fs, walPath);
287       count = 0;
288       while((entry = reader.next(entry)) != null) count++;
289       assertEquals(total * 2, count);
290       reader.close();
291       // Now do a test that ensures stuff works when we go over block boundary,
292       // especially that we return good length on file.
293       final byte [] value = new byte[1025 * 1024];  // Make a 1M value.
294       for (int i = 0; i < total; i++) {
295         WALEdit kvs = new WALEdit();
296         kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
297         wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
298             System.currentTimeMillis()), kvs, sequenceId, true, null);
299       }
300       // Now I should have written out lots of blocks.  Sync then read.
301       wal.sync();
302       reader = wals.createReader(fs, walPath);
303       count = 0;
304       while((entry = reader.next(entry)) != null) count++;
305       assertEquals(total * 3, count);
306       reader.close();
307       // shutdown and ensure that Reader gets right length also.
308       wal.shutdown();
309       reader = wals.createReader(fs, walPath);
310       count = 0;
311       while((entry = reader.next(entry)) != null) count++;
312       assertEquals(total * 3, count);
313       reader.close();
314     } finally {
315       if (reader != null) reader.close();
316     }
317   }
318 
319   private void verifySplits(final List<Path> splits, final int howmany)
320   throws IOException {
321     assertEquals(howmany * howmany, splits.size());
322     for (int i = 0; i < splits.size(); i++) {
323       LOG.info("Verifying=" + splits.get(i));
324       WAL.Reader reader = wals.createReader(fs, splits.get(i));
325       try {
326         int count = 0;
327         String previousRegion = null;
328         long seqno = -1;
329         WAL.Entry entry = new WAL.Entry();
330         while((entry = reader.next(entry)) != null) {
331           WALKey key = entry.getKey();
332           String region = Bytes.toString(key.getEncodedRegionName());
333           // Assert that all edits are for same region.
334           if (previousRegion != null) {
335             assertEquals(previousRegion, region);
336           }
337           LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
338           assertTrue(seqno < key.getLogSeqNum());
339           seqno = key.getLogSeqNum();
340           previousRegion = region;
341           count++;
342         }
343         assertEquals(howmany, count);
344       } finally {
345         reader.close();
346       }
347     }
348   }
349 
350   /*
351    * We pass different values to recoverFileLease() so that different code paths are covered
352    *
353    * For this test to pass, requires:
354    * 1. HDFS-200 (append support)
355    * 2. HDFS-988 (SafeMode should freeze file operations
356    *              [FSNamesystem.nextGenerationStampForBlock])
357    * 3. HDFS-142 (on restart, maintain pendingCreates)
358    */
359   @Test (timeout=300000)
360   public void testAppendClose() throws Exception {
361     TableName tableName =
362         TableName.valueOf(currentTest.getMethodName());
363     HRegionInfo regioninfo = new HRegionInfo(tableName,
364              HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
365 
366     final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
367     final AtomicLong sequenceId = new AtomicLong(1);
368     final int total = 20;
369 
370     HTableDescriptor htd = new HTableDescriptor();
371     htd.addFamily(new HColumnDescriptor(tableName.getName()));
372 
373     for (int i = 0; i < total; i++) {
374       WALEdit kvs = new WALEdit();
375       kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
376       wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
377           System.currentTimeMillis()), kvs, sequenceId, true, null);
378     }
379     // Now call sync to send the data to HDFS datanodes
380     wal.sync();
381      int namenodePort = cluster.getNameNodePort();
382     final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
383 
384 
385     // Stop the cluster.  (ensure restart since we're sharing MiniDFSCluster)
386     try {
387       DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
388       dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
389       TEST_UTIL.shutdownMiniDFSCluster();
390       try {
391         // wal.writer.close() will throw an exception,
392         // but still call this since it closes the LogSyncer thread first
393         wal.shutdown();
394       } catch (IOException e) {
395         LOG.info(e);
396       }
397       fs.close(); // closing FS last so DFSOutputStream can't call close
398       LOG.info("STOPPED first instance of the cluster");
399     } finally {
400       // Restart the cluster
401       while (cluster.isClusterUp()){
402         LOG.error("Waiting for cluster to go down");
403         Thread.sleep(1000);
404       }
405       assertFalse(cluster.isClusterUp());
406       cluster = null;
407       for (int i = 0; i < 100; i++) {
408         try {
409           cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
410           break;
411         } catch (BindException e) {
412           LOG.info("Sleeping.  BindException bringing up new cluster");
413           Threads.sleep(1000);
414         }
415       }
416       cluster.waitActive();
417       fs = cluster.getFileSystem();
418       LOG.info("STARTED second instance.");
419     }
420 
421     // set the lease period to be 1 second so that the
422     // namenode triggers lease recovery upon append request
423     Method setLeasePeriod = cluster.getClass()
424       .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
425     setLeasePeriod.setAccessible(true);
426     setLeasePeriod.invoke(cluster, 1000L, 1000L);
427     try {
428       Thread.sleep(1000);
429     } catch (InterruptedException e) {
430       LOG.info(e);
431     }
432 
433     // Now try recovering the log, like the HMaster would do
434     final FileSystem recoveredFs = fs;
435     final Configuration rlConf = conf;
436 
437     class RecoverLogThread extends Thread {
438       public Exception exception = null;
439       public void run() {
440           try {
441             FSUtils.getInstance(fs, rlConf)
442               .recoverFileLease(recoveredFs, walPath, rlConf, null);
443           } catch (IOException e) {
444             exception = e;
445           }
446       }
447     }
448 
449     RecoverLogThread t = new RecoverLogThread();
450     t.start();
451     // Timeout after 60 sec. Without correct patches, would be an infinite loop
452     t.join(60 * 1000);
453     if(t.isAlive()) {
454       t.interrupt();
455       throw new Exception("Timed out waiting for WAL.recoverLog()");
456     }
457 
458     if (t.exception != null)
459       throw t.exception;
460 
461     // Make sure you can read all the content
462     WAL.Reader reader = wals.createReader(fs, walPath);
463     int count = 0;
464     WAL.Entry entry = new WAL.Entry();
465     while (reader.next(entry) != null) {
466       count++;
467       assertTrue("Should be one KeyValue per WALEdit",
468                   entry.getEdit().getCells().size() == 1);
469     }
470     assertEquals(total, count);
471     reader.close();
472 
473     // Reset the lease period
474     setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
475   }
476 
477   /**
478    * Tests that we can write out an edit, close, and then read it back in again.
479    * @throws IOException
480    */
481   @Test
482   public void testEditAdd() throws IOException {
483     final int COL_COUNT = 10;
484     final HTableDescriptor htd =
485         new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
486             "column"));
487     final byte [] row = Bytes.toBytes("row");
488     WAL.Reader reader = null;
489     try {
490       final AtomicLong sequenceId = new AtomicLong(1);
491 
492       // Write columns named 1, 2, 3, etc. and then values of single byte
493       // 1, 2, 3...
494       long timestamp = System.currentTimeMillis();
495       WALEdit cols = new WALEdit();
496       for (int i = 0; i < COL_COUNT; i++) {
497         cols.add(new KeyValue(row, Bytes.toBytes("column"),
498             Bytes.toBytes(Integer.toString(i)),
499           timestamp, new byte[] { (byte)(i + '0') }));
500       }
501       HRegionInfo info = new HRegionInfo(htd.getTableName(),
502         row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
503       final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
504 
505       final long txid = log.append(htd, info,
506         new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
507         cols, sequenceId, true, null);
508       log.sync(txid);
509       log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
510       log.completeCacheFlush(info.getEncodedNameAsBytes());
511       log.shutdown();
512       Path filename = DefaultWALProvider.getCurrentFileName(log);
513       // Now open a reader on the log and assert append worked.
514       reader = wals.createReader(fs, filename);
515       // Above we added all columns on a single row so we only read one
516       // entry in the below... thats why we have '1'.
517       for (int i = 0; i < 1; i++) {
518         WAL.Entry entry = reader.next(null);
519         if (entry == null) break;
520         WALKey key = entry.getKey();
521         WALEdit val = entry.getEdit();
522         assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
523         assertTrue(htd.getTableName().equals(key.getTablename()));
524         Cell cell = val.getCells().get(0);
525         assertTrue(Bytes.equals(row, cell.getRow()));
526         assertEquals((byte)(i + '0'), cell.getValue()[0]);
527         System.out.println(key + " " + val);
528       }
529     } finally {
530       if (reader != null) {
531         reader.close();
532       }
533     }
534   }
535 
536   /**
537    * @throws IOException
538    */
539   @Test
540   public void testAppend() throws IOException {
541     final int COL_COUNT = 10;
542     final HTableDescriptor htd =
543         new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
544             "column"));
545     final byte [] row = Bytes.toBytes("row");
546     WAL.Reader reader = null;
547     final AtomicLong sequenceId = new AtomicLong(1);
548     try {
549       // Write columns named 1, 2, 3, etc. and then values of single byte
550       // 1, 2, 3...
551       long timestamp = System.currentTimeMillis();
552       WALEdit cols = new WALEdit();
553       for (int i = 0; i < COL_COUNT; i++) {
554         cols.add(new KeyValue(row, Bytes.toBytes("column"),
555           Bytes.toBytes(Integer.toString(i)),
556           timestamp, new byte[] { (byte)(i + '0') }));
557       }
558       HRegionInfo hri = new HRegionInfo(htd.getTableName(),
559           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
560       final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
561       final long txid = log.append(htd, hri,
562         new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()),
563         cols, sequenceId, true, null);
564       log.sync(txid);
565       log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
566       log.completeCacheFlush(hri.getEncodedNameAsBytes());
567       log.shutdown();
568       Path filename = DefaultWALProvider.getCurrentFileName(log);
569       // Now open a reader on the log and assert append worked.
570       reader = wals.createReader(fs, filename);
571       WAL.Entry entry = reader.next();
572       assertEquals(COL_COUNT, entry.getEdit().size());
573       int idx = 0;
574       for (Cell val : entry.getEdit().getCells()) {
575         assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
576           entry.getKey().getEncodedRegionName()));
577         assertTrue(htd.getTableName().equals(entry.getKey().getTablename()));
578         assertTrue(Bytes.equals(row, val.getRow()));
579         assertEquals((byte)(idx + '0'), val.getValue()[0]);
580         System.out.println(entry.getKey() + " " + val);
581         idx++;
582       }
583     } finally {
584       if (reader != null) {
585         reader.close();
586       }
587     }
588   }
589 
590   /**
591    * Test that we can visit entries before they are appended
592    * @throws Exception
593    */
594   @Test
595   public void testVisitors() throws Exception {
596     final int COL_COUNT = 10;
597     final TableName tableName =
598         TableName.valueOf("tablename");
599     final byte [] row = Bytes.toBytes("row");
600     final DumbWALActionsListener visitor = new DumbWALActionsListener();
601     final AtomicLong sequenceId = new AtomicLong(1);
602     long timestamp = System.currentTimeMillis();
603     HTableDescriptor htd = new HTableDescriptor();
604     htd.addFamily(new HColumnDescriptor("column"));
605 
606     HRegionInfo hri = new HRegionInfo(tableName,
607         HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
608     final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
609     log.registerWALActionsListener(visitor);
610     for (int i = 0; i < COL_COUNT; i++) {
611       WALEdit cols = new WALEdit();
612       cols.add(new KeyValue(row, Bytes.toBytes("column"),
613           Bytes.toBytes(Integer.toString(i)),
614           timestamp, new byte[]{(byte) (i + '0')}));
615       log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
616           System.currentTimeMillis()), cols, sequenceId, true, null);
617     }
618     log.sync();
619     assertEquals(COL_COUNT, visitor.increments);
620     log.unregisterWALActionsListener(visitor);
621     WALEdit cols = new WALEdit();
622     cols.add(new KeyValue(row, Bytes.toBytes("column"),
623         Bytes.toBytes(Integer.toString(11)),
624         timestamp, new byte[]{(byte) (11 + '0')}));
625     log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
626         System.currentTimeMillis()), cols, sequenceId, true, null);
627     log.sync();
628     assertEquals(COL_COUNT, visitor.increments);
629   }
630 
631   /**
632    * A loaded WAL coprocessor won't break existing WAL test cases.
633    */
634   @Test
635   public void testWALCoprocessorLoaded() throws Exception {
636     // test to see whether the coprocessor is loaded or not.
637     WALCoprocessorHost host = wals.getWAL(UNSPECIFIED_REGION).getCoprocessorHost();
638     Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
639     assertNotNull(c);
640   }
641 
642   /**
643    * @throws IOException
644    */
645   @Test
646   public void testReadLegacyLog() throws IOException {
647     final int columnCount = 5;
648     final int recordCount = 5;
649     final TableName tableName =
650         TableName.valueOf("tablename");
651     final byte[] row = Bytes.toBytes("row");
652     long timestamp = System.currentTimeMillis();
653     Path path = new Path(dir, "tempwal");
654     SequenceFileLogWriter sflw = null;
655     WAL.Reader reader = null;
656     try {
657       HRegionInfo hri = new HRegionInfo(tableName,
658           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
659       HTableDescriptor htd = new HTableDescriptor(tableName);
660       fs.mkdirs(dir);
661       // Write log in pre-PB format.
662       sflw = new SequenceFileLogWriter();
663       sflw.init(fs, path, conf, false);
664       for (int i = 0; i < recordCount; ++i) {
665         WALKey key = new HLogKey(
666             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
667         WALEdit edit = new WALEdit();
668         for (int j = 0; j < columnCount; ++j) {
669           if (i == 0) {
670             htd.addFamily(new HColumnDescriptor("column" + j));
671           }
672           String value = i + "" + j;
673           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
674         }
675         sflw.append(new WAL.Entry(key, edit));
676       }
677       sflw.sync();
678       sflw.close();
679 
680       // Now read the log using standard means.
681       reader = wals.createReader(fs, path);
682       assertTrue(reader instanceof SequenceFileLogReader);
683       for (int i = 0; i < recordCount; ++i) {
684         WAL.Entry entry = reader.next();
685         assertNotNull(entry);
686         assertEquals(columnCount, entry.getEdit().size());
687         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
688         assertEquals(tableName, entry.getKey().getTablename());
689         int idx = 0;
690         for (Cell val : entry.getEdit().getCells()) {
691           assertTrue(Bytes.equals(row, val.getRow()));
692           String value = i + "" + idx;
693           assertArrayEquals(Bytes.toBytes(value), val.getValue());
694           idx++;
695         }
696       }
697       WAL.Entry entry = reader.next();
698       assertNull(entry);
699     } finally {
700       if (sflw != null) {
701         sflw.close();
702       }
703       if (reader != null) {
704         reader.close();
705       }
706     }
707   }
708 
709   static class DumbWALActionsListener extends WALActionsListener.Base {
710     int increments = 0;
711 
712     @Override
713     public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
714                                          WALEdit logEdit) {
715       increments++;
716     }
717 
718     @Override
719     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
720       //To change body of implemented methods use File | Settings | File Templates.
721       increments++;
722     }
723   }
724 
725   private static final byte[] UNSPECIFIED_REGION = new byte[]{};
726 
727 }