View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.backup.impl;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.Set;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  
32  import org.apache.commons.lang.StringUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.CellComparator;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HBaseConfiguration;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.backup.BackupInfo;
46  import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
47  import org.apache.hadoop.hbase.backup.BackupType;
48  import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
49  import org.apache.hadoop.hbase.classification.InterfaceAudience;
50  import org.apache.hadoop.hbase.classification.InterfaceStability;
51  import org.apache.hadoop.hbase.client.Connection;
52  import org.apache.hadoop.hbase.client.Delete;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.Put;
55  import org.apache.hadoop.hbase.client.Result;
56  import org.apache.hadoop.hbase.client.ResultScanner;
57  import org.apache.hadoop.hbase.client.Scan;
58  import org.apache.hadoop.hbase.client.Table;
59  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
60  import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
61  import org.apache.hadoop.hbase.util.Bytes;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.Pair;
64  
65  /**
66   * This class provides 'hbase:backup' table API
67   */
68  @InterfaceAudience.Private
69  @InterfaceStability.Evolving
70  public final class BackupSystemTable implements Closeable {
71    
72    static class WALItem {
73      String backupId;
74      String walFile;
75      String backupRoot;
76      
77      WALItem(String backupId, String walFile, String backupRoot)
78      {
79        this.backupId = backupId;
80        this.walFile = walFile;
81        this.backupRoot = backupRoot;
82      }
83  
84      public String getBackupId() {
85        return backupId;
86      }
87  
88      public String getWalFile() {
89        return walFile;
90      }
91  
92      public String getBackupRoot() {
93        return backupRoot;
94      }
95      
96      public String toString() {
97        return "/"+ backupRoot + "/"+backupId + "/" + walFile;
98      }
99      
100   }
101   
102   private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
103   private final static TableName tableName = TableName.BACKUP_TABLE_NAME;  
104   // Stores backup sessions (contexts)
105   final static byte[] SESSIONS_FAMILY = "session".getBytes();
106   // Stores other meta 
107   final static byte[] META_FAMILY = "meta".getBytes();
108   final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
109   // Connection to HBase cluster, shared
110   // among all instances
111   private final Connection connection;
112     
113   public BackupSystemTable(Connection conn) throws IOException {
114     this.connection = conn;
115   }
116 
117  
118   public void close() {
119      // do nothing 
120   }
121 
122   /**
123    * Updates status (state) of a backup session in hbase:backup table
124    * @param context context
125    * @throws IOException exception
126    */
127   public void updateBackupInfo(BackupInfo context) throws IOException {
128 
129     if (LOG.isDebugEnabled()) {
130       LOG.debug("update backup status in hbase:backup for: " + context.getBackupId()
131         + " set status=" + context.getState());
132     }
133     try (Table table = connection.getTable(tableName)) {
134       Put put = BackupSystemTableHelper.createPutForBackupContext(context);
135       table.put(put);
136     }
137   }
138 
139   /**
140    * Deletes backup status from hbase:backup table
141    * @param backupId backup id
142    * @return true, if operation succeeded, false - otherwise 
143    * @throws IOException exception
144    */
145 
146   public void deleteBackupInfo(String backupId) throws IOException {
147 
148     if (LOG.isDebugEnabled()) {
149       LOG.debug("delete backup status in hbase:backup for " + backupId);
150     }
151     try (Table table = connection.getTable(tableName)) {
152       Delete del = BackupSystemTableHelper.createDeleteForBackupInfo(backupId);
153       table.delete(del);
154     }
155   }
156 
157   /**
158    * Reads backup status object (instance of BackupContext) from hbase:backup table
159    * @param backupId - backupId
160    * @return Current status of backup session or null
161    */
162 
163   public BackupInfo readBackupInfo(String backupId) throws IOException {
164     if (LOG.isDebugEnabled()) {
165       LOG.debug("read backup status from hbase:backup for: " + backupId);
166     }
167 
168     try (Table table = connection.getTable(tableName)) {
169       Get get = BackupSystemTableHelper.createGetForBackupContext(backupId);
170       Result res = table.get(get);
171       if(res.isEmpty()){
172         return null;
173       }
174       return BackupSystemTableHelper.resultToBackupInfo(res);
175     }
176   }
177 
178   /**
179    * Read the last backup start code (timestamp) of last successful backup. Will return null if
180    * there is no start code stored on hbase or the value is of length 0. These two cases indicate
181    * there is no successful backup completed so far.
182    * @param backupRoot root directory path to backup 
183    * @return the timestamp of last successful backup
184    * @throws IOException exception
185    */
186   public String readBackupStartCode(String backupRoot) throws IOException {
187     if (LOG.isDebugEnabled()) {
188       LOG.debug("read backup start code from hbase:backup");
189     }
190     try (Table table = connection.getTable(tableName)) {
191       Get get = BackupSystemTableHelper.createGetForStartCode(backupRoot);
192       Result res = table.get(get);
193       if (res.isEmpty()) {
194         return null;
195       }
196       Cell cell = res.listCells().get(0);
197       byte[] val = CellUtil.cloneValue(cell);
198       if (val.length == 0){
199         return null;
200       }
201       return new String(val);
202     }
203   }
204 
205   /*
206    * @param backupId the backup Id
207    * @return Map of rows to path of bulk loaded hfile
208    */
209   public Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
210     Scan scan = BackupSystemTableHelper.createScanForBulkLoadedFiles(backupId);
211     try (Table table = connection.getTable(tableName);
212         ResultScanner scanner = table.getScanner(scan)) {
213       Result res = null;
214       Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
215       while ((res = scanner.next()) != null) {
216         res.advance();
217         byte[] row = CellUtil.cloneRow(res.listCells().get(0));
218         for (Cell cell : res.listCells()) {
219           if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
220               BackupSystemTableHelper.PATH_COL.length) == 0) {
221             map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
222           }
223         }
224       }
225       return map;
226     }
227   }
228 
229   public static int getIndex(TableName tbl, List<TableName> sTableList) {
230     if (sTableList == null) return 0;
231     for (int i = 0; i < sTableList.size(); i++) {
232       if (tbl.equals(sTableList.get(i))) {
233         return i;
234       }
235     }
236     return -1;
237   }
238   /*
239    * Used during restore
240    * @param backupId the backup Id
241    * @param sTableList List of tables
242    * @return array of Map of family to List of Paths
243    */
244   public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
245       throws IOException {
246     Scan scan = BackupSystemTableHelper.createScanForBulkLoadedFiles(backupId);
247     Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
248     try (Table table = connection.getTable(tableName);
249         ResultScanner scanner = table.getScanner(scan)) {
250       Result res = null;
251       while ((res = scanner.next()) != null) {
252         res.advance();
253         TableName tbl = null;
254         byte[] fam = null;
255         String path = null;
256         for (Cell cell : res.listCells()) {
257           if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.TBL_COL, 0,
258               BackupSystemTableHelper.TBL_COL.length) == 0) {
259             tbl = TableName.valueOf(CellUtil.cloneValue(cell));
260           } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.FAM_COL, 0,
261               BackupSystemTableHelper.FAM_COL.length) == 0) {
262             fam = CellUtil.cloneValue(cell);
263           } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
264               BackupSystemTableHelper.PATH_COL.length) == 0) {
265             path = Bytes.toString(CellUtil.cloneValue(cell));
266           }
267         }
268         int srcIdx = getIndex(tbl, sTableList);
269         if (srcIdx == -1) {
270           // the table is not among the query
271           continue;
272         }
273         if (mapForSrc[srcIdx] == null) {
274           mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
275         }
276         List<Path> files;
277         if (!mapForSrc[srcIdx].containsKey(fam)) {
278           files = new ArrayList<Path>();
279           mapForSrc[srcIdx].put(fam, files);
280         } else {
281           files = mapForSrc[srcIdx].get(fam);
282         }
283         files.add(new Path(path));
284         LOG.debug("found bulk loaded file : " + tbl + " " +  Bytes.toString(fam) + " " + path);
285       };
286       return mapForSrc;
287     }
288   }
289 
290   /*
291    * @param map Map of row keys to path of bulk loaded hfile
292    */
293   public void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
294     try (Table table = connection.getTable(tableName)) {
295       List<Delete> dels = new ArrayList<>();
296       for (byte[] row : map.keySet()) {
297         dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
298       }
299       table.delete(dels);
300     }
301   }
302 
303   /**
304    * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte.
305    * @param startCode start code
306    * @param backupRoot root directory path to backup 
307    * @throws IOException exception
308    */
309   public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
310     if (LOG.isDebugEnabled()) {
311       LOG.debug("write backup start code to hbase:backup " + startCode);
312     }
313     try (Table table = connection.getTable(tableName)) {
314       Put put = BackupSystemTableHelper.createPutForStartCode(startCode.toString(), backupRoot);
315       table.put(put);
316     }
317   }
318 
319   /*
320    * For postBulkLoadHFile() hook.
321    */
322   public void writeOrigBulkLoad(TableName tabName, byte[] region,
323       Map<byte[], List<Path>> finalPaths) throws IOException {
324     if (LOG.isDebugEnabled()) {
325       LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
326           finalPaths.size() + " entries");
327     }
328     try (Table table = connection.getTable(tableName)) {
329       List<Put> puts = BackupSystemTableHelper.createPutForOrigBulkload(tabName, region,
330           finalPaths);
331       table.put(puts);
332       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
333     }
334   }
335 
336   /*
337    * For preCommitStoreFile() hook
338    */
339   public void writeOrigBulkLoad(TableName tabName, byte[] region,
340       final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
341     if (LOG.isDebugEnabled()) {
342       LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
343           pairs.size() + " entries");
344     }
345     try (Table table = connection.getTable(tableName)) {
346       List<Put> puts = BackupSystemTableHelper.createPutForOrigBulkload(tabName, region,
347           family, pairs);
348       table.put(puts);
349       LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
350     }
351   }
352 
353   public void removeOrigBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
354     try (Table table = connection.getTable(tableName)) {
355       List<Delete> lstDels = new ArrayList<>();
356       for (byte[] row : rows) {
357         Delete del = new Delete(row);
358         lstDels.add(del);
359         LOG.debug("orig deleting the row: " + Bytes.toString(row));
360       }
361       table.delete(lstDels);
362       LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
363     }
364   }
365 
366   /*
367    * The keys of the Map are table, region and column family
368    */
369   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
370   readOrigBulkloadRows(List<TableName> tableList) throws IOException {
371     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
372     List<byte[]> rows = new ArrayList<>();
373     for (TableName tTable : tableList) {
374       Scan scan = BackupSystemTableHelper.createScanForOrigBulkLoadedFiles(tTable);
375       Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
376       try (Table table = connection.getTable(tableName);
377           ResultScanner scanner = table.getScanner(scan)) {
378         Result res = null;
379         while ((res = scanner.next()) != null) {
380           res.advance();
381           String fam = null;
382           String path = null;
383           boolean raw = false;
384           byte[] row = null;
385           String region = null;
386           for (Cell cell : res.listCells()) {
387             row = CellUtil.cloneRow(cell);
388             rows.add(row);
389             String rowStr = Bytes.toString(row);
390             region = BackupSystemTableHelper.getRegionNameFromOrigBulkLoadRow(rowStr);
391             if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.FAM_COL, 0,
392                 BackupSystemTableHelper.FAM_COL.length) == 0) {
393               fam = Bytes.toString(CellUtil.cloneValue(cell));
394             } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.PATH_COL, 0,
395                 BackupSystemTableHelper.PATH_COL.length) == 0) {
396               path = Bytes.toString(CellUtil.cloneValue(cell));
397             } else if (CellComparator.compareQualifiers(cell, BackupSystemTableHelper.STATE_COL, 0,
398                 BackupSystemTableHelper.STATE_COL.length) == 0) {
399               byte[] state = CellUtil.cloneValue(cell);
400               if (BackupSystemTableHelper.BL_RAW.equals(state)) {
401                 raw = true;
402               } else raw = false;
403             }
404           }
405           if (map.get(tTable) == null) {
406             map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
407             tblMap = map.get(tTable);
408           }
409           if (tblMap.get(region) == null) {
410             tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
411           }
412           Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
413           if (famMap.get(fam) == null) {
414             famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
415           }
416           famMap.get(fam).add(new Pair<>(path, raw));
417           LOG.debug("found orig " + raw + " " + path + " for " + fam + " of region " + region);
418         }
419       }
420     }
421     return new Pair<>(map, rows);
422   }
423 
424   /*
425    * @param sTableList List of tables
426    * @param maps array of Map of family to List of Paths
427    * @param backupId the backup Id
428    */
429   public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
430       String backupId) throws IOException {
431     try (Table table = connection.getTable(tableName)) {
432       long ts = EnvironmentEdgeManager.currentTime();
433       int cnt = 0;
434       List<Put> puts = new ArrayList<>();
435       for (int idx = 0; idx < maps.length; idx++) {
436         Map<byte[], List<Path>> map = maps[idx];
437         TableName tn = sTableList.get(idx);
438         if (map == null) continue;
439         for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
440           byte[] fam = entry.getKey();
441           List<Path> paths = entry.getValue();
442           for (Path p : paths) {
443             Put put = BackupSystemTableHelper.createPutForBulkLoadedFile(tn, fam, p.toString(),
444                 backupId, ts, cnt++);
445             puts.add(put);
446           }
447         }
448       }
449       if (!puts.isEmpty()) {
450         table.put(puts);
451       }
452     }
453   }
454 
455   /**
456    * Get the Region Servers log information after the last log roll from hbase:backup.
457    * @param backupRoot root directory path to backup 
458    * @return RS log info
459    * @throws IOException exception
460    */
461   public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
462       throws IOException {
463     if (LOG.isDebugEnabled()) {
464       LOG.debug("read region server last roll log result to hbase:backup");
465     }
466 
467     Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult(backupRoot);
468 
469     try (Table table = connection.getTable(tableName);
470         ResultScanner scanner = table.getScanner(scan)) {
471       Result res = null;
472       HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
473       while ((res = scanner.next()) != null) {
474         res.advance();
475         Cell cell = res.current();
476         byte[] row = CellUtil.cloneRow(cell);
477         String server =
478             BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row);
479         byte[] data = CellUtil.cloneValue(cell);
480         rsTimestampMap.put(server, Long.parseLong(new String(data)));
481       }
482       return rsTimestampMap;
483     }
484   }
485 
486   /**
487    * Writes Region Server last roll log result (timestamp) to hbase:backup table
488    * @param server - Region Server name
489    * @param timestamp - last log timestamp
490    * @param backupRoot root directory path to backup 
491    * @throws IOException exception
492    */
493   public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
494       throws IOException {
495     if (LOG.isDebugEnabled()) {
496       LOG.debug("write region server last roll log result to hbase:backup");
497     }
498     try (Table table = connection.getTable(tableName)) {
499       Put put =
500           BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server,ts,backupRoot);
501       table.put(put);
502     }
503   }
504 
505   /**
506    * Get all completed backup information (in desc order by time)
507    * @param onlyCompeleted, true, if only successfully completed sessions
508    * @return history info of BackupCompleteData
509    * @throws IOException exception
510    */
511   public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
512     if (LOG.isDebugEnabled()) {
513       LOG.debug("get backup history from hbase:backup");
514     }
515     ArrayList<BackupInfo> list ;
516     BackupState state = onlyCompleted? BackupState.COMPLETE: BackupState.ANY;
517     list = getBackupContexts(state);
518     return BackupClientUtil.sortHistoryListDesc(list);    
519   }
520 
521   public ArrayList<BackupInfo> getBackupHistory() throws IOException {
522     return getBackupHistory(false);
523   }
524   
525   /**
526    * Get all backup session with a given status (in desc order by time)
527    * @param status status
528    * @return history info of backup contexts
529    * @throws IOException exception
530    */
531   public ArrayList<BackupInfo> getBackupContexts(BackupState status) throws IOException {
532     if (LOG.isDebugEnabled()) {
533       LOG.debug("get backup contexts from hbase:backup");
534     }
535 
536     Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
537     ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
538 
539     try (Table table = connection.getTable(tableName);
540         ResultScanner scanner = table.getScanner(scan)) {
541       Result res = null;
542       while ((res = scanner.next()) != null) {
543         res.advance();
544         BackupInfo context = BackupSystemTableHelper.cellToBackupInfo(res.current());
545         if (status != BackupState.ANY && context.getState() != status){
546           continue;
547         }
548         list.add(context);
549       }
550       return list;
551     }
552   }
553 
554   /**
555    * Write the current timestamps for each regionserver to hbase:backup 
556    * after a successful full or incremental backup. The saved timestamp is of the last
557    *  log file that was backed up already.
558    * @param tables tables
559    * @param newTimestamps timestamps
560    * @param backupRoot root directory path to backup 
561    * @throws IOException exception
562    */
563   public void writeRegionServerLogTimestamp(Set<TableName> tables,
564       HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
565     if (LOG.isDebugEnabled()) {
566       LOG.debug("write RS log time stamps to hbase:backup for tables ["+ 
567           StringUtils.join(tables, ",")+"]");
568     }
569     List<Put> puts = new ArrayList<Put>();
570     for (TableName table : tables) {
571       byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
572       Put put = 
573           BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, 
574             smapData, backupRoot);
575       puts.add(put);
576     }
577     try (Table table = connection.getTable(tableName)) {
578       table.put(puts);
579     }
580   }
581 
582   /*
583    * Retrieve TableName's for completed backup of given type
584    * @param type backup type
585    * @return List of table names
586    */
587   public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
588     List<TableName> names = new ArrayList<>();
589     List<BackupInfo> infos = getBackupHistory(true);
590     for (BackupInfo info : infos) {
591       if (info.getType() != type) continue;
592       names.addAll(info.getTableNames());
593     }
594     return names;
595   }
596 
597   /**
598    * Read the timestamp for each region server log after the last successful backup. Each table has
599    * its own set of the timestamps. The info is stored for each table as a concatenated string of
600    * rs->timestapmp
601    * @param backupRoot root directory path to backup 
602    * @return the timestamp for each region server. key: tableName value:
603    *         RegionServer,PreviousTimeStamp
604    * @throws IOException exception
605    */
606   public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
607       throws IOException {
608     if (LOG.isDebugEnabled()) {
609       LOG.debug("read RS log ts from hbase:backup for root="+ backupRoot);
610     }
611 
612     HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
613         new HashMap<TableName, HashMap<String, Long>>();
614 
615     Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap(backupRoot);
616     try (Table table = connection.getTable(tableName);
617         ResultScanner scanner = table.getScanner(scan)) {
618       Result res = null;
619       while ((res = scanner.next()) != null) {
620         res.advance();
621         Cell cell = res.current();
622         byte[] row = CellUtil.cloneRow(cell);
623         String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row);
624         TableName tn = TableName.valueOf(tabName);
625         byte[] data = CellUtil.cloneValue(cell);
626         if (data == null) {
627           throw new IOException("Data of last backup data from hbase:backup "
628               + "is empty. Create a backup first.");
629         }
630         if (data != null && data.length > 0) {
631           HashMap<String, Long> lastBackup =
632               fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
633           tableTimestampMap.put(tn, lastBackup);
634         }
635       }
636       return tableTimestampMap;
637     }
638   }
639 
640   private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
641       Map<String, Long> map) {
642     BackupProtos.TableServerTimestamp.Builder tstBuilder =
643         BackupProtos.TableServerTimestamp.newBuilder();
644     tstBuilder.setTable(ProtobufUtil.toProtoTableName(table));
645 
646     for(Entry<String, Long> entry: map.entrySet()) {
647       BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
648       builder.setServer(entry.getKey());
649       builder.setTimestamp(entry.getValue());
650       tstBuilder.addServerTimestamp(builder.build());
651     }
652 
653     return tstBuilder.build();
654   }
655 
656   private HashMap<String, Long> fromTableServerTimestampProto(
657       BackupProtos.TableServerTimestamp proto) {
658     HashMap<String, Long> map = new HashMap<String, Long> ();
659     List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
660     for(BackupProtos.ServerTimestamp st: list) {
661       map.put(st.getServer(), st.getTimestamp());
662     }
663     return map;
664   }
665 
666   /**
667    * Return the current tables covered by incremental backup.
668    * @param backupRoot root directory path to backup 
669    * @return set of tableNames
670    * @throws IOException exception
671    */
672   public  Set<TableName> getIncrementalBackupTableSet(String backupRoot)
673       throws IOException {
674     if (LOG.isDebugEnabled()) {
675       LOG.debug("get incr backup table set from hbase:backup");
676     }
677     TreeSet<TableName> set = new TreeSet<>();
678 
679     try (Table table = connection.getTable(tableName)) {
680       Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet(backupRoot);
681       Result res = table.get(get);
682       if (res.isEmpty()) {
683         return set;
684       }
685       List<Cell> cells = res.listCells();
686       for (Cell cell : cells) {
687         // qualifier = table name - we use table names as qualifiers
688         set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
689       }
690       return set;
691     }
692   }
693 
694   /**
695    * Add tables to global incremental backup set
696    * @param tables - set of tables
697    * @param backupRoot root directory path to backup 
698    * @throws IOException exception
699    */
700   public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot) throws IOException {
701     if (LOG.isDebugEnabled()) {
702       LOG.debug("Add incremental backup table set to hbase:backup. ROOT="+backupRoot +
703         " tables ["+ StringUtils.join(tables, " ")+"]");
704       for (TableName table : tables) {
705         LOG.debug(table);
706       }
707     }
708     try (Table table = connection.getTable(tableName)) {
709       Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables, backupRoot);
710       table.put(put);
711     }
712   }
713 
714   /**
715    * Register WAL files as eligible for deletion
716    * @param files files
717    * @param backupId backup id
718    * @param backupRoot root directory path to backup 
719    * @throws IOException exception
720    */
721   public void addWALFiles(List<String> files, String backupId, 
722       String backupRoot) throws IOException {
723     if (LOG.isDebugEnabled()) {
724       LOG.debug("add WAL files to hbase:backup: "+backupId +" "+backupRoot+" files ["+
725      StringUtils.join(files, ",")+"]");
726       for(String f: files){
727         LOG.debug("add :"+f);
728       }
729     }
730     try (Table table = connection.getTable(tableName)) {
731       List<Put> puts = 
732           BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId, backupRoot);
733       table.put(puts);
734     }
735   }
736 
737   /**
738    * Register WAL files as eligible for deletion
739    * @param backupRoot root directory path to backup 
740    * @throws IOException exception
741    */
742   public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
743     if (LOG.isDebugEnabled()) {
744       LOG.debug("get WAL files from hbase:backup");
745     }
746     final Table table = connection.getTable(tableName);
747     Scan scan = BackupSystemTableHelper.createScanForGetWALs(backupRoot);
748     final ResultScanner scanner = table.getScanner(scan);
749     final Iterator<Result> it = scanner.iterator();
750     return new Iterator<WALItem>() {
751 
752       @Override
753       public boolean hasNext() {
754         boolean next = it.hasNext();
755         if (!next) {
756           // close all
757           try {
758             scanner.close();
759             table.close();
760           } catch (IOException e) {
761             LOG.error("Close WAL Iterator", e);
762           }
763         }
764         return next;
765       }
766 
767       @Override
768       public WALItem next() {
769         Result next = it.next();
770         List<Cell> cells = next.listCells();
771         byte[] buf = cells.get(0).getValueArray();
772         int len = cells.get(0).getValueLength();
773         int offset = cells.get(0).getValueOffset();
774         String backupId = new String(buf, offset, len);
775         buf = cells.get(1).getValueArray();
776         len = cells.get(1).getValueLength();
777         offset = cells.get(1).getValueOffset();
778         String walFile = new String(buf, offset, len);
779         buf = cells.get(2).getValueArray();
780         len = cells.get(2).getValueLength();
781         offset = cells.get(2).getValueOffset();
782         String backupRoot = new String(buf, offset, len);    
783         return new WALItem(backupId, walFile, backupRoot);
784       }
785 
786       @Override
787       public void remove() {
788         // not implemented
789         throw new RuntimeException("remove is not supported");
790       }
791     };
792 
793   }
794 
795   /**
796    * Check if WAL file is eligible for deletion
797    * Future: to support all backup destinations
798    * @param file file
799    * @return true, if - yes.
800    * @throws IOException exception
801    */
802   public boolean isWALFileDeletable(String file) throws IOException {
803     if (LOG.isDebugEnabled()) {
804       LOG.debug("Check if WAL file has been already backed up in hbase:backup "+ file);
805     }
806     try (Table table = connection.getTable(tableName)) {
807       Get get = BackupSystemTableHelper.createGetForCheckWALFile(file);
808       Result res = table.get(get);
809       if (res.isEmpty()){
810         return false;
811       }
812       return true;
813     }
814   }
815 
816   /**
817    * Checks if we have at least one backup session in hbase:backup This API is used by
818    * BackupLogCleaner
819    * @return true, if - at least one session exists in hbase:backup table
820    * @throws IOException exception
821    */
822   public boolean hasBackupSessions() throws IOException {
823     if (LOG.isDebugEnabled()) {
824       LOG.debug("Has backup sessions from hbase:backup");
825     }
826     boolean result = false;
827     Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
828     scan.setCaching(1);
829     try (Table table = connection.getTable(tableName);
830         ResultScanner scanner = table.getScanner(scan)) {
831       if (scanner.next() != null) {
832         result = true;
833       }
834       return result;
835     }
836   }
837   
838   /**
839    * BACKUP SETS
840    */
841   
842   /**
843    * Get backup set list
844    * @return backup set list
845    * @throws IOException
846    */
847   public List<String> listBackupSets() throws IOException {
848     if (LOG.isDebugEnabled()) {
849       LOG.debug(" Backup set list");
850     }
851     List<String> list = new ArrayList<String>();
852     Table table = null;
853     ResultScanner scanner = null;
854     try {
855       table = connection.getTable(tableName);
856       Scan scan = BackupSystemTableHelper.createScanForBackupSetList();
857       scan.setMaxVersions(1);
858       scanner = table.getScanner(scan);
859       Result res = null;
860      while ((res = scanner.next()) != null) {
861        res.advance();
862        list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current()));
863      }
864      return list;
865    } finally {
866      if(scanner != null) {
867        scanner.close();
868      }
869      if (table != null) {
870        table.close();
871      }
872    }
873  }
874  
875  /**
876   * Get backup set description (list of tables)
877   * @param name - set's name
878   * @return list of tables in a backup set 
879   * @throws IOException
880   */
881  public List<TableName> describeBackupSet(String name) throws IOException {
882    if (LOG.isDebugEnabled()) {
883      LOG.debug(" Backup set describe: "+name);
884    }
885    Table table = null;
886    try {
887      table = connection.getTable(tableName);
888      Get get = BackupSystemTableHelper.createGetForBackupSet(name);
889      Result res = table.get(get);
890      if(res.isEmpty()) return null;
891      res.advance();
892      String[] tables = 
893          BackupSystemTableHelper.cellValueToBackupSet(res.current());
894      return toList(tables);
895    } finally {
896      if (table != null) {
897        table.close();
898      }
899    }
900  }
901  
902  private List<TableName> toList(String[] tables)
903  {
904    List<TableName> list = new ArrayList<TableName>(tables.length);
905    for(String name: tables) {
906      list.add(TableName.valueOf(name));
907    }
908    return list;
909  }
910  
911  /**
912   * Add backup set (list of tables)
913   * @param name - set name
914   * @param tables - list of tables, comma-separated
915   * @throws IOException
916   */
917  public void addToBackupSet(String name, String[] newTables) throws IOException {
918    if (LOG.isDebugEnabled()) {
919      LOG.debug("Backup set add: "+name+" tables ["+ StringUtils.join(newTables, " ")+"]");
920    }
921    Table table = null;
922    String[] union = null;
923    try {
924      table = connection.getTable(tableName);
925      Get get = BackupSystemTableHelper.createGetForBackupSet(name);
926      Result res = table.get(get);
927      if(res.isEmpty()) {
928        union = newTables;
929      } else {
930        res.advance();
931        String[] tables = 
932          BackupSystemTableHelper.cellValueToBackupSet(res.current());
933        union = merge(tables, newTables);  
934      }
935      Put put = BackupSystemTableHelper.createPutForBackupSet(name, union);
936      table.put(put);
937    } finally {
938      if (table != null) {
939        table.close();
940      }
941    }
942  }
943  
944  private String[] merge(String[] tables, String[] newTables) {
945    List<String> list = new ArrayList<String>();
946    // Add all from tables
947    for(String t: tables){
948      list.add(t);
949    }
950    for(String nt: newTables){
951      if(list.contains(nt)) continue;
952      list.add(nt);
953    }
954    String[] arr = new String[list.size()];
955    list.toArray(arr);
956    return arr;
957  }
958 
959  /**
960   * Remove tables from backup set (list of tables)
961   * @param name - set name
962   * @param tables - list of tables, comma-separated
963   * @throws IOException
964   */
965   public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
966     if (LOG.isDebugEnabled()) {
967       LOG.debug(" Backup set remove from : " + name+" tables ["+
968      StringUtils.join(toRemove, " ")+"]");
969     }
970     Table table = null;
971     String[] disjoint = null;
972     try {
973       table = connection.getTable(tableName);
974       Get get = BackupSystemTableHelper.createGetForBackupSet(name);
975       Result res = table.get(get);
976       if (res.isEmpty()) {
977         LOG.warn("Backup set '"+ name+"' not found.");
978         return;
979       } else {
980         res.advance();
981         String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
982         disjoint = disjoin(tables, toRemove);
983       }
984       if (disjoint.length > 0) {
985         Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint);
986         table.put(put);
987       } else {
988         // Delete
989         //describeBackupSet(name);
990         LOG.warn("Backup set '"+ name+"' does not contain tables ["+
991         StringUtils.join(toRemove, " ")+"]");
992       }
993     } finally {
994       if (table != null) {
995         table.close();
996       }
997     }
998   }
999 
1000   private String[] disjoin(String[] tables, String[] toRemove) {
1001     List<String> list = new ArrayList<String>();
1002     // Add all from tables
1003     for (String t : tables) {
1004       list.add(t);
1005     }
1006     for (String nt : toRemove) {
1007       if (list.contains(nt)) {
1008         list.remove(nt);
1009       }
1010     }
1011     String[] arr = new String[list.size()];
1012     list.toArray(arr);
1013     return arr;
1014   }
1015 
1016  /**
1017   * Delete backup set 
1018   * @param name set's name
1019   * @throws IOException
1020   */
1021   public void deleteBackupSet(String name) throws IOException {
1022     if (LOG.isDebugEnabled()) {
1023       LOG.debug(" Backup set delete: " + name);
1024     }
1025     Table table = null;
1026     try {
1027       table = connection.getTable(tableName);
1028       Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name);
1029       table.delete(del);
1030     } finally {
1031       if (table != null) {
1032         table.close();
1033       }
1034     }
1035   }
1036 
1037   /**
1038    * Get backup system table descriptor
1039    * @return descriptor
1040    */
1041   public static HTableDescriptor getSystemTableDescriptor() {
1042     HTableDescriptor tableDesc = new HTableDescriptor(tableName);
1043     HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
1044     colSessionsDesc.setMaxVersions(1);
1045     // Time to keep backup sessions (secs)
1046     Configuration config = HBaseConfiguration.create();
1047     int ttl =
1048         config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
1049     colSessionsDesc.setTimeToLive(ttl);
1050     tableDesc.addFamily(colSessionsDesc);
1051     HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
1052     //colDesc.setMaxVersions(1);
1053     tableDesc.addFamily(colMetaDesc);
1054     /*HColumnDescriptor colBulkLoadDesc = new HColumnDescriptor(BULK_LOAD_FAMILY);
1055     tableDesc.addFamily(colBulkLoadDesc); */
1056     return tableDesc;
1057   }
1058 
1059   public static String getTableNameAsString() {
1060     return tableName.getNameAsString();
1061   }
1062   
1063   public static TableName getTableName() {
1064     return tableName;
1065   }
1066 }