1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.backup.impl;
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Set;
29 import java.util.TreeMap;
30 import java.util.TreeSet;
31
32 import org.apache.commons.lang.StringUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.CellComparator;
39 import org.apache.hadoop.hbase.CellUtil;
40 import org.apache.hadoop.hbase.HBaseConfiguration;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.backup.BackupInfo;
46 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
47 import org.apache.hadoop.hbase.backup.BackupType;
48 import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
49 import org.apache.hadoop.hbase.classification.InterfaceAudience;
50 import org.apache.hadoop.hbase.classification.InterfaceStability;
51 import org.apache.hadoop.hbase.client.Connection;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.Put;
55 import org.apache.hadoop.hbase.client.Result;
56 import org.apache.hadoop.hbase.client.ResultScanner;
57 import org.apache.hadoop.hbase.client.Scan;
58 import org.apache.hadoop.hbase.client.Table;
59 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60 import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.apache.hadoop.hbase.util.Pair;
64
65
66
67
68 @InterfaceAudience.Private
69 @InterfaceStability.Evolving
70 public final class BackupSystemTable implements Closeable {
71
72 static class WALItem {
73 String backupId;
74 String walFile;
75 String backupRoot;
76
77 WALItem(String backupId, String walFile, String backupRoot)
78 {
79 this.backupId = backupId;
80 this.walFile = walFile;
81 this.backupRoot = backupRoot;
82 }
83
84 public String getBackupId() {
85 return backupId;
86 }
87
88 public String getWalFile() {
89 return walFile;
90 }
91
92 public String getBackupRoot() {
93 return backupRoot;
94 }
95
96 public String toString() {
97 return "/"+ backupRoot + "/"+backupId + "/" + walFile;
98 }
99
100 }
101
102 private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
103 private final static TableName tableName = TableName.BACKUP_TABLE_NAME;
104
105 final static byte[] SESSIONS_FAMILY = "session".getBytes();
106
107 final static byte[] META_FAMILY = "meta".getBytes();
108 final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
109
110
111 private final Connection connection;
112
113 public BackupSystemTable(Connection conn) throws IOException {
114 this.connection = conn;
115 }
116
117
118 public void close() {
119
120 }
121
122
123
124
125
126
127 public void updateBackupInfo(BackupInfo context) throws IOException {
128
129 if (LOG.isDebugEnabled()) {
130 LOG.debug("update backup status in hbase:backup for: " + context.getBackupId()
131 + " set status=" + context.getState());
132 }
133 try (Table table = connection.getTable(tableName)) {
134 Put put = BackupSystemTableHelper.createPutForBackupContext(context);
135 table.put(put);
136 }
137 }
138
139
140
141
142
143
144
145
146 public void deleteBackupInfo(String backupId) throws IOException {
147
148 if (LOG.isDebugEnabled()) {
149 LOG.debug("delete backup status in hbase:backup for " + backupId);
150 }
151 try (Table table = connection.getTable(tableName)) {
152 Delete del = BackupSystemTableHelper.createDeleteForBackupInfo(backupId);
153 table.delete(del);
154 }
155 }
156
157
158
159
160
161
162
163 public BackupInfo readBackupInfo(String backupId) throws IOException {
164 if (LOG.isDebugEnabled()) {
165 LOG.debug("read backup status from hbase:backup for: " + backupId);
166 }
167
168 try (Table table = connection.getTable(tableName)) {
169 Get get = BackupSystemTableHelper.createGetForBackupContext(backupId);
170 Result res = table.get(get);
171 if(res.isEmpty()){
172 return null;
173 }
174 return BackupSystemTableHelper.resultToBackupInfo(res);
175 }
176 }
177
178
179
180
181
182
183
184
185
186 public String readBackupStartCode(String backupRoot) throws IOException {
187 if (LOG.isDebugEnabled()) {
188 LOG.debug("read backup start code from hbase:backup");
189 }
190 try (Table table = connection.getTable(tableName)) {
191 Get get = BackupSystemTableHelper.createGetForStartCode(backupRoot);
192 Result res = table.get(get);
193 if (res.isEmpty()) {
194 return null;
195 }
196 Cell cell = res.listCells().get(0);
197 byte[] val = CellUtil.cloneValue(cell);
198 if (val.length == 0){
199 return null;
200 }
201 return new String(val);
202 }
203 }
204
205
206
207
208
209 public Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
210 Scan scan = BackupSystemTableHelper.createScanForBulkLoadedFiles(backupId);
211 try (Table table = connection.getTable(tableName);
212 ResultScanner scanner = table.getScanner(scan)) {
213 Result res = null;
214 Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
215 while ((res = scanner.next()) != null) {
216 res.advance();
217 byte[] row = CellUtil.cloneRow(res.listCells().get(0));
218 for (Cell cell : res.listCells()) {
219 if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
220 BackupSystemTableHelper.PATH_COL.length) == 0) {
221 map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
222 }
223 }
224 }
225 return map;
226 }
227 }
228
229 public static int getIndex(TableName tbl, List<TableName> sTableList) {
230 if (sTableList == null) return 0;
231 for (int i = 0; i < sTableList.size(); i++) {
232 if (tbl.equals(sTableList.get(i))) {
233 return i;
234 }
235 }
236 return -1;
237 }
238
239
240
241
242
243
244 public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
245 throws IOException {
246 Scan scan = BackupSystemTableHelper.createScanForBulkLoadedFiles(backupId);
247 Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
248 try (Table table = connection.getTable(tableName);
249 ResultScanner scanner = table.getScanner(scan)) {
250 Result res = null;
251 while ((res = scanner.next()) != null) {
252 res.advance();
253 TableName tbl = null;
254 byte[] fam = null;
255 String path = null;
256 for (Cell cell : res.listCells()) {
257 if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.TBL_COL, 0,
258 BackupSystemTableHelper.TBL_COL.length) == 0) {
259 tbl = TableName.valueOf(CellUtil.cloneValue(cell));
260 } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.FAM_COL, 0,
261 BackupSystemTableHelper.FAM_COL.length) == 0) {
262 fam = CellUtil.cloneValue(cell);
263 } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
264 BackupSystemTableHelper.PATH_COL.length) == 0) {
265 path = Bytes.toString(CellUtil.cloneValue(cell));
266 }
267 }
268 int srcIdx = getIndex(tbl, sTableList);
269 if (srcIdx == -1) {
270
271 continue;
272 }
273 if (mapForSrc[srcIdx] == null) {
274 mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
275 }
276 List<Path> files;
277 if (!mapForSrc[srcIdx].containsKey(fam)) {
278 files = new ArrayList<Path>();
279 mapForSrc[srcIdx].put(fam, files);
280 } else {
281 files = mapForSrc[srcIdx].get(fam);
282 }
283 files.add(new Path(path));
284 LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
285 };
286 return mapForSrc;
287 }
288 }
289
290
291
292
293 public void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
294 try (Table table = connection.getTable(tableName)) {
295 List<Delete> dels = new ArrayList<>();
296 for (byte[] row : map.keySet()) {
297 dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
298 }
299 table.delete(dels);
300 }
301 }
302
303
304
305
306
307
308
309 public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
310 if (LOG.isDebugEnabled()) {
311 LOG.debug("write backup start code to hbase:backup " + startCode);
312 }
313 try (Table table = connection.getTable(tableName)) {
314 Put put = BackupSystemTableHelper.createPutForStartCode(startCode.toString(), backupRoot);
315 table.put(put);
316 }
317 }
318
319
320
321
322 public void writeOrigBulkLoad(TableName tabName, byte[] region,
323 Map<byte[], List<Path>> finalPaths) throws IOException {
324 if (LOG.isDebugEnabled()) {
325 LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
326 finalPaths.size() + " entries");
327 }
328 try (Table table = connection.getTable(tableName)) {
329 List<Put> puts = BackupSystemTableHelper.createPutForOrigBulkload(tabName, region,
330 finalPaths);
331 table.put(puts);
332 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
333 }
334 }
335
336
337
338
339 public void writeOrigBulkLoad(TableName tabName, byte[] region,
340 final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
341 if (LOG.isDebugEnabled()) {
342 LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
343 pairs.size() + " entries");
344 }
345 try (Table table = connection.getTable(tableName)) {
346 List<Put> puts = BackupSystemTableHelper.createPutForOrigBulkload(tabName, region,
347 family, pairs);
348 table.put(puts);
349 LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
350 }
351 }
352
353 public void removeOrigBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
354 try (Table table = connection.getTable(tableName)) {
355 List<Delete> lstDels = new ArrayList<>();
356 for (byte[] row : rows) {
357 Delete del = new Delete(row);
358 lstDels.add(del);
359 LOG.debug("orig deleting the row: " + Bytes.toString(row));
360 }
361 table.delete(lstDels);
362 LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
363 }
364 }
365
366
367
368
369 public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
370 readOrigBulkloadRows(List<TableName> tableList) throws IOException {
371 Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
372 List<byte[]> rows = new ArrayList<>();
373 for (TableName tTable : tableList) {
374 Scan scan = BackupSystemTableHelper.createScanForOrigBulkLoadedFiles(tTable);
375 Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
376 try (Table table = connection.getTable(tableName);
377 ResultScanner scanner = table.getScanner(scan)) {
378 Result res = null;
379 while ((res = scanner.next()) != null) {
380 res.advance();
381 String fam = null;
382 String path = null;
383 boolean raw = false;
384 byte[] row = null;
385 String region = null;
386 for (Cell cell : res.listCells()) {
387 row = CellUtil.cloneRow(cell);
388 rows.add(row);
389 String rowStr = Bytes.toString(row);
390 region = BackupSystemTableHelper.getRegionNameFromOrigBulkLoadRow(rowStr);
391 if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.FAM_COL, 0,
392 BackupSystemTableHelper.FAM_COL.length) == 0) {
393 fam = Bytes.toString(CellUtil.cloneValue(cell));
394 } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
395 BackupSystemTableHelper.PATH_COL.length) == 0) {
396 path = Bytes.toString(CellUtil.cloneValue(cell));
397 } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.STATE_COL, 0,
398 BackupSystemTableHelper.STATE_COL.length) == 0) {
399 byte[] state = CellUtil.cloneValue(cell);
400 if (BackupSystemTableHelper.BL_RAW.equals(state)) {
401 raw = true;
402 } else raw = false;
403 }
404 }
405 if (map.get(tTable) == null) {
406 map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
407 tblMap = map.get(tTable);
408 }
409 if (tblMap.get(region) == null) {
410 tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
411 }
412 Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
413 if (famMap.get(fam) == null) {
414 famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
415 }
416 famMap.get(fam).add(new Pair<>(path, raw));
417 LOG.debug("found orig " + raw + " " + path + " for " + fam + " of region " + region);
418 }
419 }
420 }
421 return new Pair<>(map, rows);
422 }
423
424
425
426
427
428
429 public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
430 String backupId) throws IOException {
431 try (Table table = connection.getTable(tableName)) {
432 long ts = EnvironmentEdgeManager.currentTime();
433 int cnt = 0;
434 List<Put> puts = new ArrayList<>();
435 for (int idx = 0; idx < maps.length; idx++) {
436 Map<byte[], List<Path>> map = maps[idx];
437 TableName tn = sTableList.get(idx);
438 if (map == null) continue;
439 for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
440 byte[] fam = entry.getKey();
441 List<Path> paths = entry.getValue();
442 for (Path p : paths) {
443 Put put = BackupSystemTableHelper.createPutForBulkLoadedFile(tn, fam, p.toString(),
444 backupId, ts, cnt++);
445 puts.add(put);
446 }
447 }
448 }
449 if (!puts.isEmpty()) {
450 table.put(puts);
451 }
452 }
453 }
454
455
456
457
458
459
460
461 public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
462 throws IOException {
463 if (LOG.isDebugEnabled()) {
464 LOG.debug("read region server last roll log result to hbase:backup");
465 }
466
467 Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult(backupRoot);
468
469 try (Table table = connection.getTable(tableName);
470 ResultScanner scanner = table.getScanner(scan)) {
471 Result res = null;
472 HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
473 while ((res = scanner.next()) != null) {
474 res.advance();
475 Cell cell = res.current();
476 byte[] row = CellUtil.cloneRow(cell);
477 String server =
478 BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row);
479 byte[] data = CellUtil.cloneValue(cell);
480 rsTimestampMap.put(server, Long.parseLong(new String(data)));
481 }
482 return rsTimestampMap;
483 }
484 }
485
486
487
488
489
490
491
492
493 public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
494 throws IOException {
495 if (LOG.isDebugEnabled()) {
496 LOG.debug("write region server last roll log result to hbase:backup");
497 }
498 try (Table table = connection.getTable(tableName)) {
499 Put put =
500 BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server,ts,backupRoot);
501 table.put(put);
502 }
503 }
504
505
506
507
508
509
510
511 public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
512 if (LOG.isDebugEnabled()) {
513 LOG.debug("get backup history from hbase:backup");
514 }
515 ArrayList<BackupInfo> list ;
516 BackupState state = onlyCompleted? BackupState.COMPLETE: BackupState.ANY;
517 list = getBackupContexts(state);
518 return BackupClientUtil.sortHistoryListDesc(list);
519 }
520
521 public ArrayList<BackupInfo> getBackupHistory() throws IOException {
522 return getBackupHistory(false);
523 }
524
525
526
527
528
529
530
531 public ArrayList<BackupInfo> getBackupContexts(BackupState status) throws IOException {
532 if (LOG.isDebugEnabled()) {
533 LOG.debug("get backup contexts from hbase:backup");
534 }
535
536 Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
537 ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
538
539 try (Table table = connection.getTable(tableName);
540 ResultScanner scanner = table.getScanner(scan)) {
541 Result res = null;
542 while ((res = scanner.next()) != null) {
543 res.advance();
544 BackupInfo context = BackupSystemTableHelper.cellToBackupInfo(res.current());
545 if (status != BackupState.ANY && context.getState() != status){
546 continue;
547 }
548 list.add(context);
549 }
550 return list;
551 }
552 }
553
554
555
556
557
558
559
560
561
562
563 public void writeRegionServerLogTimestamp(Set<TableName> tables,
564 HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
565 if (LOG.isDebugEnabled()) {
566 LOG.debug("write RS log time stamps to hbase:backup for tables ["+
567 StringUtils.join(tables, ",")+"]");
568 }
569 List<Put> puts = new ArrayList<Put>();
570 for (TableName table : tables) {
571 byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
572 Put put =
573 BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table,
574 smapData, backupRoot);
575 puts.add(put);
576 }
577 try (Table table = connection.getTable(tableName)) {
578 table.put(puts);
579 }
580 }
581
582
583
584
585
586
587 public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
588 List<TableName> names = new ArrayList<>();
589 List<BackupInfo> infos = getBackupHistory(true);
590 for (BackupInfo info : infos) {
591 if (info.getType() != type) continue;
592 names.addAll(info.getTableNames());
593 }
594 return names;
595 }
596
597
598
599
600
601
602
603
604
605
606 public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
607 throws IOException {
608 if (LOG.isDebugEnabled()) {
609 LOG.debug("read RS log ts from hbase:backup for root="+ backupRoot);
610 }
611
612 HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
613 new HashMap<TableName, HashMap<String, Long>>();
614
615 Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap(backupRoot);
616 try (Table table = connection.getTable(tableName);
617 ResultScanner scanner = table.getScanner(scan)) {
618 Result res = null;
619 while ((res = scanner.next()) != null) {
620 res.advance();
621 Cell cell = res.current();
622 byte[] row = CellUtil.cloneRow(cell);
623 String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row);
624 TableName tn = TableName.valueOf(tabName);
625 byte[] data = CellUtil.cloneValue(cell);
626 if (data == null) {
627 throw new IOException("Data of last backup data from hbase:backup "
628 + "is empty. Create a backup first.");
629 }
630 if (data != null && data.length > 0) {
631 HashMap<String, Long> lastBackup =
632 fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
633 tableTimestampMap.put(tn, lastBackup);
634 }
635 }
636 return tableTimestampMap;
637 }
638 }
639
640 private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
641 Map<String, Long> map) {
642 BackupProtos.TableServerTimestamp.Builder tstBuilder =
643 BackupProtos.TableServerTimestamp.newBuilder();
644 tstBuilder.setTable(ProtobufUtil.toProtoTableName(table));
645
646 for(Entry<String, Long> entry: map.entrySet()) {
647 BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
648 builder.setServer(entry.getKey());
649 builder.setTimestamp(entry.getValue());
650 tstBuilder.addServerTimestamp(builder.build());
651 }
652
653 return tstBuilder.build();
654 }
655
656 private HashMap<String, Long> fromTableServerTimestampProto(
657 BackupProtos.TableServerTimestamp proto) {
658 HashMap<String, Long> map = new HashMap<String, Long> ();
659 List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
660 for(BackupProtos.ServerTimestamp st: list) {
661 map.put(st.getServer(), st.getTimestamp());
662 }
663 return map;
664 }
665
666
667
668
669
670
671
672 public Set<TableName> getIncrementalBackupTableSet(String backupRoot)
673 throws IOException {
674 if (LOG.isDebugEnabled()) {
675 LOG.debug("get incr backup table set from hbase:backup");
676 }
677 TreeSet<TableName> set = new TreeSet<>();
678
679 try (Table table = connection.getTable(tableName)) {
680 Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet(backupRoot);
681 Result res = table.get(get);
682 if (res.isEmpty()) {
683 return set;
684 }
685 List<Cell> cells = res.listCells();
686 for (Cell cell : cells) {
687
688 set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
689 }
690 return set;
691 }
692 }
693
694
695
696
697
698
699
700 public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) throws IOException {
701 if (LOG.isDebugEnabled()) {
702 LOG.debug("Add incremental backup table set to hbase:backup. ROOT="+backupRoot +
703 " tables ["+ StringUtils.join(tables, " ")+"]");
704 for (TableName table : tables) {
705 LOG.debug(table);
706 }
707 }
708 try (Table table = connection.getTable(tableName)) {
709 Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables, backupRoot);
710 table.put(put);
711 }
712 }
713
714
715
716
717
718
719
720
721 public void addWALFiles(List<String> files, String backupId,
722 String backupRoot) throws IOException {
723 if (LOG.isDebugEnabled()) {
724 LOG.debug("add WAL files to hbase:backup: "+backupId +" "+backupRoot+" files ["+
725 StringUtils.join(files, ",")+"]");
726 for(String f: files){
727 LOG.debug("add :"+f);
728 }
729 }
730 try (Table table = connection.getTable(tableName)) {
731 List<Put> puts =
732 BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId, backupRoot);
733 table.put(puts);
734 }
735 }
736
737
738
739
740
741
742 public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
743 if (LOG.isDebugEnabled()) {
744 LOG.debug("get WAL files from hbase:backup");
745 }
746 final Table table = connection.getTable(tableName);
747 Scan scan = BackupSystemTableHelper.createScanForGetWALs(backupRoot);
748 final ResultScanner scanner = table.getScanner(scan);
749 final Iterator<Result> it = scanner.iterator();
750 return new Iterator<WALItem>() {
751
752 @Override
753 public boolean hasNext() {
754 boolean next = it.hasNext();
755 if (!next) {
756
757 try {
758 scanner.close();
759 table.close();
760 } catch (IOException e) {
761 LOG.error("Close WAL Iterator", e);
762 }
763 }
764 return next;
765 }
766
767 @Override
768 public WALItem next() {
769 Result next = it.next();
770 List<Cell> cells = next.listCells();
771 byte[] buf = cells.get(0).getValueArray();
772 int len = cells.get(0).getValueLength();
773 int offset = cells.get(0).getValueOffset();
774 String backupId = new String(buf, offset, len);
775 buf = cells.get(1).getValueArray();
776 len = cells.get(1).getValueLength();
777 offset = cells.get(1).getValueOffset();
778 String walFile = new String(buf, offset, len);
779 buf = cells.get(2).getValueArray();
780 len = cells.get(2).getValueLength();
781 offset = cells.get(2).getValueOffset();
782 String backupRoot = new String(buf, offset, len);
783 return new WALItem(backupId, walFile, backupRoot);
784 }
785
786 @Override
787 public void remove() {
788
789 throw new RuntimeException("remove is not supported");
790 }
791 };
792
793 }
794
795
796
797
798
799
800
801
802 public boolean isWALFileDeletable(String file) throws IOException {
803 if (LOG.isDebugEnabled()) {
804 LOG.debug("Check if WAL file has been already backed up in hbase:backup "+ file);
805 }
806 try (Table table = connection.getTable(tableName)) {
807 Get get = BackupSystemTableHelper.createGetForCheckWALFile(file);
808 Result res = table.get(get);
809 if (res.isEmpty()){
810 return false;
811 }
812 return true;
813 }
814 }
815
816
817
818
819
820
821
822 public boolean hasBackupSessions() throws IOException {
823 if (LOG.isDebugEnabled()) {
824 LOG.debug("Has backup sessions from hbase:backup");
825 }
826 boolean result = false;
827 Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
828 scan.setCaching(1);
829 try (Table table = connection.getTable(tableName);
830 ResultScanner scanner = table.getScanner(scan)) {
831 if (scanner.next() != null) {
832 result = true;
833 }
834 return result;
835 }
836 }
837
838
839
840
841
842
843
844
845
846
847 public List<String> listBackupSets() throws IOException {
848 if (LOG.isDebugEnabled()) {
849 LOG.debug(" Backup set list");
850 }
851 List<String> list = new ArrayList<String>();
852 Table table = null;
853 ResultScanner scanner = null;
854 try {
855 table = connection.getTable(tableName);
856 Scan scan = BackupSystemTableHelper.createScanForBackupSetList();
857 scan.setMaxVersions(1);
858 scanner = table.getScanner(scan);
859 Result res = null;
860 while ((res = scanner.next()) != null) {
861 res.advance();
862 list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current()));
863 }
864 return list;
865 } finally {
866 if(scanner != null) {
867 scanner.close();
868 }
869 if (table != null) {
870 table.close();
871 }
872 }
873 }
874
875
876
877
878
879
880
881 public List<TableName> describeBackupSet(String name) throws IOException {
882 if (LOG.isDebugEnabled()) {
883 LOG.debug(" Backup set describe: "+name);
884 }
885 Table table = null;
886 try {
887 table = connection.getTable(tableName);
888 Get get = BackupSystemTableHelper.createGetForBackupSet(name);
889 Result res = table.get(get);
890 if(res.isEmpty()) return null;
891 res.advance();
892 String[] tables =
893 BackupSystemTableHelper.cellValueToBackupSet(res.current());
894 return toList(tables);
895 } finally {
896 if (table != null) {
897 table.close();
898 }
899 }
900 }
901
902 private List<TableName> toList(String[] tables)
903 {
904 List<TableName> list = new ArrayList<TableName>(tables.length);
905 for(String name: tables) {
906 list.add(TableName.valueOf(name));
907 }
908 return list;
909 }
910
911
912
913
914
915
916
917 public void addToBackupSet(String name, String[] newTables) throws IOException {
918 if (LOG.isDebugEnabled()) {
919 LOG.debug("Backup set add: "+name+" tables ["+ StringUtils.join(newTables, " ")+"]");
920 }
921 Table table = null;
922 String[] union = null;
923 try {
924 table = connection.getTable(tableName);
925 Get get = BackupSystemTableHelper.createGetForBackupSet(name);
926 Result res = table.get(get);
927 if(res.isEmpty()) {
928 union = newTables;
929 } else {
930 res.advance();
931 String[] tables =
932 BackupSystemTableHelper.cellValueToBackupSet(res.current());
933 union = merge(tables, newTables);
934 }
935 Put put = BackupSystemTableHelper.createPutForBackupSet(name, union);
936 table.put(put);
937 } finally {
938 if (table != null) {
939 table.close();
940 }
941 }
942 }
943
944 private String[] merge(String[] tables, String[] newTables) {
945 List<String> list = new ArrayList<String>();
946
947 for(String t: tables){
948 list.add(t);
949 }
950 for(String nt: newTables){
951 if(list.contains(nt)) continue;
952 list.add(nt);
953 }
954 String[] arr = new String[list.size()];
955 list.toArray(arr);
956 return arr;
957 }
958
959
960
961
962
963
964
965 public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
966 if (LOG.isDebugEnabled()) {
967 LOG.debug(" Backup set remove from : " + name+" tables ["+
968 StringUtils.join(toRemove, " ")+"]");
969 }
970 Table table = null;
971 String[] disjoint = null;
972 try {
973 table = connection.getTable(tableName);
974 Get get = BackupSystemTableHelper.createGetForBackupSet(name);
975 Result res = table.get(get);
976 if (res.isEmpty()) {
977 LOG.warn("Backup set '"+ name+"' not found.");
978 return;
979 } else {
980 res.advance();
981 String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
982 disjoint = disjoin(tables, toRemove);
983 }
984 if (disjoint.length > 0) {
985 Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint);
986 table.put(put);
987 } else {
988
989
990 LOG.warn("Backup set '"+ name+"' does not contain tables ["+
991 StringUtils.join(toRemove, " ")+"]");
992 }
993 } finally {
994 if (table != null) {
995 table.close();
996 }
997 }
998 }
999
1000 private String[] disjoin(String[] tables, String[] toRemove) {
1001 List<String> list = new ArrayList<String>();
1002
1003 for (String t : tables) {
1004 list.add(t);
1005 }
1006 for (String nt : toRemove) {
1007 if (list.contains(nt)) {
1008 list.remove(nt);
1009 }
1010 }
1011 String[] arr = new String[list.size()];
1012 list.toArray(arr);
1013 return arr;
1014 }
1015
1016
1017
1018
1019
1020
1021 public void deleteBackupSet(String name) throws IOException {
1022 if (LOG.isDebugEnabled()) {
1023 LOG.debug(" Backup set delete: " + name);
1024 }
1025 Table table = null;
1026 try {
1027 table = connection.getTable(tableName);
1028 Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name);
1029 table.delete(del);
1030 } finally {
1031 if (table != null) {
1032 table.close();
1033 }
1034 }
1035 }
1036
1037
1038
1039
1040
1041 public static HTableDescriptor getSystemTableDescriptor() {
1042 HTableDescriptor tableDesc = new HTableDescriptor(tableName);
1043 HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
1044 colSessionsDesc.setMaxVersions(1);
1045
1046 Configuration config = HBaseConfiguration.create();
1047 int ttl =
1048 config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1049 colSessionsDesc.setTimeToLive(ttl);
1050 tableDesc.addFamily(colSessionsDesc);
1051 HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
1052
1053 tableDesc.addFamily(colMetaDesc);
1054
1055
1056 return tableDesc;
1057 }
1058
1059 public static String getTableNameAsString() {
1060 return tableName.getNameAsString();
1061 }
1062
1063 public static TableName getTableName() {
1064 return tableName;
1065 }
1066 }