1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.assertNotNull;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.lang.ref.SoftReference;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.List;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.fs.FSDataInputStream;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.FilterFileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.fs.PositionedReadable;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HTableDescriptor;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.MiniHBaseCluster;
45 import org.apache.hadoop.hbase.client.Table;
46 import org.apache.hadoop.hbase.testclassification.MediumTests;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.HBaseAdmin;
50 import org.apache.hadoop.hbase.fs.HFileSystem;
51 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52 import org.apache.hadoop.hbase.io.hfile.HFileContext;
53 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
54 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.junit.Assume;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59
60
61
62
63
64 @Category(MediumTests.class)
65 public class TestFSErrorsExposed {
66 private static final Log LOG = LogFactory.getLog(TestFSErrorsExposed.class);
67
68 HBaseTestingUtility util = new HBaseTestingUtility();
69
70
71
72
73
74 @Test
75 public void testHFileScannerThrowsErrors() throws IOException {
76 Path hfilePath = new Path(new Path(
77 util.getDataTestDir("internalScannerExposesErrors"),
78 "regionname"), "familyname");
79 HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
80 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
81 FileSystem fs = new HFileSystem(faultyfs);
82 CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
83 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
84 StoreFile.Writer writer = new StoreFile.WriterBuilder(
85 util.getConfiguration(), cacheConf, hfs)
86 .withOutputDir(hfilePath)
87 .withFileContext(meta)
88 .build();
89 TestStoreFile.writeStoreFile(
90 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
91
92 StoreFile sf = new StoreFile(fs, writer.getPath(),
93 util.getConfiguration(), cacheConf, BloomType.NONE);
94
95 StoreFile.Reader reader = sf.createReader();
96 HFileScanner scanner = reader.getScanner(false, true);
97
98 FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
99 assertNotNull(inStream);
100
101 scanner.seekTo();
102
103 assertTrue(scanner.next());
104
105 faultyfs.startFaults();
106
107 try {
108 int scanned=0;
109 while (scanner.next()) {
110 scanned++;
111 }
112 fail("Scanner didn't throw after faults injected");
113 } catch (IOException ioe) {
114 LOG.info("Got expected exception", ioe);
115 assertTrue(ioe.getMessage().contains("Fault"));
116 }
117 reader.close(true);
118 }
119
120
121
122
123
124 @Test
125 public void testStoreFileScannerThrowsErrors() throws IOException {
126 Path hfilePath = new Path(new Path(
127 util.getDataTestDir("internalScannerExposesErrors"),
128 "regionname"), "familyname");
129 HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
130 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
131 HFileSystem fs = new HFileSystem(faultyfs);
132 CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
133 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
134 StoreFile.Writer writer = new StoreFile.WriterBuilder(
135 util.getConfiguration(), cacheConf, hfs)
136 .withOutputDir(hfilePath)
137 .withFileContext(meta)
138 .build();
139 TestStoreFile.writeStoreFile(
140 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
141
142 StoreFile sf = new StoreFile(fs, writer.getPath(), util.getConfiguration(),
143 cacheConf, BloomType.NONE);
144
145 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
146 Collections.singletonList(sf), false, true, false,
147
148 0);
149 KeyValueScanner scanner = scanners.get(0);
150
151 FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
152 assertNotNull(inStream);
153
154 scanner.seek(KeyValue.LOWESTKEY);
155
156 assertNotNull(scanner.next());
157 faultyfs.startFaults();
158
159 try {
160 int scanned=0;
161 while (scanner.next() != null) {
162 scanned++;
163 }
164 fail("Scanner didn't throw after faults injected");
165 } catch (IOException ioe) {
166 LOG.info("Got expected exception", ioe);
167 assertTrue(ioe.getMessage().contains("Could not iterate"));
168 }
169 scanner.close();
170 }
171
172
173
174
175
176
177 @Test(timeout=5 * 60 * 1000)
178 public void testFullSystemBubblesFSErrors() throws Exception {
179
180
181 Assume.assumeTrue(!util.isReadShortCircuitOn());
182
183 try {
184
185 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
186
187 util.startMiniCluster(1);
188 TableName tableName = TableName.valueOf("table");
189 byte[] fam = Bytes.toBytes("fam");
190
191 Admin admin = new HBaseAdmin(util.getConfiguration());
192 HTableDescriptor desc = new HTableDescriptor(tableName);
193 desc.addFamily(new HColumnDescriptor(fam)
194 .setMaxVersions(1)
195 .setBlockCacheEnabled(false)
196 );
197 admin.createTable(desc);
198
199
200
201 try (Table table = util.getConnection().getTable(tableName)) {
202
203 util.loadTable(table, fam, false);
204 util.flush();
205 util.countRows(table);
206
207
208 util.getDFSCluster().shutdownDataNodes();
209
210 try {
211 util.countRows(table);
212 fail("Did not fail to count after removing data");
213 } catch (Exception e) {
214 LOG.info("Got expected error", e);
215 assertTrue(e.getMessage().contains("Could not seek"));
216 }
217 }
218
219
220 util.getDFSCluster().restartDataNodes();
221
222 } finally {
223 MiniHBaseCluster cluster = util.getMiniHBaseCluster();
224 if (cluster != null) cluster.killAll();
225 util.shutdownMiniCluster();
226 }
227 }
228
229 static class FaultyFileSystem extends FilterFileSystem {
230 List<SoftReference<FaultyInputStream>> inStreams =
231 new ArrayList<SoftReference<FaultyInputStream>>();
232
233 public FaultyFileSystem(FileSystem testFileSystem) {
234 super(testFileSystem);
235 }
236
237 @Override
238 public FSDataInputStream open(Path p, int bufferSize) throws IOException {
239 FSDataInputStream orig = fs.open(p, bufferSize);
240 FaultyInputStream faulty = new FaultyInputStream(orig);
241 inStreams.add(new SoftReference<FaultyInputStream>(faulty));
242 return faulty;
243 }
244
245
246
247
248 public void startFaults() {
249 for (SoftReference<FaultyInputStream> is: inStreams) {
250 is.get().startFaults();
251 }
252 }
253 }
254
255 static class FaultyInputStream extends FSDataInputStream {
256 boolean faultsStarted = false;
257
258 public FaultyInputStream(InputStream in) throws IOException {
259 super(in);
260 }
261
262 public void startFaults() {
263 faultsStarted = true;
264 }
265
266 @Override
267 public int read(long position, byte[] buffer, int offset, int length)
268 throws IOException {
269 injectFault();
270 return ((PositionedReadable)in).read(position, buffer, offset, length);
271 }
272
273 private void injectFault() throws IOException {
274 if (faultsStarted) {
275 throw new IOException("Fault injected");
276 }
277 }
278 }
279
280
281
282 }