View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import static org.junit.Assert.assertArrayEquals;
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertNull;
25  import static org.junit.Assert.assertTrue;
26  
27  import java.io.IOException;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.Cell;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.testclassification.MediumTests;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44  import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.wal.WAL;
47  import org.apache.hadoop.hbase.wal.WALFactory;
48  import org.apache.hadoop.hbase.wal.WALKey;
49  import org.apache.hadoop.hbase.wal.WALProvider;
50  import org.junit.After;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Rule;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  import org.junit.rules.TestName;
58  
59  /**
60   * WAL tests that can be reused across providers.
61   */
62  @Category(MediumTests.class)
63  public class TestProtobufLog {
64    protected static final Log LOG = LogFactory.getLog(TestProtobufLog.class);
65  
66    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67  
68    protected FileSystem fs;
69    protected Path dir;
70    protected WALFactory wals;
71  
72    @Rule
73    public final TestName currentTest = new TestName();
74  
75    @Before
76    public void setUp() throws Exception {
77      fs = TEST_UTIL.getDFSCluster().getFileSystem();
78      dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
79      wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName());
80    }
81  
82    @After
83    public void tearDown() throws Exception {
84      wals.close();
85      FileStatus[] entries = fs.listStatus(new Path("/"));
86      for (FileStatus dir : entries) {
87        fs.delete(dir.getPath(), true);
88      }
89    }
90  
91    @BeforeClass
92    public static void setUpBeforeClass() throws Exception {
93      // Make block sizes small.
94      TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
95      // needed for testAppendClose()
96      TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
97      TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
98      // quicker heartbeat interval for faster DN death notification
99      TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
100     TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
101     TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
102 
103     // faster failover with cluster.shutdown();fs.close() idiom
104     TEST_UTIL.getConfiguration()
105         .setInt("hbase.ipc.client.connect.max.retries", 1);
106     TEST_UTIL.getConfiguration().setInt(
107         "dfs.client.block.recovery.retries", 1);
108     TEST_UTIL.getConfiguration().setInt(
109       "hbase.ipc.client.connection.maxidletime", 500);
110     TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
111         SampleRegionWALObserver.class.getName());
112     TEST_UTIL.startMiniDFSCluster(3);
113   }
114 
115   @AfterClass
116   public static void tearDownAfterClass() throws Exception {
117     TEST_UTIL.shutdownMiniCluster();
118   }
119 
120   /**
121    * Reads the WAL with and without WALTrailer.
122    * @throws IOException
123    */
124   @Test
125   public void testWALTrailer() throws IOException {
126     // read With trailer.
127     doRead(true);
128     // read without trailer
129     doRead(false);
130   }
131 
132   /**
133    * Appends entries in the WAL and reads it.
134    * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading
135    *          so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync
136    *          call. This means that reader is not aware of the trailer. In this scenario, if the
137    *          reader tries to read the trailer in its next() call, it returns false from
138    *          ProtoBufLogReader.
139    * @throws IOException
140    */
141   private void doRead(boolean withTrailer) throws IOException {
142     final int columnCount = 5;
143     final int recordCount = 5;
144     final TableName tableName =
145         TableName.valueOf("tablename");
146     final byte[] row = Bytes.toBytes("row");
147     long timestamp = System.currentTimeMillis();
148     Path path = new Path(dir, "tempwal");
149     // delete the log if already exists, for test only
150     fs.delete(path, true);
151     WALProvider.Writer writer = null;
152     ProtobufLogReader reader = null;
153     try {
154       HRegionInfo hri = new HRegionInfo(tableName,
155           HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
156       HTableDescriptor htd = new HTableDescriptor(tableName);
157       fs.mkdirs(dir);
158       // Write log in pb format.
159       writer = wals.createWALWriter(fs, path);
160       for (int i = 0; i < recordCount; ++i) {
161         WALKey key = new WALKey(
162             hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
163         WALEdit edit = new WALEdit();
164         for (int j = 0; j < columnCount; ++j) {
165           if (i == 0) {
166             htd.addFamily(new HColumnDescriptor("column" + j));
167           }
168           String value = i + "" + j;
169           edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
170         }
171         writer.append(new WAL.Entry(key, edit));
172       }
173       writer.sync();
174       if (withTrailer) writer.close();
175 
176       // Now read the log using standard means.
177       reader = (ProtobufLogReader) wals.createReader(fs, path);
178       if (withTrailer) {
179         assertNotNull(reader.trailer);
180       } else {
181         assertNull(reader.trailer);
182       }
183       for (int i = 0; i < recordCount; ++i) {
184         WAL.Entry entry = reader.next();
185         assertNotNull(entry);
186         assertEquals(columnCount, entry.getEdit().size());
187         assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
188         assertEquals(tableName, entry.getKey().getTablename());
189         int idx = 0;
190         for (Cell val : entry.getEdit().getCells()) {
191           assertTrue(Bytes.equals(row, val.getRow()));
192           String value = i + "" + idx;
193           assertArrayEquals(Bytes.toBytes(value), val.getValue());
194           idx++;
195         }
196       }
197       WAL.Entry entry = reader.next();
198       assertNull(entry);
199     } finally {
200       if (writer != null) {
201         writer.close();
202       }
203       if (reader != null) {
204         reader.close();
205       }
206     }
207   }
208 }