1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23
24 import java.security.SecureRandom;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Random;
33 import java.util.Set;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.ByteStringer;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.FileUtil;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.HBaseTestingUtility;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.testclassification.MediumTests;
51 import org.apache.hadoop.hbase.Stoppable;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.Connection;
54 import org.apache.hadoop.hbase.client.ConnectionFactory;
55 import org.apache.hadoop.hbase.client.Get;
56 import org.apache.hadoop.hbase.client.RegionLocator;
57 import org.apache.hadoop.hbase.client.Result;
58 import org.apache.hadoop.hbase.client.ResultScanner;
59 import org.apache.hadoop.hbase.client.Scan;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
62 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
63 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
64 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
65 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.FSUtils;
68 import org.apache.hadoop.hbase.util.HFileTestUtil;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.experimental.categories.Category;
74
75 @Category(MediumTests.class)
76 public class TestReplicationSink {
77 private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
78 private static final int BATCH_SIZE = 10;
79
80 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
81
82 protected static ReplicationSink SINK;
83
84 protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
85 protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
86
87 protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
88 protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
89
90 protected static Table table1;
91 protected static Stoppable STOPPABLE = new Stoppable() {
92 final AtomicBoolean stop = new AtomicBoolean(false);
93
94 @Override
95 public boolean isStopped() {
96 return this.stop.get();
97 }
98
99 @Override
100 public void stop(String why) {
101 LOG.info("STOPPING BECAUSE: " + why);
102 this.stop.set(true);
103 }
104
105 };
106
107 protected static Table table2;
108 protected static String baseNamespaceDir;
109 protected static String hfileArchiveDir;
110 protected static String replicationClusterId;
111
112
113
114
115 @BeforeClass
116 public static void setUpBeforeClass() throws Exception {
117 TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
118 TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
119 HConstants.REPLICATION_ENABLE_DEFAULT);
120 TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
121 TestSourceFSConfigurationProvider.class.getCanonicalName());
122
123 TEST_UTIL.startMiniCluster(3);
124 SINK =
125 new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
126 table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
127 table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
128 Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
129 baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
130 hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
131 replicationClusterId = "12345";
132 }
133
134
135
136
137 @AfterClass
138 public static void tearDownAfterClass() throws Exception {
139 STOPPABLE.stop("Shutting down");
140 TEST_UTIL.shutdownMiniCluster();
141 }
142
143
144
145
146 @Before
147 public void setUp() throws Exception {
148 table1 = TEST_UTIL.deleteTableData(TABLE_NAME1);
149 table2 = TEST_UTIL.deleteTableData(TABLE_NAME2);
150 }
151
152
153
154
155
156 @Test
157 public void testBatchSink() throws Exception {
158 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE);
159 List<Cell> cells = new ArrayList<Cell>();
160 for(int i = 0; i < BATCH_SIZE; i++) {
161 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
162 }
163 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
164 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
165 Scan scan = new Scan();
166 ResultScanner scanRes = table1.getScanner(scan);
167 assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
168 }
169
170
171
172
173
174 @Test
175 public void testMixedPutDelete() throws Exception {
176 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
177 List<Cell> cells = new ArrayList<Cell>();
178 for(int i = 0; i < BATCH_SIZE/2; i++) {
179 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
180 }
181 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
182 baseNamespaceDir, hfileArchiveDir);
183
184 entries = new ArrayList<WALEntry>(BATCH_SIZE);
185 cells = new ArrayList<Cell>();
186 for(int i = 0; i < BATCH_SIZE; i++) {
187 entries.add(createEntry(TABLE_NAME1, i,
188 i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
189 }
190
191 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
192 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
193 Scan scan = new Scan();
194 ResultScanner scanRes = table1.getScanner(scan);
195 assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
196 }
197
198
199
200
201
202 @Test
203 public void testMixedPutTables() throws Exception {
204 List<WALEntry> entries = new ArrayList<WALEntry>(BATCH_SIZE/2);
205 List<Cell> cells = new ArrayList<Cell>();
206 for(int i = 0; i < BATCH_SIZE; i++) {
207 entries.add(createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
208 i, KeyValue.Type.Put, cells));
209 }
210
211 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
212 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
213 Scan scan = new Scan();
214 ResultScanner scanRes = table2.getScanner(scan);
215 for(Result res : scanRes) {
216 assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
217 }
218 }
219
220
221
222
223
224 @Test
225 public void testMixedDeletes() throws Exception {
226 List<WALEntry> entries = new ArrayList<WALEntry>(3);
227 List<Cell> cells = new ArrayList<Cell>();
228 for(int i = 0; i < 3; i++) {
229 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
230 }
231 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
232 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
233 entries = new ArrayList<WALEntry>(3);
234 cells = new ArrayList<Cell>();
235 entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
236 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
237 entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
238
239 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
240 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
241
242 Scan scan = new Scan();
243 ResultScanner scanRes = table1.getScanner(scan);
244 assertEquals(0, scanRes.next(3).length);
245 }
246
247
248
249
250
251
252 @Test
253 public void testApplyDeleteBeforePut() throws Exception {
254 List<WALEntry> entries = new ArrayList<WALEntry>(5);
255 List<Cell> cells = new ArrayList<Cell>();
256 for(int i = 0; i < 2; i++) {
257 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
258 }
259 entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
260 for(int i = 3; i < 5; i++) {
261 entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
262 }
263 SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
264 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
265 Get get = new Get(Bytes.toBytes(1));
266 Result res = table1.get(get);
267 assertEquals(0, res.size());
268 }
269
270
271
272
273 @Test
274 public void testReplicateEntriesForHFiles() throws Exception {
275 Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
276 Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
277 int numRows = 10;
278
279 List<Path> p = new ArrayList<>(1);
280
281
282 Random rng = new SecureRandom();
283 Set<Integer> numbers = new HashSet<>();
284 while (numbers.size() < 50) {
285 numbers.add(rng.nextInt(1000));
286 }
287 List<Integer> numberList = new ArrayList<>(numbers);
288 Collections.sort(numberList);
289 Map<String, Long> storeFilesSize = new HashMap<String, Long>(1);
290
291
292 Configuration conf = TEST_UTIL.getConfiguration();
293 FileSystem fs = dir.getFileSystem(conf);
294 Iterator<Integer> numbersItr = numberList.iterator();
295 for (int i = 0; i < 25; i++) {
296 Path hfilePath = new Path(familyDir, "hfile_" + i);
297 HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
298 Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
299 p.add(hfilePath);
300 storeFilesSize.put(hfilePath.getName(), fs.getFileStatus(hfilePath).getLen());
301 }
302
303
304 Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
305 storeFiles.put(FAM_NAME1, p);
306 WALEdit edit = null;
307 WALProtos.BulkLoadDescriptor loadDescriptor = null;
308
309 try (Connection c = ConnectionFactory.createConnection(conf);
310 RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
311 HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
312 loadDescriptor =
313 ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
314 ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1);
315 edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
316 }
317 List<WALEntry> entries = new ArrayList<WALEntry>(1);
318
319
320 WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
321
322
323 for (int i = 0; i < 25; i++) {
324 String pathToHfileFromNS =
325 new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
326 .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
327 .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
328 .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
329 .append("hfile_" + i).toString();
330 String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
331
332 FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
333 }
334
335 entries.add(builder.build());
336 ResultScanner scanRes = null;
337 try {
338 Scan scan = new Scan();
339 scanRes = table1.getScanner(scan);
340
341 assertEquals(0, scanRes.next(numRows).length);
342
343 SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
344 replicationClusterId, baseNamespaceDir, hfileArchiveDir);
345 scanRes = table1.getScanner(scan);
346
347 assertEquals(numRows, scanRes.next(numRows).length);
348 } finally {
349 if (scanRes != null) {
350 scanRes.close();
351 }
352 }
353 }
354
355 private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
356 byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
357 byte[] rowBytes = Bytes.toBytes(row);
358
359
360 try {
361 Thread.sleep(1);
362 } catch (InterruptedException e) {
363 LOG.info("Was interrupted while sleep, meh", e);
364 }
365 final long now = System.currentTimeMillis();
366 KeyValue kv = null;
367 if(type.getCode() == KeyValue.Type.Put.getCode()) {
368 kv = new KeyValue(rowBytes, fam, fam, now,
369 KeyValue.Type.Put, Bytes.toBytes(row));
370 } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
371 kv = new KeyValue(rowBytes, fam, fam,
372 now, KeyValue.Type.DeleteColumn);
373 } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
374 kv = new KeyValue(rowBytes, fam, null,
375 now, KeyValue.Type.DeleteFamily);
376 }
377 WALEntry.Builder builder = createWALEntryBuilder(table);
378 cells.add(kv);
379
380 return builder.build();
381 }
382
383 private WALEntry.Builder createWALEntryBuilder(TableName table) {
384 WALEntry.Builder builder = WALEntry.newBuilder();
385 builder.setAssociatedCellCount(1);
386 WALKey.Builder keyBuilder = WALKey.newBuilder();
387 UUID.Builder uuidBuilder = UUID.newBuilder();
388 uuidBuilder.setLeastSigBits(HConstants.DEFAULT_CLUSTER_ID.getLeastSignificantBits());
389 uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
390 keyBuilder.setClusterId(uuidBuilder.build());
391 keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
392 keyBuilder.setWriteTime(System.currentTimeMillis());
393 keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
394 keyBuilder.setLogSequenceNumber(-1);
395 builder.setKey(keyBuilder.build());
396 return builder;
397 }
398 }