1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.fs;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.net.BindException;
27 import java.net.ServerSocket;
28 import java.util.List;
29 import java.util.concurrent.CountDownLatch;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.commons.logging.impl.Log4JLogger;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.BlockLocation;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.ipc.RemoteException;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.testclassification.LargeTests;
45 import org.apache.hadoop.hbase.MiniHBaseCluster;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.regionserver.HRegion;
50 import org.apache.hadoop.hbase.regionserver.HRegionServer;
51 import org.apache.hadoop.hbase.regionserver.Region;
52 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
54 import org.apache.hadoop.hbase.util.FSUtils;
55 import org.apache.hadoop.hdfs.DFSClient;
56 import org.apache.hadoop.hdfs.DistributedFileSystem;
57 import org.apache.hadoop.hdfs.MiniDFSCluster;
58 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
59 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
60 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
61 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
62 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
63 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
64 import org.apache.hadoop.hdfs.server.datanode.DataNode;
65 import org.apache.log4j.Level;
66 import org.junit.After;
67 import org.junit.Assert;
68 import org.junit.Before;
69 import org.junit.Ignore;
70 import org.junit.Test;
71 import org.junit.experimental.categories.Category;
72
73
74
75
76 @Category(LargeTests.class)
77 public class TestBlockReorder {
78 private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
79
80 static {
81 ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
82 ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
83 }
84
85 private Configuration conf;
86 private MiniDFSCluster cluster;
87 private HBaseTestingUtility htu;
88 private DistributedFileSystem dfs;
89 private static final String host1 = "host1";
90 private static final String host2 = "host2";
91 private static final String host3 = "host3";
92 private static Path rootDir;
93 private static Path walRootDir;
94
95 @Before
96 public void setUp() throws Exception {
97 htu = new HBaseTestingUtility();
98 htu.getConfiguration().setInt("dfs.blocksize", 1024);
99 htu.getConfiguration().setBoolean("dfs.support.append", true);
100 htu.getConfiguration().setInt("dfs.replication", 3);
101 htu.startMiniDFSCluster(3,
102 new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
103
104 conf = htu.getConfiguration();
105 cluster = htu.getDFSCluster();
106 dfs = (DistributedFileSystem) FileSystem.get(conf);
107 rootDir = htu.createRootDir();
108 walRootDir = htu.createWALRootDir();
109 }
110
111 @After
112 public void tearDownAfterClass() throws Exception {
113 dfs.delete(rootDir, true);
114 dfs.delete(walRootDir, true);
115 htu.shutdownMiniCluster();
116 }
117
118
119
120
121 @Test
122 @Ignore
123 public void testBlockLocationReorder() throws Exception {
124 Path p = new Path("hello");
125
126 Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
127 final int repCount = 2;
128
129
130 FSDataOutputStream fop = dfs.create(p, (short) repCount);
131 final double toWrite = 875.5613;
132 fop.writeDouble(toWrite);
133 fop.close();
134
135
136 long start = System.currentTimeMillis();
137 FSDataInputStream fin = dfs.open(p);
138 Assert.assertTrue(toWrite == fin.readDouble());
139 long end = System.currentTimeMillis();
140 LOG.info("readtime= " + (end - start));
141 fin.close();
142 Assert.assertTrue((end - start) < 30 * 1000);
143
144
145
146 FileStatus f = dfs.getFileStatus(p);
147 BlockLocation[] lbs;
148 do {
149 lbs = dfs.getFileBlockLocations(f, 0, 1);
150 } while (lbs.length != 1 && lbs[0].getLength() != repCount);
151 final String name = lbs[0].getNames()[0];
152 Assert.assertTrue(name.indexOf(':') > 0);
153 String portS = name.substring(name.indexOf(':') + 1);
154 final int port = Integer.parseInt(portS);
155 LOG.info("port= " + port);
156 int ipcPort = -1;
157
158
159
160 boolean ok = false;
161 final String lookup = lbs[0].getHosts()[0];
162 StringBuilder sb = new StringBuilder();
163 for (DataNode dn : cluster.getDataNodes()) {
164 final String dnName = getHostName(dn);
165 sb.append(dnName).append(' ');
166 if (lookup.equals(dnName)) {
167 ok = true;
168 LOG.info("killing datanode " + name + " / " + lookup);
169 ipcPort = dn.ipcServer.getListenerAddress().getPort();
170 dn.shutdown();
171 LOG.info("killed datanode " + name + " / " + lookup);
172 break;
173 }
174 }
175 Assert.assertTrue(
176 "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
177 LOG.info("ipc port= " + ipcPort);
178
179
180 Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
181 new HFileSystem.ReorderBlocks() {
182 @Override
183 public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
184 for (LocatedBlock lb : lbs.getLocatedBlocks()) {
185 if (lb.getLocations().length > 1) {
186 DatanodeInfo[] infos = lb.getLocations();
187 if (infos[0].getHostName().equals(lookup)) {
188 LOG.info("HFileSystem bad host, inverting");
189 DatanodeInfo tmp = infos[0];
190 infos[0] = infos[1];
191 infos[1] = tmp;
192 }
193 }
194 }
195 }
196 }));
197
198
199 final int retries = 10;
200 ServerSocket ss = null;
201 ServerSocket ssI;
202 try {
203 ss = new ServerSocket(port);
204 ssI = new ServerSocket(ipcPort);
205 } catch (BindException be) {
206 LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
207 ", this means that the datanode has not closed the socket or" +
208 " someone else took it. It may happen, skipping this test for this time.", be);
209 if (ss != null) {
210 ss.close();
211 }
212 return;
213 }
214
215
216
217 for (int i = 0; i < retries; i++) {
218 start = System.currentTimeMillis();
219
220 fin = dfs.open(p);
221 Assert.assertTrue(toWrite == fin.readDouble());
222 fin.close();
223 end = System.currentTimeMillis();
224 LOG.info("HFileSystem readtime= " + (end - start));
225 Assert.assertFalse("We took too much time to read", (end - start) > 60000);
226 }
227
228 ss.close();
229 ssI.close();
230 }
231
232
233
234
235 private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
236 Method m;
237 try {
238 m = DataNode.class.getMethod("getDisplayName");
239 } catch (NoSuchMethodException e) {
240 try {
241 m = DataNode.class.getMethod("getHostName");
242 } catch (NoSuchMethodException e1) {
243 throw new RuntimeException(e1);
244 }
245 }
246
247 String res = (String) m.invoke(dn);
248 if (res.contains(":")) {
249 return res.split(":")[0];
250 } else {
251 return res;
252 }
253 }
254
255
256
257
258 @Test()
259 @Ignore
260 public void testHBaseCluster() throws Exception {
261 byte[] sb = "sb".getBytes();
262 htu.startMiniZKCluster();
263
264 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
265 hbm.waitForActiveAndReadyMaster();
266 hbm.getRegionServer(0).waitForServerOnline();
267 HRegionServer targetRs = hbm.getRegionServer(0);
268
269
270
271 String host4 = targetRs.getServerName().getHostname();
272 LOG.info("Starting a new datanode with the name=" + host4);
273 cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
274 cluster.waitClusterUp();
275
276 final int repCount = 3;
277
278
279 conf = targetRs.getConfiguration();
280 HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
281 Table h = htu.createTable(TableName.valueOf("table"), sb);
282
283
284
285
286
287
288
289 String walDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
290 "/" + targetRs.getServerName().toString()).toUri().getPath();
291
292 DistributedFileSystem mdfs = (DistributedFileSystem)
293 hbm.getMaster().getMasterFileSystem().getFileSystem();
294
295
296 int nbTest = 0;
297 while (nbTest < 10) {
298 final List<Region> regions = targetRs.getOnlineRegions(h.getName());
299 final CountDownLatch latch = new CountDownLatch(regions.size());
300
301 final WALActionsListener listener = new WALActionsListener.Base() {
302 @Override
303 public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
304 latch.countDown();
305 }
306 };
307 for (Region region : regions) {
308 ((HRegion)region).getWAL().registerWALActionsListener(listener);
309 }
310
311 htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
312
313
314 try {
315 latch.await();
316 } catch (InterruptedException exception) {
317 LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
318 "tests fail, it's probably because we should still be waiting.");
319 Thread.currentThread().interrupt();
320 }
321 for (Region region : regions) {
322 ((HRegion)region).getWAL().unregisterWALActionsListener(listener);
323 }
324
325
326 Thread.sleep(100);
327
328
329 Put p = new Put(sb);
330 p.add(sb, sb, sb);
331 h.put(p);
332
333 DirectoryListing dl = dfs.getClient().listPaths(walDir, HdfsFileStatus.EMPTY_NAME);
334 HdfsFileStatus[] hfs = dl.getPartialListing();
335
336
337 Assert.assertTrue(hfs.length >= 1);
338 for (HdfsFileStatus hf : hfs) {
339
340 try {
341 LOG.info("Log file found: " + hf.getLocalName() + " in " + walDir);
342 String logFile = walDir + "/" + hf.getLocalName();
343 FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
344
345 LOG.info("Checking log file: " + logFile);
346
347
348
349
350 BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
351 if (bls.length > 0) {
352 BlockLocation bl = bls[0];
353
354 LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
355 for (int i = 0; i < bl.getHosts().length - 1; i++) {
356 LOG.info(bl.getHosts()[i] + " " + logFile);
357 Assert.assertNotSame(bl.getHosts()[i], host4);
358 }
359 String last = bl.getHosts()[bl.getHosts().length - 1];
360 LOG.info(last + " " + logFile);
361 if (host4.equals(last)) {
362 nbTest++;
363 LOG.info(logFile + " is on the new datanode and is ok");
364 if (bl.getHosts().length == 3) {
365
366
367 testFromDFS(dfs, logFile, repCount, host4);
368
369
370 testFromDFS(mdfs, logFile, repCount, host4);
371 }
372 }
373 }
374 } catch (FileNotFoundException exception) {
375 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
376 "archived out from under us so we'll ignore and retry. If this test hangs " +
377 "indefinitely you should treat this failure as a symptom.", exception);
378 } catch (RemoteException exception) {
379 if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
380 LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
381 "archived out from under us so we'll ignore and retry. If this test hangs " +
382 "indefinitely you should treat this failure as a symptom.", exception);
383 } else {
384 throw exception;
385 }
386 }
387 }
388 }
389 }
390
391 private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
392 throws Exception {
393
394 for (int i = 0; i < 10; i++) {
395 LocatedBlocks l;
396
397 final long max = System.currentTimeMillis() + 10000;
398 boolean done;
399 do {
400 Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
401 l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
402 Assert.assertNotNull("Can't get block locations for " + src, l);
403 Assert.assertNotNull(l.getLocatedBlocks());
404 Assert.assertTrue(l.getLocatedBlocks().size() > 0);
405
406 done = true;
407 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
408 done = (l.get(y).getLocations().length == repCount);
409 }
410 } while (!done);
411
412 for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
413 Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
414 }
415 }
416 }
417
418 private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
419 Field nf = DFSClient.class.getDeclaredField("namenode");
420 nf.setAccessible(true);
421 return (ClientProtocol) nf.get(dfsc);
422 }
423
424
425
426
427 @Test
428 public void testBlockLocation() throws Exception {
429
430 htu.startMiniZKCluster();
431 MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
432 conf = hbm.getConfiguration();
433
434
435
436 final String fileName = "/helloWorld";
437 Path p = new Path(fileName);
438
439 final int repCount = 3;
440 Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
441
442
443 FSDataOutputStream fop = dfs.create(p, (short) repCount);
444 final double toWrite = 875.5613;
445 fop.writeDouble(toWrite);
446 fop.close();
447
448 for (int i=0; i<10; i++){
449
450 LocatedBlocks l;
451 final long max = System.currentTimeMillis() + 10000;
452 do {
453 l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
454 Assert.assertNotNull(l.getLocatedBlocks());
455 Assert.assertEquals(l.getLocatedBlocks().size(), 1);
456 Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
457 System.currentTimeMillis() < max);
458 } while (l.get(0).getLocations().length != repCount);
459
460
461 Object originalList[] = l.getLocatedBlocks().toArray();
462 HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
463 lrb.reorderBlocks(conf, l, fileName);
464 Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
465
466
467 Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
468 Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
469 String pseudoLogFile = conf.get(HFileSystem.HBASE_WAL_DIR) + "/" +
470 HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
471
472
473 Assert.assertNotNull("log= " + pseudoLogFile,
474 DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
475
476
477 lrb.reorderBlocks(conf, l, pseudoLogFile);
478 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
479
480
481 lrb.reorderBlocks(conf, l, pseudoLogFile);
482 Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
483 }
484 }
485
486 }