1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertNull;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.IOException;
26 import java.nio.ByteBuffer;
27 import java.util.Collection;
28 import java.util.Deque;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.NavigableMap;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HRegionInfo;
44 import org.apache.hadoop.hbase.HRegionLocation;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.MetaTableAccessor;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.TableExistsException;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.Admin;
51 import org.apache.hadoop.hbase.client.Connection;
52 import org.apache.hadoop.hbase.client.ConnectionFactory;
53 import org.apache.hadoop.hbase.client.HConnection;
54 import org.apache.hadoop.hbase.client.HTable;
55 import org.apache.hadoop.hbase.client.Result;
56 import org.apache.hadoop.hbase.client.ResultScanner;
57 import org.apache.hadoop.hbase.client.Scan;
58 import org.apache.hadoop.hbase.client.Table;
59 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
63 import org.apache.hadoop.hbase.regionserver.HRegionServer;
64 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
65 import org.apache.hadoop.hbase.testclassification.LargeTests;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.FSUtils;
68 import org.apache.hadoop.hbase.util.Pair;
69 import org.junit.AfterClass;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73 import org.mockito.Mockito;
74
75 import com.google.common.collect.Multimap;
76 import com.google.protobuf.RpcController;
77 import com.google.protobuf.ServiceException;
78
79
80
81
82 @Category(LargeTests.class)
83 public class TestLoadIncrementalHFilesSplitRecovery {
84 final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
85
86 static HBaseTestingUtility util;
87
88 static boolean useSecure = false;
89
90 final static int NUM_CFS = 10;
91 final static byte[] QUAL = Bytes.toBytes("qual");
92 final static int ROWCOUNT = 100;
93
94 private final static byte[][] families = new byte[NUM_CFS][];
95 static {
96 for (int i = 0; i < NUM_CFS; i++) {
97 families[i] = Bytes.toBytes(family(i));
98 }
99 }
100
101 static byte[] rowkey(int i) {
102 return Bytes.toBytes(String.format("row_%08d", i));
103 }
104
105 static String family(int i) {
106 return String.format("family_%04d", i);
107 }
108
109 static byte[] value(int i) {
110 return Bytes.toBytes(String.format("%010d", i));
111 }
112
113 public static void buildHFiles(FileSystem fs, Path dir, int value)
114 throws IOException {
115 byte[] val = value(value);
116 for (int i = 0; i < NUM_CFS; i++) {
117 Path testIn = new Path(dir, family(i));
118
119 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
120 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
121 }
122 }
123
124
125
126
127
128 private void setupTable(final Connection connection, TableName table, int cfs)
129 throws IOException {
130 try {
131 LOG.info("Creating table " + table);
132 HTableDescriptor htd = new HTableDescriptor(table);
133 for (int i = 0; i < cfs; i++) {
134 htd.addFamily(new HColumnDescriptor(family(i)));
135 }
136 try (Admin admin = connection.getAdmin()) {
137 admin.createTable(htd);
138 }
139 } catch (TableExistsException tee) {
140 LOG.info("Table " + table + " already exists");
141 }
142 }
143
144
145
146
147
148
149
150
151 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
152 throws IOException {
153 try {
154 LOG.info("Creating table " + table);
155 HTableDescriptor htd = new HTableDescriptor(table);
156 for (int i = 0; i < cfs; i++) {
157 htd.addFamily(new HColumnDescriptor(family(i)));
158 }
159
160 util.createTable(htd, SPLIT_KEYS);
161 } catch (TableExistsException tee) {
162 LOG.info("Table " + table + " already exists");
163 }
164 }
165
166 private Path buildBulkFiles(TableName table, int value) throws Exception {
167 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
168 Path bulk1 = new Path(dir, table.getNameAsString() + value);
169 FileSystem fs = util.getTestFileSystem();
170 buildHFiles(fs, bulk1, value);
171 return bulk1;
172 }
173
174
175
176
177 private void populateTable(final Connection connection, TableName table, int value)
178 throws Exception {
179
180 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
181 Path bulk1 = buildBulkFiles(table, value);
182 try (Table t = connection.getTable(table)) {
183 lih.doBulkLoad(bulk1, (HTable)t);
184 }
185 }
186
187
188
189
190 private void forceSplit(TableName table) {
191 try {
192
193 HRegionServer hrs = util.getRSForFirstRegionInTable(table);
194
195 for (HRegionInfo hri :
196 ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
197 if (hri.getTable().equals(table)) {
198
199 ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
200 }
201 }
202
203
204 int regions;
205 do {
206 regions = 0;
207 for (HRegionInfo hri :
208 ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
209 if (hri.getTable().equals(table)) {
210 regions++;
211 }
212 }
213 if (regions != 2) {
214 LOG.info("Taking some time to complete split...");
215 Thread.sleep(250);
216 }
217 } while (regions != 2);
218 } catch (IOException e) {
219 e.printStackTrace();
220 } catch (InterruptedException e) {
221 e.printStackTrace();
222 }
223 }
224
225 @BeforeClass
226 public static void setupCluster() throws Exception {
227 util = new HBaseTestingUtility();
228 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
229 util.startMiniCluster(1);
230 }
231
232 @AfterClass
233 public static void teardownCluster() throws Exception {
234 util.shutdownMiniCluster();
235 }
236
237
238
239
240
241
242 void assertExpectedTable(TableName table, int count, int value) throws IOException {
243 HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
244 assertEquals(htds.length, 1);
245 Table t = null;
246 try {
247 t = new HTable(util.getConfiguration(), table);
248 Scan s = new Scan();
249 ResultScanner sr = t.getScanner(s);
250 int i = 0;
251 for (Result r : sr) {
252 i++;
253 for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
254 for (byte[] val : nm.values()) {
255 assertTrue(Bytes.equals(val, value(value)));
256 }
257 }
258 }
259 assertEquals(count, i);
260 } catch (IOException e) {
261 fail("Failed due to exception");
262 } finally {
263 if (t != null) t.close();
264 }
265 }
266
267
268
269
270
271 @Test(expected=IOException.class, timeout=120000)
272 public void testBulkLoadPhaseFailure() throws Exception {
273 TableName table = TableName.valueOf("bulkLoadPhaseFailure");
274 final AtomicInteger attmptedCalls = new AtomicInteger();
275 final AtomicInteger failedCalls = new AtomicInteger();
276 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
277 try (Connection connection = ConnectionFactory.createConnection(this.util.getConfiguration())) {
278 setupTable(connection, table, 10);
279 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
280 @Override
281 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
282 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
283 boolean copyFile) throws IOException {
284 int i = attmptedCalls.incrementAndGet();
285 if (i == 1) {
286 Connection errConn = null;
287 try {
288 errConn = getMockedConnection(util.getConfiguration());
289 } catch (Exception e) {
290 LOG.fatal("mocking cruft, should never happen", e);
291 throw new RuntimeException("mocking cruft, should never happen");
292 }
293 failedCalls.incrementAndGet();
294 return super.tryAtomicRegionLoad((HConnection)errConn, tableName, first, lqis,copyFile);
295 }
296
297 return super.tryAtomicRegionLoad((HConnection)conn, tableName, first, lqis, copyFile);
298 }
299 };
300 try {
301
302 Path dir = buildBulkFiles(table, 1);
303 try (Table t = connection.getTable(table)) {
304 lih.doBulkLoad(dir, (HTable)t);
305 }
306 } finally {
307 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
308 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
309 }
310 fail("doBulkLoad should have thrown an exception");
311 }
312 }
313
314 @SuppressWarnings("deprecation")
315 private HConnection getMockedConnection(final Configuration conf)
316 throws IOException, ServiceException {
317 HConnection c = Mockito.mock(HConnection.class);
318 Mockito.when(c.getConfiguration()).thenReturn(conf);
319 Mockito.doNothing().when(c).close();
320
321 final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
322 ServerName.valueOf("example.org", 1234, 0));
323 Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
324 (byte[]) Mockito.any(), Mockito.anyBoolean())).
325 thenReturn(loc);
326 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
327 thenReturn(loc);
328 ClientProtos.ClientService.BlockingInterface hri =
329 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
330 Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
331 thenThrow(new ServiceException(new IOException("injecting bulk load error")));
332 Mockito.when(c.getClient(Mockito.any(ServerName.class))).
333 thenReturn(hri);
334 return c;
335 }
336
337
338
339
340
341
342
343 @Test (timeout=120000)
344 public void testSplitWhileBulkLoadPhase() throws Exception {
345 final TableName table = TableName.valueOf("splitWhileBulkloadPhase");
346 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
347 setupTable(connection, table, 10);
348 populateTable(connection, table,1);
349 assertExpectedTable(table, ROWCOUNT, 1);
350
351
352
353 final AtomicInteger attemptedCalls = new AtomicInteger();
354 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
355 @Override
356 protected void bulkLoadPhase(final Table htable, final Connection conn,
357 ExecutorService pool, Deque<LoadQueueItem> queue,
358 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
359 Map<LoadQueueItem, ByteBuffer> item2RegionMap)
360 throws IOException {
361 int i = attemptedCalls.incrementAndGet();
362 if (i == 1) {
363
364 forceSplit(table);
365 }
366 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
367 }
368 };
369
370
371 try (Table t = connection.getTable(table)) {
372 Path bulk = buildBulkFiles(table, 2);
373 lih2.doBulkLoad(bulk, (HTable)t);
374 }
375
376
377
378
379 assertEquals(attemptedCalls.get(), 3);
380 assertExpectedTable(table, ROWCOUNT, 2);
381 }
382 }
383
384
385
386
387
388 @Test (timeout=120000)
389 public void testGroupOrSplitPresplit() throws Exception {
390 final TableName table = TableName.valueOf("groupOrSplitPresplit");
391 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
392 setupTable(connection, table, 10);
393 populateTable(connection, table, 1);
394 assertExpectedTable(connection, table, ROWCOUNT, 1);
395 forceSplit(table);
396
397 final AtomicInteger countedLqis= new AtomicInteger();
398 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
399 util.getConfiguration()) {
400 @Override
401 protected Pair<List<LoadQueueItem>, String> groupOrSplit(
402 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
403 final LoadQueueItem item, final Table htable,
404 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
405 Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
406 startEndKeys);
407 if (lqis != null && lqis.getFirst() != null) {
408 countedLqis.addAndGet(lqis.getFirst().size());
409 }
410 return lqis;
411 }
412 };
413
414
415 Path bulk = buildBulkFiles(table, 2);
416 try (Table t = connection.getTable(table)) {
417 lih.doBulkLoad(bulk, (HTable)t);
418 }
419 assertExpectedTable(connection, table, ROWCOUNT, 2);
420 assertEquals(20, countedLqis.get());
421 }
422 }
423
424
425
426
427
428 @Test (timeout=120000)
429 public void testSplitTmpFileCleanUp() throws Exception {
430 final TableName table = TableName.valueOf("splitTmpFileCleanUp");
431 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
432 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
433 Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
434 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
435 setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
436
437 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
438
439
440 Path bulk = buildBulkFiles(table, 2);
441 try (Table t = connection.getTable(table)) {
442 lih.doBulkLoad(bulk, (HTable) t);
443 }
444
445 Path tmpPath = new Path(bulk, family(0));
446
447 tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
448 FileSystem fs = bulk.getFileSystem(util.getConfiguration());
449
450 assertTrue(fs.exists(tmpPath));
451
452 assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
453 FSUtils.listStatus(fs, tmpPath));
454 assertExpectedTable(connection, table, ROWCOUNT, 2);
455 }
456 }
457
458
459
460
461
462 @Test(expected = IOException.class, timeout=120000)
463 public void testGroupOrSplitFailure() throws Exception {
464 TableName table = TableName.valueOf("groupOrSplitFailure");
465 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
466 setupTable(connection, table, 10);
467
468 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
469 util.getConfiguration()) {
470 int i = 0;
471
472 @Override
473 protected Pair<List<LoadQueueItem>, String> groupOrSplit(
474 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
475 final LoadQueueItem item, final Table table,
476 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
477 i++;
478
479 if (i == 5) {
480 throw new IOException("failure");
481 }
482 return super.groupOrSplit(regionGroups, item, table, startEndKeys);
483 }
484 };
485
486
487 Path dir = buildBulkFiles(table,1);
488 try (Table t = connection.getTable(table)) {
489 lih.doBulkLoad(dir, (HTable)t);
490 }
491 }
492
493 fail("doBulkLoad should have thrown an exception");
494 }
495
496 @Test (timeout=120000)
497 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
498 TableName tableName = TableName.valueOf("testGroupOrSplitWhenRegionHoleExistsInMeta");
499 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
500
501
502
503 Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
504 Table table = connection.getTable(tableName);
505
506 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
507 Path dir = buildBulkFiles(tableName, 2);
508
509 final AtomicInteger countedLqis = new AtomicInteger();
510 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
511
512 @Override
513 protected Pair<List<LoadQueueItem>, String> groupOrSplit(
514 Multimap<ByteBuffer, LoadQueueItem> regionGroups,
515 final LoadQueueItem item, final Table htable,
516 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
517 Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
518 startEndKeys);
519 if (lqis != null && lqis.getFirst() != null) {
520 countedLqis.addAndGet(lqis.getFirst().size());
521 }
522 return lqis;
523 }
524 };
525
526
527 try {
528 loader.doBulkLoad(dir, (HTable)table);
529 } catch (Exception e) {
530 LOG.error("exeception=", e);
531 }
532
533 this.assertExpectedTable(tableName, ROWCOUNT, 2);
534
535 dir = buildBulkFiles(tableName, 3);
536
537
538 List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
539 connection, tableName);
540 for (HRegionInfo regionInfo : regionInfos) {
541 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
542 MetaTableAccessor.deleteRegion(connection, regionInfo);
543 break;
544 }
545 }
546
547 try {
548 loader.doBulkLoad(dir, (HTable)table);
549 } catch (Exception e) {
550 LOG.error("exeception=", e);
551 assertTrue("IOException expected", e instanceof IOException);
552 }
553
554 table.close();
555
556
557 regionInfos = MetaTableAccessor.getTableRegions(util.getZooKeeperWatcher(),
558 connection, tableName);
559 assertTrue(regionInfos.size() >= 1);
560
561 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
562 connection.close();
563 }
564
565
566
567
568
569
570 void assertExpectedTable(final Connection connection, TableName table, int count, int value)
571 throws IOException {
572 HTableDescriptor [] htds = util.getHBaseAdmin().listTables(table.getNameAsString());
573 assertEquals(htds.length, 1);
574 Table t = null;
575 try {
576 t = connection.getTable(table);
577 Scan s = new Scan();
578 ResultScanner sr = t.getScanner(s);
579 int i = 0;
580 for (Result r : sr) {
581 i++;
582 for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
583 for (byte[] val : nm.values()) {
584 assertTrue(Bytes.equals(val, value(value)));
585 }
586 }
587 }
588 assertEquals(count, i);
589 } catch (IOException e) {
590 fail("Failed due to exception");
591 } finally {
592 if (t != null) t.close();
593 }
594 }
595 }