1
2
3
4
5
6
7
8
9
10
11 package org.apache.hadoop.hbase.replication;
12
13 import static org.junit.Assert.assertEquals;
14
15 import java.io.IOException;
16 import java.util.ArrayList;
17 import java.util.Collections;
18 import java.util.HashSet;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Set;
22 import java.util.UUID;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Table;
31 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
32 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
33 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
34 import org.apache.hadoop.hbase.testclassification.LargeTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.HFileTestUtil;
37 import org.junit.BeforeClass;
38 import org.junit.experimental.categories.Category;
39
40 @Category({ LargeTests.class })
41 public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
42
43 private static final Log LOG = LogFactory
44 .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
45
46 @BeforeClass
47 public static void setUpBeforeClass() throws Exception {
48 conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
49 conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
50 conf1.set("hbase.replication.source.fs.conf.provider",
51 TestSourceFSConfigurationProvider.class.getCanonicalName());
52 String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
53 if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
54 classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
55 conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
56 }
57
58 TestReplicationBase.setUpBeforeClass();
59 }
60
61 @Override
62 public void testSyncUpTool() throws Exception {
63
64
65
66
67 setupReplication();
68
69
70
71
72 Iterator<String> randomHFileRangeListIterator = null;
73 Set<String> randomHFileRanges = new HashSet<String>(16);
74 for (int i = 0; i < 16; i++) {
75 randomHFileRanges.add(UUID.randomUUID().toString());
76 }
77 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
78 Collections.sort(randomHFileRangeList);
79 randomHFileRangeListIterator = randomHFileRangeList.iterator();
80
81
82
83
84
85 loadAndReplicateHFiles(true, randomHFileRangeListIterator);
86
87
88
89
90
91
92
93
94
95
96 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
97
98 }
99
100 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
101 throws Exception {
102 LOG.debug("mimicSyncUpAfterBulkLoad");
103 utility2.shutdownMiniHBaseCluster();
104
105 loadAndReplicateHFiles(false, randomHFileRangeListIterator);
106
107 int rowCount_ht1Source = utility1.countRows(ht1Source);
108 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
109 rowCount_ht1Source);
110
111 int rowCount_ht2Source = utility1.countRows(ht2Source);
112 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
113 rowCount_ht2Source);
114
115 utility1.shutdownMiniHBaseCluster();
116 utility2.restartHBaseCluster(1);
117
118 Thread.sleep(SLEEP_TIME);
119
120
121 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
122 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
123 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
124 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
125
126
127 syncUp(utility1);
128
129
130 for (int i = 0; i < NB_RETRIES; i++) {
131 syncUp(utility1);
132 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
133 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
134 if (i == NB_RETRIES - 1) {
135 if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
136
137 utility1.restartHBaseCluster(1);
138 rowCount_ht1Source = utility1.countRows(ht1Source);
139 LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
140 rowCount_ht2Source = utility1.countRows(ht2Source);
141 LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
142 }
143 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
144 rowCount_ht1TargetAtPeer1);
145 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
146 rowCount_ht2TargetAtPeer1);
147 }
148 if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
149 LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
150 break;
151 } else {
152 LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
153 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
154 + rowCount_ht2TargetAtPeer1);
155 }
156 Thread.sleep(SLEEP_TIME);
157 }
158 }
159
160 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
161 Iterator<String> randomHFileRangeListIterator) throws Exception {
162 LOG.debug("loadAndReplicateHFiles");
163
164
165 byte[][][] hfileRanges =
166 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
167 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
168 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
169 100);
170
171 hfileRanges =
172 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
173 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
174 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
175 hfileRanges, 3);
176
177
178 hfileRanges =
179 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
180 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
181 loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
182 200);
183
184 hfileRanges =
185 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187 loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
188 hfileRanges, 3);
189
190 if (verifyReplicationOnSlave) {
191
192 wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
193 "t1_syncup has 103 rows on source, and 100 on slave1");
194
195 wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
196 "t2_syncup has 203 rows on source, and 200 on slave1");
197 }
198 }
199
200 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
201 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
202 Path dir = utility1.getDataTestDirOnTestFS(testName);
203 FileSystem fs = utility1.getTestFileSystem();
204 dir = dir.makeQualified(fs);
205 Path familyDir = new Path(dir, Bytes.toString(fam));
206
207 int hfileIdx = 0;
208 for (byte[][] range : hfileRanges) {
209 byte[] from = range[0];
210 byte[] to = range[1];
211 HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
212 + hfileIdx++), fam, row, from, to, numOfRows);
213 }
214
215 final TableName tableName = source.getName();
216 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
217 String[] args = { dir.toString(), tableName.toString() };
218 loader.run(args);
219 }
220
221 private void wait(Table target, int expectedCount, String msg) throws IOException,
222 InterruptedException {
223 for (int i = 0; i < NB_RETRIES; i++) {
224 int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
225 if (i == NB_RETRIES - 1) {
226 assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
227 }
228 if (expectedCount == rowCount_ht2TargetAtPeer1) {
229 break;
230 }
231 Thread.sleep(SLEEP_TIME);
232 }
233 }
234 }