View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.Cell;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
33  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
34  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
35  import org.apache.hadoop.hbase.wal.WAL.Entry;
36  import org.apache.hadoop.hbase.util.Bytes;
37  
38  public class TableCfWALEntryFilter implements WALEntryFilter {
39  
40    private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
41    private final ReplicationPeer peer;
42  
43    public TableCfWALEntryFilter(ReplicationPeer peer) {
44      this.peer = peer;
45    }
46  
47    @Override
48    public Entry filter(Entry entry) {
49      TableName tabName = entry.getKey().getTablename();
50      ArrayList<Cell> cells = entry.getEdit().getCells();
51      Map<TableName, List<String>> tableCFs = null;
52  
53      try {
54        tableCFs = this.peer.getTableCFs();
55      } catch (IllegalArgumentException e) {
56        LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
57            ", degenerate as if it's not configured by keeping tableCFs==null");
58      }
59      int size = cells.size();
60      
61      // If null means user has explicitly not configured any table CFs so all the tables data are
62      // applicable for replication
63      if (tableCFs == null) {
64        return entry;
65      }
66      // return null(prevent replicating) if logKey's table isn't in this peer's
67      // replicable table list
68      if (!tableCFs.containsKey(tabName)) {
69        return null;
70      } else {
71        List<String> cfs = tableCFs.get(tabName);
72        for (int i = size - 1; i >= 0; i--) {
73          Cell cell = cells.get(i);
74          // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
75          // cannot refactor into one now, can revisit and see if any way to unify them.
76          // Filter bulk load entries separately
77          if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
78            Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
79            if (filteredBulkLoadEntryCell != null) {
80              cells.set(i, filteredBulkLoadEntryCell);
81            } else {
82              cells.remove(i);
83            }
84          } else {
85            // ignore(remove) kv if its cf isn't in the replicable cf list
86            // (empty cfs means all cfs of this table are replicable)
87            if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
88              cell.getFamilyOffset(), cell.getFamilyLength()))) {
89              cells.remove(i);
90            }
91          }
92        }
93      }
94      if (cells.size() < size/2) {
95        cells.trimToSize();
96      }
97      return entry;
98    }
99  
100   private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
101     byte[] fam;
102     BulkLoadDescriptor bld = null;
103     try {
104       bld = WALEdit.getBulkLoadDescriptor(cell);
105     } catch (IOException e) {
106       LOG.warn("Failed to get bulk load events information from the WAL file.", e);
107       return cell;
108     }
109     List<StoreDescriptor> storesList = bld.getStoresList();
110     // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
111     List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
112     Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
113     boolean anyStoreRemoved = false;
114     while (copiedStoresListIterator.hasNext()) {
115       StoreDescriptor sd = copiedStoresListIterator.next();
116       fam = sd.getFamilyName().toByteArray();
117       if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
118         copiedStoresListIterator.remove();
119         anyStoreRemoved = true;
120       }
121     }
122 
123     if (!anyStoreRemoved) {
124       return cell;
125     } else if (copiedStoresList.isEmpty()) {
126       return null;
127     }
128     BulkLoadDescriptor.Builder newDesc =
129         BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
130             .setEncodedRegionName(bld.getEncodedRegionName())
131             .setBulkloadSeqNum(bld.getBulkloadSeqNum());
132     newDesc.addAllStores(copiedStoresList);
133     BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
134     return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
135       cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
136   }
137 }