1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.fs;
22
23 import java.io.Closeable;
24 import java.io.IOException;
25 import java.lang.reflect.Field;
26 import java.lang.reflect.InvocationHandler;
27 import java.lang.reflect.InvocationTargetException;
28 import java.lang.reflect.Method;
29 import java.lang.reflect.Modifier;
30 import java.lang.reflect.Proxy;
31 import java.lang.reflect.UndeclaredThrowableException;
32 import java.net.URI;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.FilterFileSystem;
40 import org.apache.hadoop.fs.LocalFileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.ServerName;
43 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
44 import org.apache.hadoop.hdfs.DFSClient;
45 import org.apache.hadoop.hdfs.DistributedFileSystem;
46 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
47 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
48 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
49 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
50 import org.apache.hadoop.ipc.RPC;
51 import org.apache.hadoop.util.Progressable;
52 import org.apache.hadoop.util.ReflectionUtils;
53
54
55
56
57
58
59 public class HFileSystem extends FilterFileSystem {
60 public static final Log LOG = LogFactory.getLog(HFileSystem.class);
61
62
63 public static final String HBASE_WAL_DIR = "hbase.wal.dir";
64
65 private final FileSystem noChecksumFs;
66 private final boolean useHBaseChecksum;
67
68
69
70
71
72
73
74
75 public HFileSystem(Configuration conf, boolean useHBaseChecksum)
76 throws IOException {
77
78
79
80
81 this.fs = FileSystem.get(conf);
82 this.useHBaseChecksum = useHBaseChecksum;
83
84 fs.initialize(getDefaultUri(conf), conf);
85
86
87 if (fs instanceof LocalFileSystem) {
88 fs.setWriteChecksum(false);
89 fs.setVerifyChecksum(false);
90 }
91
92 addLocationsOrderInterceptor(conf);
93
94
95
96
97
98
99
100
101
102 if (useHBaseChecksum && !(fs instanceof LocalFileSystem)) {
103 conf = new Configuration(conf);
104 conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
105 this.noChecksumFs = newInstanceFileSystem(conf);
106 this.noChecksumFs.setVerifyChecksum(false);
107 } else {
108 this.noChecksumFs = fs;
109 }
110 }
111
112
113
114
115
116
117
118 public HFileSystem(FileSystem fs) {
119 this.fs = fs;
120 this.noChecksumFs = fs;
121 this.useHBaseChecksum = false;
122 }
123
124
125
126
127
128
129
130
131 public FileSystem getNoChecksumFs() {
132 return noChecksumFs;
133 }
134
135
136
137
138
139 public FileSystem getBackingFs() throws IOException {
140 return fs;
141 }
142
143
144
145
146
147
148 public boolean useHBaseChecksum() {
149 return useHBaseChecksum;
150 }
151
152
153
154
155 @Override
156 public void close() throws IOException {
157 super.close();
158 if (this.noChecksumFs != fs) {
159 this.noChecksumFs.close();
160 }
161 }
162
163
164
165
166
167
168
169
170
171 private static FileSystem newInstanceFileSystem(Configuration conf)
172 throws IOException {
173 URI uri = FileSystem.getDefaultUri(conf);
174 FileSystem fs = null;
175 Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
176 if (clazz != null) {
177
178 fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
179 fs.initialize(uri, conf);
180 } else {
181
182
183
184 Configuration clone = new Configuration(conf);
185 clone.setBoolean("fs." + uri.getScheme() + ".impl.disable.cache", true);
186 fs = FileSystem.get(uri, clone);
187 }
188 if (fs == null) {
189 throw new IOException("No FileSystem for scheme: " + uri.getScheme());
190 }
191
192 return fs;
193 }
194
195 public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
196 return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
197 }
198
199
200
201
202
203
204
205
206
207 static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
208 if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) {
209 LOG.debug("addLocationsOrderInterceptor configured to false");
210 return false;
211 }
212
213 FileSystem fs;
214 try {
215 fs = FileSystem.get(conf);
216 } catch (IOException e) {
217 LOG.warn("Can't get the file system from the conf.", e);
218 return false;
219 }
220
221 if (!(fs instanceof DistributedFileSystem)) {
222 LOG.debug("The file system is not a DistributedFileSystem. " +
223 "Skipping on block location reordering");
224 return false;
225 }
226
227 DistributedFileSystem dfs = (DistributedFileSystem) fs;
228 DFSClient dfsc = dfs.getClient();
229 if (dfsc == null) {
230 LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
231 "block reordering interceptor. Continuing, but this is unexpected."
232 );
233 return false;
234 }
235
236 try {
237 Field nf = DFSClient.class.getDeclaredField("namenode");
238 nf.setAccessible(true);
239 Field modifiersField = Field.class.getDeclaredField("modifiers");
240 modifiersField.setAccessible(true);
241 modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
242
243 ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
244 if (namenode == null) {
245 LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
246 " reordering interceptor. Continuing, but this is unexpected."
247 );
248 return false;
249 }
250
251 ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
252 nf.set(dfsc, cp1);
253 LOG.info("Added intercepting call to namenode#getBlockLocations so can do block reordering" +
254 " using class " + lrb.getClass());
255 } catch (NoSuchFieldException e) {
256 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
257 return false;
258 } catch (IllegalAccessException e) {
259 LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
260 return false;
261 }
262
263 return true;
264 }
265
266 private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
267 final ReorderBlocks lrb, final Configuration conf) {
268 return (ClientProtocol) Proxy.newProxyInstance
269 (cp.getClass().getClassLoader(),
270 new Class[]{ClientProtocol.class, Closeable.class},
271 new InvocationHandler() {
272 public Object invoke(Object proxy, Method method,
273 Object[] args) throws Throwable {
274 try {
275 if ((args == null || args.length == 0)
276 && "close".equals(method.getName())) {
277 RPC.stopProxy(cp);
278 return null;
279 } else {
280 Object res = method.invoke(cp, args);
281 if (res != null && args != null && args.length == 3
282 && "getBlockLocations".equals(method.getName())
283 && res instanceof LocatedBlocks
284 && args[0] instanceof String
285 && args[0] != null) {
286 lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
287 }
288 return res;
289 }
290 } catch (InvocationTargetException ite) {
291
292
293 Throwable cause = ite.getCause();
294 if (cause == null){
295 throw new RuntimeException(
296 "Proxy invocation failed and getCause is null", ite);
297 }
298 if (cause instanceof UndeclaredThrowableException) {
299 Throwable causeCause = cause.getCause();
300 if (causeCause == null) {
301 throw new RuntimeException("UndeclaredThrowableException had null cause!");
302 }
303 cause = cause.getCause();
304 }
305 throw cause;
306 }
307 }
308 });
309 }
310
311
312
313
314 interface ReorderBlocks {
315
316
317
318
319
320
321
322 void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
323 }
324
325
326
327
328
329
330 static class ReorderWALBlocks implements ReorderBlocks {
331 public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
332 throws IOException {
333
334 ServerName sn = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, src);
335 if (sn == null) {
336
337 return;
338 }
339
340
341 String hostName = sn.getHostname();
342 if (LOG.isTraceEnabled()) {
343 LOG.trace(src +
344 " is an WAL file, so reordering blocks, last hostname will be:" + hostName);
345 }
346
347
348 for (LocatedBlock lb : lbs.getLocatedBlocks()) {
349 DatanodeInfo[] dnis = lb.getLocations();
350 if (dnis != null && dnis.length > 1) {
351 boolean found = false;
352 for (int i = 0; i < dnis.length - 1 && !found; i++) {
353 if (hostName.equals(dnis[i].getHostName())) {
354
355 DatanodeInfo toLast = dnis[i];
356 System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
357 dnis[dnis.length - 1] = toLast;
358 found = true;
359 }
360 }
361 }
362 }
363 }
364 }
365
366
367
368
369
370
371
372 static public FileSystem get(Configuration conf) throws IOException {
373 return new HFileSystem(conf, true);
374 }
375
376
377
378
379 static public FileSystem getLocalFs(Configuration conf) throws IOException {
380 return new HFileSystem(FileSystem.getLocal(conf));
381 }
382
383
384
385
386
387
388 @SuppressWarnings("deprecation")
389 public FSDataOutputStream createNonRecursive(Path f,
390 boolean overwrite,
391 int bufferSize, short replication, long blockSize,
392 Progressable progress) throws IOException {
393 return fs.createNonRecursive(f, overwrite, bufferSize, replication,
394 blockSize, progress);
395 }
396 }