View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static java.lang.String.format;
22  
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.nio.ByteBuffer;
27  import java.util.ArrayList;
28  import java.util.Arrays;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Deque;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.LinkedList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Set;
40  import java.util.TreeMap;
41  import java.util.UUID;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.ExecutionException;
44  import java.util.concurrent.ExecutorService;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.LinkedBlockingQueue;
47  import java.util.concurrent.ThreadPoolExecutor;
48  import java.util.concurrent.TimeUnit;
49  
50  import org.apache.commons.lang.mutable.MutableInt;
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.conf.Configured;
55  import org.apache.hadoop.fs.FileStatus;
56  import org.apache.hadoop.fs.FileSystem;
57  import org.apache.hadoop.fs.Path;
58  import org.apache.hadoop.fs.permission.FsPermission;
59  import org.apache.hadoop.hbase.HBaseConfiguration;
60  import org.apache.hadoop.hbase.HColumnDescriptor;
61  import org.apache.hadoop.hbase.HConstants;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.KeyValueUtil;
65  import org.apache.hadoop.hbase.TableName;
66  import org.apache.hadoop.hbase.TableNotFoundException;
67  import org.apache.hadoop.hbase.backup.BackupType;
68  import org.apache.hadoop.hbase.backup.impl.BackupManager;
69  import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
70  import org.apache.hadoop.hbase.classification.InterfaceAudience;
71  import org.apache.hadoop.hbase.classification.InterfaceStability;
72  import org.apache.hadoop.hbase.client.Admin;
73  import org.apache.hadoop.hbase.client.ClusterConnection;
74  import org.apache.hadoop.hbase.client.Connection;
75  import org.apache.hadoop.hbase.client.ConnectionFactory;
76  import org.apache.hadoop.hbase.client.HBaseAdmin;
77  import org.apache.hadoop.hbase.client.HConnection;
78  import org.apache.hadoop.hbase.client.HTable;
79  import org.apache.hadoop.hbase.client.RegionLocator;
80  import org.apache.hadoop.hbase.client.RegionServerCallable;
81  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
82  import org.apache.hadoop.hbase.client.Table;
83  import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
84  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
85  import org.apache.hadoop.hbase.io.HFileLink;
86  import org.apache.hadoop.hbase.io.HalfStoreFileReader;
87  import org.apache.hadoop.hbase.io.Reference;
88  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
89  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
90  import org.apache.hadoop.hbase.io.hfile.HFile;
91  import org.apache.hadoop.hbase.io.hfile.HFileContext;
92  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
93  import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
94  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
95  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
96  import org.apache.hadoop.hbase.regionserver.BloomType;
97  import org.apache.hadoop.hbase.regionserver.HStore;
98  import org.apache.hadoop.hbase.regionserver.StoreFile;
99  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
100 import org.apache.hadoop.hbase.security.UserProvider;
101 import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
102 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
103 import org.apache.hadoop.hbase.util.Bytes;
104 import org.apache.hadoop.hbase.util.FSHDFSUtils;
105 import org.apache.hadoop.hbase.util.Pair;
106 import org.apache.hadoop.util.Tool;
107 import org.apache.hadoop.util.ToolRunner;
108 
109 import com.google.common.collect.HashMultimap;
110 import com.google.common.collect.Multimap;
111 import com.google.common.collect.Multimaps;
112 import com.google.common.util.concurrent.ThreadFactoryBuilder;
113 
114 /**
115  * Tool to load the output of HFileOutputFormat into an existing table.
116  * @see #usage()
117  */
118 @InterfaceAudience.Public
119 @InterfaceStability.Stable
120 public class LoadIncrementalHFiles extends Configured implements Tool {
121   private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
122   private Admin hbAdmin;
123 
124   public static final String NAME = "completebulkload";
125   public static final String MAX_FILES_PER_REGION_PER_FAMILY
126     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
127   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
128   public final static String CREATE_TABLE_CONF_KEY = "create.table";
129   public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
130   public final static String ALWAYS_COPY_FILES = "always.copy.files";
131 
132   // We use a '.' prefix which is ignored when walking directory trees
133   // above. It is invalid family name.
134   final static String TMP_DIR = ".tmp";
135 
136   private int maxFilesPerRegionPerFamily;
137   private boolean assignSeqIds;
138   private Set<String> unmatchedFamilies = new HashSet<String>();
139 
140   // Source filesystem
141   private FileSystem fs;
142   // Source delegation token
143   private FsDelegationToken fsDelegationToken;
144   private String bulkToken;
145   private UserProvider userProvider;
146   private int nrThreads;
147 
148   private LoadIncrementalHFiles() {}
149 
150   private Map<LoadQueueItem, ByteBuffer> retValue = null;
151 
152   public LoadIncrementalHFiles(Configuration conf) throws Exception {
153     super(conf);
154     initialize();
155   }
156 
157   private void initialize() throws IOException {
158     if (hbAdmin == null) {
159       // make a copy, just to be sure we're not overriding someone else's config
160       setConf(HBaseConfiguration.create(getConf()));
161       Configuration conf = getConf();
162       // disable blockcache for tool invocation, see HBASE-10500
163       conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
164       this.hbAdmin = new HBaseAdmin(conf);
165       this.userProvider = UserProvider.instantiate(conf);
166       this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
167       assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
168       maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
169       nrThreads = conf.getInt("hbase.loadincremental.threads.max",
170         Runtime.getRuntime().availableProcessors());
171     }
172   }
173 
174   private void usage() {
175     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
176         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
177         + "  Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
178         + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
179         + "\n");
180   }
181 
182   private static interface BulkHFileVisitor<TFamily> {
183     TFamily bulkFamily(final byte[] familyName)
184       throws IOException;
185     void bulkHFile(final TFamily family, final FileStatus hfileStatus)
186       throws IOException;
187   }
188 
189   /**
190    * Iterate over the bulkDir hfiles.
191    * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
192    */
193   private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
194     final BulkHFileVisitor<TFamily> visitor) throws IOException {
195     if (!fs.exists(bulkDir)) {
196       throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
197     }
198 
199     FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
200     if (familyDirStatuses == null) {
201       throw new FileNotFoundException("No families found in " + bulkDir);
202     }
203 
204     for (FileStatus familyStat : familyDirStatuses) {
205       if (!familyStat.isDirectory()) {
206         LOG.warn("Skipping non-directory " + familyStat.getPath());
207         continue;
208       }
209       Path familyDir = familyStat.getPath();
210       byte[] familyName = familyDir.getName().getBytes();
211       // Skip invalid family
212       try {
213         HColumnDescriptor.isLegalFamilyName(familyName);
214       }
215       catch (IllegalArgumentException e) {
216         LOG.warn("Skipping invalid " + familyStat.getPath());
217         continue;
218       }
219       TFamily family = visitor.bulkFamily(familyName);
220 
221       FileStatus[] hfileStatuses = fs.listStatus(familyDir);
222       for (FileStatus hfileStatus : hfileStatuses) {
223         if (!fs.isFile(hfileStatus.getPath())) {
224           LOG.warn("Skipping non-file " + hfileStatus);
225           continue;
226         }
227 
228         Path hfile = hfileStatus.getPath();
229         // Skip "_", reference, HFileLink
230         String fileName = hfile.getName();
231         if (fileName.startsWith("_")) {
232           continue;
233         }
234         if (StoreFileInfo.isReference(fileName)) {
235           LOG.warn("Skipping reference " + fileName);
236           continue;
237         }
238         if (HFileLink.isHFileLink(fileName)) {
239           LOG.warn("Skipping HFileLink " + fileName);
240           continue;
241         }
242 
243         // Validate HFile Format
244         try {
245           if (!HFile.isHFileFormat(fs, hfile)) {
246             LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
247             continue;
248           }
249         } catch (FileNotFoundException e) {
250           LOG.warn("the file " + hfile + " was removed");
251           continue;
252         }
253 
254         visitor.bulkHFile(family, hfileStatus);
255       }
256     }
257   }
258 
259   /**
260    * Represents an HFile waiting to be loaded. An queue is used
261    * in this class in order to support the case where a region has
262    * split during the process of the load. When this happens,
263    * the HFile is split into two physical parts across the new
264    * region boundary, and each part is added back into the queue.
265    * The import process finishes when the queue is empty.
266    */
267   public static class LoadQueueItem {
268     final byte[] family;
269     final Path hfilePath;
270 
271     public LoadQueueItem(byte[] family, Path hfilePath) {
272       this.family = family;
273       this.hfilePath = hfilePath;
274     }
275 
276     @Override
277     public String toString() {
278       return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
279     }
280 
281     public byte[] getFamily() {
282       return family;
283     }
284 
285     public Path getFilePath() {
286       return hfilePath;
287     }
288   }
289 
290   /*
291    * Populate the Queue with given HFiles
292    */
293   private void populateLoadQueue(final Deque<LoadQueueItem> ret,
294       Map<byte[], List<Path>> map) throws IOException {
295     for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
296       for (Path p : entry.getValue()) {
297         ret.add(new LoadQueueItem(entry.getKey(), p));
298       }
299     }
300   }
301 
302   /**
303    * Walk the given directory for all HFiles, and return a Queue
304    * containing all such files.
305    */
306   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
307       final boolean validateHFile) throws IOException {
308     fs = hfofDir.getFileSystem(getConf());
309     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
310       @Override
311       public byte[] bulkFamily(final byte[] familyName) {
312         return familyName;
313       }
314       @Override
315       public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
316         long length = hfile.getLen();
317         if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
318             HConstants.DEFAULT_MAX_FILE_SIZE)) {
319           LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
320               length + " bytes can be problematic as it may lead to oversplitting.");
321         }
322         ret.add(new LoadQueueItem(family, hfile.getPath()));
323       }
324     });
325   }
326 
327   /**
328    * Perform a bulk load of the given directory into the given
329    * pre-existing table.  This method is not threadsafe.
330    *
331    * @param hfofDir the directory that was provided as the output path
332    * of a job using HFileOutputFormat
333    * @param table the table to load into
334    * @throws TableNotFoundException if table does not yet exist
335    */
336   @SuppressWarnings("deprecation")
337   public void doBulkLoad(Path hfofDir, final HTable table)
338     throws TableNotFoundException, IOException
339   {
340     Admin admin = null;
341     Table t = table;
342     Connection conn = table.getConnection();
343     boolean closeConnWhenFinished = false;
344     try {
345       if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
346         LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
347         // can only use unmanaged connections from here on out.
348         conn = ConnectionFactory.createConnection(table.getConfiguration());
349         t = conn.getTable(table.getName());
350         closeConnWhenFinished = true;
351         if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
352           throw new RuntimeException("Failed to create unmanaged connection.");
353         }
354         admin = conn.getAdmin();
355       } else {
356         admin = conn.getAdmin();
357       }
358       try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
359         doBulkLoad(hfofDir, admin, t, rl);
360       }
361     } finally {
362       if (admin != null) admin.close();
363       if (closeConnWhenFinished) {
364         t.close();
365         conn.close();
366       }
367     }
368   }
369 
370   void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
371       SecureBulkLoadClient secureClient) throws IOException {
372     fsDelegationToken.releaseDelegationToken();
373     if (bulkToken != null && secureClient != null) {
374       secureClient.cleanupBulkLoad(bulkToken);
375     }
376     if (pool != null) {
377       pool.shutdown();
378     }
379     if (!queue.isEmpty()) {
380       StringBuilder err = new StringBuilder();
381       err.append("-------------------------------------------------\n");
382       err.append("Bulk load aborted with some files not yet loaded:\n");
383       err.append("-------------------------------------------------\n");
384       for (LoadQueueItem q : queue) {
385         err.append("  ").append(q.hfilePath).append('\n');
386       }
387       LOG.error(err);
388     }
389   }
390   /**
391    * Perform a bulk load of the given directory into the given
392    * pre-existing table.  This method is not threadsafe.
393    *
394    * @param hfofDir the directory that was provided as the output path
395    * of a job using HFileOutputFormat
396    * @param admin the Admin
397    * @param table the table to load into
398    * @param regionLocator region locator
399    * @throws TableNotFoundException if table does not yet exist
400    */
401   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
402       RegionLocator regionLocator) throws TableNotFoundException, IOException  {
403          doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
404        }
405     
406   /**
407    * Perform a bulk load of the given directory into the given
408    * pre-existing table.  This method is not threadsafe.
409    *
410    * @param map map of family to List of hfiles
411    * @param admin the Admin
412    * @param table the table to load into
413    * @param regionLocator region locator
414    * @param silence true to ignore unmatched column families
415    * @param copyFile always copy hfiles if true
416    * @throws TableNotFoundException if table does not yet exist
417    */
418   public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
419       Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
420           throws TableNotFoundException, IOException {
421     if (!admin.isTableAvailable(regionLocator.getName())) {
422       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
423     }
424     // LQI queue does not need to be threadsafe -- all operations on this queue
425     // happen in this thread
426     Deque<LoadQueueItem> queue = new LinkedList<>();
427     ExecutorService pool = null;
428     SecureBulkLoadClient secureClient = null;
429     try {
430       prepareHFileQueue(map, table, queue, silence);
431       if (queue.isEmpty()) {
432         LOG.warn("Bulk load operation did not get any files to load");
433         return;
434       }
435       pool = createExecutorService();
436       for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
437         for (Path p : entry.getValue()) {
438           fs = p.getFileSystem(table.getConfiguration());
439           break;
440         }
441       }
442       secureClient = new SecureBulkLoadClient(table);
443       retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
444     } finally {
445       cleanup(admin, queue, pool, secureClient);
446     }
447   }
448 
449   /**
450    * Perform a bulk load of the given directory into the given
451    * pre-existing table.  This method is not threadsafe.
452    *
453    * @param hfofDir the directory that was provided as the output path
454    *   of a job using HFileOutputFormat
455    * @param admin the Admin
456    * @param table the table to load into
457    * @param regionLocator region locator
458    * @param silence true to ignore unmatched column families
459    * @param copyFile always copy hfiles if true
460    * @throws TableNotFoundException if table does not yet exist
461    */
462   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
463       RegionLocator regionLocator, boolean silence, boolean copyFile)
464           throws TableNotFoundException, IOException {
465     if (!admin.isTableAvailable(regionLocator.getName())) {
466       throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
467     }
468 
469     /*
470      * Checking hfile format is a time-consuming operation, we should have an option to skip
471      * this step when bulkloading millions of HFiles. See HBASE-13985.
472      */
473     boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
474     if (!validateHFile) {
475       LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
476           "are not correct. If you fail to read data from your table after using this " +
477           "option, consider removing the files and bulkload again without this option. " +
478           "See HBASE-13985");
479     }
480     // LQI queue does not need to be threadsafe -- all operations on this queue
481     // happen in this thread
482     Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
483     ExecutorService pool = null;
484     SecureBulkLoadClient secureClient = null;
485     try {
486       prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
487       if (queue.isEmpty()) {
488         LOG.warn("Bulk load operation did not find any files to load in " +
489             "directory " + hfofDir.toUri() + ".  Does it contain files in " +
490             "subdirectories that correspond to column family names?");
491         return;
492       }
493       pool = createExecutorService();
494       secureClient = new SecureBulkLoadClient(table);
495       retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
496     } finally {
497       cleanup(admin, queue, pool, secureClient);
498     }
499   }
500 
501   Map<LoadQueueItem, ByteBuffer> performBulkLoad(final Admin admin, Table table,
502       RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
503       SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
504     int count = 0;
505 
506     //If using secure bulk load, get source delegation token, and
507     //prepare staging directory and token
508     // fs is the source filesystem
509     fsDelegationToken.acquireDelegationToken(fs);
510     if(isSecureBulkLoadEndpointAvailable()) {
511       bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName());
512     }
513     Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
514 
515     Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
516     // Assumes that region splits can happen while this occurs.
517     while (!queue.isEmpty()) {
518       // need to reload split keys each iteration.
519       final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
520       if (count != 0) {
521         LOG.info("Split occured while grouping HFiles, retry attempt " +
522             + count + " with " + queue.size() + " files remaining to group or split");
523       }
524 
525       int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
526       if (maxRetries != 0 && count >= maxRetries) {
527         throw new IOException("Retry attempted " + count +
528             " times without completing, bailing out");
529       }
530       count++;
531 
532       // Using ByteBuffer for byte[] equality semantics
533       pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
534       Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
535 
536       if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
537         // Error is logged inside checkHFilesCountPerRegionPerFamily.
538         throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
539             + " hfiles to one family of one region");
540       }
541 
542       bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
543           item2RegionMap);
544 
545       // NOTE: The next iteration's split / group could happen in parallel to
546       // atomic bulkloads assuming that there are splits and no merges, and
547       // that we can atomically pull out the groups we want to retry.
548     }
549 
550     if (queue != null && !queue.isEmpty()) {
551         throw new RuntimeException("Bulk load aborted with some files not yet loaded."
552           + "Please check log for more details.");
553     }
554     return item2RegionMap;
555   }
556 
557   /**
558    * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
559    * passed directory and validates whether the prepared queue has all the valid table column
560    * families in it.
561    * @param hfilesDir directory containing list of hfiles to be loaded into the table
562    * @param table table to which hfiles should be loaded
563    * @param queue queue which needs to be loaded into the table
564    * @param validateHFile if true hfiles will be validated for its format
565    * @throws IOException If any I/O or network error occurred
566    */
567   public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
568       boolean validateHFile) throws IOException {
569     prepareHFileQueue(hfofDir, table, queue, validateHFile, false);
570   }
571 
572   /**
573    * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
574    * passed directory and validates whether the prepared queue has all the valid table column
575    * families in it.
576    * @param hfilesDir directory containing list of hfiles to be loaded into the table
577    * @param table table to which hfiles should be loaded
578    * @param queue queue which needs to be loaded into the table
579    * @param validateHFile if true hfiles will be validated for its format
580    * @param silence  true to ignore unmatched column families
581    * @throws IOException If any I/O or network error occurred
582    */
583   public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
584       boolean validateHFile, boolean silence) throws IOException {
585     discoverLoadQueue(queue, hfilesDir, validateHFile);
586     validateFamiliesInHFiles(table, queue, silence);
587   }
588 
589   /**
590    * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
591    * passed directory and validates whether the prepared queue has all the valid table column
592    * families in it.
593    * @param map map of family to List of hfiles
594    * @param table table to which hfiles should be loaded
595    * @param queue queue which needs to be loaded into the table
596    * @param silence  true to ignore unmatched column families
597    * @throws IOException If any I/O or network error occurred
598    */
599   public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
600       Deque<LoadQueueItem> queue, boolean silence) throws IOException {
601     populateLoadQueue(queue, map);
602     validateFamiliesInHFiles(table, queue, silence);
603   }
604 
605   // Initialize a thread pool
606   private ExecutorService createExecutorService() {
607     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
608     builder.setNameFormat("LoadIncrementalHFiles-%1$d");
609     ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
610         new LinkedBlockingQueue<Runnable>(), builder.build());
611     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
612     return pool;
613   }
614 
615   /**
616    * Checks whether there is any invalid family name in HFiles to be bulk loaded.
617    */
618   private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
619       throws IOException {
620     Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
621     List<String> familyNames = new ArrayList<String>(families.size());
622     for (HColumnDescriptor family : families) {
623       familyNames.add(family.getNameAsString());
624     }
625     List<String> unmatchedFamilies = new ArrayList<String>();
626     Iterator<LoadQueueItem> queueIter = queue.iterator();
627     while (queueIter.hasNext()) {
628       LoadQueueItem lqi = queueIter.next();
629       String familyNameInHFile = Bytes.toString(lqi.family);
630       if (!familyNames.contains(familyNameInHFile)) {
631         unmatchedFamilies.add(familyNameInHFile);
632       }
633     }
634     if (unmatchedFamilies.size() > 0) {
635       String msg =
636           "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
637               + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
638               + familyNames;
639       LOG.error(msg);
640       if (!silence) throw new IOException(msg);
641     }
642   }
643 
644   /**
645    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
646    * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
647    * {@link
648    * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
649    * @param table Table to which these hfiles should be loaded to
650    * @param conn Connection to use
651    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
652    * @param startEndKeys starting and ending row keys of the region
653    */
654   public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
655       Pair<byte[][], byte[][]> startEndKeys) throws IOException {
656     loadHFileQueue(table, conn, queue, startEndKeys, false);
657   }
658 
659   /**
660    * Used by the replication sink to load the hfiles from the source cluster. It does the following,
661    * <ol>
662    * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
663    * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
664    * </li>
665    * </ol>
666    * @param table Table to which these hfiles should be loaded to
667    * @param conn Connection to use
668    * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
669    * @param startEndKeys starting and ending row keys of the region
670    */
671   public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
672       Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
673     ExecutorService pool = null;
674     try {
675       pool = createExecutorService();
676       Multimap<ByteBuffer, LoadQueueItem> regionGroups =
677           groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
678       bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
679     } finally {
680       if (pool != null) {
681         pool.shutdown();
682       }
683     }
684   }
685 
686   /**
687    * This takes the LQI's grouped by likely regions and attempts to bulk load
688    * them.  Any failures are re-queued for another pass with the
689    * groupOrSplitPhase.
690    */
691   protected void bulkLoadPhase(final Table table, final Connection conn,
692       ExecutorService pool, Deque<LoadQueueItem> queue,
693       final Multimap<ByteBuffer, LoadQueueItem> regionGroups, final boolean copyFile,
694       Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
695     // atomically bulk load the groups.
696     Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<Future<List<LoadQueueItem>>>();
697     for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
698       final byte[] first = e.getKey().array();
699       final Collection<LoadQueueItem> lqis =  e.getValue();
700 
701       final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
702         @Override
703         public List<LoadQueueItem> call() throws Exception {
704           List<LoadQueueItem> toRetry =
705               tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
706           return toRetry;
707         }
708       };
709       if (item2RegionMap != null) {
710         for (LoadQueueItem lqi : lqis) {
711           item2RegionMap.put(lqi, e.getKey());
712         }
713       }
714       loadingFutures.add(pool.submit(call));
715     }
716 
717     // get all the results.
718     for (Future<List<LoadQueueItem>> future : loadingFutures) {
719       try {
720         List<LoadQueueItem> toRetry = future.get();
721 
722         if (item2RegionMap != null) {
723           for (LoadQueueItem lqi : toRetry) {
724             item2RegionMap.remove(lqi);
725           }
726         }
727         // LQIs that are requeued to be regrouped.
728         queue.addAll(toRetry);
729 
730       } catch (ExecutionException e1) {
731         Throwable t = e1.getCause();
732         if (t instanceof IOException) {
733           // At this point something unrecoverable has happened.
734           // TODO Implement bulk load recovery
735           throw new IOException("BulkLoad encountered an unrecoverable problem", t);
736         }
737         LOG.error("Unexpected execution exception during bulk load", e1);
738         throw new IllegalStateException(t);
739       } catch (InterruptedException e1) {
740         LOG.error("Unexpected interrupted exception during bulk load", e1);
741         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
742       }
743     }
744   }
745 
746   private boolean checkHFilesCountPerRegionPerFamily(
747       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
748     for (Entry<ByteBuffer,
749         ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
750       final Collection<LoadQueueItem> lqis =  e.getValue();
751       HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
752       for (LoadQueueItem lqi: lqis) {
753         MutableInt count = filesMap.get(lqi.family);
754         if (count == null) {
755           count = new MutableInt();
756           filesMap.put(lqi.family, count);
757         }
758         count.increment();
759         if (count.intValue() > maxFilesPerRegionPerFamily) {
760           LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
761             + " hfiles to family " + Bytes.toStringBinary(lqi.family)
762             + " of region with start key "
763             + Bytes.toStringBinary(e.getKey()));
764           return false;
765         }
766       }
767     }
768     return true;
769   }
770 
771   /**
772    * @param table the table to load into
773    * @param pool the ExecutorService
774    * @param queue the queue for LoadQueueItem
775    * @param startEndKeys start and end keys
776    * @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
777    * bulk load region targets and Set of missing hfiles.
778    */
779   private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
780       final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
781       final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
782     // <region start key, LQI> need synchronized only within this scope of this
783     // phase because of the puts that happen in futures.
784     Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
785     final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
786     Set<String> missingHFiles = new HashSet<>();
787     Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = new Pair<>(regionGroups,
788         missingHFiles);
789 
790     // drain LQIs and figure out bulk load groups
791     Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
792     while (!queue.isEmpty()) {
793       final LoadQueueItem item = queue.remove();
794 
795       final Callable<Pair<List<LoadQueueItem>, String>> call =
796           new Callable<Pair<List<LoadQueueItem>, String>>() {
797         @Override
798         public Pair<List<LoadQueueItem>, String> call() throws Exception {
799           Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
800               startEndKeys);
801           return splits;
802         }
803       };
804       splittingFutures.add(pool.submit(call));
805     }
806     // get all the results.  All grouping and splitting must finish before
807     // we can attempt the atomic loads.
808     for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
809       try {
810         Pair<List<LoadQueueItem>, String> splits = lqis.get();
811         if (splits != null) {
812           if (splits.getFirst() != null) {
813             queue.addAll(splits.getFirst());
814           } else {
815             missingHFiles.add(splits.getSecond());
816           }
817         }
818       } catch (ExecutionException e1) {
819         Throwable t = e1.getCause();
820         if (t instanceof IOException) {
821           LOG.error("IOException during splitting", e1);
822           throw (IOException)t; // would have been thrown if not parallelized,
823         }
824         LOG.error("Unexpected execution exception during splitting", e1);
825         throw new IllegalStateException(t);
826       } catch (InterruptedException e1) {
827         LOG.error("Unexpected interrupted exception during splitting", e1);
828         throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
829       }
830     }
831     return pair;
832   }
833 
834   // unique file name for the table
835   private String getUniqueName() {
836     return UUID.randomUUID().toString().replaceAll("-", "");
837   }
838 
839   protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
840       final Table table, byte[] startKey,
841       byte[] splitKey) throws IOException {
842     final Path hfilePath = item.hfilePath;
843 
844     Path tmpDir = item.hfilePath.getParent();
845     if (!tmpDir.getName().equals(TMP_DIR)) {
846       tmpDir = new Path(tmpDir, TMP_DIR);
847     }
848     LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
849     "region. Splitting...");
850 
851     String uniqueName = getUniqueName();
852     HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
853 
854     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
855     Path topOut = new Path(tmpDir, uniqueName + ".top");
856     splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
857 
858     FileSystem fs = tmpDir.getFileSystem(getConf());
859     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
860     fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
861     fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
862 
863     // Add these back at the *front* of the queue, so there's a lower
864     // chance that the region will just split again before we get there.
865     List<LoadQueueItem> lqis = new ArrayList<LoadQueueItem>(2);
866     lqis.add(new LoadQueueItem(item.family, botOut));
867     lqis.add(new LoadQueueItem(item.family, topOut));
868 
869     // If the current item is already the result of previous splits,
870     // we don't need it anymore. Clean up to save space.
871     // It is not part of the original input files.
872     try {
873       tmpDir = item.hfilePath.getParent();
874       if (tmpDir.getName().equals(TMP_DIR)) {
875         fs.delete(item.hfilePath, false);
876       }
877     } catch (IOException e) {
878       LOG.warn("Unable to delete temporary split file " + item.hfilePath);
879     }
880     LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
881     return lqis;
882   }
883 
884   /**
885    * Attempt to assign the given load queue item into its target region group.
886    * If the hfile boundary no longer fits into a region, physically splits
887    * the hfile such that the new bottom half will fit and returns the list of
888    * LQI's corresponding to the resultant hfiles.
889    *
890    * protected for testing
891    * @throws IOException
892    */
893   protected Pair<List<LoadQueueItem>, String> groupOrSplit(
894       Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
895       final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
896     final Path hfilePath = item.hfilePath;
897     // fs is the source filesystem
898     if (fs == null) {
899       fs = hfilePath.getFileSystem(getConf());
900     }
901     HFile.Reader hfr = null;
902     try {
903       hfr = HFile.createReader(fs, hfilePath,
904           new CacheConfig(getConf()), getConf());
905     } catch (FileNotFoundException fnfe) {
906       LOG.debug("encountered", fnfe);
907       return new Pair<>(null, hfilePath.getName());
908     }
909     final byte[] first, last;
910     try {
911       hfr.loadFileInfo();
912       first = hfr.getFirstRowKey();
913       last = hfr.getLastRowKey();
914     }  finally {
915       hfr.close();
916     }
917 
918     LOG.info("Trying to load hfile=" + hfilePath +
919         " first=" + Bytes.toStringBinary(first) +
920         " last="  + Bytes.toStringBinary(last));
921     if (first == null || last == null) {
922       assert first == null && last == null;
923       // TODO what if this is due to a bad HFile?
924       LOG.info("hfile " + hfilePath + " has no entries, skipping");
925       return null;
926     }
927     if (Bytes.compareTo(first, last) > 0) {
928       throw new IllegalArgumentException(
929       "Invalid range: " + Bytes.toStringBinary(first) +
930       " > " + Bytes.toStringBinary(last));
931     }
932     int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
933         Bytes.BYTES_COMPARATOR);
934     if (idx < 0) {
935       // not on boundary, returns -(insertion index).  Calculate region it
936       // would be in.
937       idx = -(idx + 1) - 1;
938     }
939     final int indexForCallable = idx;
940 
941     /**
942      * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
943      * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
944      * region. 3) if the endkey of the last region is not empty.
945      */
946     if (indexForCallable < 0) {
947       throw new IOException("The first region info for table "
948           + table.getName()
949           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
950     } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
951         && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
952       throw new IOException("The last region info for table "
953           + table.getName()
954           + " cann't be found in hbase:meta.Please use hbck tool to fix it first.");
955     } else if (indexForCallable + 1 < startEndKeys.getFirst().length
956         && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
957           startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
958       throw new IOException("The endkey of one region for table "
959           + table.getName()
960           + " is not equal to the startkey of the next region in hbase:meta."
961           + "Please use hbck tool to fix it first.");
962     }
963 
964     boolean lastKeyInRange =
965       Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
966       Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
967     if (!lastKeyInRange) {
968       List<LoadQueueItem> lqis = splitStoreFile(item, table,
969           startEndKeys.getFirst()[indexForCallable],
970           startEndKeys.getSecond()[indexForCallable]);
971       return new Pair<>(lqis, null);
972     }
973 
974     // group regions.
975     regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
976     return null;
977   }
978 
979   /**
980    * @deprecated As of release 0.96
981    *             (<a href="https://issues.apache.org/jira/browse/HBASE-9508">HBASE-9508</a>).
982    *             This will be removed in HBase 2.0.0.
983    *             Use {@link #tryAtomicRegionLoad(Connection, TableName, byte[], Collection)}.
984    */
985   @Deprecated
986   protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
987       final byte [] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
988   throws IOException {
989     return tryAtomicRegionLoad(conn, TableName.valueOf(tableName), first, lqis, false);
990   }
991 
992   /**
993    * Attempts to do an atomic load of many hfiles into a region.  If it fails,
994    * it returns a list of hfiles that need to be retried.  If it is successful
995    * it will return an empty list.
996    *
997    * NOTE: To maintain row atomicity guarantees, region server callable should
998    * succeed atomically and fails atomically.
999    *
1000    * Protected for testing.
1001    *
1002    * @return empty list if success, list of items to retry on recoverable
1003    * failure
1004    */
1005   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
1006       final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
1007       final boolean copyFile) throws IOException {
1008     final List<Pair<byte[], String>> famPaths =
1009       new ArrayList<Pair<byte[], String>>(lqis.size());
1010     for (LoadQueueItem lqi : lqis) {
1011       if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
1012         famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
1013       }
1014     }
1015 
1016     final RegionServerCallable<byte[]> svrCallable =
1017         new RegionServerCallable<byte[]>(conn, tableName, first) {
1018       @Override
1019       public byte[] call(int callTimeout) throws Exception {
1020         SecureBulkLoadClient secureClient = null;
1021         boolean success = false;
1022 
1023         try {
1024           LOG.debug("Going to connect to server " + getLocation() + " for row "
1025               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
1026           byte[] regionName = getLocation().getRegionInfo().getRegionName();
1027           if (!isSecureBulkLoadEndpointAvailable()) {
1028             success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
1029           } else {
1030             try (Table table = conn.getTable(getTableName())) {
1031               secureClient = new SecureBulkLoadClient(table);
1032               success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(),
1033                 bulkToken, getLocation().getRegionInfo().getStartKey(), copyFile);
1034             }
1035           }
1036           return success ? regionName : null;
1037         } finally {
1038           //Best effort copying of files that might not have been imported
1039           //from the staging directory back to original location
1040           //in user directory
1041           if(secureClient != null && !success) {
1042             FileSystem targetFs = FileSystem.get(getConf());
1043          // fs is the source filesystem
1044             if(fs == null) {
1045               fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
1046             }
1047             // Check to see if the source and target filesystems are the same
1048             // If they are the same filesystem, we will try move the files back
1049             // because previously we moved them to the staging directory.
1050             if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
1051               for(Pair<byte[], String> el : famPaths) {
1052                 Path hfileStagingPath = null;
1053                 Path hfileOrigPath = new Path(el.getSecond());
1054                 try {
1055                   hfileStagingPath= new Path(secureClient.getStagingPath(bulkToken, el.getFirst()),
1056                     hfileOrigPath.getName());
1057                   if(targetFs.rename(hfileStagingPath, hfileOrigPath)) {
1058                     LOG.debug("Moved back file " + hfileOrigPath + " from " +
1059                         hfileStagingPath);
1060                   } else if(targetFs.exists(hfileStagingPath)){
1061                     LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
1062                         hfileStagingPath);
1063                   }
1064                 } catch(Exception ex) {
1065                   LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
1066                       hfileStagingPath, ex);
1067                 }
1068               }
1069             }
1070           }
1071         }
1072       }
1073     };
1074 
1075     try {
1076       List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
1077       Configuration conf = getConf();
1078       byte[] region = RpcRetryingCallerFactory.instantiate(conf,
1079           null).<byte[]> newCaller()
1080           .callWithRetries(svrCallable, Integer.MAX_VALUE);
1081       if (region == null) {
1082         LOG.warn("Attempt to bulk load region containing "
1083             + Bytes.toStringBinary(first) + " into table "
1084             + tableName  + " with files " + lqis
1085             + " failed.  This is recoverable and they will be retried.");
1086         toRetry.addAll(lqis); // return lqi's to retry
1087       }
1088       // success
1089       return toRetry;
1090     } catch (IOException e) {
1091       LOG.error("Encountered unrecoverable error from region server, additional details: "
1092           + svrCallable.getExceptionMessageAdditionalDetail(), e);
1093       throw e;
1094     }
1095   }
1096 
1097   private boolean isSecureBulkLoadEndpointAvailable() {
1098     String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
1099     return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
1100   }
1101 
1102   /**
1103    * Split a storefile into a top and bottom half, maintaining
1104    * the metadata, recreating bloom filters, etc.
1105    */
1106   static void splitStoreFile(
1107       Configuration conf, Path inFile,
1108       HColumnDescriptor familyDesc, byte[] splitKey,
1109       Path bottomOut, Path topOut) throws IOException
1110   {
1111     // Open reader with no block cache, and not in-memory
1112     Reference topReference = Reference.createTopReference(splitKey);
1113     Reference bottomReference = Reference.createBottomReference(splitKey);
1114 
1115     copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
1116     copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
1117   }
1118 
1119   /**
1120    * Copy half of an HFile into a new HFile.
1121    */
1122   private static void copyHFileHalf(
1123       Configuration conf, Path inFile, Path outFile, Reference reference,
1124       HColumnDescriptor familyDescriptor)
1125   throws IOException {
1126     FileSystem fs = inFile.getFileSystem(conf);
1127     CacheConfig cacheConf = new CacheConfig(conf);
1128     HalfStoreFileReader halfReader = null;
1129     StoreFile.Writer halfWriter = null;
1130     try {
1131       halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, conf);
1132       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
1133 
1134       int blocksize = familyDescriptor.getBlocksize();
1135       Algorithm compression = familyDescriptor.getCompression();
1136       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
1137       HFileContext hFileContext = new HFileContextBuilder()
1138                                   .withCompression(compression)
1139                                   .withChecksumType(HStore.getChecksumType(conf))
1140                                   .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
1141                                   .withBlockSize(blocksize)
1142                                   .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
1143                                   .withIncludesTags(true)
1144                                   .build();
1145       halfWriter = new StoreFile.WriterBuilder(conf, cacheConf,
1146           fs)
1147               .withFilePath(outFile)
1148               .withBloomType(bloomFilterType)
1149               .withFileContext(hFileContext)
1150               .build();
1151       HFileScanner scanner = halfReader.getScanner(false, false, false);
1152       scanner.seekTo();
1153       do {
1154         KeyValue kv = KeyValueUtil.ensureKeyValue(scanner.getKeyValue());
1155         halfWriter.append(kv);
1156       } while (scanner.next());
1157 
1158       for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
1159         if (shouldCopyHFileMetaKey(entry.getKey())) {
1160           halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
1161         }
1162       }
1163     } finally {
1164       if (halfWriter != null) halfWriter.close();
1165       if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose());
1166     }
1167   }
1168 
1169   private static boolean shouldCopyHFileMetaKey(byte[] key) {
1170     // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
1171     if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
1172       return false;
1173     }
1174 
1175     return !HFile.isReservedFileInfoKey(key);
1176   }
1177 
1178   private boolean doesTableExist(TableName tableName) throws IOException {
1179     return hbAdmin.tableExists(tableName);
1180   }
1181 
1182   /*
1183    * Infers region boundaries for a new table.
1184    * Parameter:
1185    *   bdryMap is a map between keys to an integer belonging to {+1, -1}
1186    *     If a key is a start key of a file, then it maps to +1
1187    *     If a key is an end key of a file, then it maps to -1
1188    * Algo:
1189    * 1) Poll on the keys in order:
1190    *    a) Keep adding the mapped values to these keys (runningSum)
1191    *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
1192    * 2) Return the boundary list.
1193    */
1194   public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
1195     ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
1196     int runningValue = 0;
1197     byte[] currStartKey = null;
1198     boolean firstBoundary = true;
1199 
1200     for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
1201       if (runningValue == 0) currStartKey = item.getKey();
1202       runningValue += item.getValue();
1203       if (runningValue == 0) {
1204         if (!firstBoundary) keysArray.add(currStartKey);
1205         firstBoundary = false;
1206       }
1207     }
1208 
1209     return keysArray.toArray(new byte[0][0]);
1210   }
1211 
1212   /*
1213    * If the table is created for the first time, then "completebulkload" reads the files twice.
1214    * More modifications necessary if we want to avoid doing it.
1215    */
1216   private void createTable(TableName tableName, String dirPath) throws IOException {
1217     final Path hfofDir = new Path(dirPath);
1218     final FileSystem fs = hfofDir.getFileSystem(getConf());
1219 
1220     // Add column families
1221     // Build a set of keys
1222     final HTableDescriptor htd = new HTableDescriptor(tableName);
1223     final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1224     visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
1225       @Override
1226       public HColumnDescriptor bulkFamily(final byte[] familyName) {
1227         HColumnDescriptor hcd = new HColumnDescriptor(familyName);
1228         htd.addFamily(hcd);
1229         return hcd;
1230       }
1231       @Override
1232       public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
1233           throws IOException {
1234         Path hfile = hfileStatus.getPath();
1235         HFile.Reader reader = HFile.createReader(fs, hfile,
1236             new CacheConfig(getConf()), getConf());
1237         try {
1238           if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
1239             hcd.setCompressionType(reader.getFileContext().getCompression());
1240             LOG.info("Setting compression " + hcd.getCompressionType().name() +
1241                      " for family " + hcd.toString());
1242           }
1243           reader.loadFileInfo();
1244           byte[] first = reader.getFirstRowKey();
1245           byte[] last  = reader.getLastRowKey();
1246 
1247           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
1248             " first=" + Bytes.toStringBinary(first) +
1249             " last="  + Bytes.toStringBinary(last));
1250 
1251           // To eventually infer start key-end key boundaries
1252           Integer value = map.containsKey(first)? map.get(first):0;
1253           map.put(first, value+1);
1254 
1255           value = map.containsKey(last)? map.get(last):0;
1256           map.put(last, value-1);
1257         } finally {
1258           reader.close();
1259         }
1260       }
1261     });
1262 
1263     byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
1264     this.hbAdmin.createTable(htd,keys);
1265 
1266     LOG.info("Table "+ tableName +" is available!!");
1267   }
1268 
1269   public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
1270       TableName tableName) throws IOException {
1271     initialize();
1272     try (Connection connection = ConnectionFactory.createConnection(getConf());
1273         Admin admin = connection.getAdmin()) {
1274 
1275       boolean tableExists = this.doesTableExist(tableName);
1276       if (!tableExists) {
1277         if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))){
1278           this.createTable(tableName, dirPath);
1279         } else {
1280           String errorMsg = format("Table '%s' does not exist.", tableName);
1281           LOG.error(errorMsg);
1282           throw new TableNotFoundException(errorMsg);
1283         }
1284       }
1285 
1286       Path hfofDir = null;
1287       if (dirPath != null) {
1288         hfofDir = new Path(dirPath);
1289       }
1290 
1291       try (HTable table = (HTable) connection.getTable(tableName);
1292           RegionLocator locator = connection.getRegionLocator(tableName)) {
1293         boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
1294         boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
1295         if (dirPath != null) {
1296           doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
1297         } else {
1298           doBulkLoad(map, admin, table, locator, silence, copyFiles);
1299         }
1300         return retValue;
1301       }
1302     }
1303   }
1304 
1305   @Override
1306   public int run(String[] args) throws Exception {
1307     if (args.length < 2) {
1308       usage();
1309       return -1;
1310     }
1311 
1312     String dirPath = args[0];
1313     TableName tableName = TableName.valueOf(args[1]);
1314     Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName);
1315     if (loaded == null || !loaded.isEmpty()) return 0;
1316     return -1;
1317   }
1318 
1319   public static void main(String[] args) throws Exception {
1320     Configuration conf = HBaseConfiguration.create();
1321     int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);
1322     System.exit(ret);
1323   }
1324 
1325   /**
1326    * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
1327    * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
1328    * property. This directory is used as a temporary directory where all files are initially
1329    * copied/moved from user given directory, set all the required file permissions and then from
1330    * their it is finally loaded into a table. This should be set only when, one would like to manage
1331    * the staging directory by itself. Otherwise this tool will handle this by itself.
1332    * @param stagingDir staging directory path
1333    */
1334   public void setBulkToken(String stagingDir) {
1335     this.bulkToken = stagingDir;
1336   }
1337 
1338 }