1
2
3
4
5
6
7
8
9
10
11 package org.apache.hadoop.hbase.replication.regionserver;
12
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14
15 import java.io.FileNotFoundException;
16 import java.io.IOException;
17 import java.io.InterruptedIOException;
18 import java.math.BigInteger;
19 import java.security.SecureRandom;
20 import java.util.ArrayList;
21 import java.util.Deque;
22 import java.util.HashMap;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.FileUtil;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.fs.permission.FsPermission;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.hbase.client.Connection;
46 import org.apache.hadoop.hbase.client.RegionLocator;
47 import org.apache.hadoop.hbase.client.Table;
48 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
49 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
50 import org.apache.hadoop.hbase.security.User;
51 import org.apache.hadoop.hbase.security.UserProvider;
52 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.Pair;
55
56
57
58
59
60
61 @InterfaceAudience.Private
62 public class HFileReplicator {
63
64 public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
65 "hbase.replication.bulkload.copy.maxthreads";
66 public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
67
68 public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
69 "hbase.replication.bulkload.copy.hfiles.perthread";
70 public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
71
72 private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
73 private final String UNDERSCORE = "_";
74 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
75
76 private Configuration sourceClusterConf;
77 private String sourceBaseNamespaceDirPath;
78 private String sourceHFileArchiveDirPath;
79 private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
80 private FileSystem sinkFs;
81 private FsDelegationToken fsDelegationToken;
82 private UserProvider userProvider;
83 private Configuration conf;
84 private Connection connection;
85 private String hbaseStagingDir;
86 private ThreadPoolExecutor exec;
87 private int maxCopyThreads;
88 private int copiesPerThread;
89
90 public HFileReplicator(Configuration sourceClusterConf,
91 String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
92 Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
93 Connection connection) throws IOException {
94 this.sourceClusterConf = sourceClusterConf;
95 this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
96 this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
97 this.bulkLoadHFileMap = tableQueueMap;
98 this.conf = conf;
99 this.connection = connection;
100
101 userProvider = UserProvider.instantiate(conf);
102 fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
103 this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
104 this.maxCopyThreads =
105 this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
106 REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
107 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
108 builder.setNameFormat("HFileReplicationCallable-%1$d");
109 this.exec =
110 new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
111 new LinkedBlockingQueue<Runnable>(), builder.build());
112 this.exec.allowCoreThreadTimeOut(true);
113 this.copiesPerThread =
114 conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
115 REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
116
117 sinkFs = FileSystem.get(conf);
118 }
119
120 public Void replicate() throws IOException {
121
122 Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
123
124 int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
125
126 for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
127 String tableNameString = tableStagingDir.getKey();
128 Path stagingDir = tableStagingDir.getValue();
129
130 LoadIncrementalHFiles loadHFiles = null;
131 try {
132 loadHFiles = new LoadIncrementalHFiles(conf);
133 } catch (Exception e) {
134 LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
135 + " data.", e);
136 throw new IOException(e);
137 }
138 Configuration newConf = HBaseConfiguration.create(conf);
139 newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
140 loadHFiles.setConf(newConf);
141
142 TableName tableName = TableName.valueOf(tableNameString);
143 Table table = this.connection.getTable(tableName);
144
145
146 Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
147 loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
148
149 if (queue.isEmpty()) {
150 LOG.warn("Replication process did not find any files to replicate in directory "
151 + stagingDir.toUri());
152 return null;
153 }
154
155 try (RegionLocator locator = connection.getRegionLocator(tableName)) {
156
157 fsDelegationToken.acquireDelegationToken(sinkFs);
158
159
160
161 loadHFiles.setBulkToken(stagingDir.toString());
162
163 doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
164 } finally {
165 cleanup(stagingDir.toString(), table);
166 }
167 }
168 return null;
169 }
170
171 private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
172 Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
173 int count = 0;
174 Pair<byte[][], byte[][]> startEndKeys;
175 while (!queue.isEmpty()) {
176
177 startEndKeys = locator.getStartEndKeys();
178 if (count != 0) {
179 LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with "
180 + queue.size() + " files still remaining to replicate.");
181 }
182
183 if (maxRetries != 0 && count >= maxRetries) {
184 throw new IOException("Retry attempted " + count
185 + " times without completing, bailing out.");
186 }
187 count++;
188
189
190 loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
191 }
192 }
193
194 private void cleanup(String stagingDir, Table table) {
195
196 fsDelegationToken.releaseDelegationToken();
197
198 if (stagingDir != null) {
199 try {
200 sinkFs.delete(new Path(stagingDir), true);
201 } catch (IOException e) {
202 LOG.warn("Failed to delete the staging directory " + stagingDir, e);
203 }
204 }
205
206
207
208
209
210
211
212
213 if (table != null) {
214 try {
215 table.close();
216 } catch (IOException e) {
217 LOG.warn("Failed to close the table.", e);
218 }
219 }
220 }
221
222 private Map<String, Path> copyHFilesToStagingDir() throws IOException {
223 Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
224 Pair<byte[], List<String>> familyHFilePathsPair;
225 List<String> hfilePaths;
226 byte[] family;
227 Path familyStagingDir;
228 int familyHFilePathsPairsListSize;
229 int totalNoOfHFiles;
230 List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
231 FileSystem sourceFs = null;
232
233 try {
234 Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
235
236
237
238
239
240
241 String sourceScheme = sourceClusterPath.toUri().getScheme();
242 String disableCacheName =
243 String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
244 sourceClusterConf.setBoolean(disableCacheName, true);
245
246 sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
247
248 User user = userProvider.getCurrent();
249
250 for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
251 .entrySet()) {
252 String tableName = tableEntry.getKey();
253
254
255 Path stagingDir =
256 createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
257
258 familyHFilePathsPairsList = tableEntry.getValue();
259 familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
260
261
262 for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
263 familyHFilePathsPair = familyHFilePathsPairsList.get(i);
264
265 family = familyHFilePathsPair.getFirst();
266 hfilePaths = familyHFilePathsPair.getSecond();
267
268 familyStagingDir = new Path(stagingDir, Bytes.toString(family));
269 totalNoOfHFiles = hfilePaths.size();
270
271
272 List<Future<Void>> futures = new ArrayList<Future<Void>>();
273 Callable<Void> c;
274 Future<Void> future;
275 int currentCopied = 0;
276
277 while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
278 c =
279 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
280 currentCopied + this.copiesPerThread));
281 future = exec.submit(c);
282 futures.add(future);
283 currentCopied += this.copiesPerThread;
284 }
285
286 int remaining = totalNoOfHFiles - currentCopied;
287 if (remaining > 0) {
288 c =
289 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
290 currentCopied + remaining));
291 future = exec.submit(c);
292 futures.add(future);
293 }
294
295 for (Future<Void> f : futures) {
296 try {
297 f.get();
298 } catch (InterruptedException e) {
299 InterruptedIOException iioe =
300 new InterruptedIOException(
301 "Failed to copy HFiles to local file system. This will be retried again "
302 + "by the source cluster.");
303 iioe.initCause(e);
304 throw iioe;
305 } catch (ExecutionException e) {
306 throw new IOException("Failed to copy HFiles to local file system. This will "
307 + "be retried again by the source cluster.", e);
308 }
309 }
310 }
311
312
313 mapOfCopiedHFiles.put(tableName, stagingDir);
314 }
315 return mapOfCopiedHFiles;
316 } finally {
317 if (sourceFs != null) {
318 sourceFs.close();
319 }
320 if(exec != null) {
321 exec.shutdown();
322 }
323 }
324 }
325
326 private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
327 String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
328 int RANDOM_WIDTH = 320;
329 int RANDOM_RADIX = 32;
330 String doubleUnderScore = UNDERSCORE + UNDERSCORE;
331 String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
332 + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
333 return createStagingDir(baseDir, user, randomDir);
334 }
335
336 private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
337 Path p = new Path(baseDir, randomDir);
338 sinkFs.mkdirs(p, PERM_ALL_ACCESS);
339 sinkFs.setPermission(p, PERM_ALL_ACCESS);
340 return p;
341 }
342
343
344
345
346
347 private class Copier implements Callable<Void> {
348 private FileSystem sourceFs;
349 private Path stagingDir;
350 private List<String> hfiles;
351
352 public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
353 throws IOException {
354 this.sourceFs = sourceFs;
355 this.stagingDir = stagingDir;
356 this.hfiles = hfiles;
357 }
358
359 @Override
360 public Void call() throws IOException {
361 Path sourceHFilePath;
362 Path localHFilePath;
363 int totalHFiles = hfiles.size();
364 for (int i = 0; i < totalHFiles; i++) {
365 sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
366 localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
367 try {
368 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
369
370
371 } catch (FileNotFoundException e) {
372 LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
373 + ". Trying to copy from hfile archive directory.",
374 e);
375 sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
376
377 try {
378 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
379 } catch (FileNotFoundException e1) {
380
381
382 LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
383 + ". Hence ignoring this hfile from replication..",
384 e1);
385 return null;
386 }
387 }
388 sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
389 }
390 return null;
391 }
392 }
393 }