View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.backup.impl;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Set;
26  
27  import org.apache.commons.lang.StringUtils;
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.backup.BackupInfo;
35  import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.hbase.classification.InterfaceStability;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.Get;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45  import org.apache.hadoop.hbase.util.Pair;
46  
47  
48  /**
49   * A collection for methods used by BackupSystemTable.
50   */
51  
52  @InterfaceAudience.Private
53  @InterfaceStability.Evolving
54  public final class BackupSystemTableHelper {
55    private static final Log LOG = LogFactory.getLog(BackupSystemTableHelper.class);
56  
57    /**
58     * hbase:backup schema:
59     * 1. Backup sessions rowkey= "session:" + backupId; value = serialized
60     * BackupContext
61     * 2. Backup start code rowkey = "startcode:" + backupRoot; value = startcode
62     * 3. Incremental backup set rowkey="incrbackupset:" + backupRoot; value=[list of tables]
63     * 4. Table-RS-timestamp map rowkey="trslm:"+ backupRoot+table_name; value = map[RS-> 
64     * last WAL timestamp]
65     * 5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp
66     * 6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file name
67     */
68  
69    private final static String BACKUP_INFO_PREFIX = "session:";
70    private final static String START_CODE_ROW = "startcode:";
71    private final static String INCR_BACKUP_SET = "incrbackupset:";
72    private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
73    private final static String RS_LOG_TS_PREFIX = "rslogts:";
74    //private final static String BULK_LOADING_PREFIX = "loading:";
75    private final static String BULK_LOAD_PREFIX = "bulk:";
76    private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
77    // separator between BULK_LOAD_PREFIX and ordinals
78    final static byte[] TBL_COL = "tbl".getBytes();
79    final static byte[] FAM_COL = "fam".getBytes();
80    final static byte[] PATH_COL = "path".getBytes();
81    final static byte[] STATE_COL = "state".getBytes();
82    // the two states a bulk loaded file can be
83    final static byte[] BL_RAW = "R".getBytes();
84    final static byte[] BL_DONE = "D".getBytes();
85  
86    private final static String WALS_PREFIX = "wals:";
87    private final static String SET_KEY_PREFIX = "backupset:";
88  
89    protected final static String BLK_LD_DELIM = ":";
90  
91    private final static byte[] EMPTY_VALUE = new byte[] {};
92    
93    // Safe delimiter in a string
94    private final static String NULL = "\u0000";
95  
96    private BackupSystemTableHelper() {
97      throw new AssertionError("Instantiating utility class...");
98    }
99  
100   /**
101    * Creates Put operation for a given backup context object
102    * @param context backup context
103    * @return put operation
104    * @throws IOException exception
105    */
106   static Put createPutForBackupContext(BackupInfo context) throws IOException {
107     Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
108     put.addColumn(BackupSystemTable.SESSIONS_FAMILY, "context".getBytes(), context.toByteArray());
109     return put;
110   }
111 
112   /**
113    * Creates Get operation for a given backup id
114    * @param backupId - backup's ID
115    * @return get operation
116    * @throws IOException exception
117    */
118   static Get createGetForBackupContext(String backupId) throws IOException {
119     Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
120     get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
121     get.setMaxVersions(1);
122     return get;
123   }
124 
125   /**
126    * Creates Delete operation for a given backup id
127    * @param backupId - backup's ID
128    * @return delete operation
129    * @throws IOException exception
130    */
131   public static Delete createDeleteForBackupInfo(String backupId) {
132     Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
133     del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
134     return del;
135   }
136 
137   /**
138    * Converts Result to BackupContext
139    * @param res - HBase result
140    * @return backup context instance
141    * @throws IOException exception
142    */
143   static BackupInfo resultToBackupInfo(Result res) throws IOException {
144     res.advance();
145     Cell cell = res.current();
146     return cellToBackupInfo(cell);
147   }
148 
149   /**
150    * Creates Get operation to retrieve start code from hbase:backup
151    * @return get operation
152    * @throws IOException exception
153    */
154   static Get createGetForStartCode(String rootPath) throws IOException {    
155     Get get = new Get(rowkey(START_CODE_ROW, rootPath));
156     get.addFamily(BackupSystemTable.META_FAMILY);
157     get.setMaxVersions(1);
158     return get;
159   }
160 
161   /**
162    * Creates Put operation to store start code to hbase:backup
163    * @return put operation
164    * @throws IOException exception
165    */
166   static Put createPutForStartCode(String startCode, String rootPath) {
167     Put put = new Put(rowkey(START_CODE_ROW, rootPath));
168     put.addColumn(BackupSystemTable.META_FAMILY, "startcode".getBytes(), startCode.getBytes());
169     return put;
170   }
171 
172   /**
173    * Creates Get to retrieve incremental backup table set from hbase:backup
174    * @return get operation
175    * @throws IOException exception
176    */
177   static Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
178     Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
179     get.addFamily(BackupSystemTable.META_FAMILY);
180     get.setMaxVersions(1);
181     return get;
182   }
183 
184   /**
185    * Creates Put to store incremental backup table set
186    * @param tables tables
187    * @return put operation
188    */
189   static Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
190     Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
191     for (TableName table : tables) {
192       put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
193         EMPTY_VALUE);
194     }
195     return put;
196   }
197 
198   /**
199    * Creates Scan operation to load backup history
200    * @return scan operation
201    */
202   static Scan createScanForBackupHistory() {
203     Scan scan = new Scan();
204     byte[] startRow = BACKUP_INFO_PREFIX.getBytes();
205     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
206     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
207     scan.setStartRow(startRow);
208     scan.setStopRow(stopRow);
209     scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
210     scan.setMaxVersions(1);
211     return scan;
212   }
213 
214   /**
215    * Converts cell to backup context instance.
216    * @param current - cell
217    * @return backup context instance
218    * @throws IOException exception
219    */
220   static BackupInfo cellToBackupInfo(Cell current) throws IOException {
221     byte[] data = CellUtil.cloneValue(current);
222     return BackupInfo.fromByteArray(data);
223   }
224 
225   /**
226    * Creates Put to write RS last roll log timestamp map
227    * @param table - table
228    * @param smap - map, containing RS:ts
229    * @return put operation
230    */
231   static Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap, 
232       String backupRoot) {    
233     Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
234     put.addColumn(BackupSystemTable.META_FAMILY, "log-roll-map".getBytes(), smap);
235     return put;
236   }
237 
238   /**
239    * Creates Scan to load table-> { RS -> ts} map of maps
240    * @return scan operation
241    */
242   static Scan createScanForReadLogTimestampMap(String backupRoot) {
243     Scan scan = new Scan();
244     byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
245     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
246     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
247     scan.setStartRow(startRow);
248     scan.setStopRow(stopRow);
249     scan.addFamily(BackupSystemTable.META_FAMILY);
250 
251     return scan;
252   }
253 
254   /**
255    * Get table name from rowkey
256    * @param cloneRow rowkey
257    * @return table name
258    */
259   static String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
260     String s = new String(cloneRow);
261     int index = s.lastIndexOf(NULL); 
262     return s.substring(index +1);
263   }
264 
265   /**
266    * Creates Put to store RS last log result
267    * @param server - server name
268    * @param timestamp - log roll result (timestamp)
269    * @return put operation
270    */
271   static Put createPutForRegionServerLastLogRollResult(String server, 
272       Long timestamp, String backupRoot ) {
273     Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
274     put.addColumn(BackupSystemTable.META_FAMILY, "rs-log-ts".getBytes(), 
275       timestamp.toString().getBytes());
276     return put;
277   }
278 
279   /*
280    * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
281    */
282   static List<Put> createPutForOrigBulkload(TableName table, byte[] region,
283       Map<byte[], List<Path>> finalPaths) {
284     List<Put> puts = new ArrayList<>();
285     for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
286       for (Path path : entry.getValue()) {
287         String file = path.toString();
288         int lastSlash = file.lastIndexOf("/");
289         String filename = file.substring(lastSlash+1);
290         Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
291             Bytes.toString(region), BLK_LD_DELIM, filename));
292         put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
293         put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
294         put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
295             file.getBytes());
296         put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_DONE);
297         puts.add(put);
298         LOG.debug("writing done bulk path " + file + " for " + table + " " +
299             Bytes.toString(region));
300       }
301     }
302     return puts;
303   }
304 
305   /*
306    * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
307    */
308   static List<Put> createPutForOrigBulkload(TableName table, byte[] region,
309       final byte[] family, final List<Pair<Path, Path>> pairs) {
310     List<Put> puts = new ArrayList<>();
311     for (Pair<Path, Path> pair : pairs) {
312       Path path = pair.getSecond();
313       String file = path.toString();
314       int lastSlash = file.lastIndexOf("/");
315       String filename = file.substring(lastSlash+1);
316       Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
317           Bytes.toString(region), BLK_LD_DELIM, filename));
318       put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
319       put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
320       put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
321           file.getBytes());
322       put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_RAW);
323       puts.add(put);
324       LOG.debug("writing raw bulk path " + file + " for " + table + " " +
325           Bytes.toString(region));
326     }
327     return puts;
328   }
329   public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
330     List<Delete> lstDels = new ArrayList<>();
331     for (TableName table : lst) {
332       Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
333       del.addFamily(BackupSystemTable.META_FAMILY);
334       lstDels.add(del);
335     }
336     return lstDels;
337   }
338 
339   static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
340     Scan scan = new Scan();
341     byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
342     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
343     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
344     scan.setStartRow(startRow);
345     scan.setStopRow(stopRow);
346     scan.addFamily(BackupSystemTable.META_FAMILY);
347     scan.setMaxVersions(1);
348     return scan;
349   }
350 
351   static String getTableNameFromOrigBulkLoadRow(String rowStr) {
352     LOG.debug("bulk row string " + rowStr);
353     // format is bulk : namespace : table : region : file
354     String[] parts = rowStr.split(BLK_LD_DELIM);
355     return parts[2];
356   }
357 
358   static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
359     // format is bulk : namespace : table : region : file
360     String[] parts = rowStr.split(BLK_LD_DELIM);
361     int idx = 3;
362     if (parts.length == 4) {
363       // the table is in default namespace
364       idx = 2;
365     }
366     LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
367     return parts[idx];
368   }
369   /*
370    * Used to query bulk loaded hfiles which have been copied by incremental backup
371    * @param backupId the backup Id. It can be null when querying for all tables
372    * @return the Scan object
373    */
374   static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
375     Scan scan = new Scan();
376     byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
377       rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
378     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
379     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
380     scan.setStartRow(startRow);
381     scan.setStopRow(stopRow);
382     //scan.setTimeRange(lower, Long.MAX_VALUE);
383     scan.addFamily(BackupSystemTable.META_FAMILY);
384     scan.setMaxVersions(1);
385     return scan;
386   }
387 
388   static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
389       long ts, int idx) {
390     Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
391     put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
392     put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
393     put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
394     return put;
395   }
396 
397   /**
398    * Creates Scan operation to load last RS log roll results
399    * @return scan operation
400    */
401   static Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
402     Scan scan = new Scan();
403     byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
404     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
405     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
406     scan.setStartRow(startRow);
407     scan.setStopRow(stopRow);
408     scan.addFamily(BackupSystemTable.META_FAMILY);
409     scan.setMaxVersions(1);
410 
411     return scan;
412   }
413 
414   /**
415    * Get server's name from rowkey
416    * @param row - rowkey
417    * @return server's name
418    */
419   static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
420     String s = new String(row);
421     int index = s.lastIndexOf(NULL);
422     return s.substring(index +1);
423   }
424 
425   /**
426    * Creates put list for list of WAL files
427    * @param files list of WAL file paths
428    * @param backupId backup id
429    * @return put list
430    * @throws IOException exception
431    */
432   public static List<Put> createPutsForAddWALFiles(List<String> files, 
433     String backupId, String backupRoot)
434       throws IOException {
435 
436     List<Put> puts = new ArrayList<Put>();
437     for (String file : files) {
438       Put put = new Put(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file)));
439       put.addColumn(BackupSystemTable.META_FAMILY, "backupId".getBytes(), backupId.getBytes());
440       put.addColumn(BackupSystemTable.META_FAMILY, "file".getBytes(), file.getBytes());
441       put.addColumn(BackupSystemTable.META_FAMILY, "root".getBytes(), backupRoot.getBytes());
442       puts.add(put);
443     }
444     return puts;
445   }
446 
447   /**
448    * Creates Scan operation to load WALs
449    * TODO: support for backupRoot
450    * @param backupRoot - path to backup destination 
451    * @return scan operation
452    */
453   public static Scan createScanForGetWALs(String backupRoot) {
454     Scan scan = new Scan();
455     byte[] startRow = WALS_PREFIX.getBytes();
456     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
457     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
458     scan.setStartRow(startRow);
459     scan.setStopRow(stopRow);
460     scan.addFamily(BackupSystemTable.META_FAMILY);
461     return scan;
462   }
463   /**
464    * Creates Get operation for a given wal file name
465    * TODO: support for backup destination
466    * @param file file
467    * @return get operation
468    * @throws IOException exception
469    */
470   public static Get createGetForCheckWALFile(String file) throws IOException {
471     Get get = new Get(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file)));
472     // add backup root column
473     get.addFamily(BackupSystemTable.META_FAMILY);
474     return get;
475   }
476 
477   
478  /**
479   * Creates Scan operation to load backup set list
480   * @return scan operation
481   */
482  static Scan createScanForBackupSetList() {
483    Scan scan = new Scan();
484    byte[] startRow = SET_KEY_PREFIX.getBytes();
485    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
486    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
487    scan.setStartRow(startRow);
488    scan.setStopRow(stopRow);
489    scan.addFamily(BackupSystemTable.META_FAMILY);
490    return scan;
491  }
492 
493  /**
494   * Creates Get operation to load backup set content
495   * @return get operation
496   */
497  static Get createGetForBackupSet(String name) {    
498    Get get  = new Get(rowkey(SET_KEY_PREFIX, name));
499    get.addFamily(BackupSystemTable.META_FAMILY);
500    return get;
501  }
502  
503  /**
504   * Creates Delete operation to delete backup set content
505   * @param name - backup set's name
506   * @return delete operation
507   */
508  static Delete createDeleteForBackupSet(String name) {    
509    Delete del  = new Delete(rowkey(SET_KEY_PREFIX, name));
510    del.addFamily(BackupSystemTable.META_FAMILY);
511    return del;
512  }
513  
514  
515  /**
516   * Creates Put operation to update backup set content
517   * @param name - backup set's name
518   * @param tables - list of tables
519   * @return put operation
520   */
521  static Put createPutForBackupSet(String name, String[] tables) {    
522    Put put  = new Put(rowkey(SET_KEY_PREFIX, name));
523    byte[] value = convertToByteArray(tables);
524    put.addColumn(BackupSystemTable.META_FAMILY, "tables".getBytes(), value);
525    return put;
526  }
527  
528  private static byte[] convertToByteArray(String[] tables) {
529    return StringUtils.join(tables, ",").getBytes();
530  }
531 
532  
533  /**
534   * Converts cell to backup set list.
535   * @param current - cell
536   * @return backup set 
537   * @throws IOException
538   */
539  static  String[] cellValueToBackupSet(Cell current) throws IOException {
540    byte[] data = CellUtil.cloneValue(current);
541    if( data != null && data.length > 0){
542      return new String(data).split(",");
543    } else{
544      return new String[0];
545    }
546  }
547 
548  /**
549   * Converts cell key to backup set name.
550   * @param current - cell
551   * @return backup set name
552   * @throws IOException
553   */
554  static  String cellKeyToBackupSetName(Cell current) throws IOException {
555    byte[] data = CellUtil.cloneRow(current);    
556    return new String(data).substring(SET_KEY_PREFIX.length());    
557  }
558  
559  static byte[] rowkey(String s, String ... other){
560    StringBuilder sb = new StringBuilder(s);
561    for(String ss: other){
562      sb.append(ss);
563    }
564    return sb.toString().getBytes();   
565  }
566  
567 }