View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertNull;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.IOException;
26  import java.nio.ByteBuffer;
27  import java.util.Collection;
28  import java.util.Deque;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.NavigableMap;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.atomic.AtomicInteger;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HBaseTestingUtility;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HRegionInfo;
44  import org.apache.hadoop.hbase.HRegionLocation;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.ServerName;
48  import org.apache.hadoop.hbase.TableExistsException;
49  import org.apache.hadoop.hbase.TableName;
50  import org.apache.hadoop.hbase.client.Admin;
51  import org.apache.hadoop.hbase.client.Connection;
52  import org.apache.hadoop.hbase.client.ConnectionFactory;
53  import org.apache.hadoop.hbase.client.HConnection;
54  import org.apache.hadoop.hbase.client.HTable;
55  import org.apache.hadoop.hbase.client.Result;
56  import org.apache.hadoop.hbase.client.ResultScanner;
57  import org.apache.hadoop.hbase.client.Scan;
58  import org.apache.hadoop.hbase.client.Table;
59  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
63  import org.apache.hadoop.hbase.regionserver.HRegionServer;
64  import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
65  import org.apache.hadoop.hbase.testclassification.LargeTests;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.FSUtils;
68  import org.apache.hadoop.hbase.util.Pair;
69  import org.junit.AfterClass;
70  import org.junit.BeforeClass;
71  import org.junit.Test;
72  import org.junit.experimental.categories.Category;
73  import org.mockito.Mockito;
74  
75  import com.google.common.collect.Multimap;
76  import com.google.protobuf.RpcController;
77  import com.google.protobuf.ServiceException;
78  
79  /**
80   * Test cases for the atomic load error handling of the bulk load functionality.
81   */
82  @Category(LargeTests.class)
83  public class TestLoadIncrementalHFilesSplitRecovery {
84    final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
85  
86    static HBaseTestingUtility util;
87    //used by secure subclass
88    static boolean useSecure = false;
89  
90    final static int NUM_CFS = 10;
91    final static byte[] QUAL = Bytes.toBytes("qual");
92    final static int ROWCOUNT = 100;
93  
94    private final static byte[][] families = new byte[NUM_CFS][];
95    static {
96      for (int i = 0; i < NUM_CFS; i++) {
97        families[i] = Bytes.toBytes(family(i));
98      }
99    }
100 
101   static byte[] rowkey(int i) {
102     return Bytes.toBytes(String.format("row_%08d", i));
103   }
104 
105   static String family(int i) {
106     return String.format("family_%04d", i);
107   }
108 
109   static byte[] value(int i) {
110     return Bytes.toBytes(String.format("%010d", i));
111   }
112 
113   public static void buildHFiles(FileSystem fs, Path dir, int value)
114       throws IOException {
115     byte[] val = value(value);
116     for (int i = 0; i < NUM_CFS; i++) {
117       Path testIn = new Path(dir, family(i));
118 
119       TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
120           Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
121     }
122   }
123 
124   /**
125    * Creates a table with given table name and specified number of column
126    * families if the table does not already exist.
127    */
128   private void setupTable(final Connection connection, TableName table, int cfs)
129   throws IOException {
130     try {
131       LOG.info("Creating table " + table);
132       HTableDescriptor htd = new HTableDescriptor(table);
133       for (int i = 0; i < cfs; i++) {
134         htd.addFamily(new HColumnDescriptor(family(i)));
135       }
136       try (Admin admin = connection.getAdmin()) {
137         admin.createTable(htd);
138       }
139     } catch (TableExistsException tee) {
140       LOG.info("Table " + table + " already exists");
141     }
142   }
143 
144   /**
145    * Creates a table with given table name,specified number of column families<br>
146    * and splitkeys if the table does not already exist.
147    * @param table
148    * @param cfs
149    * @param SPLIT_KEYS
150    */
151   private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
152       throws IOException {
153     try {
154       LOG.info("Creating table " + table);
155       HTableDescriptor htd = new HTableDescriptor(table);
156       for (int i = 0; i < cfs; i++) {
157         htd.addFamily(new HColumnDescriptor(family(i)));
158       }
159 
160       util.createTable(htd, SPLIT_KEYS);
161     } catch (TableExistsException tee) {
162       LOG.info("Table " + table + " already exists");
163     }
164   }
165 
166   private Path buildBulkFiles(TableName table, int value) throws Exception {
167     Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
168     Path bulk1 = new Path(dir, table.getNameAsString() + value);
169     FileSystem fs = util.getTestFileSystem();
170     buildHFiles(fs, bulk1, value);
171     return bulk1;
172   }
173 
174   /**
175    * Populate table with known values.
176    */
177   private void populateTable(final Connection connection, TableName table, int value)
178   throws Exception {
179     // create HFiles for different column families
180     LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
181     Path bulk1 = buildBulkFiles(table, value);
182     try (Table t = connection.getTable(table)) {
183       lih.doBulkLoad(bulk1, (HTable)t);
184     }
185   }
186 
187   /**
188    * Split the known table in half.  (this is hard coded for this test suite)
189    */
190   private void forceSplit(TableName table) {
191     try {
192       // need to call regions server to by synchronous but isn't visible.
193       HRegionServer hrs = util.getRSForFirstRegionInTable(table);
194 
195       for (HRegionInfo hri :
196           ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
197         if (hri.getTable().equals(table)) {
198           // splitRegion doesn't work if startkey/endkey are null
199           ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
200         }
201       }
202 
203       // verify that split completed.
204       int regions;
205       do {
206         regions = 0;
207         for (HRegionInfo hri :
208             ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
209           if (hri.getTable().equals(table)) {
210             regions++;
211           }
212         }
213         if (regions != 2) {
214           LOG.info("Taking some time to complete split...");
215           Thread.sleep(250);
216         }
217       } while (regions != 2);
218     } catch (IOException e) {
219       e.printStackTrace();
220     } catch (InterruptedException e) {
221       e.printStackTrace();
222     }
223   }
224 
225   @BeforeClass
226   public static void setupCluster() throws Exception {
227     util = new HBaseTestingUtility();
228     util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
229     util.startMiniCluster(1);
230   }
231 
232   @AfterClass
233   public static void teardownCluster() throws Exception {
234     util.shutdownMiniCluster();
235   }
236 
237   /**
238    * Checks that all columns have the expected value and that there is the
239    * expected number of rows.
240    * @throws IOException
241    */
242   void assertExpectedTable(TableName table, int count, int value) throws IOException {
243     HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
244     assertEquals(htds.length, 1);
245     Table t = null;
246     try {
247       t = new HTable(util.getConfiguration(), table);
248       Scan s = new Scan();
249       ResultScanner sr = t.getScanner(s);
250       int i = 0;
251       for (Result r : sr) {
252         i++;
253         for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
254           for (byte[] val : nm.values()) {
255             assertTrue(Bytes.equals(val, value(value)));
256           }
257         }
258       }
259       assertEquals(count, i);
260     } catch (IOException e) {
261       fail("Failed due to exception");
262     } finally {
263       if (t != null) t.close();
264     }
265   }
266 
267   /**
268    * Test that shows that exception thrown from the RS side will result in an
269    * exception on the LIHFile client.
270    */
271   @Test(expected=IOException.class, timeout=120000)
272   public void testBulkLoadPhaseFailure() throws Exception {
273     TableName table = TableName.valueOf("bulkLoadPhaseFailure");
274     final AtomicInteger attmptedCalls = new AtomicInteger();
275     final AtomicInteger failedCalls = new AtomicInteger();
276     util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
277     try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
278       setupTable(connection, table, 10);
279       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
280         @Override
281         protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
282             TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
283             boolean copyFile) throws IOException {
284           int i = attmptedCalls.incrementAndGet();
285           if (i == 1) {
286             Connection errConn = null;
287             try {
288               errConn = getMockedConnection(util.getConfiguration());
289             } catch (Exception e) {
290               LOG.fatal("mocking cruft, should never happen", e);
291               throw new RuntimeException("mocking cruft, should never happen");
292             }
293             failedCalls.incrementAndGet();
294             return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis,copyFile);
295           }
296 
297           return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis, copyFile);
298         }
299       };
300       try {
301         // create HFiles for different column families
302         Path dir = buildBulkFiles(table, 1);
303         try (Table t = connection.getTable(table)) {
304           lih.doBulkLoad(dir, (HTable)t);
305         }
306       } finally {
307         util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
308             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
309       }
310       fail("doBulkLoad should have thrown an exception");
311     }
312   }
313 
314   @SuppressWarnings("deprecation")
315   private HConnection getMockedConnection(final Configuration conf)
316   throws IOException, ServiceException {
317     HConnection c = Mockito.mock(HConnection.class);
318     Mockito.when(c.getConfiguration()).thenReturn(conf);
319     Mockito.doNothing().when(c).close();
320     // Make it so we return a particular location when asked.
321     final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
322         ServerName.valueOf("example.org", 1234, 0));
323     Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
324         (byte[]) Mockito.any(), Mockito.anyBoolean())).
325       thenReturn(loc);
326     Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
327       thenReturn(loc);
328     ClientProtos.ClientService.BlockingInterface hri =
329       Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
330     Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
331       thenThrow(new ServiceException(new IOException("injecting bulk load error")));
332     Mockito.when(c.getClient(Mockito.any(ServerName.class))).
333       thenReturn(hri);
334     return c;
335   }
336 
337   /**
338    * This test exercises the path where there is a split after initial
339    * validation but before the atomic bulk load call. We cannot use presplitting
340    * to test this path, so we actually inject a split just before the atomic
341    * region load.
342    */
343   @Test (timeout=120000)
344   public void testSplitWhileBulkLoadPhase() throws Exception {
345     final TableName table = TableName.valueOf("splitWhileBulkloadPhase");
346     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
347       setupTable(connection, table, 10);
348       populateTable(connection, table,1);
349       assertExpectedTable(table, ROWCOUNT, 1);
350 
351       // Now let's cause trouble.  This will occur after checks and cause bulk
352       // files to fail when attempt to atomically import.  This is recoverable.
353       final AtomicInteger attemptedCalls = new AtomicInteger();
354       LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
355         @Override
356         protected void bulkLoadPhase(final Table htable, final Connection conn,
357             ExecutorService pool, Deque<LoadQueueItem> queue,
358             final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
359             Map<LoadQueueItem, ByteBuffer> item2RegionMap)
360                 throws IOException {
361           int i = attemptedCalls.incrementAndGet();
362           if (i == 1) {
363             // On first attempt force a split.
364             forceSplit(table);
365           }
366           super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
367         }
368       };
369 
370       // create HFiles for different column families
371       try (Table t = connection.getTable(table)) {
372         Path bulk = buildBulkFiles(table, 2);
373         lih2.doBulkLoad(bulk, (HTable)t);
374       }
375 
376       // check that data was loaded
377       // The three expected attempts are 1) failure because need to split, 2)
378       // load of split top 3) load of split bottom
379       assertEquals(attemptedCalls.get(), 3);
380       assertExpectedTable(table, ROWCOUNT, 2);
381     }
382   }
383 
384   /**
385    * This test splits a table and attempts to bulk load.  The bulk import files
386    * should be split before atomically importing.
387    */
388   @Test (timeout=120000)
389   public void testGroupOrSplitPresplit() throws Exception {
390     final TableName table = TableName.valueOf("groupOrSplitPresplit");
391     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
392       setupTable(connection, table, 10);
393       populateTable(connection, table, 1);
394       assertExpectedTable(connection, table, ROWCOUNT, 1);
395       forceSplit(table);
396 
397       final AtomicInteger countedLqis= new AtomicInteger();
398       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
399           util.getConfiguration()) {
400         @Override
401         protected Pair<List<LoadQueueItem>, String> groupOrSplit(
402             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
403             final LoadQueueItem item, final Table htable,
404             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
405           Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
406               startEndKeys);
407           if (lqis != null && lqis.getFirst() != null) {
408             countedLqis.addAndGet(lqis.getFirst().size());
409           }
410           return lqis;
411         }
412       };
413 
414       // create HFiles for different column families
415       Path bulk = buildBulkFiles(table, 2);
416       try (Table t = connection.getTable(table)) {
417         lih.doBulkLoad(bulk, (HTable)t);
418       }
419       assertExpectedTable(connection, table, ROWCOUNT, 2);
420       assertEquals(20, countedLqis.get());
421     }
422   }
423 
424   /**
425    * This test creates a table with many small regions.  The bulk load files
426    * would be splitted multiple times before all of them can be loaded successfully.
427    */
428   @Test (timeout=120000)
429   public void testSplitTmpFileCleanUp() throws Exception {
430     final TableName table = TableName.valueOf("splitTmpFileCleanUp");
431     byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
432         Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
433         Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
434     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
435       setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
436 
437       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
438 
439       // create HFiles
440       Path bulk = buildBulkFiles(table, 2);
441       try (Table t = connection.getTable(table)) {
442         lih.doBulkLoad(bulk, (HTable) t);
443       }
444       // family path
445       Path tmpPath = new Path(bulk, family(0));
446       // TMP_DIR under family path
447       tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
448       FileSystem fs = bulk.getFileSystem(util.getConfiguration());
449       // HFiles have been splitted, there is TMP_DIR
450       assertTrue(fs.exists(tmpPath));
451       // TMP_DIR should have been cleaned-up
452       assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
453         FSUtils.listStatus(fs, tmpPath));
454       assertExpectedTable(connection, table, ROWCOUNT, 2);
455     }
456   }
457 
458   /**
459    * This simulates an remote exception which should cause LIHF to exit with an
460    * exception.
461    */
462   @Test(expected = IOException.class, timeout=120000)
463   public void testGroupOrSplitFailure() throws Exception {
464     TableName table = TableName.valueOf("groupOrSplitFailure");
465     try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
466       setupTable(connection, table, 10);
467 
468       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
469           util.getConfiguration()) {
470         int i = 0;
471 
472         @Override
473         protected Pair<List<LoadQueueItem>, String> groupOrSplit(
474             Multimap<ByteBuffer, LoadQueueItem> regionGroups,
475             final LoadQueueItem item, final Table table,
476             final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
477           i++;
478 
479           if (i == 5) {
480             throw new IOException("failure");
481           }
482           return super.groupOrSplit(regionGroups, item, table, startEndKeys);
483         }
484       };
485 
486       // create HFiles for different column families
487       Path dir = buildBulkFiles(table,1);
488       try (Table t = connection.getTable(table)) {
489         lih.doBulkLoad(dir, (HTable)t);
490       }
491     }
492 
493     fail("doBulkLoad should have thrown an exception");
494   }
495 
496   @Test (timeout=120000)
497   public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
498     TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
499     byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
500     // Share connection. We were failing to find the table with our new reverse scan because it
501     // looks for first region, not any region -- that is how it works now.  The below removes first
502     // region in test.  Was reliant on the Connection caching having first region.
503     Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
504     Table table = connection.getTable(tableName);
505 
506     setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
507     Path dir = buildBulkFiles(tableName, 2);
508 
509     final AtomicInteger countedLqis = new AtomicInteger();
510     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
511 
512       @Override
513       protected Pair<List<LoadQueueItem>, String> groupOrSplit(
514           Multimap<ByteBuffer, LoadQueueItem> regionGroups,
515           final LoadQueueItem item, final Table htable,
516           final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
517         Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
518             startEndKeys);
519         if (lqis != null && lqis.getFirst() != null) {
520           countedLqis.addAndGet(lqis.getFirst().size());
521         }
522         return lqis;
523       }
524     };
525 
526     // do bulkload when there is no region hole in hbase:meta.
527     try {
528       loader.doBulkLoad(dir, (HTable)table);
529     } catch (Exception e) {
530       LOG.error("exeception=", e);
531     }
532     // check if all the data are loaded into the table.
533     this.assertExpectedTable(tableName, ROWCOUNT, 2);
534 
535     dir = buildBulkFiles(tableName, 3);
536 
537     // Mess it up by leaving a hole in the hbase:meta
538     List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
539       connection, tableName);
540     for (HRegionInfo regionInfo : regionInfos) {
541       if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
542         MetaTableAccessor.deleteRegion(connection, regionInfo);
543         break;
544       }
545     }
546 
547     try {
548       loader.doBulkLoad(dir, (HTable)table);
549     } catch (Exception e) {
550       LOG.error("exeception=", e);
551       assertTrue("IOException expected", e instanceof IOException);
552     }
553 
554     table.close();
555 
556     // Make sure at least the one region that still exists can be found.
557     regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
558       connection, tableName);
559     assertTrue(regionInfos.size() >= 1);
560 
561     this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
562     connection.close();
563   }
564 
565   /**
566    * Checks that all columns have the expected value and that there is the
567    * expected number of rows.
568    * @throws IOException
569    */
570   void assertExpectedTable(final Connection connection, TableName table, int count, int value)
571   throws IOException {
572     HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
573     assertEquals(htds.length, 1);
574     Table t = null;
575     try {
576       t = connection.getTable(table);
577       Scan s = new Scan();
578       ResultScanner sr = t.getScanner(s);
579       int i = 0;
580       for (Result r : sr) {
581         i++;
582         for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
583           for (byte[] val : nm.values()) {
584             assertTrue(Bytes.equals(val, value(value)));
585           }
586         }
587       }
588       assertEquals(count, i);
589     } catch (IOException e) {
590       fail("Failed due to exception");
591     } finally {
592       if (t != null) t.close();
593     }
594   }
595 }