View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.NavigableMap;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
29  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
30  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.hbase.Cell;
34  import org.apache.hadoop.hbase.CellUtil;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.wal.WAL.Entry;
37  
38  /**
39   * Keeps KVs that are scoped other than local
40   */
41  @InterfaceAudience.Private
42  public class ScopeWALEntryFilter implements WALEntryFilter {
43    private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
44  
45    @Override
46    public Entry filter(Entry entry) {
47      NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
48      if (scopes == null || scopes.isEmpty()) {
49        return null;
50      }
51      ArrayList<Cell> cells = entry.getEdit().getCells();
52      int size = cells.size();
53      byte[] fam;
54      for (int i = size - 1; i >= 0; i--) {
55        Cell cell = cells.get(i);
56        // If a bulk load entry has a scope then that means user has enabled replication for
57        // bulk load hfiles.
58        // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
59        // cannot refactor into one now, can revisit and see if any way to unify them.
60        if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
61          Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
62          if (filteredBulkLoadEntryCell != null) {
63            cells.set(i, filteredBulkLoadEntryCell);
64          } else {
65            cells.remove(i);
66          }
67        } else {
68          // The scope will be null or empty if
69          // there's nothing to replicate in that WALEdit
70          fam = CellUtil.cloneFamily(cell);
71          if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
72            cells.remove(i);
73          }
74        }
75      }
76      if (cells.size() < size / 2) {
77        cells.trimToSize();
78      }
79      return entry;
80    }
81  
82    private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
83      byte[] fam;
84      BulkLoadDescriptor bld = null;
85      try {
86        bld = WALEdit.getBulkLoadDescriptor(cell);
87      } catch (IOException e) {
88        LOG.warn("Failed to get bulk load events information from the WAL file.", e);
89        return cell;
90      }
91      List<StoreDescriptor> storesList = bld.getStoresList();
92      // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
93      List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
94      Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
95      boolean anyStoreRemoved = false;
96      while (copiedStoresListIterator.hasNext()) {
97        StoreDescriptor sd = copiedStoresListIterator.next();
98        fam = sd.getFamilyName().toByteArray();
99        if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
100         copiedStoresListIterator.remove();
101         anyStoreRemoved = true;
102       }
103     }
104 
105     if (!anyStoreRemoved) {
106       return cell;
107     } else if (copiedStoresList.isEmpty()) {
108       return null;
109     }
110     BulkLoadDescriptor.Builder newDesc =
111         BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
112             .setEncodedRegionName(bld.getEncodedRegionName())
113             .setBulkloadSeqNum(bld.getBulkloadSeqNum());
114     newDesc.addAllStores(copiedStoresList);
115     BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
116     return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
117       cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
118   }
119 }