1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.TreeMap;
30 import java.util.UUID;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.CellScanner;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.HBaseConfiguration;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.Stoppable;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.classification.InterfaceAudience;
46 import org.apache.hadoop.hbase.client.Connection;
47 import org.apache.hadoop.hbase.client.ConnectionFactory;
48 import org.apache.hadoop.hbase.client.Delete;
49 import org.apache.hadoop.hbase.client.Mutation;
50 import org.apache.hadoop.hbase.client.Put;
51 import org.apache.hadoop.hbase.client.Row;
52 import org.apache.hadoop.hbase.client.Table;
53 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
54 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
55 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
56 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
57 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.Pair;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.Private
76 public class ReplicationSink {
77
78 private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
79 private final Configuration conf;
80
81
82 private volatile Connection sharedHtableCon;
83 private final MetricsSink metrics;
84 private final AtomicLong totalReplicatedEdits = new AtomicLong();
85 private final Object sharedHtableConLock = new Object();
86
87 private long hfilesReplicated = 0;
88 private SourceFSConfigurationProvider provider;
89
90
91
92
93
94
95
96
97 public ReplicationSink(Configuration conf, Stoppable stopper)
98 throws IOException {
99 this.conf = HBaseConfiguration.create(conf);
100 decorateConf();
101 this.metrics = new MetricsSink();
102
103 String className =
104 conf.get("hbase.replication.source.fs.conf.provider",
105 DefaultSourceFSConfigurationProvider.class.getCanonicalName());
106 try {
107 @SuppressWarnings("rawtypes")
108 Class c = Class.forName(className);
109 this.provider = (SourceFSConfigurationProvider) c.newInstance();
110 } catch (Exception e) {
111 throw new IllegalArgumentException("Configured source fs configuration provider class "
112 + className + " throws error.", e);
113 }
114 }
115
116
117
118
119
120 private void decorateConf() {
121 this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
122 this.conf.getInt("replication.sink.client.retries.number", 4));
123 this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
124 this.conf.getInt("replication.sink.client.ops.timeout", 10000));
125 String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
126 if (StringUtils.isNotEmpty(replicationCodec)) {
127 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
128 }
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143 public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
144 String replicationClusterId, String sourceBaseNamespaceDirPath,
145 String sourceHFileArchiveDirPath) throws IOException {
146 if (entries.isEmpty()) return;
147 if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
148
149
150 try {
151 long totalReplicated = 0;
152
153
154 Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
155 new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
156
157
158 Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
159
160 for (WALEntry entry : entries) {
161 TableName table =
162 TableName.valueOf(entry.getKey().getTableName().toByteArray());
163 Cell previousCell = null;
164 Mutation m = null;
165 int count = entry.getAssociatedCellCount();
166 for (int i = 0; i < count; i++) {
167
168 if (!cells.advance()) {
169 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
170 }
171 Cell cell = cells.current();
172
173 if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
174 if (bulkLoadHFileMap == null) {
175 bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
176 }
177 buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
178 } else {
179
180 if (isNewRowOrType(previousCell, cell)) {
181
182 m =
183 CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
184 cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
185 cell.getRowLength());
186 List<UUID> clusterIds = new ArrayList<UUID>();
187 for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
188 clusterIds.add(toUUID(clusterId));
189 }
190 m.setClusterIds(clusterIds);
191 addToHashMultiMap(rowMap, table, clusterIds, m);
192 }
193 if (CellUtil.isDelete(cell)) {
194 ((Delete) m).addDeleteMarker(cell);
195 } else {
196 ((Put) m).add(cell);
197 }
198 previousCell = cell;
199 }
200 }
201 totalReplicated++;
202 }
203
204
205 if (!rowMap.isEmpty()) {
206 LOG.debug("Started replicating mutations.");
207 for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
208 batch(entry.getKey(), entry.getValue().values());
209 }
210 LOG.debug("Finished replicating mutations.");
211 }
212
213 if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
214 LOG.debug("Started replicating bulk loaded data.");
215 HFileReplicator hFileReplicator =
216 new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
217 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
218 getConnection());
219 hFileReplicator.replicate();
220 LOG.debug("Finished replicating bulk loaded data.");
221 }
222
223 int size = entries.size();
224 this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
225 this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
226 this.totalReplicatedEdits.addAndGet(totalReplicated);
227 } catch (IOException ex) {
228 LOG.error("Unable to accept edit because:", ex);
229 throw ex;
230 }
231 }
232
233 private void buildBulkLoadHFileMap(
234 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
235 Cell cell) throws IOException {
236 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
237 List<StoreDescriptor> storesList = bld.getStoresList();
238 int storesSize = storesList.size();
239 for (int j = 0; j < storesSize; j++) {
240 StoreDescriptor storeDescriptor = storesList.get(j);
241 List<String> storeFileList = storeDescriptor.getStoreFileList();
242 int storeFilesSize = storeFileList.size();
243 hfilesReplicated += storeFilesSize;
244 for (int k = 0; k < storeFilesSize; k++) {
245 byte[] family = storeDescriptor.getFamilyName().toByteArray();
246
247
248 String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
249
250 String tableName = table.getNameWithNamespaceInclAsString();
251 if (bulkLoadHFileMap.containsKey(tableName)) {
252 List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
253 boolean foundFamily = false;
254 for (int i = 0; i < familyHFilePathsList.size(); i++) {
255 Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
256 if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
257
258 familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
259 foundFamily = true;
260 break;
261 }
262 }
263 if (!foundFamily) {
264
265 addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
266 }
267 } else {
268
269 addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
270 }
271 }
272 }
273 }
274
275 private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
276 List<Pair<byte[], List<String>>> familyHFilePathsList) {
277 List<String> hfilePaths = new ArrayList<String>();
278 hfilePaths.add(pathToHfileFromNS);
279 familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
280 }
281
282 private void addNewTableEntryInMap(
283 final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
284 String pathToHfileFromNS, String tableName) {
285 List<String> hfilePaths = new ArrayList<String>();
286 hfilePaths.add(pathToHfileFromNS);
287 Pair<byte[], List<String>> newFamilyHFilePathsPair =
288 new Pair<byte[], List<String>>(family, hfilePaths);
289 List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
290 new ArrayList<Pair<byte[], List<String>>>();
291 newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
292 bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
293 }
294
295 private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
296 byte[] family) {
297 return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
298 .append(table.getQualifierAsString()).append(Path.SEPARATOR)
299 .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
300 .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
301 }
302
303
304
305
306
307
308 private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
309 return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
310 !CellUtil.matchingRow(previousCell, cell);
311 }
312
313 private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
314 return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
315 }
316
317
318
319
320
321
322
323
324
325
326 private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
327 Map<K2,List<V>> innerMap = map.get(key1);
328 if (innerMap == null) {
329 innerMap = new HashMap<K2, List<V>>();
330 map.put(key1, innerMap);
331 }
332 List<V> values = innerMap.get(key2);
333 if (values == null) {
334 values = new ArrayList<V>();
335 innerMap.put(key2, values);
336 }
337 values.add(value);
338 return values;
339 }
340
341
342
343
344 public void stopReplicationSinkServices() {
345 try {
346 if (this.sharedHtableCon != null) {
347 synchronized (sharedHtableConLock) {
348 if (this.sharedHtableCon != null) {
349 this.sharedHtableCon.close();
350 this.sharedHtableCon = null;
351 }
352 }
353 }
354 } catch (IOException e) {
355 LOG.warn("IOException while closing the connection", e);
356 }
357 }
358
359
360
361
362
363
364
365
366 protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
367 if (allRows.isEmpty()) {
368 return;
369 }
370 Table table = null;
371 try {
372 Connection connection = getConnection();
373 table = connection.getTable(tableName);
374 for (List<Row> rows : allRows) {
375 table.batch(rows);
376 }
377 } catch (InterruptedException ix) {
378 throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
379 } finally {
380 if (table != null) {
381 table.close();
382 }
383 }
384 }
385
386 private Connection getConnection() throws IOException {
387
388 Connection connection = sharedHtableCon;
389 if (connection == null) {
390 synchronized (sharedHtableConLock) {
391 connection = sharedHtableCon;
392 if (connection == null) {
393 connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
394 }
395 }
396 }
397 return connection;
398 }
399
400
401
402
403
404
405 public String getStats() {
406 return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
407 "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
408 ", total replicated edits: " + this.totalReplicatedEdits;
409 }
410
411
412
413
414
415 public MetricsSink getSinkMetrics() {
416 return this.metrics;
417 }
418 }