1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.NavigableMap;
26
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
29 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
30 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.wal.WAL.Entry;
37
38
39
40
41 @InterfaceAudience.Private
42 public class ScopeWALEntryFilter implements WALEntryFilter {
43 private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
44
45 @Override
46 public Entry filter(Entry entry) {
47 NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
48 if (scopes == null || scopes.isEmpty()) {
49 return null;
50 }
51 ArrayList<Cell> cells = entry.getEdit().getCells();
52 int size = cells.size();
53 byte[] fam;
54 for (int i = size - 1; i >= 0; i--) {
55 Cell cell = cells.get(i);
56
57
58
59
60 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
61 Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
62 if (filteredBulkLoadEntryCell != null) {
63 cells.set(i, filteredBulkLoadEntryCell);
64 } else {
65 cells.remove(i);
66 }
67 } else {
68
69
70 fam = CellUtil.cloneFamily(cell);
71 if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
72 cells.remove(i);
73 }
74 }
75 }
76 if (cells.size() < size / 2) {
77 cells.trimToSize();
78 }
79 return entry;
80 }
81
82 private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
83 byte[] fam;
84 BulkLoadDescriptor bld = null;
85 try {
86 bld = WALEdit.getBulkLoadDescriptor(cell);
87 } catch (IOException e) {
88 LOG.warn("Failed to get bulk load events information from the WAL file.", e);
89 return cell;
90 }
91 List<StoreDescriptor> storesList = bld.getStoresList();
92
93 List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
94 Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
95 boolean anyStoreRemoved = false;
96 while (copiedStoresListIterator.hasNext()) {
97 StoreDescriptor sd = copiedStoresListIterator.next();
98 fam = sd.getFamilyName().toByteArray();
99 if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
100 copiedStoresListIterator.remove();
101 anyStoreRemoved = true;
102 }
103 }
104
105 if (!anyStoreRemoved) {
106 return cell;
107 } else if (copiedStoresList.isEmpty()) {
108 return null;
109 }
110 BulkLoadDescriptor.Builder newDesc =
111 BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
112 .setEncodedRegionName(bld.getEncodedRegionName())
113 .setBulkloadSeqNum(bld.getBulkloadSeqNum());
114 newDesc.addAllStores(copiedStoresList);
115 BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
116 return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
117 cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
118 }
119 }