1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.io.hfile;
21
22 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23 import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24 import static org.junit.Assert.assertEquals;
25
26 import java.io.ByteArrayInputStream;
27 import java.io.DataInputStream;
28 import java.io.DataOutputStream;
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.testclassification.SmallTests;
41 import org.apache.hadoop.hbase.fs.HFileSystem;
42 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43 import org.apache.hadoop.hbase.io.compress.Compression;
44 import org.apache.hadoop.hbase.util.ChecksumType;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 @Category(SmallTests.class)
50 public class TestChecksum {
51 private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
52
53 static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
54 NONE, GZ };
55
56 static final int[] BYTES_PER_CHECKSUM = {
57 50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
58
59 private static final HBaseTestingUtility TEST_UTIL =
60 new HBaseTestingUtility();
61 private FileSystem fs;
62 private HFileSystem hfs;
63
64 @Before
65 public void setUp() throws Exception {
66 fs = HFileSystem.get(TEST_UTIL.getConfiguration());
67 hfs = (HFileSystem)fs;
68 }
69
70
71
72
73
74 @Test
75 public void testChecksumCorruption() throws IOException {
76 testChecksumCorruptionInternals(false);
77 testChecksumCorruptionInternals(true);
78 }
79
80 protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
81 for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
82 for (boolean pread : new boolean[] { false, true }) {
83 LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
84 ", pread=" + pread);
85 Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
86 + algo);
87 FSDataOutputStream os = fs.create(path);
88 HFileContext meta = new HFileContextBuilder()
89 .withCompression(algo)
90 .withIncludesMvcc(true)
91 .withIncludesTags(useTags)
92 .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
93 .build();
94 HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
95 long totalSize = 0;
96 for (int blockId = 0; blockId < 2; ++blockId) {
97 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
98 for (int i = 0; i < 1234; ++i)
99 dos.writeInt(i);
100 hbw.writeHeaderAndData(os);
101 totalSize += hbw.getOnDiskSizeWithHeader();
102 }
103 os.close();
104
105
106 assertEquals(true, hfs.useHBaseChecksum());
107
108
109 FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
110 meta = new HFileContextBuilder()
111 .withCompression(algo)
112 .withIncludesMvcc(true)
113 .withIncludesTags(useTags)
114 .withHBaseCheckSum(true)
115 .build();
116 HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
117 HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
118 b.sanityCheck();
119 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
120 assertEquals(algo == GZ ? 2173 : 4936,
121 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
122
123 ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader();
124 DataInputStream in = new DataInputStream(
125 new ByteArrayInputStream(
126 bb.array(), bb.arrayOffset(), bb.limit()));
127
128
129
130 assertEquals(1, HFile.getAndResetChecksumFailuresCount());
131 validateData(in);
132
133
134
135
136 for (int i = 0; i <
137 HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
138 b = hbr.readBlockData(0, -1, -1, pread, false);
139 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
140 }
141
142
143 b = hbr.readBlockData(0, -1, -1, pread, false);
144 assertEquals(1, HFile.getAndResetChecksumFailuresCount());
145
146
147
148 b = hbr.readBlockData(0, -1, -1, pread, false);
149 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
150 is.close();
151
152
153
154
155 HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
156 assertEquals(false, newfs.useHBaseChecksum());
157 is = new FSDataInputStreamWrapper(newfs, path);
158 hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
159 b = hbr.readBlockData(0, -1, -1, pread, false);
160 is.close();
161 b.sanityCheck();
162 b = b.unpack(meta, hbr);
163 assertEquals(4936, b.getUncompressedSizeWithoutHeader());
164 assertEquals(algo == GZ ? 2173 : 4936,
165 b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
166
167 bb = b.getBufferWithoutHeader();
168 in = new DataInputStream(new ByteArrayInputStream(
169 bb.array(), bb.arrayOffset(), bb.limit()));
170
171
172
173 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
174 validateData(in);
175 }
176 }
177 }
178
179
180
181
182 @Test
183 public void testChecksumChunks() throws IOException {
184 testChecksumInternals(false);
185 testChecksumInternals(true);
186 }
187
188 protected void testChecksumInternals(boolean useTags) throws IOException {
189 Compression.Algorithm algo = NONE;
190 for (boolean pread : new boolean[] { false, true }) {
191 for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
192 Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
193 algo + bytesPerChecksum);
194 FSDataOutputStream os = fs.create(path);
195 HFileContext meta = new HFileContextBuilder()
196 .withCompression(algo)
197 .withIncludesMvcc(true)
198 .withIncludesTags(useTags)
199 .withHBaseCheckSum(true)
200 .withBytesPerCheckSum(bytesPerChecksum)
201 .build();
202 HFileBlock.Writer hbw = new HFileBlock.Writer(null,
203 meta);
204
205
206
207 long dataSize = 0;
208 DataOutputStream dos = hbw.startWriting(BlockType.DATA);
209 for (; dataSize < 6 * bytesPerChecksum;) {
210 for (int i = 0; i < 1234; ++i) {
211 dos.writeInt(i);
212 dataSize += 4;
213 }
214 }
215 hbw.writeHeaderAndData(os);
216 long totalSize = hbw.getOnDiskSizeWithHeader();
217 os.close();
218
219 long expectedChunks = ChecksumUtil.numChunks(
220 dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
221 bytesPerChecksum);
222 LOG.info("testChecksumChunks: pread=" + pread +
223 ", bytesPerChecksum=" + bytesPerChecksum +
224 ", fileSize=" + totalSize +
225 ", dataSize=" + dataSize +
226 ", expectedChunks=" + expectedChunks);
227
228
229 assertEquals(true, hfs.useHBaseChecksum());
230
231
232 FSDataInputStream is = fs.open(path);
233 FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
234 meta = new HFileContextBuilder()
235 .withCompression(algo)
236 .withIncludesMvcc(true)
237 .withIncludesTags(useTags)
238 .withHBaseCheckSum(true)
239 .withBytesPerCheckSum(bytesPerChecksum)
240 .build();
241 HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
242 is, nochecksum), totalSize, hfs, path, meta);
243 HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
244 is.close();
245 b.sanityCheck();
246 assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
247
248
249 assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
250 expectedChunks * HFileBlock.CHECKSUM_SIZE);
251
252
253 assertEquals(0, HFile.getAndResetChecksumFailuresCount());
254 }
255 }
256 }
257
258
259
260
261 @Test
262 public void testChecksumAlgorithm() throws IOException {
263 ChecksumType type = ChecksumType.CRC32;
264 assertEquals(ChecksumType.nameToType(type.getName()), type);
265 assertEquals(ChecksumType.valueOf(type.toString()), type);
266 }
267
268 private void validateData(DataInputStream in) throws IOException {
269
270 for (int i = 0; i < 1234; i++) {
271 int val = in.readInt();
272 assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
273 }
274 }
275
276
277
278
279
280
281 static private class FSReaderImplTest extends HFileBlock.FSReaderImpl {
282 public FSReaderImplTest(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
283 Path path, HFileContext meta) throws IOException {
284 super(istream, fileSize, (HFileSystem) fs, path, meta);
285 }
286
287 @Override
288 protected boolean validateBlockChecksum(HFileBlock block,
289 byte[] data, int hdrSize) throws IOException {
290 return false;
291 }
292 }
293 }
294