1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.Cell;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.wal.WAL;
47 import org.apache.hadoop.hbase.wal.WALFactory;
48 import org.apache.hadoop.hbase.wal.WALKey;
49 import org.apache.hadoop.hbase.wal.WALProvider;
50 import org.junit.After;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Rule;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57 import org.junit.rules.TestName;
58
59
60
61
62 @Category(MediumTests.class)
63 public class TestProtobufLog {
64 protected static final Log LOG = LogFactory.getLog(TestProtobufLog.class);
65
66 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67
68 protected FileSystem fs;
69 protected Path dir;
70 protected WALFactory wals;
71
72 @Rule
73 public final TestName currentTest = new TestName();
74
75 @Before
76 public void setUp() throws Exception {
77 fs = TEST_UTIL.getDFSCluster().getFileSystem();
78 dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName());
79 wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName());
80 }
81
82 @After
83 public void tearDown() throws Exception {
84 wals.close();
85 FileStatus[] entries = fs.listStatus(new Path("/"));
86 for (FileStatus dir : entries) {
87 fs.delete(dir.getPath(), true);
88 }
89 }
90
91 @BeforeClass
92 public static void setUpBeforeClass() throws Exception {
93
94 TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
95
96 TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
97 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
98
99 TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
100 TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
101 TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
102
103
104 TEST_UTIL.getConfiguration()
105 .setInt("hbase.ipc.client.connect.max.retries", 1);
106 TEST_UTIL.getConfiguration().setInt(
107 "dfs.client.block.recovery.retries", 1);
108 TEST_UTIL.getConfiguration().setInt(
109 "hbase.ipc.client.connection.maxidletime", 500);
110 TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
111 SampleRegionWALObserver.class.getName());
112 TEST_UTIL.startMiniDFSCluster(3);
113 }
114
115 @AfterClass
116 public static void tearDownAfterClass() throws Exception {
117 TEST_UTIL.shutdownMiniCluster();
118 }
119
120
121
122
123
124 @Test
125 public void testWALTrailer() throws IOException {
126
127 doRead(true);
128
129 doRead(false);
130 }
131
132
133
134
135
136
137
138
139
140
141 private void doRead(boolean withTrailer) throws IOException {
142 final int columnCount = 5;
143 final int recordCount = 5;
144 final TableName tableName =
145 TableName.valueOf("tablename");
146 final byte[] row = Bytes.toBytes("row");
147 long timestamp = System.currentTimeMillis();
148 Path path = new Path(dir, "tempwal");
149
150 fs.delete(path, true);
151 WALProvider.Writer writer = null;
152 ProtobufLogReader reader = null;
153 try {
154 HRegionInfo hri = new HRegionInfo(tableName,
155 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
156 HTableDescriptor htd = new HTableDescriptor(tableName);
157 fs.mkdirs(dir);
158
159 writer = wals.createWALWriter(fs, path);
160 for (int i = 0; i < recordCount; ++i) {
161 WALKey key = new WALKey(
162 hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
163 WALEdit edit = new WALEdit();
164 for (int j = 0; j < columnCount; ++j) {
165 if (i == 0) {
166 htd.addFamily(new HColumnDescriptor("column" + j));
167 }
168 String value = i + "" + j;
169 edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
170 }
171 writer.append(new WAL.Entry(key, edit));
172 }
173 writer.sync();
174 if (withTrailer) writer.close();
175
176
177 reader = (ProtobufLogReader) wals.createReader(fs, path);
178 if (withTrailer) {
179 assertNotNull(reader.trailer);
180 } else {
181 assertNull(reader.trailer);
182 }
183 for (int i = 0; i < recordCount; ++i) {
184 WAL.Entry entry = reader.next();
185 assertNotNull(entry);
186 assertEquals(columnCount, entry.getEdit().size());
187 assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
188 assertEquals(tableName, entry.getKey().getTablename());
189 int idx = 0;
190 for (Cell val : entry.getEdit().getCells()) {
191 assertTrue(Bytes.equals(row, val.getRow()));
192 String value = i + "" + idx;
193 assertArrayEquals(Bytes.toBytes(value), val.getValue());
194 idx++;
195 }
196 }
197 WAL.Entry entry = reader.next();
198 assertNull(entry);
199 } finally {
200 if (writer != null) {
201 writer.close();
202 }
203 if (reader != null) {
204 reader.close();
205 }
206 }
207 }
208 }