View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.NavigableMap;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
49  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
50  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
51  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
52  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
53  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
54  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
55  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
56  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
57  import org.apache.hadoop.hbase.replication.ReplicationException;
58  import org.apache.hadoop.hbase.replication.ReplicationFactory;
59  import org.apache.hadoop.hbase.replication.ReplicationPeers;
60  import org.apache.hadoop.hbase.replication.ReplicationQueues;
61  import org.apache.hadoop.hbase.replication.ReplicationTracker;
62  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
63  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
64  import org.apache.hadoop.hbase.util.Bytes;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.hbase.wal.WALKey;
67  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
68  import org.apache.zookeeper.KeeperException;
69  
70  import com.google.common.util.concurrent.ThreadFactoryBuilder;
71  
72  /**
73   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
74   */
75  @InterfaceAudience.Private
76  public class Replication extends WALActionsListener.Base implements
77    ReplicationSourceService, ReplicationSinkService {
78    private static final Log LOG =
79        LogFactory.getLog(Replication.class);
80    private boolean replication;
81    private boolean replicationForBulkLoadData;
82    private ReplicationSourceManager replicationManager;
83    private ReplicationQueues replicationQueues;
84    private ReplicationPeers replicationPeers;
85    private ReplicationTracker replicationTracker;
86    private Configuration conf;
87    private ReplicationSink replicationSink;
88    // Hosting server
89    private Server server;
90    /** Statistics thread schedule pool */
91    private ScheduledExecutorService scheduleThreadPool;
92    private int statsThreadPeriod;
93    // ReplicationLoad to access replication metrics
94    private ReplicationLoad replicationLoad;
95    /**
96     * Instantiate the replication management (if rep is enabled).
97     * @param server Hosting server
98     * @param fs handle to the filesystem
99     * @param logDir
100    * @param oldLogDir directory where logs are archived
101    * @throws IOException
102    */
103   public Replication(final Server server, final FileSystem fs,
104       final Path logDir, final Path oldLogDir) throws IOException{
105     initialize(server, fs, logDir, oldLogDir);
106   }
107 
108   /**
109    * Empty constructor
110    */
111   public Replication() {
112   }
113 
114   public void initialize(final Server server, final FileSystem fs,
115       final Path logDir, final Path oldLogDir) throws IOException {
116     this.server = server;
117     this.conf = this.server.getConfiguration();
118     this.replication = isReplication(this.conf);
119     this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
120     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
121       new ThreadFactoryBuilder()
122         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
123         .setDaemon(true)
124         .build());
125     if (this.replicationForBulkLoadData) {
126       if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
127           || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
128         throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
129             + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
130             + " is set to true.");
131       }
132     }
133     if (replication) {
134       try {
135         this.replicationQueues =
136             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
137         this.replicationQueues.init(this.server.getServerName().toString());
138         this.replicationPeers =
139             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
140         this.replicationPeers.init();
141         this.replicationTracker =
142             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
143               this.conf, this.server, this.server);
144       } catch (ReplicationException e) {
145         throw new IOException("Failed replication handler create", e);
146       }
147       UUID clusterId = null;
148       try {
149         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
150       } catch (KeeperException ke) {
151         throw new IOException("Could not read cluster id", ke);
152       }
153       this.replicationManager =
154           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
155               conf, this.server, fs, logDir, oldLogDir, clusterId);
156       this.statsThreadPeriod =
157           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
158       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
159       this.replicationLoad = new ReplicationLoad();
160     } else {
161       this.replicationManager = null;
162       this.replicationQueues = null;
163       this.replicationPeers = null;
164       this.replicationTracker = null;
165       this.replicationLoad = null;
166     }
167   }
168 
169    /**
170     * @param c Configuration to look at
171     * @return True if replication is enabled.
172     */
173   public static boolean isReplication(final Configuration c) {
174     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
175   }
176 
177   /**
178    * @param c Configuration to look at
179    * @return True if replication for bulk load data is enabled.
180    */
181   public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
182     return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
183       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
184   }
185 
186    /*
187     * Returns an object to listen to new wal changes
188     **/
189   public WALActionsListener getWALActionsListener() {
190     return this;
191   }
192   /**
193    * Stops replication service.
194    */
195   public void stopReplicationService() {
196     join();
197   }
198 
199   /**
200    * Join with the replication threads
201    */
202   public void join() {
203     if (this.replication) {
204       this.replicationManager.join();
205       if (this.replicationSink != null) {
206         this.replicationSink.stopReplicationSinkServices();
207       }
208     }
209     scheduleThreadPool.shutdown();
210   }
211 
212   /**
213    * Carry on the list of log entries down to the sink
214    * @param entries list of entries to replicate
215    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
216    *          contain the Cells we are replicating; they are passed here on the side in this
217    *          CellScanner).
218    * @param replicationClusterId Id which will uniquely identify source cluster FS client
219    *          configurations in the replication configuration directory
220    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
221    *          directory required for replicating hfiles
222    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
223    * @throws IOException
224    */
225   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
226       String replicationClusterId, String sourceBaseNamespaceDirPath,
227       String sourceHFileArchiveDirPath) throws IOException {
228     if (this.replication) {
229       this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
230         sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
231     }
232   }
233 
234   /**
235    * If replication is enabled and this cluster is a master,
236    * it starts
237    * @throws IOException
238    */
239   public void startReplicationService() throws IOException {
240     if (this.replication) {
241       try {
242         this.replicationManager.init();
243       } catch (ReplicationException e) {
244         throw new IOException(e);
245       }
246       this.replicationSink = new ReplicationSink(this.conf, this.server);
247       this.scheduleThreadPool.scheduleAtFixedRate(
248         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
249         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
250     }
251   }
252 
253   /**
254    * Get the replication sources manager
255    * @return the manager if replication is enabled, else returns false
256    */
257   public ReplicationSourceManager getReplicationManager() {
258     return this.replicationManager;
259   }
260 
261   @Override
262   public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
263       throws IOException {
264     scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
265   }
266 
267   /**
268    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
269    * compaction WAL edits and if the scope is local.
270    * @param htd Descriptor used to find the scope to use
271    * @param logKey Key that may get scoped according to its edits
272    * @param logEdit Edits used to lookup the scopes
273    * @param replicationManager Manager used to add bulk load events hfile references
274    * @throws IOException If failed to parse the WALEdit
275    */
276   public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
277       Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
278     NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
279     byte[] family;
280     boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
281     for (Cell cell : logEdit.getCells()) {
282       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
283         if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
284           scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
285         } else {
286           // Skip the flush/compaction/region events
287           continue;
288         }
289       } else {
290         family = CellUtil.cloneFamily(cell);
291         // Unexpected, has a tendency to happen in unit tests
292         assert htd.getFamily(family) != null;
293 
294         if (!scopes.containsKey(family)) {
295           int scope = htd.getFamily(family).getScope();
296           if (scope != REPLICATION_SCOPE_LOCAL) {
297             scopes.put(family, scope);
298           }
299         }
300       }
301     }
302     if (!scopes.isEmpty()) {
303       logKey.setScopes(scopes);
304     }
305   }
306 
307   private static void scopeBulkLoadEdits(HTableDescriptor htd,
308       ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
309       TableName tableName, Cell cell) throws IOException {
310     byte[] family;
311     try {
312       BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
313       for (StoreDescriptor s : bld.getStoresList()) {
314         family = s.getFamilyName().toByteArray();
315         if (!scopes.containsKey(family)) {
316           int scope = htd.getFamily(family).getScope();
317           if (scope != REPLICATION_SCOPE_LOCAL) {
318             scopes.put(family, scope);
319           }
320         }
321       }
322     } catch (IOException e) {
323       LOG.error("Failed to get bulk load events information from the wal file.", e);
324       throw e;
325     }
326   }
327 
328   void addHFileRefsToQueue(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
329       throws IOException {
330     try {
331       this.replicationManager.addHFileRefs(tableName, family, pairs);
332     } catch (ReplicationException e) {
333       LOG.error("Failed to add hfile references in the replication queue.", e);
334       throw new IOException(e);
335     }
336   }
337 
338   @Override
339   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
340     getReplicationManager().preLogRoll(newPath);
341   }
342 
343   @Override
344   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
345     getReplicationManager().postLogRoll(newPath);
346   }
347 
348   /**
349    * This method modifies the master's configuration in order to inject replication-related features
350    * @param conf
351    */
352   public static void decorateMasterConfiguration(Configuration conf) {
353     if (!isReplication(conf)) {
354       return;
355     }
356     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
357     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
358     if (!plugins.contains(cleanerClass)) {
359       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
360     }
361     if (isReplicationForBulkLoadDataEnabled(conf)) {
362       plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
363       cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
364       if (!plugins.contains(cleanerClass)) {
365         conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
366       }
367     }
368   }
369 
370   /**
371    * This method modifies the region server's configuration in order to inject replication-related
372    * features
373    * @param conf region server configurations
374    */
375   public static void decorateRegionServerConfiguration(Configuration conf) {
376     if (isReplicationForBulkLoadDataEnabled(conf)) {
377       String plugins = conf.get(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, "");
378       String rsCoprocessorClass = ReplicationObserver.class.getCanonicalName();
379       if (!plugins.contains(rsCoprocessorClass)) {
380         conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, plugins + ","
381             + rsCoprocessorClass);
382       }
383     }
384   }
385 
386   /*
387    * Statistics thread. Periodically prints the cache statistics to the log.
388    */
389   static class ReplicationStatisticsThread extends Thread {
390 
391     private final ReplicationSink replicationSink;
392     private final ReplicationSourceManager replicationManager;
393 
394     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
395                             final ReplicationSourceManager replicationManager) {
396       super("ReplicationStatisticsThread");
397       this.replicationManager = replicationManager;
398       this.replicationSink = replicationSink;
399     }
400 
401     @Override
402     public void run() {
403       printStats(this.replicationManager.getStats());
404       printStats(this.replicationSink.getStats());
405     }
406 
407     private void printStats(String stats) {
408       if (!stats.isEmpty()) {
409         LOG.info(stats);
410       }
411     }
412   }
413 
414   @Override
415   public ReplicationLoad refreshAndGetReplicationLoad() {
416     if (this.replicationLoad == null) {
417       return null;
418     }
419     // always build for latest data
420     buildReplicationLoad();
421     return this.replicationLoad;
422   }
423 
424   private void buildReplicationLoad() {
425     // get source
426     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
427     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
428 
429     for (ReplicationSourceInterface source : sources) {
430       if (source instanceof ReplicationSource) {
431         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
432       }
433     }
434     // get sink
435     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
436     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
437   }
438 }