1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static java.lang.String.format;
22
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.nio.ByteBuffer;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.Deque;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.Set;
40 import java.util.TreeMap;
41 import java.util.UUID;
42 import java.util.concurrent.Callable;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.LinkedBlockingQueue;
47 import java.util.concurrent.ThreadPoolExecutor;
48 import java.util.concurrent.TimeUnit;
49
50 import org.apache.commons.lang.mutable.MutableInt;
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53 import org.apache.hadoop.conf.Configuration;
54 import org.apache.hadoop.conf.Configured;
55 import org.apache.hadoop.fs.FileStatus;
56 import org.apache.hadoop.fs.FileSystem;
57 import org.apache.hadoop.fs.Path;
58 import org.apache.hadoop.fs.permission.FsPermission;
59 import org.apache.hadoop.hbase.HBaseConfiguration;
60 import org.apache.hadoop.hbase.HColumnDescriptor;
61 import org.apache.hadoop.hbase.HConstants;
62 import org.apache.hadoop.hbase.HTableDescriptor;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.KeyValueUtil;
65 import org.apache.hadoop.hbase.TableName;
66 import org.apache.hadoop.hbase.TableNotFoundException;
67 import org.apache.hadoop.hbase.backup.BackupType;
68 import org.apache.hadoop.hbase.backup.impl.BackupManager;
69 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
70 import org.apache.hadoop.hbase.classification.InterfaceAudience;
71 import org.apache.hadoop.hbase.classification.InterfaceStability;
72 import org.apache.hadoop.hbase.client.Admin;
73 import org.apache.hadoop.hbase.client.ClusterConnection;
74 import org.apache.hadoop.hbase.client.Connection;
75 import org.apache.hadoop.hbase.client.ConnectionFactory;
76 import org.apache.hadoop.hbase.client.HBaseAdmin;
77 import org.apache.hadoop.hbase.client.HConnection;
78 import org.apache.hadoop.hbase.client.HTable;
79 import org.apache.hadoop.hbase.client.RegionLocator;
80 import org.apache.hadoop.hbase.client.RegionServerCallable;
81 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
82 import org.apache.hadoop.hbase.client.Table;
83 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
84 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
85 import org.apache.hadoop.hbase.io.HFileLink;
86 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
87 import org.apache.hadoop.hbase.io.Reference;
88 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
89 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
90 import org.apache.hadoop.hbase.io.hfile.HFile;
91 import org.apache.hadoop.hbase.io.hfile.HFileContext;
92 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
93 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
94 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
95 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
96 import org.apache.hadoop.hbase.regionserver.BloomType;
97 import org.apache.hadoop.hbase.regionserver.HStore;
98 import org.apache.hadoop.hbase.regionserver.StoreFile;
99 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
100 import org.apache.hadoop.hbase.security.UserProvider;
101 import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
102 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.FSHDFSUtils;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.util.Tool;
107 import org.apache.hadoop.util.ToolRunner;
108
109 import com.google.common.collect.HashMultimap;
110 import com.google.common.collect.Multimap;
111 import com.google.common.collect.Multimaps;
112 import com.google.common.util.concurrent.ThreadFactoryBuilder;
113
114
115
116
117
118 @InterfaceAudience.Public
119 @InterfaceStability.Stable
120 public class LoadIncrementalHFiles extends Configured implements Tool {
121 private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
122 private Admin hbAdmin;
123
124 public static final String NAME = "completebulkload";
125 public static final String MAX_FILES_PER_REGION_PER_FAMILY
126 = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
127 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
128 public final static String CREATE_TABLE_CONF_KEY = "create.table";
129 public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
130 public final static String ALWAYS_COPY_FILES = "always.copy.files";
131
132
133
134 final static String TMP_DIR = ".tmp";
135
136 private int maxFilesPerRegionPerFamily;
137 private boolean assignSeqIds;
138 private Set<String> unmatchedFamilies = new HashSet<String>();
139
140
141 private FileSystem fs;
142
143 private FsDelegationToken fsDelegationToken;
144 private String bulkToken;
145 private UserProvider userProvider;
146 private int nrThreads;
147
148 private LoadIncrementalHFiles() {}
149
150 private Map<LoadQueueItem, ByteBuffer> retValue = null;
151
152 public LoadIncrementalHFiles(Configuration conf) throws Exception {
153 super(conf);
154 initialize();
155 }
156
157 private void initialize() throws IOException {
158 if (hbAdmin == null) {
159
160 setConf(HBaseConfiguration.create(getConf()));
161 Configuration conf = getConf();
162
163 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
164 this.hbAdmin = new HBaseAdmin(conf);
165 this.userProvider = UserProvider.instantiate(conf);
166 this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
167 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
168 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
169 nrThreads = conf.getInt("hbase.loadincremental.threads.max",
170 Runtime.getRuntime().availableProcessors());
171 }
172 }
173
174 private void usage() {
175 System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
176 + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
177 + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
178 + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
179 + "\n");
180 }
181
182 private static interface BulkHFileVisitor<TFamily> {
183 TFamily bulkFamily(final byte[] familyName)
184 throws IOException;
185 void bulkHFile(final TFamily family, final FileStatus hfileStatus)
186 throws IOException;
187 }
188
189
190
191
192
193 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
194 final BulkHFileVisitor<TFamily> visitor) throws IOException {
195 if (!fs.exists(bulkDir)) {
196 throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
197 }
198
199 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
200 if (familyDirStatuses == null) {
201 throw new FileNotFoundException("No families found in " + bulkDir);
202 }
203
204 for (FileStatus familyStat : familyDirStatuses) {
205 if (!familyStat.isDirectory()) {
206 LOG.warn("Skipping non-directory " + familyStat.getPath());
207 continue;
208 }
209 Path familyDir = familyStat.getPath();
210 byte[] familyName = familyDir.getName().getBytes();
211
212 try {
213 HColumnDescriptor.isLegalFamilyName(familyName);
214 }
215 catch (IllegalArgumentException e) {
216 LOG.warn("Skipping invalid " + familyStat.getPath());
217 continue;
218 }
219 TFamily family = visitor.bulkFamily(familyName);
220
221 FileStatus[] hfileStatuses = fs.listStatus(familyDir);
222 for (FileStatus hfileStatus : hfileStatuses) {
223 if (!fs.isFile(hfileStatus.getPath())) {
224 LOG.warn("Skipping non-file " + hfileStatus);
225 continue;
226 }
227
228 Path hfile = hfileStatus.getPath();
229
230 String fileName = hfile.getName();
231 if (fileName.startsWith("_")) {
232 continue;
233 }
234 if (StoreFileInfo.isReference(fileName)) {
235 LOG.warn("Skipping reference " + fileName);
236 continue;
237 }
238 if (HFileLink.isHFileLink(fileName)) {
239 LOG.warn("Skipping HFileLink " + fileName);
240 continue;
241 }
242
243
244 try {
245 if (!HFile.isHFileFormat(fs, hfile)) {
246 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
247 continue;
248 }
249 } catch (FileNotFoundException e) {
250 LOG.warn("the file " + hfile + " was removed");
251 continue;
252 }
253
254 visitor.bulkHFile(family, hfileStatus);
255 }
256 }
257 }
258
259
260
261
262
263
264
265
266
267 public static class LoadQueueItem {
268 final byte[] family;
269 final Path hfilePath;
270
271 public LoadQueueItem(byte[] family, Path hfilePath) {
272 this.family = family;
273 this.hfilePath = hfilePath;
274 }
275
276 @Override
277 public String toString() {
278 return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
279 }
280
281 public byte[] getFamily() {
282 return family;
283 }
284
285 public Path getFilePath() {
286 return hfilePath;
287 }
288 }
289
290
291
292
293 private void populateLoadQueue(final Deque<LoadQueueItem> ret,
294 Map<byte[], List<Path>> map) throws IOException {
295 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
296 for (Path p : entry.getValue()) {
297 ret.add(new LoadQueueItem(entry.getKey(), p));
298 }
299 }
300 }
301
302
303
304
305
306 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
307 final boolean validateHFile) throws IOException {
308 fs = hfofDir.getFileSystem(getConf());
309 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
310 @Override
311 public byte[] bulkFamily(final byte[] familyName) {
312 return familyName;
313 }
314 @Override
315 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
316 long length = hfile.getLen();
317 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
318 HConstants.DEFAULT_MAX_FILE_SIZE)) {
319 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
320 length + " bytes can be problematic as it may lead to oversplitting.");
321 }
322 ret.add(new LoadQueueItem(family, hfile.getPath()));
323 }
324 });
325 }
326
327
328
329
330
331
332
333
334
335
336 @SuppressWarnings("deprecation")
337 public void doBulkLoad(Path hfofDir, final HTable table)
338 throws TableNotFoundException, IOException
339 {
340 Admin admin = null;
341 Table t = table;
342 Connection conn = table.getConnection();
343 boolean closeConnWhenFinished = false;
344 try {
345 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
346 LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
347
348 conn = ConnectionFactory.createConnection(table.getConfiguration());
349 t = conn.getTable(table.getName());
350 closeConnWhenFinished = true;
351 if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
352 throw new RuntimeException("Failed to create unmanaged connection.");
353 }
354 admin = conn.getAdmin();
355 } else {
356 admin = conn.getAdmin();
357 }
358 try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
359 doBulkLoad(hfofDir, admin, t, rl);
360 }
361 } finally {
362 if (admin != null) admin.close();
363 if (closeConnWhenFinished) {
364 t.close();
365 conn.close();
366 }
367 }
368 }
369
370 void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
371 SecureBulkLoadClient secureClient) throws IOException {
372 fsDelegationToken.releaseDelegationToken();
373 if (bulkToken != null && secureClient != null) {
374 secureClient.cleanupBulkLoad(bulkToken);
375 }
376 if (pool != null) {
377 pool.shutdown();
378 }
379 if (!queue.isEmpty()) {
380 StringBuilder err = new StringBuilder();
381 err.append("-------------------------------------------------\n");
382 err.append("Bulk load aborted with some files not yet loaded:\n");
383 err.append("-------------------------------------------------\n");
384 for (LoadQueueItem q : queue) {
385 err.append(" ").append(q.hfilePath).append('\n');
386 }
387 LOG.error(err);
388 }
389 }
390
391
392
393
394
395
396
397
398
399
400
401 public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
402 RegionLocator regionLocator) throws TableNotFoundException, IOException {
403 doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
404 }
405
406
407
408
409
410
411
412
413
414
415
416
417
418 public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
419 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
420 throws TableNotFoundException, IOException {
421 if (!admin.isTableAvailable(regionLocator.getName())) {
422 throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
423 }
424
425
426 Deque<LoadQueueItem> queue = new LinkedList<>();
427 ExecutorService pool = null;
428 SecureBulkLoadClient secureClient = null;
429 try {
430 prepareHFileQueue(map, table, queue, silence);
431 if (queue.isEmpty()) {
432 LOG.warn("Bulk load operation did not get any files to load");
433 return;
434 }
435 pool = createExecutorService();
436 for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
437 for (Path p : entry.getValue()) {
438 fs = p.getFileSystem(table.getConfiguration());
439 break;
440 }
441 }
442 secureClient = new SecureBulkLoadClient(table);
443 retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
444 } finally {
445 cleanup(admin, queue, pool, secureClient);
446 }
447 }
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
463 RegionLocator regionLocator, boolean silence, boolean copyFile)
464 throws TableNotFoundException, IOException {
465 if (!admin.isTableAvailable(regionLocator.getName())) {
466 throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
467 }
468
469
470
471
472
473 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
474 if (!validateHFile) {
475 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
476 "are not correct. If you fail to read data from your table after using this " +
477 "option, consider removing the files and bulkload again without this option. " +
478 "See HBASE-13985");
479 }
480
481
482 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
483 ExecutorService pool = null;
484 SecureBulkLoadClient secureClient = null;
485 try {
486 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
487 if (queue.isEmpty()) {
488 LOG.warn("Bulk load operation did not find any files to load in " +
489 "directory " + hfofDir.toUri() + ". Does it contain files in " +
490 "subdirectories that correspond to column family names?");
491 return;
492 }
493 pool = createExecutorService();
494 secureClient = new SecureBulkLoadClient(table);
495 retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
496 } finally {
497 cleanup(admin, queue, pool, secureClient);
498 }
499 }
500
501 Map<LoadQueueItem, ByteBuffer> performBulkLoad(final Admin admin, Table table,
502 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
503 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
504 int count = 0;
505
506
507
508
509 fsDelegationToken.acquireDelegationToken(fs);
510 if(isSecureBulkLoadEndpointAvailable()) {
511 bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
512 }
513 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
514
515 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
516
517 while (!queue.isEmpty()) {
518
519 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
520 if (count != 0) {
521 LOG.info("Split occured while grouping HFiles, retry attempt " +
522 + count + " with " + queue.size() + " files remaining to group or split");
523 }
524
525 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
526 if (maxRetries != 0 && count >= maxRetries) {
527 throw new IOException("Retry attempted " + count +
528 " times without completing, bailing out");
529 }
530 count++;
531
532
533 pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
534 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
535
536 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
537
538 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
539 + " hfiles to one family of one region");
540 }
541
542 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
543 item2RegionMap);
544
545
546
547
548 }
549
550 if (queue != null && !queue.isEmpty()) {
551 throw new RuntimeException("Bulk load aborted with some files not yet loaded."
552 + "Please check log for more details.");
553 }
554 return item2RegionMap;
555 }
556
557
558
559
560
561
562
563
564
565
566
567 public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
568 boolean validateHFile) throws IOException {
569 prepareHFileQueue(hfofDir, table, queue, validateHFile, false);
570 }
571
572
573
574
575
576
577
578
579
580
581
582
583 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
584 boolean validateHFile, boolean silence) throws IOException {
585 discoverLoadQueue(queue, hfilesDir, validateHFile);
586 validateFamiliesInHFiles(table, queue, silence);
587 }
588
589
590
591
592
593
594
595
596
597
598
599 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
600 Deque<LoadQueueItem> queue, boolean silence) throws IOException {
601 populateLoadQueue(queue, map);
602 validateFamiliesInHFiles(table, queue, silence);
603 }
604
605
606 private ExecutorService createExecutorService() {
607 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
608 builder.setNameFormat("LoadIncrementalHFiles-%1$d");
609 ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
610 new LinkedBlockingQueue<Runnable>(), builder.build());
611 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
612 return pool;
613 }
614
615
616
617
618 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
619 throws IOException {
620 Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
621 List<String> familyNames = new ArrayList<String>(families.size());
622 for (HColumnDescriptor family : families) {
623 familyNames.add(family.getNameAsString());
624 }
625 List<String> unmatchedFamilies = new ArrayList<String>();
626 Iterator<LoadQueueItem> queueIter = queue.iterator();
627 while (queueIter.hasNext()) {
628 LoadQueueItem lqi = queueIter.next();
629 String familyNameInHFile = Bytes.toString(lqi.family);
630 if (!familyNames.contains(familyNameInHFile)) {
631 unmatchedFamilies.add(familyNameInHFile);
632 }
633 }
634 if (unmatchedFamilies.size() > 0) {
635 String msg =
636 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
637 + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
638 + familyNames;
639 LOG.error(msg);
640 if (!silence) throw new IOException(msg);
641 }
642 }
643
644
645
646
647
648
649
650
651
652
653
654 public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
655 Pair<byte[][], byte[][]> startEndKeys) throws IOException {
656 loadHFileQueue(table, conn, queue, startEndKeys, false);
657 }
658
659
660
661
662
663
664
665
666
667
668
669
670
671 public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
672 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
673 ExecutorService pool = null;
674 try {
675 pool = createExecutorService();
676 Multimap<ByteBuffer, LoadQueueItem> regionGroups =
677 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
678 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
679 } finally {
680 if (pool != null) {
681 pool.shutdown();
682 }
683 }
684 }
685
686
687
688
689
690
691 protected void bulkLoadPhase(final Table table, final Connection conn,
692 ExecutorService pool, Deque<LoadQueueItem> queue,
693 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, final boolean copyFile,
694 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
695
696 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
697 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
698 final byte[] first = e.getKey().array();
699 final Collection<LoadQueueItem> lqis = e.getValue();
700
701 final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
702 @Override
703 public List<LoadQueueItem> call() throws Exception {
704 List<LoadQueueItem> toRetry =
705 tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
706 return toRetry;
707 }
708 };
709 if (item2RegionMap != null) {
710 for (LoadQueueItem lqi : lqis) {
711 item2RegionMap.put(lqi, e.getKey());
712 }
713 }
714 loadingFutures.add(pool.submit(call));
715 }
716
717
718 for (Future<List<LoadQueueItem>> future : loadingFutures) {
719 try {
720 List<LoadQueueItem> toRetry = future.get();
721
722 if (item2RegionMap != null) {
723 for (LoadQueueItem lqi : toRetry) {
724 item2RegionMap.remove(lqi);
725 }
726 }
727
728 queue.addAll(toRetry);
729
730 } catch (ExecutionException e1) {
731 Throwable t = e1.getCause();
732 if (t instanceof IOException) {
733
734
735 throw new IOException("BulkLoad encountered an unrecoverable problem", t);
736 }
737 LOG.error("Unexpected execution exception during bulk load", e1);
738 throw new IllegalStateException(t);
739 } catch (InterruptedException e1) {
740 LOG.error("Unexpected interrupted exception during bulk load", e1);
741 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
742 }
743 }
744 }
745
746 private boolean checkHFilesCountPerRegionPerFamily(
747 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
748 for (Entry<ByteBuffer,
749 ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
750 final Collection<LoadQueueItem> lqis = e.getValue();
751 HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
752 for (LoadQueueItem lqi: lqis) {
753 MutableInt count = filesMap.get(lqi.family);
754 if (count == null) {
755 count = new MutableInt();
756 filesMap.put(lqi.family, count);
757 }
758 count.increment();
759 if (count.intValue() > maxFilesPerRegionPerFamily) {
760 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
761 + " hfiles to family " + Bytes.toStringBinary(lqi.family)
762 + " of region with start key "
763 + Bytes.toStringBinary(e.getKey()));
764 return false;
765 }
766 }
767 }
768 return true;
769 }
770
771
772
773
774
775
776
777
778
779 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
780 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
781 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
782
783
784 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
785 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
786 Set<String> missingHFiles = new HashSet<>();
787 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = new Pair<>(regionGroups,
788 missingHFiles);
789
790
791 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
792 while (!queue.isEmpty()) {
793 final LoadQueueItem item = queue.remove();
794
795 final Callable<Pair<List<LoadQueueItem>, String>> call =
796 new Callable<Pair<List<LoadQueueItem>, String>>() {
797 @Override
798 public Pair<List<LoadQueueItem>, String> call() throws Exception {
799 Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
800 startEndKeys);
801 return splits;
802 }
803 };
804 splittingFutures.add(pool.submit(call));
805 }
806
807
808 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
809 try {
810 Pair<List<LoadQueueItem>, String> splits = lqis.get();
811 if (splits != null) {
812 if (splits.getFirst() != null) {
813 queue.addAll(splits.getFirst());
814 } else {
815 missingHFiles.add(splits.getSecond());
816 }
817 }
818 } catch (ExecutionException e1) {
819 Throwable t = e1.getCause();
820 if (t instanceof IOException) {
821 LOG.error("IOException during splitting", e1);
822 throw (IOException)t;
823 }
824 LOG.error("Unexpected execution exception during splitting", e1);
825 throw new IllegalStateException(t);
826 } catch (InterruptedException e1) {
827 LOG.error("Unexpected interrupted exception during splitting", e1);
828 throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
829 }
830 }
831 return pair;
832 }
833
834
835 private String getUniqueName() {
836 return UUID.randomUUID().toString().replaceAll("-", "");
837 }
838
839 protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
840 final Table table, byte[] startKey,
841 byte[] splitKey) throws IOException {
842 final Path hfilePath = item.hfilePath;
843
844 Path tmpDir = item.hfilePath.getParent();
845 if (!tmpDir.getName().equals(TMP_DIR)) {
846 tmpDir = new Path(tmpDir, TMP_DIR);
847 }
848 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
849 "region. Splitting...");
850
851 String uniqueName = getUniqueName();
852 HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
853
854 Path botOut = new Path(tmpDir, uniqueName + ".bottom");
855 Path topOut = new Path(tmpDir, uniqueName + ".top");
856 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
857
858 FileSystem fs = tmpDir.getFileSystem(getConf());
859 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
860 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
861 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
862
863
864
865 List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
866 lqis.add(new LoadQueueItem(item.family, botOut));
867 lqis.add(new LoadQueueItem(item.family, topOut));
868
869
870
871
872 try {
873 tmpDir = item.hfilePath.getParent();
874 if (tmpDir.getName().equals(TMP_DIR)) {
875 fs.delete(item.hfilePath, false);
876 }
877 } catch (IOException e) {
878 LOG.warn("Unable to delete temporary split file " + item.hfilePath);
879 }
880 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
881 return lqis;
882 }
883
884
885
886
887
888
889
890
891
892
893 protected Pair<List<LoadQueueItem>, String> groupOrSplit(
894 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
895 final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
896 final Path hfilePath = item.hfilePath;
897
898 if (fs == null) {
899 fs = hfilePath.getFileSystem(getConf());
900 }
901 HFile.Reader hfr = null;
902 try {
903 hfr = HFile.createReader(fs, hfilePath,
904 new CacheConfig(getConf()), getConf());
905 } catch (FileNotFoundException fnfe) {
906 LOG.debug("encountered", fnfe);
907 return new Pair<>(null, hfilePath.getName());
908 }
909 final byte[] first, last;
910 try {
911 hfr.loadFileInfo();
912 first = hfr.getFirstRowKey();
913 last = hfr.getLastRowKey();
914 } finally {
915 hfr.close();
916 }
917
918 LOG.info("Trying to load hfile=" + hfilePath +
919 " first=" + Bytes.toStringBinary(first) +
920 " last=" + Bytes.toStringBinary(last));
921 if (first == null || last == null) {
922 assert first == null && last == null;
923
924 LOG.info("hfile " + hfilePath + " has no entries, skipping");
925 return null;
926 }
927 if (Bytes.compareTo(first, last) > 0) {
928 throw new IllegalArgumentException(
929 "Invalid range: " + Bytes.toStringBinary(first) +
930 " > " + Bytes.toStringBinary(last));
931 }
932 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
933 Bytes.BYTES_COMPARATOR);
934 if (idx < 0) {
935
936
937 idx = -(idx + 1) - 1;
938 }
939 final int indexForCallable = idx;
940
941
942
943
944
945
946 if (indexForCallable < 0) {
947 throw new IOException("The first region info for table "
948 + table.getName()
949 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
950 } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
951 && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
952 throw new IOException("The last region info for table "
953 + table.getName()
954 + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
955 } else if (indexForCallable + 1 < startEndKeys.getFirst().length
956 && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
957 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
958 throw new IOException("The endkey of one region for table "
959 + table.getName()
960 + " is not equal to the startkey of the next region in hbase:meta."
961 + "Please use hbck tool to fix it first.");
962 }
963
964 boolean lastKeyInRange =
965 Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
966 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
967 if (!lastKeyInRange) {
968 List<LoadQueueItem> lqis = splitStoreFile(item, table,
969 startEndKeys.getFirst()[indexForCallable],
970 startEndKeys.getSecond()[indexForCallable]);
971 return new Pair<>(lqis, null);
972 }
973
974
975 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
976 return null;
977 }
978
979
980
981
982
983
984
985 @Deprecated
986 protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
987 final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
988 throws IOException {
989 return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis, false);
990 }
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
1006 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
1007 final boolean copyFile) throws IOException {
1008 final List<Pair<byte[], String>> famPaths =
1009 new ArrayList<Pair<byte[], String>>(lqis.size());
1010 for (LoadQueueItem lqi : lqis) {
1011 if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
1012 famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
1013 }
1014 }
1015
1016 final RegionServerCallable<byte[]> svrCallable =
1017 new RegionServerCallable<byte[]>(conn, tableName, first) {
1018 @Override
1019 public byte[] call(int callTimeout) throws Exception {
1020 SecureBulkLoadClient secureClient = null;
1021 boolean success = false;
1022
1023 try {
1024 LOG.debug("Going to connect to server " + getLocation() + " for row "
1025 + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
1026 byte[] regionName = getLocation().getRegionInfo().getRegionName();
1027 if (!isSecureBulkLoadEndpointAvailable()) {
1028 success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
1029 } else {
1030 try (Table table = conn.getTable(getTableName())) {
1031 secureClient = new SecureBulkLoadClient(table);
1032 success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
1033 bulkToken, getLocation().getRegionInfo().getStartKey(), copyFile);
1034 }
1035 }
1036 return success ? regionName : null;
1037 } finally {
1038
1039
1040
1041 if(secureClient != null && !success) {
1042 FileSystem targetFs = FileSystem.get(getConf());
1043
1044 if(fs == null) {
1045 fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
1046 }
1047
1048
1049
1050 if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
1051 for(Pair<byte[], String> el : famPaths) {
1052 Path hfileStagingPath = null;
1053 Path hfileOrigPath = new Path(el.getSecond());
1054 try {
1055 hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
1056 hfileOrigPath.getName());
1057 if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
1058 LOG.debug("Moved back file " + hfileOrigPath + " from " +
1059 hfileStagingPath);
1060 } else if(targetFs.exists(hfileStagingPath)){
1061 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
1062 hfileStagingPath);
1063 }
1064 } catch(Exception ex) {
1065 LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
1066 hfileStagingPath, ex);
1067 }
1068 }
1069 }
1070 }
1071 }
1072 }
1073 };
1074
1075 try {
1076 List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
1077 Configuration conf = getConf();
1078 byte[] region = RpcRetryingCallerFactory.instantiate(conf,
1079 null).<byte[]> newCaller()
1080 .callWithRetries(svrCallable, Integer.MAX_VALUE);
1081 if (region == null) {
1082 LOG.warn("Attempt to bulk load region containing "
1083 + Bytes.toStringBinary(first) + " into table "
1084 + tableName + " with files " + lqis
1085 + " failed. This is recoverable and they will be retried.");
1086 toRetry.addAll(lqis);
1087 }
1088
1089 return toRetry;
1090 } catch (IOException e) {
1091 LOG.error("Encountered unrecoverable error from region server, additional details: "
1092 + svrCallable.getExceptionMessageAdditionalDetail(), e);
1093 throw e;
1094 }
1095 }
1096
1097 private boolean isSecureBulkLoadEndpointAvailable() {
1098 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
1099 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
1100 }
1101
1102
1103
1104
1105
1106 static void splitStoreFile(
1107 Configuration conf, Path inFile,
1108 HColumnDescriptor familyDesc, byte[] splitKey,
1109 Path bottomOut, Path topOut) throws IOException
1110 {
1111
1112 Reference topReference = Reference.createTopReference(splitKey);
1113 Reference bottomReference = Reference.createBottomReference(splitKey);
1114
1115 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
1116 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
1117 }
1118
1119
1120
1121
1122 private static void copyHFileHalf(
1123 Configuration conf, Path inFile, Path outFile, Reference reference,
1124 HColumnDescriptor familyDescriptor)
1125 throws IOException {
1126 FileSystem fs = inFile.getFileSystem(conf);
1127 CacheConfig cacheConf = new CacheConfig(conf);
1128 HalfStoreFileReader halfReader = null;
1129 StoreFile.Writer halfWriter = null;
1130 try {
1131 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
1132 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
1133
1134 int blocksize = familyDescriptor.getBlocksize();
1135 Algorithm compression = familyDescriptor.getCompression();
1136 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
1137 HFileContext hFileContext = new HFileContextBuilder()
1138 .withCompression(compression)
1139 .withChecksumType(HStore.getChecksumType(conf))
1140 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
1141 .withBlockSize(blocksize)
1142 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
1143 .withIncludesTags(true)
1144 .build();
1145 halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
1146 fs)
1147 .withFilePath(outFile)
1148 .withBloomType(bloomFilterType)
1149 .withFileContext(hFileContext)
1150 .build();
1151 HFileScanner scanner = halfReader.getScanner(false, false, false);
1152 scanner.seekTo();
1153 do {
1154 KeyValue kv = KeyValueUtil.ensureKeyValue(scanner.getKeyValue());
1155 halfWriter.append(kv);
1156 } while (scanner.next());
1157
1158 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
1159 if (shouldCopyHFileMetaKey(entry.getKey())) {
1160 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
1161 }
1162 }
1163 } finally {
1164 if (halfWriter != null) halfWriter.close();
1165 if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
1166 }
1167 }
1168
1169 private static boolean shouldCopyHFileMetaKey(byte[] key) {
1170
1171 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
1172 return false;
1173 }
1174
1175 return !HFile.isReservedFileInfoKey(key);
1176 }
1177
1178 private boolean doesTableExist(TableName tableName) throws IOException {
1179 return hbAdmin.tableExists(tableName);
1180 }
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194 public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
1195 ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
1196 int runningValue = 0;
1197 byte[] currStartKey = null;
1198 boolean firstBoundary = true;
1199
1200 for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
1201 if (runningValue == 0) currStartKey = item.getKey();
1202 runningValue += item.getValue();
1203 if (runningValue == 0) {
1204 if (!firstBoundary) keysArray.add(currStartKey);
1205 firstBoundary = false;
1206 }
1207 }
1208
1209 return keysArray.toArray(new byte[0][0]);
1210 }
1211
1212
1213
1214
1215
1216 private void createTable(TableName tableName, String dirPath) throws IOException {
1217 final Path hfofDir = new Path(dirPath);
1218 final FileSystem fs = hfofDir.getFileSystem(getConf());
1219
1220
1221
1222 final HTableDescriptor htd = new HTableDescriptor(tableName);
1223 final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1224 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
1225 @Override
1226 public HColumnDescriptor bulkFamily(final byte[] familyName) {
1227 HColumnDescriptor hcd = new HColumnDescriptor(familyName);
1228 htd.addFamily(hcd);
1229 return hcd;
1230 }
1231 @Override
1232 public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
1233 throws IOException {
1234 Path hfile = hfileStatus.getPath();
1235 HFile.Reader reader = HFile.createReader(fs, hfile,
1236 new CacheConfig(getConf()), getConf());
1237 try {
1238 if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
1239 hcd.setCompressionType(reader.getFileContext().getCompression());
1240 LOG.info("Setting compression " + hcd.getCompressionType().name() +
1241 " for family " + hcd.toString());
1242 }
1243 reader.loadFileInfo();
1244 byte[] first = reader.getFirstRowKey();
1245 byte[] last = reader.getLastRowKey();
1246
1247 LOG.info("Trying to figure out region boundaries hfile=" + hfile +
1248 " first=" + Bytes.toStringBinary(first) +
1249 " last=" + Bytes.toStringBinary(last));
1250
1251
1252 Integer value = map.containsKey(first)? map.get(first):0;
1253 map.put(first, value+1);
1254
1255 value = map.containsKey(last)? map.get(last):0;
1256 map.put(last, value-1);
1257 } finally {
1258 reader.close();
1259 }
1260 }
1261 });
1262
1263 byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
1264 this.hbAdmin.createTable(htd,keys);
1265
1266 LOG.info("Table "+ tableName +" is available!!");
1267 }
1268
1269 public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
1270 TableName tableName) throws IOException {
1271 initialize();
1272 try (Connection connection = ConnectionFactory.createConnection(getConf());
1273 Admin admin = connection.getAdmin()) {
1274
1275 boolean tableExists = this.doesTableExist(tableName);
1276 if (!tableExists) {
1277 if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))){
1278 this.createTable(tableName, dirPath);
1279 } else {
1280 String errorMsg = format("Table '%s' does not exist.", tableName);
1281 LOG.error(errorMsg);
1282 throw new TableNotFoundException(errorMsg);
1283 }
1284 }
1285
1286 Path hfofDir = null;
1287 if (dirPath != null) {
1288 hfofDir = new Path(dirPath);
1289 }
1290
1291 try (HTable table = (HTable) connection.getTable(tableName);
1292 RegionLocator locator = connection.getRegionLocator(tableName)) {
1293 boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
1294 boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
1295 if (dirPath != null) {
1296 doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
1297 } else {
1298 doBulkLoad(map, admin, table, locator, silence, copyFiles);
1299 }
1300 return retValue;
1301 }
1302 }
1303 }
1304
1305 @Override
1306 public int run(String[] args) throws Exception {
1307 if (args.length < 2) {
1308 usage();
1309 return -1;
1310 }
1311
1312 String dirPath = args[0];
1313 TableName tableName = TableName.valueOf(args[1]);
1314 Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName);
1315 if (loaded == null || !loaded.isEmpty()) return 0;
1316 return -1;
1317 }
1318
1319 public static void main(String[] args) throws Exception {
1320 Configuration conf = HBaseConfiguration.create();
1321 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
1322 System.exit(ret);
1323 }
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334 public void setBulkToken(String stagingDir) {
1335 this.bulkToken = stagingDir;
1336 }
1337
1338 }