View Javadoc

1   /*
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.io.hfile;
21  
22  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.GZ;
23  import static org.apache.hadoop.hbase.io.compress.Compression.Algorithm.NONE;
24  import static org.junit.Assert.assertEquals;
25  
26  import java.io.ByteArrayInputStream;
27  import java.io.DataInputStream;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.fs.FSDataInputStream;
35  import org.apache.hadoop.fs.FSDataOutputStream;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.testclassification.SmallTests;
41  import org.apache.hadoop.hbase.fs.HFileSystem;
42  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
43  import org.apache.hadoop.hbase.io.compress.Compression;
44  import org.apache.hadoop.hbase.util.ChecksumType;
45  import org.junit.Before;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  @Category(SmallTests.class)
50  public class TestChecksum {
51    private static final Log LOG = LogFactory.getLog(TestHFileBlock.class);
52  
53    static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = {
54        NONE, GZ };
55  
56    static final int[] BYTES_PER_CHECKSUM = {
57        50, 500, 688, 16*1024, (16*1024+980), 64 * 1024};
58  
59    private static final HBaseTestingUtility TEST_UTIL =
60      new HBaseTestingUtility();
61    private FileSystem fs;
62    private HFileSystem hfs;
63  
64    @Before
65    public void setUp() throws Exception {
66      fs = HFileSystem.get(TEST_UTIL.getConfiguration());
67      hfs = (HFileSystem)fs;
68    }
69  
70    /**
71     * Introduce checksum failures and check that we can still read
72     * the data
73     */
74    @Test
75    public void testChecksumCorruption() throws IOException {
76      testChecksumCorruptionInternals(false);
77      testChecksumCorruptionInternals(true);
78    }
79  
80    protected void testChecksumCorruptionInternals(boolean useTags) throws IOException {
81      for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
82        for (boolean pread : new boolean[] { false, true }) {
83          LOG.info("testChecksumCorruption: Compression algorithm: " + algo +
84                     ", pread=" + pread);
85          Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
86              + algo);
87          FSDataOutputStream os = fs.create(path);
88          HFileContext meta = new HFileContextBuilder()
89                              .withCompression(algo)
90                              .withIncludesMvcc(true)
91                              .withIncludesTags(useTags)
92                              .withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
93                              .build();
94          HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
95          long totalSize = 0;
96          for (int blockId = 0; blockId < 2; ++blockId) {
97            DataOutputStream dos = hbw.startWriting(BlockType.DATA);
98            for (int i = 0; i < 1234; ++i)
99              dos.writeInt(i);
100           hbw.writeHeaderAndData(os);
101           totalSize += hbw.getOnDiskSizeWithHeader();
102         }
103         os.close();
104 
105         // Use hbase checksums. 
106         assertEquals(true, hfs.useHBaseChecksum());
107 
108         // Do a read that purposely introduces checksum verification failures.
109         FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
110         meta = new HFileContextBuilder()
111               .withCompression(algo)
112               .withIncludesMvcc(true)
113               .withIncludesTags(useTags)
114               .withHBaseCheckSum(true)
115               .build();
116         HFileBlock.FSReader hbr = new FSReaderImplTest(is, totalSize, fs, path, meta);
117         HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
118         b.sanityCheck();
119         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
120         assertEquals(algo == GZ ? 2173 : 4936, 
121                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
122         // read data back from the hfile, exclude header and checksum
123         ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data
124         DataInputStream in = new DataInputStream(
125                                new ByteArrayInputStream(
126                                  bb.array(), bb.arrayOffset(), bb.limit()));
127 
128         // assert that we encountered hbase checksum verification failures
129         // but still used hdfs checksums and read data successfully.
130         assertEquals(1, HFile.getAndResetChecksumFailuresCount());
131         validateData(in);
132 
133         // A single instance of hbase checksum failure causes the reader to
134         // switch off hbase checksum verification for the next 100 read
135         // requests. Verify that this is correct.
136         for (int i = 0; i < 
137              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
138           b = hbr.readBlockData(0, -1, -1, pread, false);
139           assertEquals(0, HFile.getAndResetChecksumFailuresCount());
140         }
141         // The next read should have hbase checksum verification reanabled,
142         // we verify this by assertng that there was a hbase-checksum failure.
143         b = hbr.readBlockData(0, -1, -1, pread, false);
144         assertEquals(1, HFile.getAndResetChecksumFailuresCount());
145 
146         // Since the above encountered a checksum failure, we switch
147         // back to not checking hbase checksums.
148         b = hbr.readBlockData(0, -1, -1, pread, false);
149         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
150         is.close();
151 
152         // Now, use a completely new reader. Switch off hbase checksums in 
153         // the configuration. In this case, we should not detect
154         // any retries within hbase. 
155         HFileSystem newfs = new HFileSystem(TEST_UTIL.getConfiguration(), false);
156         assertEquals(false, newfs.useHBaseChecksum());
157         is = new FSDataInputStreamWrapper(newfs, path);
158         hbr = new FSReaderImplTest(is, totalSize, newfs, path, meta);
159         b = hbr.readBlockData(0, -1, -1, pread, false);
160         is.close();
161         b.sanityCheck();
162         b = b.unpack(meta, hbr);
163         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
164         assertEquals(algo == GZ ? 2173 : 4936, 
165                      b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
166         // read data back from the hfile, exclude header and checksum
167         bb = b.getBufferWithoutHeader(); // read back data
168         in = new DataInputStream(new ByteArrayInputStream(
169                                  bb.array(), bb.arrayOffset(), bb.limit()));
170 
171         // assert that we did not encounter hbase checksum verification failures
172         // but still used hdfs checksums and read data successfully.
173         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
174         validateData(in);
175       }
176     }
177   }
178 
179   /** 
180    * Test different values of bytesPerChecksum
181    */
182   @Test
183   public void testChecksumChunks() throws IOException {
184     testChecksumInternals(false);
185     testChecksumInternals(true);
186   }
187 
188   protected void testChecksumInternals(boolean useTags) throws IOException {
189     Compression.Algorithm algo = NONE;
190     for (boolean pread : new boolean[] { false, true }) {
191       for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
192         Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" + 
193                              algo + bytesPerChecksum);
194         FSDataOutputStream os = fs.create(path);
195         HFileContext meta = new HFileContextBuilder()
196                             .withCompression(algo)
197                             .withIncludesMvcc(true)
198                             .withIncludesTags(useTags)
199                             .withHBaseCheckSum(true)
200                             .withBytesPerCheckSum(bytesPerChecksum)
201                             .build();
202         HFileBlock.Writer hbw = new HFileBlock.Writer(null,
203            meta);
204 
205         // write one block. The block has data
206         // that is at least 6 times more than the checksum chunk size
207         long dataSize = 0;
208         DataOutputStream dos = hbw.startWriting(BlockType.DATA);
209         for (; dataSize < 6 * bytesPerChecksum;) {
210           for (int i = 0; i < 1234; ++i) {
211             dos.writeInt(i);
212             dataSize += 4;
213           }
214         }
215         hbw.writeHeaderAndData(os);
216         long totalSize = hbw.getOnDiskSizeWithHeader();
217         os.close();
218 
219         long expectedChunks = ChecksumUtil.numChunks(
220                                dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
221                                bytesPerChecksum);
222         LOG.info("testChecksumChunks: pread=" + pread +
223                    ", bytesPerChecksum=" + bytesPerChecksum +
224                    ", fileSize=" + totalSize +
225                    ", dataSize=" + dataSize +
226                    ", expectedChunks=" + expectedChunks);
227 
228         // Verify hbase checksums. 
229         assertEquals(true, hfs.useHBaseChecksum());
230 
231         // Read data back from file.
232         FSDataInputStream is = fs.open(path);
233         FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
234         meta = new HFileContextBuilder()
235                .withCompression(algo)
236                .withIncludesMvcc(true)
237                .withIncludesTags(useTags)
238                .withHBaseCheckSum(true)
239                .withBytesPerCheckSum(bytesPerChecksum)
240                .build();
241         HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(
242             is, nochecksum), totalSize, hfs, path, meta);
243         HFileBlock b = hbr.readBlockData(0, -1, -1, pread, false);
244         is.close();
245         b.sanityCheck();
246         assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
247 
248         // verify that we have the expected number of checksum chunks
249         assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
250                      expectedChunks * HFileBlock.CHECKSUM_SIZE);
251 
252         // assert that we did not encounter hbase checksum verification failures
253         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
254       }
255     }
256   }
257 
258   /** 
259    * Test to ensure that these is at least one valid checksum implementation
260    */
261   @Test
262   public void testChecksumAlgorithm() throws IOException {
263     ChecksumType type = ChecksumType.CRC32;
264     assertEquals(ChecksumType.nameToType(type.getName()), type);
265     assertEquals(ChecksumType.valueOf(type.toString()), type);
266   }
267 
268   private void validateData(DataInputStream in) throws IOException {
269     // validate data
270     for (int i = 0; i < 1234; i++) {
271       int val = in.readInt();
272       assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val);
273     }
274   }
275 
276   /**
277    * A class that introduces hbase-checksum failures while 
278    * reading  data from hfiles. This should trigger the hdfs level
279    * checksum validations.
280    */
281   static private class FSReaderImplTest extends HFileBlock.FSReaderImpl {
282     public FSReaderImplTest(FSDataInputStreamWrapper istream, long fileSize, FileSystem fs,
283         Path path, HFileContext meta) throws IOException {
284       super(istream, fileSize, (HFileSystem) fs, path, meta);
285     }
286 
287     @Override
288     protected boolean validateBlockChecksum(HFileBlock block,
289       byte[] data, int hdrSize) throws IOException {
290       return false;  // checksum validation failure
291     }
292   }
293 }
294