1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
33 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
34 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
35 import org.apache.hadoop.hbase.wal.WAL.Entry;
36 import org.apache.hadoop.hbase.util.Bytes;
37
38 public class TableCfWALEntryFilter implements WALEntryFilter {
39
40 private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
41 private final ReplicationPeer peer;
42
43 public TableCfWALEntryFilter(ReplicationPeer peer) {
44 this.peer = peer;
45 }
46
47 @Override
48 public Entry filter(Entry entry) {
49 TableName tabName = entry.getKey().getTablename();
50 ArrayList<Cell> cells = entry.getEdit().getCells();
51 Map<TableName, List<String>> tableCFs = null;
52
53 try {
54 tableCFs = this.peer.getTableCFs();
55 } catch (IllegalArgumentException e) {
56 LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
57 ", degenerate as if it's not configured by keeping tableCFs==null");
58 }
59 int size = cells.size();
60
61
62
63 if (tableCFs == null) {
64 return entry;
65 }
66
67
68 if (!tableCFs.containsKey(tabName)) {
69 return null;
70 } else {
71 List<String> cfs = tableCFs.get(tabName);
72 for (int i = size - 1; i >= 0; i--) {
73 Cell cell = cells.get(i);
74
75
76
77 if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
78 Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
79 if (filteredBulkLoadEntryCell != null) {
80 cells.set(i, filteredBulkLoadEntryCell);
81 } else {
82 cells.remove(i);
83 }
84 } else {
85
86
87 if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
88 cell.getFamilyOffset(), cell.getFamilyLength()))) {
89 cells.remove(i);
90 }
91 }
92 }
93 }
94 if (cells.size() < size/2) {
95 cells.trimToSize();
96 }
97 return entry;
98 }
99
100 private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
101 byte[] fam;
102 BulkLoadDescriptor bld = null;
103 try {
104 bld = WALEdit.getBulkLoadDescriptor(cell);
105 } catch (IOException e) {
106 LOG.warn("Failed to get bulk load events information from the WAL file.", e);
107 return cell;
108 }
109 List<StoreDescriptor> storesList = bld.getStoresList();
110
111 List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
112 Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
113 boolean anyStoreRemoved = false;
114 while (copiedStoresListIterator.hasNext()) {
115 StoreDescriptor sd = copiedStoresListIterator.next();
116 fam = sd.getFamilyName().toByteArray();
117 if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
118 copiedStoresListIterator.remove();
119 anyStoreRemoved = true;
120 }
121 }
122
123 if (!anyStoreRemoved) {
124 return cell;
125 } else if (copiedStoresList.isEmpty()) {
126 return null;
127 }
128 BulkLoadDescriptor.Builder newDesc =
129 BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
130 .setEncodedRegionName(bld.getEncodedRegionName())
131 .setBulkloadSeqNum(bld.getBulkloadSeqNum());
132 newDesc.addAllStores(copiedStoresList);
133 BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
134 return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
135 cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
136 }
137 }