View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client.replication;
20  
21  import java.io.Closeable;
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.Set;
32  import java.util.TreeMap;
33  
34  import org.apache.commons.lang.StringUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Abortable;
41  import org.apache.hadoop.hbase.HColumnDescriptor;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.TableNotFoundException;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.hbase.client.Admin;
49  import org.apache.hadoop.hbase.client.HBaseAdmin;
50  import org.apache.hadoop.hbase.client.Connection;
51  import org.apache.hadoop.hbase.client.ConnectionFactory;
52  import org.apache.hadoop.hbase.client.RegionLocator;
53  import org.apache.hadoop.hbase.replication.ReplicationException;
54  import org.apache.hadoop.hbase.replication.ReplicationFactory;
55  import org.apache.hadoop.hbase.replication.ReplicationPeer;
56  import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
57  import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
58  import org.apache.hadoop.hbase.replication.ReplicationPeers;
59  import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62  import org.apache.zookeeper.KeeperException;
63  import org.apache.zookeeper.data.Stat;
64  
65  import com.google.common.annotations.VisibleForTesting;
66  import com.google.common.collect.Lists;
67  
68  import com.google.common.annotations.VisibleForTesting;
69  import com.google.common.collect.Lists;
70  
71  /**
72   * <p>
73   * This class provides the administrative interface to HBase cluster
74   * replication. In order to use it, the cluster and the client using
75   * ReplicationAdmin must be configured with <code>hbase.replication</code>
76   * set to true.
77   * </p>
78   * <p>
79   * Adding a new peer results in creating new outbound connections from every
80   * region server to a subset of region servers on the slave cluster. Each
81   * new stream of replication will start replicating from the beginning of the
82   * current WAL, meaning that edits from that past will be replicated.
83   * </p>
84   * <p>
85   * Removing a peer is a destructive and irreversible operation that stops
86   * all the replication streams for the given cluster and deletes the metadata
87   * used to keep track of the replication state.
88   * </p>
89   * <p>
90   * To see which commands are available in the shell, type
91   * <code>replication</code>.
92   * </p>
93   */
94  @InterfaceAudience.Public
95  @InterfaceStability.Evolving
96  public class ReplicationAdmin implements Closeable {
97    private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
98  
99    public static final String TNAME = "tableName";
100   public static final String CFNAME = "columnFamlyName";
101 
102   // only Global for now, can add other type
103   // such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
104   public static final String REPLICATIONTYPE = "replicationType";
105   public static final String REPLICATIONGLOBAL = Integer
106       .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
107 
108   private final Connection connection;
109   // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
110   // be moved to hbase-server. Resolve it in HBASE-11392.
111   private final ReplicationQueuesClient replicationQueuesClient;
112   private final ReplicationPeers replicationPeers;
113   /**
114    * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
115    * on {@link #close()}.
116    */
117   private final ZooKeeperWatcher zkw;
118 
119   /**
120    * Constructor that creates a connection to the local ZooKeeper ensemble.
121    * @param conf Configuration to use
122    * @throws IOException if an internal replication error occurs
123    * @throws RuntimeException if replication isn't enabled.
124    */
125   public ReplicationAdmin(Configuration conf) throws IOException {
126     if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
127         HConstants.REPLICATION_ENABLE_DEFAULT)) {
128       throw new RuntimeException("hbase.replication isn't true, please " +
129           "enable it in order to use replication");
130     }
131     this.connection = ConnectionFactory.createConnection(conf);
132     try {
133       zkw = createZooKeeperWatcher();
134       try {
135         this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
136         this.replicationPeers.init();
137         this.replicationQueuesClient =
138             ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
139         this.replicationQueuesClient.init();
140       } catch (Exception exception) {
141         if (zkw != null) {
142           zkw.close();
143         }
144         throw exception;
145       }
146     } catch (Exception exception) {
147       if (connection != null) {
148         connection.close();
149       }
150       if (exception instanceof IOException) {
151         throw (IOException) exception;
152       } else if (exception instanceof RuntimeException) {
153         throw (RuntimeException) exception;
154       } else {
155         throw new IOException("Error initializing the replication admin client.", exception);
156       }
157     }
158   }
159 
160   private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
161     // This Abortable doesn't 'abort'... it just logs.
162     return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
163       @Override
164       public void abort(String why, Throwable e) {
165         LOG.error(why, e);
166         // We used to call system.exit here but this script can be embedded by other programs that
167         // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
168       }
169 
170       @Override
171       public boolean isAborted() {
172         return false;
173       }
174     });
175   }
176 
177   /**
178    * Add a new peer cluster to replicate to.
179    * @param id a short name that identifies the cluster
180    * @param clusterKey the concatenation of the slave cluster's
181    * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
182    * @throws IllegalStateException if there's already one slave since
183    * multi-slave isn't supported yet.
184    * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
185    */
186   @Deprecated
187   public void addPeer(String id, String clusterKey) throws ReplicationException {
188     this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
189   }
190 
191   @Deprecated
192   public void addPeer(String id, String clusterKey, String tableCFs)
193     throws ReplicationException {
194     this.replicationPeers.addPeer(id,
195       new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
196   }
197 
198   /**
199    * Add a new remote slave cluster for replication.
200    * @param id a short name that identifies the cluster
201    * @param peerConfig configuration for the replication slave cluster
202    * @param tableCfs the table and column-family list which will be replicated for this peer.
203    * A map from tableName to column family names. An empty collection can be passed
204    * to indicate replicating all column families. Pass null for replicating all table and column
205    * families
206    */
207   public void addPeer(String id, ReplicationPeerConfig peerConfig,
208       Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
209     this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
210   }
211 
212   public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
213     if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
214       return null;
215     }
216 
217     Map<TableName, List<String>> tableCFsMap = null;
218     // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
219     // parse out (table, cf-list) pairs from tableCFsConfig
220     // format: "table1:cf1,cf2;table2:cfA,cfB"
221     String[] tables = tableCFsConfig.split(";");
222     for (String tab : tables) {
223       // 1 ignore empty table config
224       tab = tab.trim();
225       if (tab.length() == 0) {
226         continue;
227       }
228       // 2 split to "table" and "cf1,cf2"
229       //   for each table: "table:cf1,cf2" or "table"
230       String[] pair = tab.split(":");
231       String tabName = pair[0].trim();
232       if (pair.length > 2 || tabName.length() == 0) {
233         LOG.error("ignore invalid tableCFs setting: " + tab);
234         continue;
235       }
236 
237       // 3 parse "cf1,cf2" part to List<cf>
238       List<String> cfs = null;
239       if (pair.length == 2) {
240         String[] cfsList = pair[1].split(",");
241         for (String cf : cfsList) {
242           String cfName = cf.trim();
243           if (cfName.length() > 0) {
244             if (cfs == null) {
245               cfs = new ArrayList<String>();
246             }
247             cfs.add(cfName);
248           }
249         }
250       }
251 
252       // 4 put <table, List<cf>> to map
253       if (tableCFsMap == null) {
254         tableCFsMap = new HashMap<TableName, List<String>>();
255       }
256       tableCFsMap.put(TableName.valueOf(tabName), cfs);
257     }
258     return tableCFsMap;
259   }
260 
261   @VisibleForTesting
262   static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
263     String tableCfsStr = null;
264     if (tableCfs != null) {
265       // Format: table1:cf1,cf2;table2:cfA,cfB;table3
266       StringBuilder builder = new StringBuilder();
267       for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
268         if (builder.length() > 0) {
269           builder.append(";");
270         }
271         builder.append(entry.getKey());
272         if (entry.getValue() != null && !entry.getValue().isEmpty()) {
273           builder.append(":");
274           builder.append(StringUtils.join(entry.getValue(), ","));
275         }
276       }
277       tableCfsStr = builder.toString();
278     }
279     return tableCfsStr;
280   }
281 
282   /**
283    * Removes a peer cluster and stops the replication to it.
284    * @param id a short name that identifies the cluster
285    */
286   public void removePeer(String id) throws ReplicationException {
287     this.replicationPeers.removePeer(id);
288   }
289 
290   /**
291    * Restart the replication stream to the specified peer.
292    * @param id a short name that identifies the cluster
293    */
294   public void enablePeer(String id) throws ReplicationException {
295     this.replicationPeers.enablePeer(id);
296   }
297 
298   /**
299    * Stop the replication stream to the specified peer.
300    * @param id a short name that identifies the cluster
301    */
302   public void disablePeer(String id) throws ReplicationException {
303     this.replicationPeers.disablePeer(id);
304   }
305 
306   /**
307    * Get the number of slave clusters the local cluster has.
308    * @return number of slave clusters
309    */
310   public int getPeersCount() {
311     return this.replicationPeers.getAllPeerIds().size();
312   }
313 
314   /**
315    * Map of this cluster's peers for display.
316    * @return A map of peer ids to peer cluster keys
317    * @deprecated use {@link #listPeerConfigs()}
318    */
319   @Deprecated
320   public Map<String, String> listPeers() {
321     Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
322     Map<String, String> ret = new HashMap<String, String>(peers.size());
323 
324     for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
325       ret.put(entry.getKey(), entry.getValue().getClusterKey());
326     }
327     return ret;
328   }
329 
330   public Map<String, ReplicationPeerConfig> listPeerConfigs() {
331     return this.replicationPeers.getAllPeerConfigs();
332   }
333 
334   public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
335     return this.replicationPeers.getReplicationPeerConfig(id);
336   }
337 
338   /**
339    * Get the replicable table-cf config of the specified peer.
340    * @param id a short name that identifies the cluster
341    */
342   public String getPeerTableCFs(String id) throws ReplicationException {
343     return this.replicationPeers.getPeerTableCFsConfig(id);
344   }
345 
346   /**
347    * Set the replicable table-cf config of the specified peer
348    * @param id a short name that identifies the cluster
349    * @deprecated use {@link #setPeerTableCFs(String, Map)}
350    */
351   @Deprecated
352   public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
353     this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
354   }
355 
356   /**
357    * Append the replicable table-cf config of the specified peer
358    * @param id a short that identifies the cluster
359    * @param tableCfs table-cfs config str
360    * @throws KeeperException
361    */
362   public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
363     appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
364   }
365 
366   /**
367    * Append the replicable table-cf config of the specified peer
368    * @param id a short that identifies the cluster
369    * @param tableCfs A map from tableName to column family names
370    * @throws KeeperException
371    */
372   public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
373       throws ReplicationException {
374     if (tableCfs == null) {
375       throw new ReplicationException("tableCfs is null");
376     }
377     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
378     if (preTableCfs == null) {
379       setPeerTableCFs(id, tableCfs);
380       return;
381     }
382 
383     for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
384       TableName table = entry.getKey();
385       Collection<String> appendCfs = entry.getValue();
386       if (preTableCfs.containsKey(table)) {
387         List<String> cfs = preTableCfs.get(table);
388         if (cfs == null || appendCfs == null) {
389           preTableCfs.put(table, null);
390         } else {
391           Set<String> cfSet = new HashSet<String>(cfs);
392           cfSet.addAll(appendCfs);
393           preTableCfs.put(table, Lists.newArrayList(cfSet));
394         }
395       } else {
396         if (appendCfs == null || appendCfs.isEmpty()) {
397           preTableCfs.put(table, null);
398         } else {
399           preTableCfs.put(table, Lists.newArrayList(appendCfs));
400         }
401       }
402     }
403     setPeerTableCFs(id, preTableCfs);
404   }
405 
406   /**
407    * Remove some table-cfs from table-cfs config of the specified peer
408    * @param id a short name that identifies the cluster
409    * @param tableCf table-cfs config str
410    * @throws ReplicationException
411    */
412   public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
413     removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
414   }
415 
416   /**
417    * Remove some table-cfs from config of the specified peer
418    * @param id a short name that identifies the cluster
419    * @param tableCfs A map from tableName to column family names
420    * @throws ReplicationException
421    */
422   public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
423       throws ReplicationException {
424     if (tableCfs == null) {
425       throw new ReplicationException("tableCfs is null");
426     }
427 
428     Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
429     if (preTableCfs == null) {
430       throw new ReplicationException("Table-Cfs for peer" + id + " is null");
431     }
432     for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
433       TableName table = entry.getKey();
434       Collection<String> removeCfs = entry.getValue();
435       if (preTableCfs.containsKey(table)) {
436         List<String> cfs = preTableCfs.get(table);
437         if (cfs == null && removeCfs == null) {
438           preTableCfs.remove(table);
439         } else if (cfs != null && removeCfs != null) {
440           Set<String> cfSet = new HashSet<String>(cfs);
441           cfSet.removeAll(removeCfs);
442           if (cfSet.isEmpty()) {
443             preTableCfs.remove(table);
444           } else {
445             preTableCfs.put(table, Lists.newArrayList(cfSet));
446           }
447         } else if (cfs == null && removeCfs != null) {
448           throw new ReplicationException("Cannot remove cf of table: " + table
449               + " which doesn't specify cfs from table-cfs config in peer: " + id);
450         } else if (cfs != null && removeCfs == null) {
451           throw new ReplicationException("Cannot remove table: " + table
452               + " which has specified cfs from table-cfs config in peer: " + id);
453         }
454       } else {
455         throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
456       }
457     }
458     setPeerTableCFs(id, preTableCfs);
459   }
460 
461   /**
462    * Set the replicable table-cf config of the specified peer
463    * @param id a short name that identifies the cluster
464    * @param tableCfs the table and column-family list which will be replicated for this peer.
465    * A map from tableName to column family names. An empty collection can be passed
466    * to indicate replicating all column families. Pass null for replicating all table and column
467    * families
468    */
469   public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
470       throws ReplicationException {
471     this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
472   }
473 
474   /**
475    * Get the state of the specified peer cluster
476    * @param id String format of the Short name that identifies the peer,
477    * an IllegalArgumentException is thrown if it doesn't exist
478    * @return true if replication is enabled to that peer, false if it isn't
479    */
480   public boolean getPeerState(String id) throws ReplicationException {
481     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
482   }
483 
484   @Override
485   public void close() throws IOException {
486     if (this.zkw != null) {
487       this.zkw.close();
488     }
489     if (this.connection != null) {
490       this.connection.close();
491     }
492   }
493 
494 
495   /**
496    * Find all column families that are replicated from this cluster
497    * @return the full list of the replicated column families of this cluster as:
498    *        tableName, family name, replicationType
499    *
500    * Currently replicationType is Global. In the future, more replication
501    * types may be extended here. For example
502    *  1) the replication may only apply to selected peers instead of all peers
503    *  2) the replicationType may indicate the host Cluster servers as Slave
504    *     for the table:columnFam.
505    */
506   public List<HashMap<String, String>> listReplicated() throws IOException {
507     List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
508 
509     Admin admin = connection.getAdmin();
510     HTableDescriptor[] tables;
511     try {
512       tables = admin.listTables();
513     } finally {
514       if (admin!= null) admin.close();
515     }
516 
517     for (HTableDescriptor table : tables) {
518       HColumnDescriptor[] columns = table.getColumnFamilies();
519       String tableName = table.getNameAsString();
520       for (HColumnDescriptor column : columns) {
521         if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
522           // At this moment, the columfam is replicated to all peers
523           HashMap<String, String> replicationEntry = new HashMap<String, String>();
524           replicationEntry.put(TNAME, tableName);
525           replicationEntry.put(CFNAME, column.getNameAsString());
526           replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
527           replicationColFams.add(replicationEntry);
528         }
529       }
530     }
531 
532     return replicationColFams;
533   }
534 
535   /**
536    * Enable a table's replication switch.
537    * @param tableName name of the table
538    * @throws IOException if a remote or network exception occurs
539    */
540   public void enableTableRep(final TableName tableName) throws IOException {
541     if (tableName == null) {
542       throw new IllegalArgumentException("Table name cannot be null");
543     }
544     try (Admin admin = this.connection.getAdmin()) {
545       if (!admin.tableExists(tableName)) {
546         throw new TableNotFoundException("Table '" + tableName.getNameAsString()
547             + "' does not exists.");
548       }
549     }
550     byte[][] splits = getTableSplitRowKeys(tableName);
551     checkAndSyncTableDescToPeers(tableName, splits);
552     setTableRep(tableName, true);
553   }
554 
555   /**
556    * Disable a table's replication switch.
557    * @param tableName name of the table
558    * @throws IOException if a remote or network exception occurs
559    */
560   public void disableTableRep(final TableName tableName) throws IOException {
561     if (tableName == null) {
562       throw new IllegalArgumentException("Table name is null");
563     }
564     try (Admin admin = this.connection.getAdmin()) {
565       if (!admin.tableExists(tableName)) {
566         throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
567             + "' does not exists.");
568       }
569     }
570     setTableRep(tableName, false);
571   }
572 
573   /**
574    * Get the split row keys of table
575    * @param tableName table name
576    * @return array of split row keys
577    * @throws IOException
578    */
579   private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
580     try (RegionLocator locator = connection.getRegionLocator(tableName);) {
581       byte[][] startKeys = locator.getStartKeys();
582       if (startKeys.length == 1) {
583         return null;
584       }
585       byte[][] splits = new byte[startKeys.length - 1][];
586       for (int i = 1; i < startKeys.length; i++) {
587         splits[i - 1] = startKeys[i];
588       }
589       return splits;
590     }
591   }
592 
593   /**
594    * Connect to peer and check the table descriptor on peer:
595    * <ol>
596    * <li>Create the same table on peer when not exist.</li>
597    * <li>Throw an exception if the table already has replication enabled on any of the column
598    * families.</li>
599    * <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
600    * </ol>
601    * @param tableName name of the table to sync to the peer
602    * @param splits table split keys
603    * @throws IOException
604    */
605   private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
606       throws IOException {
607     List<ReplicationPeer> repPeers = listValidReplicationPeers();
608     if (repPeers == null || repPeers.size() <= 0) {
609       throw new IllegalArgumentException("Found no peer cluster for replication.");
610     }
611     for (ReplicationPeer repPeer : repPeers) {
612       Configuration peerConf = repPeer.getConfiguration();
613       HTableDescriptor localHtd = null;
614       try (Connection conn = ConnectionFactory.createConnection(peerConf);
615           Admin admin = this.connection.getAdmin();
616           Admin repHBaseAdmin = conn.getAdmin()) {
617         localHtd = admin.getTableDescriptor(tableName);
618         HTableDescriptor peerHtd = null;
619         if (!repHBaseAdmin.tableExists(tableName)) {
620           repHBaseAdmin.createTable(localHtd, splits);
621         } else {
622           peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
623           if (peerHtd == null) {
624             throw new IllegalArgumentException("Failed to get table descriptor for table "
625                 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
626           }
627           if (!compareForReplication(peerHtd, localHtd)) {
628             throw new IllegalArgumentException("Table " + tableName.getNameAsString()
629                 + " exists in peer cluster " + repPeer.getId()
630                 + ", but the table descriptors are not same when comapred with source cluster."
631                 + " Thus can not enable the table's replication switch.");
632           }
633         }
634       }
635     }
636   }
637 
638   @VisibleForTesting
639   List<ReplicationPeer> listValidReplicationPeers() {
640     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
641     if (peers == null || peers.size() <= 0) {
642       return null;
643     }
644     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
645     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
646       String peerId = peerEntry.getKey();
647       Stat s = null;
648       try {
649         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
650         Configuration peerConf = pair.getSecond();
651         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
652         s =
653             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
654               null);
655         if (null == s) {
656           LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
657           continue;
658         }
659         validPeers.add(peer);
660       } catch (ReplicationException e) {
661         LOG.warn("Failed to get valid replication peers. "
662             + "Error connecting to peer cluster with peerId=" + peerId);
663         LOG.debug("Failure details to get valid replication peers.", e);
664         continue;
665       } catch (KeeperException e) {
666         LOG.warn("Failed to get valid replication peers. KeeperException code="
667             + e.code().intValue());
668         LOG.debug("Failure details to get valid replication peers.", e);
669         continue;
670       } catch (InterruptedException e) {
671         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
672         LOG.debug("Failure details to get valid replication peers.", e);
673         continue;
674       }
675     }
676     return validPeers;
677   }
678 
679   /**
680    * Set the table's replication switch if the table's replication switch is already not set.
681    * @param tableName name of the table
682    * @param enableRep is replication switch enable or disable
683    * @throws IOException if a remote or network exception occurs
684    */
685   private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
686     Admin admin = null;
687     try {
688       admin = this.connection.getAdmin();
689       HTableDescriptor htd = admin.getTableDescriptor(tableName);
690       ReplicationState currentReplicationState = getTableReplicationState(htd);
691       if (enableRep && currentReplicationState != ReplicationState.ENABLED
692           || !enableRep && currentReplicationState != ReplicationState.DISABLED) {
693         boolean isOnlineSchemaUpdateEnabled =
694             this.connection.getConfiguration()
695                 .getBoolean("hbase.online.schema.update.enable", true);
696         if (!isOnlineSchemaUpdateEnabled) {
697           admin.disableTable(tableName);
698         }
699         for (HColumnDescriptor hcd : htd.getFamilies()) {
700           hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
701               : HConstants.REPLICATION_SCOPE_LOCAL);
702         }
703         admin.modifyTable(tableName, htd);
704         if (!isOnlineSchemaUpdateEnabled) {
705           admin.enableTable(tableName);
706         }
707       }
708     } finally {
709       if (admin != null) {
710         try {
711           admin.close();
712         } catch (IOException e) {
713           LOG.warn("Failed to close admin connection.");
714           LOG.debug("Details on failure to close admin connection.", e);
715         }
716       }
717     }
718   }
719 
720   /**
721    * This enum indicates the current state of the replication for a given table.
722    */
723   private enum ReplicationState {
724     ENABLED, // all column families enabled
725     MIXED, // some column families enabled, some disabled
726     DISABLED // all column families disabled
727   }
728 
729   /**
730    * @param htd table descriptor details for the table to check
731    * @return ReplicationState the current state of the table.
732    */
733   private ReplicationState getTableReplicationState(HTableDescriptor htd) {
734     boolean hasEnabled = false;
735     boolean hasDisabled = false;
736 
737     for (HColumnDescriptor hcd : htd.getFamilies()) {
738       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
739         hasDisabled = true;
740       } else {
741         hasEnabled = true;
742       }
743     }
744 
745     if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
746     if (hasEnabled) return ReplicationState.ENABLED;
747     return ReplicationState.DISABLED;
748   }
749 
750   /**
751    * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method
752    * ensures that the name of table and column-families should match.
753    * @param peerHtd descriptor on peer cluster
754    * @param localHtd - The HTableDescriptor of table from source cluster.
755    * @return true If the name of table and column families match and REPLICATION_SCOPE copied
756    *         successfully. false If there is any mismatch in the names.
757    */
758   private boolean copyReplicationScope(final HTableDescriptor peerHtd,
759       final HTableDescriptor localHtd) {
760     // Copy the REPLICATION_SCOPE only when table names and the names of
761     // Column-Families are same.
762     int result = peerHtd.getTableName().compareTo(localHtd.getTableName());
763 
764     if (result == 0) {
765       Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator();
766       Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator();
767 
768       while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) {
769         HColumnDescriptor remoteHCD = remoteHCDIter.next();
770         HColumnDescriptor localHCD = localHCDIter.next();
771 
772         String remoteHCDName = remoteHCD.getNameAsString();
773         String localHCDName = localHCD.getNameAsString();
774 
775         if (remoteHCDName.equals(localHCDName)) {
776           remoteHCD.setScope(localHCD.getScope());
777         } else {
778           result = -1;
779           break;
780         }
781       }
782       if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) {
783         return false;
784       }
785     }
786 
787     return result == 0;
788   }
789 
790   /**
791    * Compare the contents of the descriptor with another one passed as a parameter for replication
792    * purpose. The REPLICATION_SCOPE field is ignored during comparison.
793    * @param peerHtd descriptor on peer cluster
794    * @param localHtd descriptor on source cluster which needs to be replicated.
795    * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE).
796    * @see java.lang.Object#equals(java.lang.Object)
797    */
798   private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) {
799     if (peerHtd == localHtd) {
800       return true;
801     }
802     if (peerHtd == null) {
803       return false;
804     }
805     boolean result = false;
806 
807     // Create a copy of peer HTD as we need to change its replication
808     // scope to match with the local HTD.
809     HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd);
810 
811     result = copyReplicationScope(peerHtdCopy, localHtd);
812 
813     // If copy was successful, compare the two tables now.
814     if (result) {
815       result = (peerHtdCopy.compareTo(localHtd) == 0);
816     }
817 
818     return result;
819   }
820 }