View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.security.SecureRandom;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import org.apache.hadoop.hbase.client.Table;
37  import org.apache.hadoop.hbase.util.ByteStringer;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.FileUtil;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.Cell;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.HBaseTestingUtility;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.hbase.testclassification.MediumTests;
51  import org.apache.hadoop.hbase.Stoppable;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.Connection;
54  import org.apache.hadoop.hbase.client.ConnectionFactory;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.RegionLocator;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.ResultScanner;
59  import org.apache.hadoop.hbase.client.Scan;
60  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
62  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
63  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
64  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
65  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.FSUtils;
68  import org.apache.hadoop.hbase.util.HFileTestUtil;
69  import org.junit.AfterClass;
70  import org.junit.Before;
71  import org.junit.BeforeClass;
72  import org.junit.Test;
73  import org.junit.experimental.categories.Category;
74  
75  @Category(MediumTests.class)
76  public class TestReplicationSink {
77    private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
78    private static final int BATCH_SIZE = 10;
79  
80    protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
81  
82    protected static ReplicationSink SINK;
83  
84    protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
85    protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
86  
87    protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
88    protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
89  
90    protected static Table table1;
91    protected static Stoppable STOPPABLE = new Stoppable() {
92      final AtomicBoolean stop = new AtomicBoolean(false);
93  
94      @Override
95      public boolean isStopped() {
96        return this.stop.get();
97      }
98  
99      @Override
100     public void stop(String why) {
101       LOG.info("STOPPING BECAUSE: " + why);
102       this.stop.set(true);
103     }
104 
105   };
106 
107   protected static Table table2;
108   protected static String baseNamespaceDir;
109   protected static String hfileArchiveDir;
110   protected static String replicationClusterId;
111 
112    /**
113    * @throws java.lang.Exception
114    */
115   @BeforeClass
116   public static void setUpBeforeClass() throws Exception {
117     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
118     TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
119         HConstants.REPLICATION_ENABLE_DEFAULT);
120     TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
121       TestSourceFSConfigurationProvider.class.getCanonicalName());
122 
123     TEST_UTIL.startMiniCluster(3);
124     SINK =
125       new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
126     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
127     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
128     Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
129     baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
130     hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
131     replicationClusterId = "12345";
132   }
133 
134   /**
135    * @throws java.lang.Exception
136    */
137   @AfterClass
138   public static void tearDownAfterClass() throws Exception {
139     STOPPABLE.stop("Shutting down");
140     TEST_UTIL.shutdownMiniCluster();
141   }
142 
143   /**
144    * @throws java.lang.Exception
145    */
146   @Before
147   public void setUp() throws Exception {
148     table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
149     table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
150   }
151 
152   /**
153    * Insert a whole batch of entries
154    * @throws Exception
155    */
156   @Test
157   public void testBatchSink() throws Exception {
158     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
159     List<Cell> cells = new ArrayList<Cell>();
160     for(int i = 0; i < BATCH_SIZE; i++) {
161       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
162     }
163     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
164       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
165     Scan scan = new Scan();
166     ResultScanner scanRes = table1.getScanner(scan);
167     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
168   }
169 
170   /**
171    * Insert a mix of puts and deletes
172    * @throws Exception
173    */
174   @Test
175   public void testMixedPutDelete() throws Exception {
176     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
177     List<Cell> cells = new ArrayList<Cell>();
178     for(int i = 0; i < BATCH_SIZE/2; i++) {
179       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
180     }
181     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
182       baseNamespaceDir, hfileArchiveDir);
183 
184     entries = new ArrayList<WALEntry>(BATCH_SIZE);
185     cells = new ArrayList<Cell>();
186     for(int i = 0; i < BATCH_SIZE; i++) {
187       entries.add(createEntry(TABLE_NAME1, i,
188           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
189     }
190 
191     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
192       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
193     Scan scan = new Scan();
194     ResultScanner scanRes = table1.getScanner(scan);
195     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
196   }
197 
198   /**
199    * Insert to 2 different tables
200    * @throws Exception
201    */
202   @Test
203   public void testMixedPutTables() throws Exception {
204     List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
205     List<Cell> cells = new ArrayList<Cell>();
206     for(int i = 0; i < BATCH_SIZE; i++) {
207       entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
208               i, KeyValue.Type.Put, cells));
209     }
210 
211     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
212       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
213     Scan scan = new Scan();
214     ResultScanner scanRes = table2.getScanner(scan);
215     for(Result res : scanRes) {
216       assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
217     }
218   }
219 
220   /**
221    * Insert then do different types of deletes
222    * @throws Exception
223    */
224   @Test
225   public void testMixedDeletes() throws Exception {
226     List<WALEntry> entries = new ArrayList<WALEntry>(3);
227     List<Cell> cells = new ArrayList<Cell>();
228     for(int i = 0; i < 3; i++) {
229       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
230     }
231     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
232       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
233     entries = new ArrayList<WALEntry>(3);
234     cells = new ArrayList<Cell>();
235     entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
236     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
237     entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
238 
239     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
240       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
241 
242     Scan scan = new Scan();
243     ResultScanner scanRes = table1.getScanner(scan);
244     assertEquals(0, scanRes.next(3).length);
245   }
246 
247   /**
248    * Puts are buffered, but this tests when a delete (not-buffered) is applied
249    * before the actual Put that creates it.
250    * @throws Exception
251    */
252   @Test
253   public void testApplyDeleteBeforePut() throws Exception {
254     List<WALEntry> entries = new ArrayList<WALEntry>(5);
255     List<Cell> cells = new ArrayList<Cell>();
256     for(int i = 0; i < 2; i++) {
257       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
258     }
259     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
260     for(int i = 3; i < 5; i++) {
261       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
262     }
263     SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
264       replicationClusterId, baseNamespaceDir, hfileArchiveDir);
265     Get get = new Get(Bytes.toBytes(1));
266     Result res = table1.get(get);
267     assertEquals(0, res.size());
268   }
269 
270   /**
271    * Test replicateEntries with a bulk load entry for 25 HFiles
272    */
273   @Test
274   public void testReplicateEntriesForHFiles() throws Exception {
275     Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
276     Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
277     int numRows = 10;
278 
279     List<Path> p = new ArrayList<>(1);
280 
281     // 1. Generate 25 hfile ranges
282     Random rng = new SecureRandom();
283     Set<Integer> numbers = new HashSet<>();
284     while (numbers.size() < 50) {
285       numbers.add(rng.nextInt(1000));
286     }
287     List<Integer> numberList = new ArrayList<>(numbers);
288     Collections.sort(numberList);
289     Map<String, Long> storeFilesSize = new HashMap<String, Long>(1);
290 
291     // 2. Create 25 hfiles
292     Configuration conf = TEST_UTIL.getConfiguration();
293     FileSystem fs = dir.getFileSystem(conf);
294     Iterator<Integer> numbersItr = numberList.iterator();
295     for (int i = 0; i < 25; i++) {
296       Path hfilePath = new Path(familyDir, "hfile_" + i);
297       HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
298         Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
299       p.add(hfilePath);
300       storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
301     }
302 
303     // 3. Create a BulkLoadDescriptor and a WALEdit
304     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
305     storeFiles.put(FAM_NAME1, p);
306     WALEdit edit = null;
307     WALProtos.BulkLoadDescriptor loadDescriptor = null;
308 
309     try (Connection c = ConnectionFactory.createConnection(conf);
310         RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
311       HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
312       loadDescriptor =
313           ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
314             ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
315       edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
316     }
317     List<WALEntry> entries = new ArrayList<WALEntry>(1);
318 
319     // 4. Create a WALEntryBuilder
320     WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
321 
322     // 5. Copy the hfile to the path as it is in reality
323     for (int i = 0; i < 25; i++) {
324       String pathToHfileFromNS =
325           new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
326               .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
327               .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
328               .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
329               .append("hfile_" + i).toString();
330       String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
331 
332       FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
333     }
334 
335     entries.add(builder.build());
336     ResultScanner scanRes = null;
337     try {
338       Scan scan = new Scan();
339       scanRes = table1.getScanner(scan);
340       // 6. Assert no existing data in table
341       assertEquals(0, scanRes.next(numRows).length);
342       // 7. Replicate the bulk loaded entry
343       SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
344         replicationClusterId, baseNamespaceDir, hfileArchiveDir);
345       scanRes = table1.getScanner(scan);
346       // 8. Assert data is replicated
347       assertEquals(numRows, scanRes.next(numRows).length);
348     } finally {
349       if (scanRes != null) {
350         scanRes.close();
351       }
352     }
353   }
354 
355   private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
356     byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
357     byte[] rowBytes = Bytes.toBytes(row);
358     // Just make sure we don't get the same ts for two consecutive rows with
359     // same key
360     try {
361       Thread.sleep(1);
362     } catch (InterruptedException e) {
363       LOG.info("Was interrupted while sleep, meh", e);
364     }
365     final long now = System.currentTimeMillis();
366     KeyValue kv = null;
367     if(type.getCode() == KeyValue.Type.Put.getCode()) {
368       kv = new KeyValue(rowBytes, fam, fam, now,
369           KeyValue.Type.Put, Bytes.toBytes(row));
370     } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
371         kv = new KeyValue(rowBytes, fam, fam,
372             now, KeyValue.Type.DeleteColumn);
373     } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
374         kv = new KeyValue(rowBytes, fam, null,
375             now, KeyValue.Type.DeleteFamily);
376     }
377     WALEntry.Builder builder = createWALEntryBuilder(table);
378     cells.add(kv);
379 
380     return builder.build();
381   }
382 
383   private WALEntry.Builder createWALEntryBuilder(TableName table) {
384     WALEntry.Builder builder = WALEntry.newBuilder();
385     builder.setAssociatedCellCount(1);
386     WALKey.Builder keyBuilder = WALKey.newBuilder();
387     UUID.Builder uuidBuilder = UUID.newBuilder();
388     uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
389     uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
390     keyBuilder.setClusterId(uuidBuilder.build());
391     keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
392     keyBuilder.setWriteTime(System.currentTimeMillis());
393     keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
394     keyBuilder.setLogSequenceNumber(-1);
395     builder.setKey(keyBuilder.build());
396     return builder;
397   }
398 }