View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.fs;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.InvocationTargetException;
25  import java.lang.reflect.Method;
26  import java.net.BindException;
27  import java.net.ServerSocket;
28  import java.util.List;
29  import java.util.concurrent.CountDownLatch;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.commons.logging.impl.Log4JLogger;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.BlockLocation;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.ipc.RemoteException;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.testclassification.LargeTests;
45  import org.apache.hadoop.hbase.MiniHBaseCluster;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.regionserver.HRegion;
50  import org.apache.hadoop.hbase.regionserver.HRegionServer;
51  import org.apache.hadoop.hbase.regionserver.Region;
52  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
54  import org.apache.hadoop.hbase.util.FSUtils;
55  import org.apache.hadoop.hdfs.DFSClient;
56  import org.apache.hadoop.hdfs.DistributedFileSystem;
57  import org.apache.hadoop.hdfs.MiniDFSCluster;
58  import org.apache.hadoop.hdfs.protocol.ClientProtocol;
59  import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
60  import org.apache.hadoop.hdfs.protocol.DirectoryListing;
61  import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
62  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
63  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
64  import org.apache.hadoop.hdfs.server.datanode.DataNode;
65  import org.apache.log4j.Level;
66  import org.junit.After;
67  import org.junit.Assert;
68  import org.junit.Before;
69  import org.junit.Ignore;
70  import org.junit.Test;
71  import org.junit.experimental.categories.Category;
72  
73  /**
74   * Tests for the hdfs fix from HBASE-6435.
75   */
76  @Category(LargeTests.class)
77  public class TestBlockReorder {
78    private static final Log LOG = LogFactory.getLog(TestBlockReorder.class);
79  
80    static {
81      ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
82      ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL);
83    }
84  
85    private Configuration conf;
86    private MiniDFSCluster cluster;
87    private HBaseTestingUtility htu;
88    private DistributedFileSystem dfs;
89    private static final String host1 = "host1";
90    private static final String host2 = "host2";
91    private static final String host3 = "host3";
92    private static Path rootDir;
93    private static Path walRootDir;
94  
95    @Before
96    public void setUp() throws Exception {
97      htu = new HBaseTestingUtility();
98      htu.getConfiguration().setInt("dfs.blocksize", 1024);// For the test with multiple blocks
99      htu.getConfiguration().setBoolean("dfs.support.append", true);
100     htu.getConfiguration().setInt("dfs.replication", 3);
101     htu.startMiniDFSCluster(3,
102         new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3});
103 
104     conf = htu.getConfiguration();
105     cluster = htu.getDFSCluster();
106     dfs = (DistributedFileSystem) FileSystem.get(conf);
107     rootDir = htu.createRootDir();
108     walRootDir = htu.createWALRootDir();
109   }
110 
111   @After
112   public void tearDownAfterClass() throws Exception {
113     dfs.delete(rootDir, true);
114     dfs.delete(walRootDir, true);
115     htu.shutdownMiniCluster();
116   }
117 
118   /**
119    * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS.
120    */
121   @Test
122   @Ignore
123   public void testBlockLocationReorder() throws Exception {
124     Path p = new Path("hello");
125 
126     Assert.assertTrue((short) cluster.getDataNodes().size() > 1);
127     final int repCount = 2;
128 
129     // Let's write the file
130     FSDataOutputStream fop = dfs.create(p, (short) repCount);
131     final double toWrite = 875.5613;
132     fop.writeDouble(toWrite);
133     fop.close();
134 
135     // Let's check we can read it when everybody's there
136     long start = System.currentTimeMillis();
137     FSDataInputStream fin = dfs.open(p);
138     Assert.assertTrue(toWrite == fin.readDouble());
139     long end = System.currentTimeMillis();
140     LOG.info("readtime= " + (end - start));
141     fin.close();
142     Assert.assertTrue((end - start) < 30 * 1000);
143 
144     // Let's kill the first location. But actually the fist location returned will change
145     // The first thing to do is to get the location, then the port
146     FileStatus f = dfs.getFileStatus(p);
147     BlockLocation[] lbs;
148     do {
149       lbs = dfs.getFileBlockLocations(f, 0, 1);
150     } while (lbs.length != 1 && lbs[0].getLength() != repCount);
151     final String name = lbs[0].getNames()[0];
152     Assert.assertTrue(name.indexOf(':') > 0);
153     String portS = name.substring(name.indexOf(':') + 1);
154     final int port = Integer.parseInt(portS);
155     LOG.info("port= " + port);
156     int ipcPort = -1;
157 
158     // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so we need
159     // to iterate ourselves.
160     boolean ok = false;
161     final String lookup = lbs[0].getHosts()[0];
162     StringBuilder sb = new StringBuilder();
163     for (DataNode dn : cluster.getDataNodes()) {
164       final String dnName = getHostName(dn);
165       sb.append(dnName).append(' ');
166       if (lookup.equals(dnName)) {
167         ok = true;
168         LOG.info("killing datanode " + name + " / " + lookup);
169         ipcPort = dn.ipcServer.getListenerAddress().getPort();
170         dn.shutdown();
171         LOG.info("killed datanode " + name + " / " + lookup);
172         break;
173       }
174     }
175     Assert.assertTrue(
176         "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok);
177     LOG.info("ipc port= " + ipcPort);
178 
179     // Add the hook, with an implementation checking that we don't use the port we've just killed.
180     Assert.assertTrue(HFileSystem.addLocationsOrderInterceptor(conf,
181         new HFileSystem.ReorderBlocks() {
182           @Override
183           public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
184             for (LocatedBlock lb : lbs.getLocatedBlocks()) {
185               if (lb.getLocations().length > 1) {
186                 DatanodeInfo[] infos = lb.getLocations();
187                 if (infos[0].getHostName().equals(lookup)) {
188                   LOG.info("HFileSystem bad host, inverting");
189                   DatanodeInfo tmp = infos[0];
190                   infos[0] = infos[1];
191                   infos[1] = tmp;
192                 }
193               }
194             }
195           }
196         }));
197 
198 
199     final int retries = 10;
200     ServerSocket ss = null;
201     ServerSocket ssI;
202     try {
203       ss = new ServerSocket(port);// We're taking the port to have a timeout issue later.
204       ssI = new ServerSocket(ipcPort);
205     } catch (BindException be) {
206       LOG.warn("Got bind exception trying to set up socket on " + port + " or " + ipcPort +
207           ", this means that the datanode has not closed the socket or" +
208           " someone else took it. It may happen, skipping this test for this time.", be);
209       if (ss != null) {
210         ss.close();
211       }
212       return;
213     }
214 
215     // Now it will fail with a timeout, unfortunately it does not always connect to the same box,
216     // so we try retries times;  with the reorder it will never last more than a few milli seconds
217     for (int i = 0; i < retries; i++) {
218       start = System.currentTimeMillis();
219 
220       fin = dfs.open(p);
221       Assert.assertTrue(toWrite == fin.readDouble());
222       fin.close();
223       end = System.currentTimeMillis();
224       LOG.info("HFileSystem readtime= " + (end - start));
225       Assert.assertFalse("We took too much time to read", (end - start) > 60000);
226     }
227 
228     ss.close();
229     ssI.close();
230   }
231 
232   /**
233    * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2)
234    */
235   private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException {
236     Method m;
237     try {
238       m = DataNode.class.getMethod("getDisplayName");
239     } catch (NoSuchMethodException e) {
240       try {
241         m = DataNode.class.getMethod("getHostName");
242       } catch (NoSuchMethodException e1) {
243         throw new RuntimeException(e1);
244       }
245     }
246 
247     String res = (String) m.invoke(dn);
248     if (res.contains(":")) {
249       return res.split(":")[0];
250     } else {
251       return res;
252     }
253   }
254 
255   /**
256    * Test that the hook works within HBase, including when there are multiple blocks.
257    */
258   @Test()
259   @Ignore
260   public void testHBaseCluster() throws Exception {
261     byte[] sb = "sb".getBytes();
262     htu.startMiniZKCluster();
263 
264     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
265     hbm.waitForActiveAndReadyMaster();
266     hbm.getRegionServer(0).waitForServerOnline();
267     HRegionServer targetRs = hbm.getRegionServer(0);
268 
269     // We want to have a datanode with the same name as the region server, so
270     //  we're going to get the regionservername, and start a new datanode with this name.
271     String host4 = targetRs.getServerName().getHostname();
272     LOG.info("Starting a new datanode with the name=" + host4);
273     cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null);
274     cluster.waitClusterUp();
275 
276     final int repCount = 3;
277 
278     // We use the regionserver file system & conf as we expect it to have the hook.
279     conf = targetRs.getConfiguration();
280     HFileSystem rfs = (HFileSystem) targetRs.getFileSystem();
281     Table h = htu.createTable(TableName.valueOf("table"), sb);
282 
283     // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode
284     // with the same node will be used. We can't really stop an existing datanode, this would
285     // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times.
286 
287     // Now we need to find the log file, its locations, and look at it
288 
289     String walDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
290             "/" + targetRs.getServerName().toString()).toUri().getPath();
291 
292     DistributedFileSystem mdfs = (DistributedFileSystem)
293         hbm.getMaster().getMasterFileSystem().getFileSystem();
294 
295 
296     int nbTest = 0;
297     while (nbTest < 10) {
298       final List<Region> regions = targetRs.getOnlineRegions(h.getName());
299       final CountDownLatch latch = new CountDownLatch(regions.size());
300       // listen for successful log rolls
301       final WALActionsListener listener = new WALActionsListener.Base() {
302             @Override
303             public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
304               latch.countDown();
305             }
306           };
307       for (Region region : regions) {
308         ((HRegion)region).getWAL().registerWALActionsListener(listener);
309       }
310 
311       htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
312 
313       // wait
314       try {
315         latch.await();
316       } catch (InterruptedException exception) {
317         LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
318             "tests fail, it's probably because we should still be waiting.");
319         Thread.currentThread().interrupt();
320       }
321       for (Region region : regions) {
322         ((HRegion)region).getWAL().unregisterWALActionsListener(listener);
323       }
324 
325       // We need a sleep as the namenode is informed asynchronously
326       Thread.sleep(100);
327 
328       // insert one put to ensure a minimal size
329       Put p = new Put(sb);
330       p.add(sb, sb, sb);
331       h.put(p);
332 
333       DirectoryListing dl = dfs.getClient().listPaths(walDir, HdfsFileStatus.EMPTY_NAME);
334       HdfsFileStatus[] hfs = dl.getPartialListing();
335 
336       // As we wrote a put, we should have at least one log file.
337       Assert.assertTrue(hfs.length >= 1);
338       for (HdfsFileStatus hf : hfs) {
339         // Because this is a live cluster, log files might get archived while we're processing
340         try {
341           LOG.info("Log file found: " + hf.getLocalName() + " in " + walDir);
342           String logFile = walDir + "/" + hf.getLocalName();
343           FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
344 
345           LOG.info("Checking log file: " + logFile);
346           // Now checking that the hook is up and running
347           // We can't call directly getBlockLocations, it's not available in HFileSystem
348           // We're trying multiple times to be sure, as the order is random
349 
350           BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
351           if (bls.length > 0) {
352             BlockLocation bl = bls[0];
353 
354             LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
355             for (int i = 0; i < bl.getHosts().length - 1; i++) {
356               LOG.info(bl.getHosts()[i] + "    " + logFile);
357               Assert.assertNotSame(bl.getHosts()[i], host4);
358             }
359             String last = bl.getHosts()[bl.getHosts().length - 1];
360             LOG.info(last + "    " + logFile);
361             if (host4.equals(last)) {
362               nbTest++;
363               LOG.info(logFile + " is on the new datanode and is ok");
364               if (bl.getHosts().length == 3) {
365                 // We can test this case from the file system as well
366                 // Checking the underlying file system. Multiple times as the order is random
367                 testFromDFS(dfs, logFile, repCount, host4);
368 
369                 // now from the master
370                 testFromDFS(mdfs, logFile, repCount, host4);
371               }
372             }
373           }
374         } catch (FileNotFoundException exception) {
375           LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
376               "archived out from under us so we'll ignore and retry. If this test hangs " +
377               "indefinitely you should treat this failure as a symptom.", exception);
378         } catch (RemoteException exception) {
379           if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
380             LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
381                 "archived out from under us so we'll ignore and retry. If this test hangs " +
382                 "indefinitely you should treat this failure as a symptom.", exception);
383           } else {
384             throw exception;
385           }
386         }
387       }
388     }
389   }
390 
391   private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost)
392       throws Exception {
393     // Multiple times as the order is random
394     for (int i = 0; i < 10; i++) {
395       LocatedBlocks l;
396       // The NN gets the block list asynchronously, so we may need multiple tries to get the list
397       final long max = System.currentTimeMillis() + 10000;
398       boolean done;
399       do {
400         Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max);
401         l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
402         Assert.assertNotNull("Can't get block locations for " + src, l);
403         Assert.assertNotNull(l.getLocatedBlocks());
404         Assert.assertTrue(l.getLocatedBlocks().size() > 0);
405 
406         done = true;
407         for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
408           done = (l.get(y).getLocations().length == repCount);
409         }
410       } while (!done);
411 
412       for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
413         Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
414       }
415     }
416   }
417 
418   private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception {
419     Field nf = DFSClient.class.getDeclaredField("namenode");
420     nf.setAccessible(true);
421     return (ClientProtocol) nf.get(dfsc);
422   }
423 
424   /**
425    * Test that the reorder algo works as we expect.
426    */
427   @Test
428   public void testBlockLocation() throws Exception {
429     // We need to start HBase to get  HConstants.HBASE_DIR set in conf
430     htu.startMiniZKCluster();
431     MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1);
432     conf = hbm.getConfiguration();
433 
434 
435     // The "/" is mandatory, without it we've got a null pointer exception on the namenode
436     final String fileName = "/helloWorld";
437     Path p = new Path(fileName);
438 
439     final int repCount = 3;
440     Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount);
441 
442     // Let's write the file
443     FSDataOutputStream fop = dfs.create(p, (short) repCount);
444     final double toWrite = 875.5613;
445     fop.writeDouble(toWrite);
446     fop.close();
447 
448     for (int i=0; i<10; i++){
449       // The interceptor is not set in this test, so we get the raw list at this point
450       LocatedBlocks l;
451       final long max = System.currentTimeMillis() + 10000;
452       do {
453         l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
454         Assert.assertNotNull(l.getLocatedBlocks());
455         Assert.assertEquals(l.getLocatedBlocks().size(), 1);
456         Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
457             System.currentTimeMillis() < max);
458       } while (l.get(0).getLocations().length != repCount);
459 
460       // Should be filtered, the name is different => The order won't change
461       Object originalList[] = l.getLocatedBlocks().toArray();
462       HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
463       lrb.reorderBlocks(conf, l, fileName);
464       Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
465 
466       // Should be reordered, as we pretend to be a file name with a compliant stuff
467       Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
468       Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
469       String pseudoLogFile = conf.get(HFileSystem.HBASE_WAL_DIR) + "/" +
470           HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
471 
472       // Check that it will be possible to extract a ServerName from our construction
473       Assert.assertNotNull("log= " + pseudoLogFile,
474           DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
475 
476       // And check we're doing the right reorder.
477       lrb.reorderBlocks(conf, l, pseudoLogFile);
478       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
479 
480       // Check again, it should remain the same.
481       lrb.reorderBlocks(conf, l, pseudoLogFile);
482       Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
483     }
484   }
485 
486 }