View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  package org.apache.hadoop.hbase.replication;
12  
13  import static org.junit.Assert.assertEquals;
14  
15  import java.io.IOException;
16  import java.util.ArrayList;
17  import java.util.Collections;
18  import java.util.HashSet;
19  import java.util.Iterator;
20  import java.util.List;
21  import java.util.Set;
22  import java.util.UUID;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HConstants;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Table;
31  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
32  import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
33  import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
34  import org.apache.hadoop.hbase.testclassification.LargeTests;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.HFileTestUtil;
37  import org.junit.BeforeClass;
38  import org.junit.experimental.categories.Category;
39  
40  @Category({ LargeTests.class })
41  public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
42  
43    private static final Log LOG = LogFactory
44        .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
45  
46    @BeforeClass
47    public static void setUpBeforeClass() throws Exception {
48      conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
49      conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
50      conf1.set("hbase.replication.source.fs.conf.provider",
51        TestSourceFSConfigurationProvider.class.getCanonicalName());
52      String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
53      if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
54        classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
55        conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
56      }
57  
58      TestReplicationBase.setUpBeforeClass();
59    }
60  
61    @Override
62    public void testSyncUpTool() throws Exception {
63      /**
64       * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
65       * 'cf1' : replicated 'norep': not replicated
66       */
67      setupReplication();
68  
69      /**
70       * Prepare 16 random hfile ranges required for creating hfiles
71       */
72      Iterator<String> randomHFileRangeListIterator = null;
73      Set<String> randomHFileRanges = new HashSet<String>(16);
74      for (int i = 0; i < 16; i++) {
75        randomHFileRanges.add(UUID.randomUUID().toString());
76      }
77      List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
78      Collections.sort(randomHFileRangeList);
79      randomHFileRangeListIterator = randomHFileRangeList.iterator();
80  
81      /**
82       * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
83       * into cf1, and 3 rows into norep verify correctly replicated to slave
84       */
85      loadAndReplicateHFiles(true, randomHFileRangeListIterator);
86  
87      /**
88       * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
89       * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
90       * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
91       * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
92       * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
93       * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
94       * Slave
95       */
96      mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
97  
98    }
99  
100   private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
101       throws Exception {
102     LOG.debug("mimicSyncUpAfterBulkLoad");
103     utility2.shutdownMiniHBaseCluster();
104 
105     loadAndReplicateHFiles(false, randomHFileRangeListIterator);
106 
107     int rowCount_ht1Source = utility1.countRows(ht1Source);
108     assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
109       rowCount_ht1Source);
110 
111     int rowCount_ht2Source = utility1.countRows(ht2Source);
112     assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
113       rowCount_ht2Source);
114 
115     utility1.shutdownMiniHBaseCluster();
116     utility2.restartHBaseCluster(1);
117 
118     Thread.sleep(SLEEP_TIME);
119 
120     // Before sync up
121     int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
122     int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
123     assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
124     assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
125 
126     // Run sync up tool
127     syncUp(utility1);
128 
129     // After syun up
130     for (int i = 0; i < NB_RETRIES; i++) {
131       syncUp(utility1);
132       rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
133       rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
134       if (i == NB_RETRIES - 1) {
135         if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
136           // syncUP still failed. Let's look at the source in case anything wrong there
137           utility1.restartHBaseCluster(1);
138           rowCount_ht1Source = utility1.countRows(ht1Source);
139           LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
140           rowCount_ht2Source = utility1.countRows(ht2Source);
141           LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
142         }
143         assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
144           rowCount_ht1TargetAtPeer1);
145         assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
146           rowCount_ht2TargetAtPeer1);
147       }
148       if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
149         LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
150         break;
151       } else {
152         LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
153             + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
154             + rowCount_ht2TargetAtPeer1);
155       }
156       Thread.sleep(SLEEP_TIME);
157     }
158   }
159 
160   private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
161       Iterator<String> randomHFileRangeListIterator) throws Exception {
162     LOG.debug("loadAndReplicateHFiles");
163 
164     // Load 100 + 3 hfiles to t1_syncup.
165     byte[][][] hfileRanges =
166         new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
167             Bytes.toBytes(randomHFileRangeListIterator.next()) } };
168     loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
169       100);
170 
171     hfileRanges =
172         new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
173             Bytes.toBytes(randomHFileRangeListIterator.next()) } };
174     loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
175       hfileRanges, 3);
176 
177     // Load 200 + 3 hfiles to t2_syncup.
178     hfileRanges =
179         new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
180             Bytes.toBytes(randomHFileRangeListIterator.next()) } };
181     loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
182       200);
183 
184     hfileRanges =
185         new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
186             Bytes.toBytes(randomHFileRangeListIterator.next()) } };
187     loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
188       hfileRanges, 3);
189 
190     if (verifyReplicationOnSlave) {
191       // ensure replication completed
192       wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
193         "t1_syncup has 103 rows on source, and 100 on slave1");
194 
195       wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
196         "t2_syncup has 203 rows on source, and 200 on slave1");
197     }
198   }
199 
200   private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
201       Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
202     Path dir = utility1.getDataTestDirOnTestFS(testName);
203     FileSystem fs = utility1.getTestFileSystem();
204     dir = dir.makeQualified(fs);
205     Path familyDir = new Path(dir, Bytes.toString(fam));
206 
207     int hfileIdx = 0;
208     for (byte[][] range : hfileRanges) {
209       byte[] from = range[0];
210       byte[] to = range[1];
211       HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
212           + hfileIdx++), fam, row, from, to, numOfRows);
213     }
214 
215     final TableName tableName = source.getName();
216     LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
217     String[] args = { dir.toString(), tableName.toString() };
218     loader.run(args);
219   }
220 
221   private void wait(Table target, int expectedCount, String msg) throws IOException,
222       InterruptedException {
223     for (int i = 0; i < NB_RETRIES; i++) {
224       int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
225       if (i == NB_RETRIES - 1) {
226         assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
227       }
228       if (expectedCount == rowCount_ht2TargetAtPeer1) {
229         break;
230       }
231       Thread.sleep(SLEEP_TIME);
232     }
233   }
234 }