1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.HBaseTestingUtility;
26 import org.apache.hadoop.hbase.HColumnDescriptor;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.HTableDescriptor;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Admin;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.ConnectionFactory;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
37 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
38 import org.junit.AfterClass;
39 import org.junit.BeforeClass;
40
41
42
43
44
45
46
47 public class TestReplicationBase {
48
49
50
51
52
53 private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
54
55 protected static Configuration conf1 = HBaseConfiguration.create();
56 protected static Configuration conf2;
57 protected static Configuration CONF_WITH_LOCALFS;
58
59 protected static ZooKeeperWatcher zkw1;
60 protected static ZooKeeperWatcher zkw2;
61
62 protected static ReplicationAdmin admin;
63
64 protected static Table htable1;
65 protected static Table htable2;
66
67 protected static HBaseTestingUtility utility1;
68 protected static HBaseTestingUtility utility2;
69 protected static final int NB_ROWS_IN_BATCH = 100;
70 protected static final int NB_ROWS_IN_BIG_BATCH =
71 NB_ROWS_IN_BATCH * 10;
72 protected static final long SLEEP_TIME = 500;
73 protected static final int NB_RETRIES = 10;
74
75 protected static final TableName tableName = TableName.valueOf("test");
76 protected static final byte[] famName = Bytes.toBytes("f");
77 protected static final byte[] row = Bytes.toBytes("row");
78 protected static final byte[] noRepfamName = Bytes.toBytes("norep");
79
80
81
82
83 @BeforeClass
84 public static void setUpBeforeClass() throws Exception {
85 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
86
87 conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
88 conf1.setInt("replication.source.size.capacity", 10240);
89 conf1.setLong("replication.source.sleepforretries", 100);
90 conf1.setInt("hbase.regionserver.maxlogs", 10);
91 conf1.setLong("hbase.master.logcleaner.ttl", 10);
92 conf1.setInt("zookeeper.recovery.retry", 1);
93 conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
94 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
95 conf1.setBoolean("dfs.support.append", true);
96 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
97 conf1.setInt("replication.stats.thread.period.seconds", 5);
98 conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
99 conf1.setLong("replication.sleep.before.failover", 2000);
100 conf1.setInt("replication.source.maxretriesmultiplier", 10);
101
102 utility1 = new HBaseTestingUtility(conf1);
103 utility1.startMiniZKCluster();
104 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
105
106
107 conf1 = utility1.getConfiguration();
108 zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
109 admin = new ReplicationAdmin(conf1);
110 LOG.info("Setup first Zk");
111
112
113 conf2 = HBaseConfiguration.create(conf1);
114 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
115 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
116 conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
117 conf2.setBoolean("dfs.support.append", true);
118 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
119
120 utility2 = new HBaseTestingUtility(conf2);
121 utility2.setZkCluster(miniZK);
122 zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
123
124 admin.addPeer("2", utility2.getClusterKey());
125
126 LOG.info("Setup second Zk");
127 CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
128 utility1.startMiniCluster(2);
129 utility2.startMiniCluster(2);
130
131 HTableDescriptor table = new HTableDescriptor(tableName);
132 HColumnDescriptor fam = new HColumnDescriptor(famName);
133 fam.setMaxVersions(3);
134 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
135 table.addFamily(fam);
136 fam = new HColumnDescriptor(noRepfamName);
137 table.addFamily(fam);
138 Connection connection1 = ConnectionFactory.createConnection(conf1);
139 Connection connection2 = ConnectionFactory.createConnection(conf2);
140 try (Admin admin1 = connection1.getAdmin()) {
141 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
142 }
143 try (Admin admin2 = connection2.getAdmin()) {
144 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
145 }
146 utility1.waitUntilAllRegionsAssigned(tableName);
147 utility2.waitUntilAllRegionsAssigned(tableName);
148 htable1 = connection1.getTable(tableName);
149 htable1.setWriteBufferSize(1024);
150 htable2 = connection2.getTable(tableName);
151 }
152
153
154
155
156 @AfterClass
157 public static void tearDownAfterClass() throws Exception {
158 htable2.close();
159 htable1.close();
160 admin.close();
161 utility2.shutdownMiniCluster();
162 utility1.shutdownMiniCluster();
163 }
164 }