1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.io;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.lang.reflect.InvocationTargetException;
23 import java.lang.reflect.Method;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.fs.FSDataInputStream;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.fs.HFileSystem;
31 import org.apache.hadoop.hbase.io.FileLink;
32
33 import com.google.common.annotations.VisibleForTesting;
34
35
36
37
38
39
40 public class FSDataInputStreamWrapper {
41 private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
42 private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
43
44 private final HFileSystem hfs;
45 private final Path path;
46 private final FileLink link;
47 private final boolean doCloseStreams;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 private volatile FSDataInputStream stream = null;
70 private volatile FSDataInputStream streamNoFsChecksum = null;
71 private Object streamNoFsChecksumFirstCreateLock = new Object();
72
73
74 private boolean useHBaseChecksumConfigured;
75
76
77
78
79
80 private volatile boolean useHBaseChecksum;
81
82
83
84 private volatile int hbaseChecksumOffCount = -1;
85
86 private Boolean instanceOfCanUnbuffer = null;
87
88
89 private Method unbuffer = null;
90
91 public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
92 this(fs, null, path);
93 }
94
95 public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
96 this(fs, link, null);
97 }
98
99 private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException {
100 assert (path == null) != (link == null);
101 this.path = path;
102 this.link = link;
103 this.doCloseStreams = true;
104
105
106
107 this.hfs = (fs instanceof HFileSystem) ? (HFileSystem)fs : new HFileSystem(fs);
108
109
110 this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
111 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
112 }
113
114
115
116
117
118
119
120 public void prepareForBlockReader(boolean forceNoHBaseChecksum) throws IOException {
121 if (hfs == null) return;
122 assert this.stream != null && !this.useHBaseChecksumConfigured;
123 boolean useHBaseChecksum =
124 !forceNoHBaseChecksum && hfs.useHBaseChecksum() && (hfs.getNoChecksumFs() != hfs);
125
126 if (useHBaseChecksum) {
127 FileSystem fsNc = hfs.getNoChecksumFs();
128 this.streamNoFsChecksum = (link != null) ? link.open(fsNc) : fsNc.open(path);
129 this.useHBaseChecksumConfigured = this.useHBaseChecksum = useHBaseChecksum;
130
131 this.stream.close();
132 this.stream = null;
133 }
134 }
135
136
137 @VisibleForTesting
138 public FSDataInputStreamWrapper(FSDataInputStream fsdis) {
139 this(fsdis, fsdis);
140 }
141
142
143 @VisibleForTesting
144 public FSDataInputStreamWrapper(FSDataInputStream fsdis, FSDataInputStream noChecksum) {
145 doCloseStreams = false;
146 stream = fsdis;
147 streamNoFsChecksum = noChecksum;
148 path = null;
149 link = null;
150 hfs = null;
151 useHBaseChecksumConfigured = useHBaseChecksum = false;
152 }
153
154
155
156
157 public boolean shouldUseHBaseChecksum() {
158 return this.useHBaseChecksum;
159 }
160
161
162
163
164
165
166 public FSDataInputStream getStream(boolean useHBaseChecksum) {
167 return useHBaseChecksum ? this.streamNoFsChecksum : this.stream;
168 }
169
170
171
172
173
174 public FSDataInputStream fallbackToFsChecksum(int offCount) throws IOException {
175
176 boolean partOfConvoy = false;
177 if (this.stream == null) {
178 synchronized (streamNoFsChecksumFirstCreateLock) {
179 partOfConvoy = (this.stream != null);
180 if (!partOfConvoy) {
181 this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
182 }
183 }
184 }
185 if (!partOfConvoy) {
186 this.useHBaseChecksum = false;
187 this.hbaseChecksumOffCount = offCount;
188 }
189 return this.stream;
190 }
191
192
193 public void checksumOk() {
194 if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
195 && (this.hbaseChecksumOffCount-- < 0)) {
196
197 assert this.streamNoFsChecksum != null;
198 this.useHBaseChecksum = true;
199 }
200 }
201
202
203 public void close() throws IOException {
204 if (!doCloseStreams) return;
205 try {
206 if (stream != streamNoFsChecksum && streamNoFsChecksum != null) {
207 streamNoFsChecksum.close();
208 streamNoFsChecksum = null;
209 }
210 } finally {
211 if (stream != null) {
212 stream.close();
213 stream = null;
214 }
215 }
216 }
217
218 public HFileSystem getHfs() {
219 return this.hfs;
220 }
221
222
223
224
225
226
227
228
229
230
231
232
233 @SuppressWarnings({ "rawtypes" })
234 public void unbuffer() {
235 FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
236 if (stream != null) {
237 InputStream wrappedStream = stream.getWrappedStream();
238
239
240
241 final Class<? extends InputStream> streamClass = wrappedStream.getClass();
242 if (this.instanceOfCanUnbuffer == null) {
243
244 this.instanceOfCanUnbuffer = false;
245 Class<?>[] streamInterfaces = streamClass.getInterfaces();
246 for (Class c : streamInterfaces) {
247 if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
248 try {
249 this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
250 } catch (NoSuchMethodException | SecurityException e) {
251 if (isLogTraceEnabled) {
252 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
253 + " . So there may be a TCP socket connection "
254 + "left open in CLOSE_WAIT state.", e);
255 }
256 return;
257 }
258 this.instanceOfCanUnbuffer = true;
259 break;
260 }
261 }
262 }
263 if (this.instanceOfCanUnbuffer) {
264 try {
265 this.unbuffer.invoke(wrappedStream);
266 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
267 if (isLogTraceEnabled) {
268 LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
269 + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.", e);
270 }
271 }
272 } else {
273 if (isLogTraceEnabled) {
274 LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
275 + " . So there may be a TCP socket connection "
276 + "left open in CLOSE_WAIT state. For more details check "
277 + "https://issues.apache.org/jira/browse/HBASE-9393");
278 }
279 }
280 }
281 }
282 }