1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.impl;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HTableDescriptor;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.backup.BackupInfo;
42 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
43 import org.apache.hadoop.hbase.backup.BackupType;
44 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
45 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
46 import org.apache.hadoop.hbase.backup.master.BackupController;
47 import org.apache.hadoop.hbase.backup.master.BackupLogCleaner;
48 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
49 import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureManager;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.classification.InterfaceStability;
52 import org.apache.hadoop.hbase.client.Admin;
53 import org.apache.hadoop.hbase.client.Connection;
54 import org.apache.hadoop.hbase.client.ConnectionFactory;
55 import org.apache.hadoop.hbase.util.Pair;
56
57 import com.google.common.util.concurrent.ThreadFactoryBuilder;
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 @InterfaceStability.Evolving
66 public class BackupManager implements Closeable {
67 private static final Log LOG = LogFactory.getLog(BackupManager.class);
68
69 private Configuration conf = null;
70 private BackupInfo backupInfo = null;
71
72 private ExecutorService pool = null;
73
74 private boolean backupComplete = false;
75
76 private BackupSystemTable systemTable;
77
78 private final Connection conn;
79
80
81
82
83
84
85 public BackupManager(Configuration conf) throws IOException {
86 if (!conf.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) {
87 throw new BackupException("HBase backup is not enabled. Check your " +
88 HConstants.BACKUP_ENABLE_KEY + " setting.");
89 }
90 this.conf = conf;
91 this.conn = ConnectionFactory.createConnection(conf);
92 this.systemTable = new BackupSystemTable(conn);
93
94 }
95
96
97
98
99 protected BackupInfo getBackupContext()
100 {
101 return backupInfo;
102 }
103
104
105
106
107 public static void decorateMasterConfiguration(Configuration conf) {
108 if (!isBackupEnabled(conf)) {
109 return;
110 }
111
112 String plugins = conf.get(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
113 String cleanerClass = BackupLogCleaner.class.getCanonicalName();
114 if (!plugins.contains(cleanerClass)) {
115 conf.set(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
116 }
117
118 String classes = conf.get("hbase.procedure.master.classes");
119 String masterProcedureClass = LogRollMasterProcedureManager.class.getName();
120 if(classes == null){
121 conf.set("hbase.procedure.master.classes", masterProcedureClass);
122 } else if(!classes.contains(masterProcedureClass)){
123 conf.set("hbase.procedure.master.classes", classes +","+masterProcedureClass);
124 }
125
126
127 classes = conf.get("hbase.coprocessor.master.classes");
128 String observerClass = BackupController.class.getName();
129 if(classes == null){
130 conf.set("hbase.coprocessor.master.classes", observerClass);
131 } else if(!classes.contains(observerClass)){
132 conf.set("hbase.coprocessor.master.classes", classes +","+observerClass);
133 }
134
135 if (LOG.isDebugEnabled()) {
136 LOG.debug("Added log cleaner: " + cleanerClass);
137 LOG.debug("Added master procedure manager: "+masterProcedureClass);
138 LOG.debug("Added master observer: "+observerClass);
139 }
140
141 }
142
143
144
145
146
147 public static void decorateRSConfiguration(Configuration conf) {
148 if (!isBackupEnabled(conf)) {
149 return;
150 }
151
152 String classes = conf.get("hbase.procedure.regionserver.classes");
153 String regionProcedureClass = LogRollRegionServerProcedureManager.class.getName();
154 if(classes == null){
155 conf.set("hbase.procedure.regionserver.classes", regionProcedureClass);
156 } else if(!classes.contains(regionProcedureClass)){
157 conf.set("hbase.procedure.regionserver.classes", classes +","+regionProcedureClass);
158 }
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("Added region procedure manager: "+regionProcedureClass);
161 }
162
163 }
164
165
166 public static boolean isBackupEnabled(Configuration conf) {
167 return conf.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT);
168 }
169
170
171
172
173
174 Configuration getConf() {
175 return conf;
176 }
177
178
179
180
181 @Override
182 public void close() {
183
184
185 if (this.pool != null) {
186 this.pool.shutdownNow();
187 }
188 if (systemTable != null) {
189 try {
190 systemTable.close();
191 } catch (Exception e) {
192 LOG.error(e);
193 }
194 }
195 if (conn != null) {
196 try {
197 conn.close();
198 } catch (IOException e) {
199 LOG.error(e);
200 }
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213
214 public BackupInfo createBackupInfo(String backupId, BackupType type,
215 List<TableName> tableList, String targetRootDir, int workers, long bandwidth)
216 throws BackupException {
217 if (targetRootDir == null) {
218 throw new BackupException("Wrong backup request parameter: target backup root directory");
219 }
220
221 if (type == BackupType.FULL && (tableList == null || tableList.isEmpty())) {
222
223
224
225 HTableDescriptor[] htds = null;
226 try (Admin hbadmin = conn.getAdmin()) {
227 htds = hbadmin.listTables();
228 } catch (Exception e) {
229 throw new BackupException(e);
230 }
231
232 if (htds == null) {
233 throw new BackupException("No table exists for full backup of all tables.");
234 } else {
235 tableList = new ArrayList<>();
236 for (HTableDescriptor hTableDescriptor : htds) {
237 tableList.add(hTableDescriptor.getTableName());
238 }
239
240 LOG.info("Full backup all the tables available in the cluster: " + tableList);
241 }
242 }
243
244
245 backupInfo = new BackupInfo(backupId, type,
246 tableList.toArray(new TableName[tableList.size()]),
247 targetRootDir);
248 backupInfo.setBandwidth(bandwidth);
249 backupInfo.setWorkers(workers);
250 return backupInfo;
251 }
252
253
254
255
256
257
258
259
260 private String getOngoingBackupId() throws IOException {
261
262 ArrayList<BackupInfo> sessions = systemTable.getBackupContexts(BackupState.RUNNING);
263 if (sessions.size() == 0) {
264 return null;
265 }
266 return sessions.get(0).getBackupId();
267 }
268
269
270
271
272
273 public void initialize() throws IOException {
274 String ongoingBackupId = this.getOngoingBackupId();
275 if (ongoingBackupId != null) {
276 LOG.info("There is a ongoing backup " + ongoingBackupId
277 + ". Can not launch new backup until no ongoing backup remains.");
278 throw new BackupException("There is ongoing backup.");
279 }
280
281
282 int nrThreads = this.conf.getInt("hbase.backup.threads.max", 1);
283 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
284 builder.setNameFormat("BackupHandler-%1$d");
285 this.pool =
286 new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
287 new LinkedBlockingQueue<Runnable>(), builder.build());
288 ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
289 }
290
291 public void setBackupInfo(BackupInfo backupInfo) {
292 this.backupInfo = backupInfo;
293 }
294
295
296
297
298
299
300
301
302 public ArrayList<BackupImage> getAncestors(BackupInfo backupCtx) throws IOException,
303 BackupException {
304 LOG.debug("Getting the direct ancestors of the current backup "+
305 backupCtx.getBackupId());
306
307 ArrayList<BackupImage> ancestors = new ArrayList<BackupImage>();
308
309
310 if (backupCtx.getType() == BackupType.FULL) {
311 LOG.debug("Current backup is a full backup, no direct ancestor for it.");
312 return ancestors;
313 }
314
315
316
317 ArrayList<BackupInfo> allHistoryList = getBackupHistory(true);
318 for (BackupInfo backup : allHistoryList) {
319 BackupImage image =
320 new BackupImage(backup.getBackupId(), backup.getType(),
321 backup.getTargetRootDir(),
322 backup.getTableNames(), backup.getStartTs(), backup
323 .getEndTs());
324
325 if (backup.getType().equals(BackupType.FULL)) {
326
327
328 if (!BackupManifest.canCoverImage(ancestors, image)) {
329 ancestors.add(image);
330 }
331 } else {
332
333
334
335
336
337 if (BackupManifest.canCoverImage(ancestors, image)) {
338 LOG.debug("Met the backup boundary of the current table set. "
339 + "The root full backup images for the current backup scope:");
340 for (BackupImage image1 : ancestors) {
341 LOG.debug(" BackupId: " + image1.getBackupId() + ", Backup directory: "
342 + image1.getRootDir());
343 }
344 } else {
345 Path logBackupPath =
346 HBackupFileSystem.getLogBackupPath(backup.getTargetRootDir(),
347 backup.getBackupId());
348 LOG.debug("Current backup has an incremental backup ancestor, "
349 + "touching its image manifest in " + logBackupPath.toString()
350 + " to construct the dependency.");
351 BackupManifest lastIncrImgManifest = new BackupManifest(conf, logBackupPath);
352 BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
353 ancestors.add(lastIncrImage);
354
355 LOG.debug("Last dependent incremental backup image information:");
356 LOG.debug(" Token: " + lastIncrImage.getBackupId());
357 LOG.debug(" Backup directory: " + lastIncrImage.getRootDir());
358 }
359 }
360 }
361 LOG.debug("Got " + ancestors.size() + " ancestors for the current backup.");
362 return ancestors;
363 }
364
365
366
367
368
369
370
371
372
373 public ArrayList<BackupImage> getAncestors(BackupInfo backupContext, TableName table)
374 throws BackupException, IOException {
375 ArrayList<BackupImage> ancestors = getAncestors(backupContext);
376 ArrayList<BackupImage> tableAncestors = new ArrayList<BackupImage>();
377 for (BackupImage image : ancestors) {
378 if (image.hasTable(table)) {
379 tableAncestors.add(image);
380 if (image.getType() == BackupType.FULL) {
381 break;
382 }
383 }
384 }
385 return tableAncestors;
386 }
387
388
389
390
391
392
393
394
395
396
397 public void updateBackupInfo(BackupInfo context) throws IOException {
398 systemTable.updateBackupInfo(context);
399 }
400
401
402
403
404
405
406
407
408 public String readBackupStartCode() throws IOException {
409 return systemTable.readBackupStartCode(backupInfo.getTargetRootDir());
410 }
411
412
413
414
415
416
417 public void writeBackupStartCode(Long startCode) throws IOException {
418 systemTable.writeBackupStartCode(startCode, backupInfo.getTargetRootDir());
419 }
420
421 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
422 readOrigBulkloadRows(List<TableName> tableList) throws IOException {
423 return systemTable.readOrigBulkloadRows(tableList);
424 }
425
426 public void removeOrigBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
427 systemTable.removeOrigBulkLoadedRows(lst, rows);
428 }
429
430 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
431 throws IOException {
432 systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
433 }
434
435
436
437
438
439
440 public HashMap<String, Long> readRegionServerLastLogRollResult() throws IOException {
441 return systemTable.readRegionServerLastLogRollResult(backupInfo.getTargetRootDir());
442 }
443
444
445
446
447
448
449 public ArrayList<BackupInfo> getBackupHistory() throws IOException {
450 return systemTable.getBackupHistory();
451 }
452
453 public ArrayList<BackupInfo> getBackupHistory(boolean completed) throws IOException {
454 return systemTable.getBackupHistory(completed);
455 }
456
457
458
459
460
461
462
463 public void writeRegionServerLogTimestamp(Set<TableName> tables,
464 HashMap<String, Long> newTimestamps) throws IOException {
465 systemTable.writeRegionServerLogTimestamp(tables, newTimestamps,
466 backupInfo.getTargetRootDir());
467 }
468
469
470
471
472
473
474
475
476 public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap() throws IOException {
477 return systemTable.readLogTimestampMap(backupInfo.getTargetRootDir());
478 }
479
480
481
482
483
484
485 public Set<TableName> getIncrementalBackupTableSet() throws IOException {
486 return systemTable.getIncrementalBackupTableSet(backupInfo.getTargetRootDir());
487 }
488
489
490
491
492
493
494 public void addIncrementalBackupTableSet(Set<TableName> tables) throws IOException {
495 systemTable.addIncrementalBackupTableSet(tables, backupInfo.getTargetRootDir());
496 }
497
498
499
500
501
502
503 public void recordWALFiles(List<String> files) throws IOException {
504 systemTable.addWALFiles(files,
505 backupInfo.getBackupId(), backupInfo.getTargetRootDir());
506 }
507
508
509
510
511
512
513 public Iterator<BackupSystemTable.WALItem> getWALFilesFromBackupSystem() throws IOException {
514 return systemTable.getWALFilesIterator(backupInfo.getTargetRootDir());
515 }
516
517 public Connection getConnection() {
518 return conn;
519 }
520 }