View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellScanner;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.client.Connection;
47  import org.apache.hadoop.hbase.client.ConnectionFactory;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Row;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
54  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
56  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Pair;
60  
61  /**
62   * This class is responsible for replicating the edits coming
63   * from another cluster.
64   * <p/>
65   * This replication process is currently waiting for the edits to be applied
66   * before the method can return. This means that the replication of edits
67   * is synchronized (after reading from WALs in ReplicationSource) and that a
68   * single region server cannot receive edits from two sources at the same time
69   * <p/>
70   * This class uses the native HBase client in order to replicate entries.
71   * <p/>
72   *
73   * TODO make this class more like ReplicationSource wrt log handling
74   */
75  @InterfaceAudience.Private
76  public class ReplicationSink {
77  
78    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
79    private final Configuration conf;
80    // Volatile because of note in here -- look for double-checked locking:
81    // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
82    private volatile Connection sharedHtableCon;
83    private final MetricsSink metrics;
84    private final AtomicLong totalReplicatedEdits = new AtomicLong();
85    private final Object sharedHtableConLock = new Object();
86    // Number of hfiles that we successfully replicated
87    private long hfilesReplicated = 0;
88    private SourceFSConfigurationProvider provider;
89  
90    /**
91     * Create a sink for replication
92     *
93     * @param conf                conf object
94     * @param stopper             boolean to tell this thread to stop
95     * @throws IOException thrown when HDFS goes bad or bad file name
96     */
97    public ReplicationSink(Configuration conf, Stoppable stopper)
98        throws IOException {
99      this.conf = HBaseConfiguration.create(conf);
100     decorateConf();
101     this.metrics = new MetricsSink();
102 
103     String className =
104         conf.get("hbase.replication.source.fs.conf.provider",
105           DefaultSourceFSConfigurationProvider.class.getCanonicalName());
106     try {
107       @SuppressWarnings("rawtypes")
108       Class c = Class.forName(className);
109       this.provider = (SourceFSConfigurationProvider) c.newInstance();
110     } catch (Exception e) {
111       throw new IllegalArgumentException("Configured source fs configuration provider class "
112           + className + " throws error.", e);
113     }
114   }
115 
116   /**
117    * decorate the Configuration object to make replication more receptive to delays:
118    * lessen the timeout and numTries.
119    */
120   private void decorateConf() {
121     this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
122         this.conf.getInt("replication.sink.client.retries.number", 4));
123     this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
124         this.conf.getInt("replication.sink.client.ops.timeout", 10000));
125     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
126     if (StringUtils.isNotEmpty(replicationCodec)) {
127       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
128     }
129    }
130 
131   /**
132    * Replicate this array of entries directly into the local cluster using the native client. Only
133    * operates against raw protobuf type saving on a conversion from pb to pojo.
134    * @param entries
135    * @param cells
136    * @param replicationClusterId Id which will uniquely identify source cluster FS client
137    *          configurations in the replication configuration directory
138    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
139    *          directory
140    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
141    * @throws IOException If failed to replicate the data
142    */
143   public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
144       String replicationClusterId, String sourceBaseNamespaceDirPath,
145       String sourceHFileArchiveDirPath) throws IOException {
146     if (entries.isEmpty()) return;
147     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
148     // Very simple optimization where we batch sequences of rows going
149     // to the same table.
150     try {
151       long totalReplicated = 0;
152       // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
153       // invocation of this method per table and cluster id.
154       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
155           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
156 
157       // Map of table name Vs list of pair of family and list of hfile paths from its namespace
158       Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
159 
160       for (WALEntry entry : entries) {
161         TableName table =
162             TableName.valueOf(entry.getKey().getTableName().toByteArray());
163         Cell previousCell = null;
164         Mutation m = null;
165         int count = entry.getAssociatedCellCount();
166         for (int i = 0; i < count; i++) {
167           // Throw index out of bounds if our cell count is off
168           if (!cells.advance()) {
169             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
170           }
171           Cell cell = cells.current();
172           // Handle bulk load hfiles replication
173           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
174             if (bulkLoadHFileMap == null) {
175               bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
176             }
177             buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
178           } else {
179             // Handle wal replication
180             if (isNewRowOrType(previousCell, cell)) {
181               // Create new mutation
182               m =
183                   CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
184                       cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
185                       cell.getRowLength());
186               List<UUID> clusterIds = new ArrayList<UUID>();
187               for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
188                 clusterIds.add(toUUID(clusterId));
189               }
190               m.setClusterIds(clusterIds);
191               addToHashMultiMap(rowMap, table, clusterIds, m);
192             }
193             if (CellUtil.isDelete(cell)) {
194               ((Delete) m).addDeleteMarker(cell);
195             } else {
196               ((Put) m).add(cell);
197             }
198             previousCell = cell;
199           }
200         }
201         totalReplicated++;
202       }
203 
204       // TODO Replicating mutations and bulk loaded data can be made parallel
205       if (!rowMap.isEmpty()) {
206         LOG.debug("Started replicating mutations.");
207         for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
208           batch(entry.getKey(), entry.getValue().values());
209         }
210         LOG.debug("Finished replicating mutations.");
211       }
212 
213       if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
214         LOG.debug("Started replicating bulk loaded data.");
215         HFileReplicator hFileReplicator =
216             new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
217                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
218                 getConnection());
219         hFileReplicator.replicate();
220         LOG.debug("Finished replicating bulk loaded data.");
221       }
222 
223       int size = entries.size();
224       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
225       this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
226       this.totalReplicatedEdits.addAndGet(totalReplicated);
227     } catch (IOException ex) {
228       LOG.error("Unable to accept edit because:", ex);
229       throw ex;
230     }
231   }
232 
233   private void buildBulkLoadHFileMap(
234       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
235       Cell cell) throws IOException {
236     BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
237     List<StoreDescriptor> storesList = bld.getStoresList();
238     int storesSize = storesList.size();
239     for (int j = 0; j < storesSize; j++) {
240       StoreDescriptor storeDescriptor = storesList.get(j);
241       List<String> storeFileList = storeDescriptor.getStoreFileList();
242       int storeFilesSize = storeFileList.size();
243       hfilesReplicated += storeFilesSize;
244       for (int k = 0; k < storeFilesSize; k++) {
245         byte[] family = storeDescriptor.getFamilyName().toByteArray();
246 
247         // Build hfile relative path from its namespace
248         String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
249 
250         String tableName = table.getNameWithNamespaceInclAsString();
251         if (bulkLoadHFileMap.containsKey(tableName)) {
252           List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
253           boolean foundFamily = false;
254           for (int i = 0; i < familyHFilePathsList.size(); i++) {
255             Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
256             if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
257               // Found family already present, just add the path to the existing list
258               familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
259               foundFamily = true;
260               break;
261             }
262           }
263           if (!foundFamily) {
264             // Family not found, add this family and its hfile paths pair to the list
265             addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
266           }
267         } else {
268           // Add this table entry into the map
269           addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
270         }
271       }
272     }
273   }
274 
275   private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
276       List<Pair<byte[], List<String>>> familyHFilePathsList) {
277     List<String> hfilePaths = new ArrayList<String>();
278     hfilePaths.add(pathToHfileFromNS);
279     familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
280   }
281 
282   private void addNewTableEntryInMap(
283       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
284       String pathToHfileFromNS, String tableName) {
285     List<String> hfilePaths = new ArrayList<String>();
286     hfilePaths.add(pathToHfileFromNS);
287     Pair<byte[], List<String>> newFamilyHFilePathsPair =
288         new Pair<byte[], List<String>>(family, hfilePaths);
289     List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
290         new ArrayList<Pair<byte[], List<String>>>();
291     newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
292     bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
293   }
294 
295   private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
296       byte[] family) {
297     return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
298         .append(table.getQualifierAsString()).append(Path.SEPARATOR)
299         .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
300         .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
301   }
302 
303   /**
304    * @param previousCell
305    * @param cell
306    * @return True if we have crossed over onto a new row or type
307    */
308   private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
309     return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
310         !CellUtil.matchingRow(previousCell, cell);
311   }
312 
313   private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
314     return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
315   }
316 
317   /**
318    * Simple helper to a map from key to (a list of) values
319    * TODO: Make a general utility method
320    * @param map
321    * @param key1
322    * @param key2
323    * @param value
324    * @return the list of values corresponding to key1 and key2
325    */
326   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
327     Map<K2,List<V>> innerMap = map.get(key1);
328     if (innerMap == null) {
329       innerMap = new HashMap<K2, List<V>>();
330       map.put(key1, innerMap);
331     }
332     List<V> values = innerMap.get(key2);
333     if (values == null) {
334       values = new ArrayList<V>();
335       innerMap.put(key2, values);
336     }
337     values.add(value);
338     return values;
339   }
340 
341   /**
342    * stop the thread pool executor. It is called when the regionserver is stopped.
343    */
344   public void stopReplicationSinkServices() {
345     try {
346       if (this.sharedHtableCon != null) {
347         synchronized (sharedHtableConLock) {
348           if (this.sharedHtableCon != null) {
349             this.sharedHtableCon.close();
350             this.sharedHtableCon = null;
351           }
352         }
353       }
354     } catch (IOException e) {
355       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
356     }
357   }
358 
359 
360   /**
361    * Do the changes and handle the pool
362    * @param tableName table to insert into
363    * @param allRows list of actions
364    * @throws IOException
365    */
366   protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
367     if (allRows.isEmpty()) {
368       return;
369     }
370     Table table = null;
371     try {
372       Connection connection = getConnection();
373       table = connection.getTable(tableName);
374       for (List<Row> rows : allRows) {
375         table.batch(rows);
376       }
377     } catch (InterruptedException ix) {
378       throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
379     } finally {
380       if (table != null) {
381         table.close();
382       }
383     }
384   }
385 
386   private Connection getConnection() throws IOException {
387     // See https://en.wikipedia.org/wiki/Double-checked_locking
388     Connection connection = sharedHtableCon;
389     if (connection == null) {
390       synchronized (sharedHtableConLock) {
391         connection = sharedHtableCon;
392         if (connection == null) {
393           connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
394         }
395       }
396     }
397     return connection;
398   }
399 
400   /**
401    * Get a string representation of this sink's metrics
402    * @return string with the total replicated edits count and the date
403    * of the last edit that was applied
404    */
405   public String getStats() {
406     return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
407       "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
408       ", total replicated edits: " + this.totalReplicatedEdits;
409   }
410 
411   /**
412    * Get replication Sink Metrics
413    * @return MetricsSink
414    */
415   public MetricsSink getSinkMetrics() {
416     return this.metrics;
417   }
418 }