1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.wal;
19
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.io.IOUtils;
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.commons.logging.impl.Log4JLogger;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataInputStream;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HBaseTestingUtility;
36 import org.apache.hadoop.hbase.HColumnDescriptor;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
44 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.FSUtils;
47 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
48 import org.apache.log4j.Level;
49 import org.junit.BeforeClass;
50 import org.junit.Rule;
51 import org.junit.Test;
52 import org.junit.experimental.categories.Category;
53 import org.junit.rules.TestName;
54
55
56 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
57 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
58 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
59 import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
60 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
61 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
62 import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec;
63
64
65
66
67 @Category(MediumTests.class)
68 public class TestWALReaderOnSecureWAL {
69 static final Log LOG = LogFactory.getLog(TestWALReaderOnSecureWAL.class);
70 static {
71 ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
72 .getLogger().setLevel(Level.ALL);
73 };
74 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
75 final byte[] value = Bytes.toBytes("Test value");
76
77 private static final String WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
78
79 @Rule
80 public TestName currentTest = new TestName();
81
82 @BeforeClass
83 public static void setUpBeforeClass() throws Exception {
84 Configuration conf = TEST_UTIL.getConfiguration();
85 conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
86 conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
87 conf.setBoolean("hbase.hlog.split.skip.errors", true);
88 conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
89 FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
90 }
91
92 private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
93 Configuration conf = TEST_UTIL.getConfiguration();
94 String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
95 conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
96 WALCellCodec.class);
97 try {
98 TableName tableName = TableName.valueOf(tblName);
99 HTableDescriptor htd = new HTableDescriptor(tableName);
100 htd.addFamily(new HColumnDescriptor(tableName.getName()));
101 HRegionInfo regioninfo = new HRegionInfo(tableName,
102 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
103 final int total = 10;
104 final byte[] row = Bytes.toBytes("row");
105 final byte[] family = Bytes.toBytes("family");
106 FileSystem fs = TEST_UTIL.getTestFileSystem();
107 Path logDir = TEST_UTIL.getDataTestDir(tblName);
108 final AtomicLong sequenceId = new AtomicLong(1);
109
110
111 WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
112 for (int i = 0; i < total; i++) {
113 WALEdit kvs = new WALEdit();
114 kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
115 wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
116 System.currentTimeMillis()), kvs, sequenceId, true, null);
117 }
118 wal.sync();
119 final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
120 wal.shutdown();
121
122 return walPath;
123 } finally {
124
125 conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
126 }
127 }
128
129 @Test()
130 public void testWALReaderOnSecureWAL() throws Exception {
131 Configuration conf = TEST_UTIL.getConfiguration();
132 conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
133 WAL.Reader.class);
134 conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
135 WALProvider.Writer.class);
136 conf.setBoolean(WAL_ENCRYPTION, true);
137 FileSystem fs = TEST_UTIL.getTestFileSystem();
138 final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
139 Path walPath = writeWAL(wals, currentTest.getMethodName());
140
141
142 long length = fs.getFileStatus(walPath).getLen();
143 FSDataInputStream in = fs.open(walPath);
144 byte[] fileData = new byte[(int)length];
145 IOUtils.readFully(in, fileData);
146 in.close();
147 assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
148
149
150 try {
151 WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
152 assertFalse(true);
153 } catch (IOException ioe) {
154
155 }
156
157 FileStatus[] listStatus = fs.listStatus(walPath.getParent());
158 RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
159 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
160 Path rootdir = FSUtils.getRootDir(conf);
161 try {
162 WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
163 s.splitLogFile(listStatus[0], null);
164 Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
165 "corrupt");
166 assertTrue(fs.exists(file));
167
168 } catch (IOException ioe) {
169 assertTrue("WAL should have been sidelined", false);
170 }
171 wals.close();
172 }
173
174 @Test()
175 public void testSecureWALReaderOnWAL() throws Exception {
176 Configuration conf = TEST_UTIL.getConfiguration();
177 conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
178 WAL.Reader.class);
179 conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
180 WALProvider.Writer.class);
181 conf.setBoolean(WAL_ENCRYPTION, false);
182 FileSystem fs = TEST_UTIL.getTestFileSystem();
183 final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
184 Path walPath = writeWAL(wals, currentTest.getMethodName());
185
186
187 long length = fs.getFileStatus(walPath).getLen();
188 FSDataInputStream in = fs.open(walPath);
189 byte[] fileData = new byte[(int)length];
190 IOUtils.readFully(in, fileData);
191 in.close();
192 assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
193
194
195 try {
196 WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
197 reader.close();
198 } catch (IOException ioe) {
199 assertFalse(true);
200 }
201
202 FileStatus[] listStatus = fs.listStatus(walPath.getParent());
203 RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
204 RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
205 Path rootdir = FSUtils.getRootDir(conf);
206 try {
207 WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
208 s.splitLogFile(listStatus[0], null);
209 Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
210 "corrupt");
211 assertTrue(!fs.exists(file));
212 } catch (IOException ioe) {
213 assertTrue("WAL should have been processed", false);
214 }
215 wals.close();
216 }
217 }