1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.IOException;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
33 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
34 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
35 import org.apache.hadoop.hbase.regionserver.HRegionServer;
36 import org.apache.hadoop.hbase.util.Pair;
37
38
39
40
41
42 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
43 public class ReplicationObserver extends BaseRegionObserver {
44 private static final Log LOG = LogFactory.getLog(ReplicationObserver.class);
45
46 @Override
47 public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
48 final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
49 RegionCoprocessorEnvironment env = ctx.getEnvironment();
50 Configuration c = env.getConfiguration();
51 if (pairs == null
52 || pairs.isEmpty()
53 || !c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
54 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
55 LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
56 + "data replication.");
57 return;
58 }
59 HRegionServer rs = (HRegionServer) env.getRegionServerServices();
60 Replication rep = (Replication) rs.getReplicationSourceService();
61 rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
62 }
63 }