1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22 import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.NavigableMap;
29 import java.util.TreeMap;
30 import java.util.UUID;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellScanner;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HTableDescriptor;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.classification.InterfaceAudience;
48 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
49 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
50 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
51 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
52 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
53 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
54 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
55 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
56 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57 import org.apache.hadoop.hbase.replication.ReplicationException;
58 import org.apache.hadoop.hbase.replication.ReplicationFactory;
59 import org.apache.hadoop.hbase.replication.ReplicationPeers;
60 import org.apache.hadoop.hbase.replication.ReplicationQueues;
61 import org.apache.hadoop.hbase.replication.ReplicationTracker;
62 import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
63 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.util.Pair;
66 import org.apache.hadoop.hbase.wal.WALKey;
67 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
68 import org.apache.zookeeper.KeeperException;
69
70 import com.google.common.util.concurrent.ThreadFactoryBuilder;
71
72
73
74
75 @InterfaceAudience.Private
76 public class Replication extends WALActionsListener.Base implements
77 ReplicationSourceService, ReplicationSinkService {
78 private static final Log LOG =
79 LogFactory.getLog(Replication.class);
80 private boolean replication;
81 private boolean replicationForBulkLoadData;
82 private ReplicationSourceManager replicationManager;
83 private ReplicationQueues replicationQueues;
84 private ReplicationPeers replicationPeers;
85 private ReplicationTracker replicationTracker;
86 private Configuration conf;
87 private ReplicationSink replicationSink;
88
89 private Server server;
90
91 private ScheduledExecutorService scheduleThreadPool;
92 private int statsThreadPeriod;
93
94 private ReplicationLoad replicationLoad;
95
96
97
98
99
100
101
102
103 public Replication(final Server server, final FileSystem fs,
104 final Path logDir, final Path oldLogDir) throws IOException{
105 initialize(server, fs, logDir, oldLogDir);
106 }
107
108
109
110
111 public Replication() {
112 }
113
114 public void initialize(final Server server, final FileSystem fs,
115 final Path logDir, final Path oldLogDir) throws IOException {
116 this.server = server;
117 this.conf = this.server.getConfiguration();
118 this.replication = isReplication(this.conf);
119 this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
120 this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
121 new ThreadFactoryBuilder()
122 .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
123 .setDaemon(true)
124 .build());
125 if (this.replicationForBulkLoadData) {
126 if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
127 || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
128 throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
129 + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
130 + " is set to true.");
131 }
132 }
133 if (replication) {
134 try {
135 this.replicationQueues =
136 ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
137 this.replicationQueues.init(this.server.getServerName().toString());
138 this.replicationPeers =
139 ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
140 this.replicationPeers.init();
141 this.replicationTracker =
142 ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
143 this.conf, this.server, this.server);
144 } catch (ReplicationException e) {
145 throw new IOException("Failed replication handler create", e);
146 }
147 UUID clusterId = null;
148 try {
149 clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
150 } catch (KeeperException ke) {
151 throw new IOException("Could not read cluster id", ke);
152 }
153 this.replicationManager =
154 new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
155 conf, this.server, fs, logDir, oldLogDir, clusterId);
156 this.statsThreadPeriod =
157 this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
158 LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
159 this.replicationLoad = new ReplicationLoad();
160 } else {
161 this.replicationManager = null;
162 this.replicationQueues = null;
163 this.replicationPeers = null;
164 this.replicationTracker = null;
165 this.replicationLoad = null;
166 }
167 }
168
169
170
171
172
173 public static boolean isReplication(final Configuration c) {
174 return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
175 }
176
177
178
179
180
181 public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
182 return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
183 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
184 }
185
186
187
188
189 public WALActionsListener getWALActionsListener() {
190 return this;
191 }
192
193
194
195 public void stopReplicationService() {
196 join();
197 }
198
199
200
201
202 public void join() {
203 if (this.replication) {
204 this.replicationManager.join();
205 if (this.replicationSink != null) {
206 this.replicationSink.stopReplicationSinkServices();
207 }
208 }
209 scheduleThreadPool.shutdown();
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225 public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
226 String replicationClusterId, String sourceBaseNamespaceDirPath,
227 String sourceHFileArchiveDirPath) throws IOException {
228 if (this.replication) {
229 this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
230 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
231 }
232 }
233
234
235
236
237
238
239 public void startReplicationService() throws IOException {
240 if (this.replication) {
241 try {
242 this.replicationManager.init();
243 } catch (ReplicationException e) {
244 throw new IOException(e);
245 }
246 this.replicationSink = new ReplicationSink(this.conf, this.server);
247 this.scheduleThreadPool.scheduleAtFixedRate(
248 new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
249 statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
250 }
251 }
252
253
254
255
256
257 public ReplicationSourceManager getReplicationManager() {
258 return this.replicationManager;
259 }
260
261 @Override
262 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
263 throws IOException {
264 scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
265 }
266
267
268
269
270
271
272
273
274
275
276 public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
277 Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
278 NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
279 byte[] family;
280 boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
281 for (Cell cell : logEdit.getCells()) {
282 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
283 if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
284 scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
285 } else {
286
287 continue;
288 }
289 } else {
290 family = CellUtil.cloneFamily(cell);
291
292 assert htd.getFamily(family) != null;
293
294 if (!scopes.containsKey(family)) {
295 int scope = htd.getFamily(family).getScope();
296 if (scope != REPLICATION_SCOPE_LOCAL) {
297 scopes.put(family, scope);
298 }
299 }
300 }
301 }
302 if (!scopes.isEmpty()) {
303 logKey.setScopes(scopes);
304 }
305 }
306
307 private static void scopeBulkLoadEdits(HTableDescriptor htd,
308 ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
309 TableName tableName, Cell cell) throws IOException {
310 byte[] family;
311 try {
312 BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
313 for (StoreDescriptor s : bld.getStoresList()) {
314 family = s.getFamilyName().toByteArray();
315 if (!scopes.containsKey(family)) {
316 int scope = htd.getFamily(family).getScope();
317 if (scope != REPLICATION_SCOPE_LOCAL) {
318 scopes.put(family, scope);
319 }
320 }
321 }
322 } catch (IOException e) {
323 LOG.error("Failed to get bulk load events information from the wal file.", e);
324 throw e;
325 }
326 }
327
328 void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
329 throws IOException {
330 try {
331 this.replicationManager.addHFileRefs(tableName, family, pairs);
332 } catch (ReplicationException e) {
333 LOG.error("Failed to add hfile references in the replication queue.", e);
334 throw new IOException(e);
335 }
336 }
337
338 @Override
339 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
340 getReplicationManager().preLogRoll(newPath);
341 }
342
343 @Override
344 public void postLogRoll(Path oldPath, Path newPath) throws IOException {
345 getReplicationManager().postLogRoll(newPath);
346 }
347
348
349
350
351
352 public static void decorateMasterConfiguration(Configuration conf) {
353 if (!isReplication(conf)) {
354 return;
355 }
356 String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
357 String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
358 if (!plugins.contains(cleanerClass)) {
359 conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
360 }
361 if (isReplicationForBulkLoadDataEnabled(conf)) {
362 plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
363 cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
364 if (!plugins.contains(cleanerClass)) {
365 conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
366 }
367 }
368 }
369
370
371
372
373
374
375 public static void decorateRegionServerConfiguration(Configuration conf) {
376 if (isReplicationForBulkLoadDataEnabled(conf)) {
377 String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
378 String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
379 if (!plugins.contains(rsCoprocessorClass)) {
380 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, plugins + ","
381 + rsCoprocessorClass);
382 }
383 }
384 }
385
386
387
388
389 static class ReplicationStatisticsThread extends Thread {
390
391 private final ReplicationSink replicationSink;
392 private final ReplicationSourceManager replicationManager;
393
394 public ReplicationStatisticsThread(final ReplicationSink replicationSink,
395 final ReplicationSourceManager replicationManager) {
396 super("ReplicationStatisticsThread");
397 this.replicationManager = replicationManager;
398 this.replicationSink = replicationSink;
399 }
400
401 @Override
402 public void run() {
403 printStats(this.replicationManager.getStats());
404 printStats(this.replicationSink.getStats());
405 }
406
407 private void printStats(String stats) {
408 if (!stats.isEmpty()) {
409 LOG.info(stats);
410 }
411 }
412 }
413
414 @Override
415 public ReplicationLoad refreshAndGetReplicationLoad() {
416 if (this.replicationLoad == null) {
417 return null;
418 }
419
420 buildReplicationLoad();
421 return this.replicationLoad;
422 }
423
424 private void buildReplicationLoad() {
425
426 List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
427 List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
428
429 for (ReplicationSourceInterface source : sources) {
430 if (source instanceof ReplicationSource) {
431 sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
432 }
433 }
434
435 MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
436 this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
437 }
438 }