1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client.replication;
20
21 import java.io.Closeable;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.TreeMap;
33
34 import org.apache.commons.lang.StringUtils;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.Abortable;
41 import org.apache.hadoop.hbase.HColumnDescriptor;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.HTableDescriptor;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.TableNotFoundException;
46 import org.apache.hadoop.hbase.classification.InterfaceAudience;
47 import org.apache.hadoop.hbase.classification.InterfaceStability;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.HBaseAdmin;
50 import org.apache.hadoop.hbase.client.Connection;
51 import org.apache.hadoop.hbase.client.ConnectionFactory;
52 import org.apache.hadoop.hbase.client.RegionLocator;
53 import org.apache.hadoop.hbase.replication.ReplicationException;
54 import org.apache.hadoop.hbase.replication.ReplicationFactory;
55 import org.apache.hadoop.hbase.replication.ReplicationPeer;
56 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
57 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
58 import org.apache.hadoop.hbase.replication.ReplicationPeers;
59 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
62 import org.apache.zookeeper.KeeperException;
63 import org.apache.zookeeper.data.Stat;
64
65 import com.google.common.annotations.VisibleForTesting;
66 import com.google.common.collect.Lists;
67
68 import com.google.common.annotations.VisibleForTesting;
69 import com.google.common.collect.Lists;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @InterfaceAudience.Public
95 @InterfaceStability.Evolving
96 public class ReplicationAdmin implements Closeable {
97 private static final Log LOG = LogFactory.getLog(ReplicationAdmin.class);
98
99 public static final String TNAME = "tableName";
100 public static final String CFNAME = "columnFamlyName";
101
102
103
104 public static final String REPLICATIONTYPE = "replicationType";
105 public static final String REPLICATIONGLOBAL = Integer
106 .toString(HConstants.REPLICATION_SCOPE_GLOBAL);
107
108 private final Connection connection;
109
110
111 private final ReplicationQueuesClient replicationQueuesClient;
112 private final ReplicationPeers replicationPeers;
113
114
115
116
117 private final ZooKeeperWatcher zkw;
118
119
120
121
122
123
124
125 public ReplicationAdmin(Configuration conf) throws IOException {
126 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
127 HConstants.REPLICATION_ENABLE_DEFAULT)) {
128 throw new RuntimeException("hbase.replication isn't true, please " +
129 "enable it in order to use replication");
130 }
131 this.connection = ConnectionFactory.createConnection(conf);
132 try {
133 zkw = createZooKeeperWatcher();
134 try {
135 this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.connection);
136 this.replicationPeers.init();
137 this.replicationQueuesClient =
138 ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
139 this.replicationQueuesClient.init();
140 } catch (Exception exception) {
141 if (zkw != null) {
142 zkw.close();
143 }
144 throw exception;
145 }
146 } catch (Exception exception) {
147 if (connection != null) {
148 connection.close();
149 }
150 if (exception instanceof IOException) {
151 throw (IOException) exception;
152 } else if (exception instanceof RuntimeException) {
153 throw (RuntimeException) exception;
154 } else {
155 throw new IOException("Error initializing the replication admin client.", exception);
156 }
157 }
158 }
159
160 private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
161
162 return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
163 @Override
164 public void abort(String why, Throwable e) {
165 LOG.error(why, e);
166
167
168 }
169
170 @Override
171 public boolean isAborted() {
172 return false;
173 }
174 });
175 }
176
177
178
179
180
181
182
183
184
185
186 @Deprecated
187 public void addPeer(String id, String clusterKey) throws ReplicationException {
188 this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
189 }
190
191 @Deprecated
192 public void addPeer(String id, String clusterKey, String tableCFs)
193 throws ReplicationException {
194 this.replicationPeers.addPeer(id,
195 new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
196 }
197
198
199
200
201
202
203
204
205
206
207 public void addPeer(String id, ReplicationPeerConfig peerConfig,
208 Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
209 this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
210 }
211
212 public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
213 if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
214 return null;
215 }
216
217 Map<TableName, List<String>> tableCFsMap = null;
218
219
220
221 String[] tables = tableCFsConfig.split(";");
222 for (String tab : tables) {
223
224 tab = tab.trim();
225 if (tab.length() == 0) {
226 continue;
227 }
228
229
230 String[] pair = tab.split(":");
231 String tabName = pair[0].trim();
232 if (pair.length > 2 || tabName.length() == 0) {
233 LOG.error("ignore invalid tableCFs setting: " + tab);
234 continue;
235 }
236
237
238 List<String> cfs = null;
239 if (pair.length == 2) {
240 String[] cfsList = pair[1].split(",");
241 for (String cf : cfsList) {
242 String cfName = cf.trim();
243 if (cfName.length() > 0) {
244 if (cfs == null) {
245 cfs = new ArrayList<String>();
246 }
247 cfs.add(cfName);
248 }
249 }
250 }
251
252
253 if (tableCFsMap == null) {
254 tableCFsMap = new HashMap<TableName, List<String>>();
255 }
256 tableCFsMap.put(TableName.valueOf(tabName), cfs);
257 }
258 return tableCFsMap;
259 }
260
261 @VisibleForTesting
262 static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
263 String tableCfsStr = null;
264 if (tableCfs != null) {
265
266 StringBuilder builder = new StringBuilder();
267 for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
268 if (builder.length() > 0) {
269 builder.append(";");
270 }
271 builder.append(entry.getKey());
272 if (entry.getValue() != null && !entry.getValue().isEmpty()) {
273 builder.append(":");
274 builder.append(StringUtils.join(entry.getValue(), ","));
275 }
276 }
277 tableCfsStr = builder.toString();
278 }
279 return tableCfsStr;
280 }
281
282
283
284
285
286 public void removePeer(String id) throws ReplicationException {
287 this.replicationPeers.removePeer(id);
288 }
289
290
291
292
293
294 public void enablePeer(String id) throws ReplicationException {
295 this.replicationPeers.enablePeer(id);
296 }
297
298
299
300
301
302 public void disablePeer(String id) throws ReplicationException {
303 this.replicationPeers.disablePeer(id);
304 }
305
306
307
308
309
310 public int getPeersCount() {
311 return this.replicationPeers.getAllPeerIds().size();
312 }
313
314
315
316
317
318
319 @Deprecated
320 public Map<String, String> listPeers() {
321 Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
322 Map<String, String> ret = new HashMap<String, String>(peers.size());
323
324 for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
325 ret.put(entry.getKey(), entry.getValue().getClusterKey());
326 }
327 return ret;
328 }
329
330 public Map<String, ReplicationPeerConfig> listPeerConfigs() {
331 return this.replicationPeers.getAllPeerConfigs();
332 }
333
334 public ReplicationPeerConfig getPeerConfig(String id) throws ReplicationException {
335 return this.replicationPeers.getReplicationPeerConfig(id);
336 }
337
338
339
340
341
342 public String getPeerTableCFs(String id) throws ReplicationException {
343 return this.replicationPeers.getPeerTableCFsConfig(id);
344 }
345
346
347
348
349
350
351 @Deprecated
352 public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
353 this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
354 }
355
356
357
358
359
360
361
362 public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
363 appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
364 }
365
366
367
368
369
370
371
372 public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
373 throws ReplicationException {
374 if (tableCfs == null) {
375 throw new ReplicationException("tableCfs is null");
376 }
377 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
378 if (preTableCfs == null) {
379 setPeerTableCFs(id, tableCfs);
380 return;
381 }
382
383 for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
384 TableName table = entry.getKey();
385 Collection<String> appendCfs = entry.getValue();
386 if (preTableCfs.containsKey(table)) {
387 List<String> cfs = preTableCfs.get(table);
388 if (cfs == null || appendCfs == null) {
389 preTableCfs.put(table, null);
390 } else {
391 Set<String> cfSet = new HashSet<String>(cfs);
392 cfSet.addAll(appendCfs);
393 preTableCfs.put(table, Lists.newArrayList(cfSet));
394 }
395 } else {
396 if (appendCfs == null || appendCfs.isEmpty()) {
397 preTableCfs.put(table, null);
398 } else {
399 preTableCfs.put(table, Lists.newArrayList(appendCfs));
400 }
401 }
402 }
403 setPeerTableCFs(id, preTableCfs);
404 }
405
406
407
408
409
410
411
412 public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
413 removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
414 }
415
416
417
418
419
420
421
422 public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
423 throws ReplicationException {
424 if (tableCfs == null) {
425 throw new ReplicationException("tableCfs is null");
426 }
427
428 Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
429 if (preTableCfs == null) {
430 throw new ReplicationException("Table-Cfs for peer" + id + " is null");
431 }
432 for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
433 TableName table = entry.getKey();
434 Collection<String> removeCfs = entry.getValue();
435 if (preTableCfs.containsKey(table)) {
436 List<String> cfs = preTableCfs.get(table);
437 if (cfs == null && removeCfs == null) {
438 preTableCfs.remove(table);
439 } else if (cfs != null && removeCfs != null) {
440 Set<String> cfSet = new HashSet<String>(cfs);
441 cfSet.removeAll(removeCfs);
442 if (cfSet.isEmpty()) {
443 preTableCfs.remove(table);
444 } else {
445 preTableCfs.put(table, Lists.newArrayList(cfSet));
446 }
447 } else if (cfs == null && removeCfs != null) {
448 throw new ReplicationException("Cannot remove cf of table: " + table
449 + " which doesn't specify cfs from table-cfs config in peer: " + id);
450 } else if (cfs != null && removeCfs == null) {
451 throw new ReplicationException("Cannot remove table: " + table
452 + " which has specified cfs from table-cfs config in peer: " + id);
453 }
454 } else {
455 throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
456 }
457 }
458 setPeerTableCFs(id, preTableCfs);
459 }
460
461
462
463
464
465
466
467
468
469 public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
470 throws ReplicationException {
471 this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
472 }
473
474
475
476
477
478
479
480 public boolean getPeerState(String id) throws ReplicationException {
481 return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
482 }
483
484 @Override
485 public void close() throws IOException {
486 if (this.zkw != null) {
487 this.zkw.close();
488 }
489 if (this.connection != null) {
490 this.connection.close();
491 }
492 }
493
494
495
496
497
498
499
500
501
502
503
504
505
506 public List<HashMap<String, String>> listReplicated() throws IOException {
507 List<HashMap<String, String>> replicationColFams = new ArrayList<HashMap<String, String>>();
508
509 Admin admin = connection.getAdmin();
510 HTableDescriptor[] tables;
511 try {
512 tables = admin.listTables();
513 } finally {
514 if (admin!= null) admin.close();
515 }
516
517 for (HTableDescriptor table : tables) {
518 HColumnDescriptor[] columns = table.getColumnFamilies();
519 String tableName = table.getNameAsString();
520 for (HColumnDescriptor column : columns) {
521 if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
522
523 HashMap<String, String> replicationEntry = new HashMap<String, String>();
524 replicationEntry.put(TNAME, tableName);
525 replicationEntry.put(CFNAME, column.getNameAsString());
526 replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
527 replicationColFams.add(replicationEntry);
528 }
529 }
530 }
531
532 return replicationColFams;
533 }
534
535
536
537
538
539
540 public void enableTableRep(final TableName tableName) throws IOException {
541 if (tableName == null) {
542 throw new IllegalArgumentException("Table name cannot be null");
543 }
544 try (Admin admin = this.connection.getAdmin()) {
545 if (!admin.tableExists(tableName)) {
546 throw new TableNotFoundException("Table '" + tableName.getNameAsString()
547 + "' does not exists.");
548 }
549 }
550 byte[][] splits = getTableSplitRowKeys(tableName);
551 checkAndSyncTableDescToPeers(tableName, splits);
552 setTableRep(tableName, true);
553 }
554
555
556
557
558
559
560 public void disableTableRep(final TableName tableName) throws IOException {
561 if (tableName == null) {
562 throw new IllegalArgumentException("Table name is null");
563 }
564 try (Admin admin = this.connection.getAdmin()) {
565 if (!admin.tableExists(tableName)) {
566 throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
567 + "' does not exists.");
568 }
569 }
570 setTableRep(tableName, false);
571 }
572
573
574
575
576
577
578
579 private byte[][] getTableSplitRowKeys(TableName tableName) throws IOException {
580 try (RegionLocator locator = connection.getRegionLocator(tableName);) {
581 byte[][] startKeys = locator.getStartKeys();
582 if (startKeys.length == 1) {
583 return null;
584 }
585 byte[][] splits = new byte[startKeys.length - 1][];
586 for (int i = 1; i < startKeys.length; i++) {
587 splits[i - 1] = startKeys[i];
588 }
589 return splits;
590 }
591 }
592
593
594
595
596
597
598
599
600
601
602
603
604
605 private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
606 throws IOException {
607 List<ReplicationPeer> repPeers = listValidReplicationPeers();
608 if (repPeers == null || repPeers.size() <= 0) {
609 throw new IllegalArgumentException("Found no peer cluster for replication.");
610 }
611 for (ReplicationPeer repPeer : repPeers) {
612 Configuration peerConf = repPeer.getConfiguration();
613 HTableDescriptor localHtd = null;
614 try (Connection conn = ConnectionFactory.createConnection(peerConf);
615 Admin admin = this.connection.getAdmin();
616 Admin repHBaseAdmin = conn.getAdmin()) {
617 localHtd = admin.getTableDescriptor(tableName);
618 HTableDescriptor peerHtd = null;
619 if (!repHBaseAdmin.tableExists(tableName)) {
620 repHBaseAdmin.createTable(localHtd, splits);
621 } else {
622 peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
623 if (peerHtd == null) {
624 throw new IllegalArgumentException("Failed to get table descriptor for table "
625 + tableName.getNameAsString() + " from peer cluster " + repPeer.getId());
626 }
627 if (!compareForReplication(peerHtd, localHtd)) {
628 throw new IllegalArgumentException("Table " + tableName.getNameAsString()
629 + " exists in peer cluster " + repPeer.getId()
630 + ", but the table descriptors are not same when comapred with source cluster."
631 + " Thus can not enable the table's replication switch.");
632 }
633 }
634 }
635 }
636 }
637
638 @VisibleForTesting
639 List<ReplicationPeer> listValidReplicationPeers() {
640 Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
641 if (peers == null || peers.size() <= 0) {
642 return null;
643 }
644 List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
645 for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
646 String peerId = peerEntry.getKey();
647 Stat s = null;
648 try {
649 Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
650 Configuration peerConf = pair.getSecond();
651 ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
652 s =
653 zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
654 null);
655 if (null == s) {
656 LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
657 continue;
658 }
659 validPeers.add(peer);
660 } catch (ReplicationException e) {
661 LOG.warn("Failed to get valid replication peers. "
662 + "Error connecting to peer cluster with peerId=" + peerId);
663 LOG.debug("Failure details to get valid replication peers.", e);
664 continue;
665 } catch (KeeperException e) {
666 LOG.warn("Failed to get valid replication peers. KeeperException code="
667 + e.code().intValue());
668 LOG.debug("Failure details to get valid replication peers.", e);
669 continue;
670 } catch (InterruptedException e) {
671 LOG.warn("Failed to get valid replication peers due to InterruptedException.");
672 LOG.debug("Failure details to get valid replication peers.", e);
673 continue;
674 }
675 }
676 return validPeers;
677 }
678
679
680
681
682
683
684
685 private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
686 Admin admin = null;
687 try {
688 admin = this.connection.getAdmin();
689 HTableDescriptor htd = admin.getTableDescriptor(tableName);
690 ReplicationState currentReplicationState = getTableReplicationState(htd);
691 if (enableRep && currentReplicationState != ReplicationState.ENABLED
692 || !enableRep && currentReplicationState != ReplicationState.DISABLED) {
693 boolean isOnlineSchemaUpdateEnabled =
694 this.connection.getConfiguration()
695 .getBoolean("hbase.online.schema.update.enable", true);
696 if (!isOnlineSchemaUpdateEnabled) {
697 admin.disableTable(tableName);
698 }
699 for (HColumnDescriptor hcd : htd.getFamilies()) {
700 hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
701 : HConstants.REPLICATION_SCOPE_LOCAL);
702 }
703 admin.modifyTable(tableName, htd);
704 if (!isOnlineSchemaUpdateEnabled) {
705 admin.enableTable(tableName);
706 }
707 }
708 } finally {
709 if (admin != null) {
710 try {
711 admin.close();
712 } catch (IOException e) {
713 LOG.warn("Failed to close admin connection.");
714 LOG.debug("Details on failure to close admin connection.", e);
715 }
716 }
717 }
718 }
719
720
721
722
723 private enum ReplicationState {
724 ENABLED,
725 MIXED,
726 DISABLED
727 }
728
729
730
731
732
733 private ReplicationState getTableReplicationState(HTableDescriptor htd) {
734 boolean hasEnabled = false;
735 boolean hasDisabled = false;
736
737 for (HColumnDescriptor hcd : htd.getFamilies()) {
738 if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
739 hasDisabled = true;
740 } else {
741 hasEnabled = true;
742 }
743 }
744
745 if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
746 if (hasEnabled) return ReplicationState.ENABLED;
747 return ReplicationState.DISABLED;
748 }
749
750
751
752
753
754
755
756
757
758 private boolean copyReplicationScope(final HTableDescriptor peerHtd,
759 final HTableDescriptor localHtd) {
760
761
762 int result = peerHtd.getTableName().compareTo(localHtd.getTableName());
763
764 if (result == 0) {
765 Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator();
766 Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator();
767
768 while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) {
769 HColumnDescriptor remoteHCD = remoteHCDIter.next();
770 HColumnDescriptor localHCD = localHCDIter.next();
771
772 String remoteHCDName = remoteHCD.getNameAsString();
773 String localHCDName = localHCD.getNameAsString();
774
775 if (remoteHCDName.equals(localHCDName)) {
776 remoteHCD.setScope(localHCD.getScope());
777 } else {
778 result = -1;
779 break;
780 }
781 }
782 if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) {
783 return false;
784 }
785 }
786
787 return result == 0;
788 }
789
790
791
792
793
794
795
796
797
798 private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) {
799 if (peerHtd == localHtd) {
800 return true;
801 }
802 if (peerHtd == null) {
803 return false;
804 }
805 boolean result = false;
806
807
808
809 HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd);
810
811 result = copyReplicationScope(peerHtdCopy, localHtd);
812
813
814 if (result) {
815 result = (peerHtdCopy.compareTo(localHtd) == 0);
816 }
817
818 return result;
819 }
820 }