1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.*;
22 import static org.mockito.Matchers.any;
23 import static org.mockito.Matchers.anyBoolean;
24 import static org.mockito.Mockito.mock;
25 import static org.mockito.Mockito.spy;
26 import static org.mockito.Mockito.times;
27 import static org.mockito.Mockito.verify;
28 import static org.mockito.Mockito.when;
29 import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
30
31 import java.io.FileNotFoundException;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Random;
37 import java.util.UUID;
38 import java.util.concurrent.atomic.AtomicLong;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.Cell;
47 import org.apache.hadoop.hbase.CellUtil;
48 import org.apache.hadoop.hbase.HBaseTestingUtility;
49 import org.apache.hadoop.hbase.HColumnDescriptor;
50 import org.apache.hadoop.hbase.HConstants;
51 import org.apache.hadoop.hbase.HRegionInfo;
52 import org.apache.hadoop.hbase.HTableDescriptor;
53 import org.apache.hadoop.hbase.KeyValue;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.Durability;
57 import org.apache.hadoop.hbase.client.Get;
58 import org.apache.hadoop.hbase.client.Put;
59 import org.apache.hadoop.hbase.io.hfile.HFile;
60 import org.apache.hadoop.hbase.io.hfile.HFileContext;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
63 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
64 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
65 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
66 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
67 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
68 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
69 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
70 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
71 import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
72 import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
73 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
74 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
75 import org.apache.hadoop.hbase.testclassification.MediumTests;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79 import org.apache.hadoop.hbase.util.FSUtils;
80 import org.apache.hadoop.hbase.util.Pair;
81 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
82 import org.apache.hadoop.hbase.wal.WAL;
83 import org.apache.hadoop.hbase.wal.WALFactory;
84 import org.apache.hadoop.hbase.wal.WALKey;
85 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
86 import org.apache.hadoop.util.StringUtils;
87 import org.junit.After;
88 import org.junit.Before;
89 import org.junit.Rule;
90 import org.junit.Test;
91 import org.junit.experimental.categories.Category;
92 import org.junit.rules.TestName;
93
94 import com.google.common.collect.Lists;
95 import com.google.protobuf.ByteString;
96
97
98
99
100
101 @Category(MediumTests.class)
102 public class TestHRegionReplayEvents {
103
104 static final Log LOG = LogFactory.getLog(TestHRegion.class);
105 @Rule public TestName name = new TestName();
106
107 private static HBaseTestingUtility TEST_UTIL;
108
109 public static Configuration CONF ;
110 private String dir;
111 private static FileSystem FILESYSTEM;
112
113 private byte[][] families = new byte[][] {
114 Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
115
116
117 protected byte[] tableName;
118 protected String method;
119 protected final byte[] row = Bytes.toBytes("rowA");
120 protected final byte[] row2 = Bytes.toBytes("rowB");
121 protected byte[] cq = Bytes.toBytes("cq");
122
123
124 private Path rootDir;
125 private HTableDescriptor htd;
126 private long time;
127 private RegionServerServices rss;
128 private HRegionInfo primaryHri, secondaryHri;
129 private HRegion primaryRegion, secondaryRegion;
130 private WALFactory wals;
131 private WAL walPrimary, walSecondary;
132 private WAL.Reader reader;
133
134 @Before
135 public void setup() throws IOException {
136 TEST_UTIL = HBaseTestingUtility.createLocalHTU();
137 FILESYSTEM = TEST_UTIL.getTestFileSystem();
138 CONF = TEST_UTIL.getConfiguration();
139 dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
140 method = name.getMethodName();
141 tableName = Bytes.toBytes(name.getMethodName());
142 rootDir = new Path(dir + method);
143 TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
144 method = name.getMethodName();
145
146 htd = new HTableDescriptor(TableName.valueOf(method));
147 for (byte[] family : families) {
148 htd.addFamily(new HColumnDescriptor(family));
149 }
150
151 time = System.currentTimeMillis();
152
153 primaryHri = new HRegionInfo(htd.getTableName(),
154 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
155 false, time, 0);
156 secondaryHri = new HRegionInfo(htd.getTableName(),
157 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
158 false, time, 1);
159
160 wals = TestHRegion.createWALFactory(CONF, rootDir);
161 walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes());
162 walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes());
163
164 rss = mock(RegionServerServices.class);
165 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
166 when(rss.getConfiguration()).thenReturn(CONF);
167 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
168
169 primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
170 primaryRegion.close();
171
172 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
173 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
174
175 reader = null;
176 }
177
178 @After
179 public void tearDown() throws Exception {
180 if (reader != null) {
181 reader.close();
182 }
183
184 if (primaryRegion != null) {
185 HRegion.closeHRegion(primaryRegion);
186 }
187 if (secondaryRegion != null) {
188 HRegion.closeHRegion(secondaryRegion);
189 }
190
191 EnvironmentEdgeManagerTestHelper.reset();
192 LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
193 TEST_UTIL.cleanupTestDir();
194 }
195
196 String getName() {
197 return name.getMethodName();
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211 @Test
212 public void testRegionReplicaSecondaryCannotFlush() throws IOException {
213
214
215
216 putDataByReplay(secondaryRegion, 0, 1000, cq, families);
217
218 verifyData(secondaryRegion, 0, 1000, cq, families);
219
220
221 FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
222 assertEquals(flush.result, FlushResultImpl.Result.CANNOT_FLUSH);
223
224 verifyData(secondaryRegion, 0, 1000, cq, families);
225
226
227 Map<byte[], List<StoreFile>> files = secondaryRegion.close(false);
228
229 for (List<StoreFile> f : files.values()) {
230 assertTrue(f.isEmpty());
231 }
232 }
233
234
235
236
237
238 @Test (timeout = 60000)
239 public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
240
241 int start = 0;
242 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
243 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
244 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
245 primaryRegion.flush(true);
246
247
248 reader = createWALReaderForPrimary();
249
250 LOG.info("-- Replaying edits and flush events in secondary");
251 while (true) {
252 WAL.Entry entry = reader.next();
253 if (entry == null) {
254 break;
255 }
256 FlushDescriptor flushDesc
257 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
258 if (flushDesc != null) {
259 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
260 LOG.info("-- Replaying flush start in secondary");
261 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
262 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
263 LOG.info("-- NOT Replaying flush commit in secondary");
264 }
265 } else {
266 replayEdit(secondaryRegion, entry);
267 }
268 }
269
270 assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
271
272 secondaryRegion.close();
273
274
275 assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
276 }
277
278 static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
279 if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
280 return 0;
281 }
282 Put put = new Put(entry.getEdit().getCells().get(0).getRow());
283 for (Cell cell : entry.getEdit().getCells()) put.add(cell);
284 put.setDurability(Durability.SKIP_WAL);
285 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
286 region.batchReplay(new MutationReplay[] {mutation},
287 entry.getKey().getLogSeqNum());
288 return Integer.parseInt(Bytes.toString(put.getRow()));
289 }
290
291 WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
292 return wals.createReader(TEST_UTIL.getTestFileSystem(),
293 DefaultWALProvider.getCurrentFileName(walPrimary),
294 TEST_UTIL.getConfiguration());
295 }
296
297 @Test
298 public void testReplayFlushesAndCompactions() throws IOException {
299
300
301
302 putDataWithFlushes(primaryRegion, 100, 300, 100);
303
304
305 LOG.info("-- Compacting primary, only 1 store");
306 primaryRegion.compactStore(Bytes.toBytes("cf1"),
307 NoLimitCompactionThroughputController.INSTANCE);
308
309
310 reader = createWALReaderForPrimary();
311
312 LOG.info("-- Replaying edits and flush events in secondary");
313 int lastReplayed = 0;
314 int expectedStoreFileCount = 0;
315 while (true) {
316 WAL.Entry entry = reader.next();
317 if (entry == null) {
318 break;
319 }
320 FlushDescriptor flushDesc
321 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
322 CompactionDescriptor compactionDesc
323 = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
324 if (flushDesc != null) {
325
326 verifyData(secondaryRegion, 0, lastReplayed, cq, families);
327 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
328 long storeMemstoreSize = store.getMemStoreSize();
329 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
330 long storeFlushableSize = store.getFlushableSize();
331 long storeSize = store.getSize();
332 long storeSizeUncompressed = store.getStoreSizeUncompressed();
333 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
334 LOG.info("-- Replaying flush start in secondary");
335 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
336 assertNull(result.result);
337 assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
338
339
340 long newStoreMemstoreSize = store.getMemStoreSize();
341 LOG.info("Memstore size reduced by:"
342 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
343 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
344
345 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
346 LOG.info("-- Replaying flush commit in secondary");
347 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
348
349
350 expectedStoreFileCount++;
351 for (Store s : secondaryRegion.getStores()) {
352 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
353 }
354 long newFlushableSize = store.getFlushableSize();
355 assertTrue(storeFlushableSize > newFlushableSize);
356
357
358 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
359 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
360
361
362 assertTrue(store.getSize() > storeSize);
363 assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
364 assertEquals(store.getSize(), store.getStorefilesSize());
365 }
366
367 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
368 } else if (compactionDesc != null) {
369 secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
370
371
372 for (Store store : secondaryRegion.getStores()) {
373 if (store.getColumnFamilyName().equals("cf1")) {
374 assertEquals(1, store.getStorefilesCount());
375 } else {
376 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
377 }
378 }
379 } else {
380 lastReplayed = replayEdit(secondaryRegion, entry);;
381 }
382 }
383
384 assertEquals(400-1, lastReplayed);
385 LOG.info("-- Verifying edits from secondary");
386 verifyData(secondaryRegion, 0, 400, cq, families);
387
388 LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
389 verifyData(primaryRegion, 0, lastReplayed, cq, families);
390 for (Store store : primaryRegion.getStores()) {
391 if (store.getColumnFamilyName().equals("cf1")) {
392 assertEquals(1, store.getStorefilesCount());
393 } else {
394 assertEquals(expectedStoreFileCount, store.getStorefilesCount());
395 }
396 }
397 }
398
399
400
401
402
403 @Test
404 public void testReplayFlushStartMarkers() throws IOException {
405
406 putDataWithFlushes(primaryRegion, 100, 100, 100);
407 int numRows = 200;
408
409
410 reader = createWALReaderForPrimary();
411
412 LOG.info("-- Replaying edits and flush events in secondary");
413
414 FlushDescriptor startFlushDesc = null;
415
416 int lastReplayed = 0;
417 while (true) {
418 WAL.Entry entry = reader.next();
419 if (entry == null) {
420 break;
421 }
422 FlushDescriptor flushDesc
423 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
424 if (flushDesc != null) {
425
426 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
427 long storeMemstoreSize = store.getMemStoreSize();
428 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
429 long storeFlushableSize = store.getFlushableSize();
430
431 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
432 startFlushDesc = flushDesc;
433 LOG.info("-- Replaying flush start in secondary");
434 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
435 assertNull(result.result);
436 assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
437 assertTrue(regionMemstoreSize > 0);
438 assertTrue(storeFlushableSize > 0);
439
440
441 long newStoreMemstoreSize = store.getMemStoreSize();
442 LOG.info("Memstore size reduced by:"
443 + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
444 assertTrue(storeMemstoreSize > newStoreMemstoreSize);
445 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
446
447 }
448
449 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
450 } else {
451 lastReplayed = replayEdit(secondaryRegion, entry);
452 }
453 }
454
455
456
457
458 verifyData(secondaryRegion, 0, numRows, cq, families);
459
460
461 LOG.info("-- Replaying same flush start in secondary again");
462 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
463 assertNull(result);
464
465 assertNotNull(secondaryRegion.getPrepareFlushResult());
466 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
467 startFlushDesc.getFlushSequenceNumber());
468 assertTrue(secondaryRegion.getMemstoreSize() > 0);
469 verifyData(secondaryRegion, 0, numRows, cq, families);
470
471
472 FlushDescriptor startFlushDescSmallerSeqId
473 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
474 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
475 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
476 assertNull(result);
477
478 assertNotNull(secondaryRegion.getPrepareFlushResult());
479 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
480 startFlushDesc.getFlushSequenceNumber());
481 assertTrue(secondaryRegion.getMemstoreSize() > 0);
482 verifyData(secondaryRegion, 0, numRows, cq, families);
483
484
485 FlushDescriptor startFlushDescLargerSeqId
486 = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
487 LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
488 result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
489 assertNull(result);
490
491 assertNotNull(secondaryRegion.getPrepareFlushResult());
492 assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
493 startFlushDesc.getFlushSequenceNumber());
494 assertTrue(secondaryRegion.getMemstoreSize() > 0);
495 verifyData(secondaryRegion, 0, numRows, cq, families);
496
497 LOG.info("-- Verifying edits from secondary");
498 verifyData(secondaryRegion, 0, numRows, cq, families);
499
500 LOG.info("-- Verifying edits from primary.");
501 verifyData(primaryRegion, 0, numRows, cq, families);
502 }
503
504
505
506
507
508 @Test
509 public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
510
511 putDataWithFlushes(primaryRegion, 100, 200, 100);
512 int numRows = 300;
513
514
515 reader = createWALReaderForPrimary();
516
517 LOG.info("-- Replaying edits and flush events in secondary");
518 FlushDescriptor startFlushDesc = null;
519 FlushDescriptor commitFlushDesc = null;
520
521 int lastReplayed = 0;
522 while (true) {
523 System.out.println(lastReplayed);
524 WAL.Entry entry = reader.next();
525 if (entry == null) {
526 break;
527 }
528 FlushDescriptor flushDesc
529 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
530 if (flushDesc != null) {
531 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
532
533 if (startFlushDesc == null) {
534 startFlushDesc = flushDesc;
535 } else {
536 LOG.info("-- Replaying flush start in secondary");
537 startFlushDesc = flushDesc;
538 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
539 assertNull(result.result);
540 }
541 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
542
543 if (commitFlushDesc == null) {
544 commitFlushDesc = flushDesc;
545 }
546 }
547
548 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
549 } else {
550 lastReplayed = replayEdit(secondaryRegion, entry);
551 }
552 }
553
554
555
556 verifyData(secondaryRegion, 0, numRows, cq, families);
557
558
559 int expectedStoreFileCount = 0;
560 for (Store s : secondaryRegion.getStores()) {
561 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
562 }
563 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
564
565
566 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
567 + startFlushDesc);
568 assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
569
570 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
571 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
572
573
574 expectedStoreFileCount++;
575 for (Store s : secondaryRegion.getStores()) {
576 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
577 }
578 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
579 long newFlushableSize = store.getFlushableSize();
580 assertTrue(newFlushableSize > 0);
581
582
583 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
584 assertEquals(regionMemstoreSize, newRegionMemstoreSize);
585
586 assertNotNull(secondaryRegion.getPrepareFlushResult());
587
588 LOG.info("-- Verifying edits from secondary");
589 verifyData(secondaryRegion, 0, numRows, cq, families);
590
591 LOG.info("-- Verifying edits from primary.");
592 verifyData(primaryRegion, 0, numRows, cq, families);
593 }
594
595
596
597
598
599 @Test
600 public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
601
602 putDataWithFlushes(primaryRegion, 100, 100, 100);
603 int numRows = 200;
604
605
606 reader = createWALReaderForPrimary();
607
608 LOG.info("-- Replaying edits and flush events in secondary");
609 FlushDescriptor startFlushDesc = null;
610 FlushDescriptor commitFlushDesc = null;
611
612 int lastReplayed = 0;
613 while (true) {
614 WAL.Entry entry = reader.next();
615 if (entry == null) {
616 break;
617 }
618 FlushDescriptor flushDesc
619 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
620 if (flushDesc != null) {
621 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
622 if (startFlushDesc == null) {
623 LOG.info("-- Replaying flush start in secondary");
624 startFlushDesc = flushDesc;
625 PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
626 assertNull(result.result);
627 }
628 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
629
630
631
632 commitFlushDesc =
633 FlushDescriptor.newBuilder(flushDesc)
634 .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
635 .build();
636 }
637
638 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
639 } else {
640 lastReplayed = replayEdit(secondaryRegion, entry);
641 }
642 }
643
644
645
646 verifyData(secondaryRegion, 0, numRows, cq, families);
647
648
649 int expectedStoreFileCount = 0;
650 for (Store s : secondaryRegion.getStores()) {
651 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
652 }
653 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
654
655
656 LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
657 + startFlushDesc);
658 assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
659
660 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
661 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
662
663
664 expectedStoreFileCount++;
665 for (Store s : secondaryRegion.getStores()) {
666 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
667 }
668 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
669 long newFlushableSize = store.getFlushableSize();
670 assertTrue(newFlushableSize > 0);
671
672
673 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
674 assertTrue(newRegionMemstoreSize > 0);
675 assertTrue(regionMemstoreSize > newRegionMemstoreSize);
676
677 assertNull(secondaryRegion.getPrepareFlushResult());
678
679 LOG.info("-- Verifying edits from secondary");
680 verifyData(secondaryRegion, 0, numRows, cq, families);
681
682 LOG.info("-- Verifying edits from primary.");
683 verifyData(primaryRegion, 0, numRows, cq, families);
684 }
685
686
687
688
689
690
691 @Test
692 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
693 throws IOException {
694 testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
695 }
696
697
698
699
700
701
702 @Test
703 public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
704 throws IOException {
705 testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
706 }
707
708
709
710
711 public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
712 throws IOException {
713
714
715 putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
716 int numRows = droppableMemstore ? 100 : 200;
717
718
719 reader = createWALReaderForPrimary();
720
721 LOG.info("-- Replaying edits and flush events in secondary");
722 FlushDescriptor commitFlushDesc = null;
723
724 int lastReplayed = 0;
725 while (true) {
726 WAL.Entry entry = reader.next();
727 if (entry == null) {
728 break;
729 }
730 FlushDescriptor flushDesc
731 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
732 if (flushDesc != null) {
733 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
734
735 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
736 commitFlushDesc = flushDesc;
737 }
738
739 verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
740 } else {
741 lastReplayed = replayEdit(secondaryRegion, entry);
742 }
743 }
744
745
746
747 verifyData(secondaryRegion, 0, numRows, cq, families);
748
749
750 int expectedStoreFileCount = 0;
751 for (Store s : secondaryRegion.getStores()) {
752 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
753 }
754 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
755
756
757 assertNull(secondaryRegion.getPrepareFlushResult());
758 assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
759
760
761 for (Store store : secondaryRegion.getStores()) {
762 assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get());
763 }
764
765 LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
766 secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
767
768
769 expectedStoreFileCount++;
770 for (Store s : secondaryRegion.getStores()) {
771 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
772 }
773 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
774 long newFlushableSize = store.getFlushableSize();
775 if (droppableMemstore) {
776 assertTrue(newFlushableSize == 0);
777 } else {
778 assertTrue(newFlushableSize > 0);
779 }
780
781
782 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
783 if (droppableMemstore) {
784 assertTrue(0 == newRegionMemstoreSize);
785 } else {
786 assertTrue(regionMemstoreSize == newRegionMemstoreSize);
787 }
788
789 LOG.info("-- Verifying edits from secondary");
790 verifyData(secondaryRegion, 0, numRows, cq, families);
791
792 LOG.info("-- Verifying edits from primary.");
793 verifyData(primaryRegion, 0, numRows, cq, families);
794 }
795
796 private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
797 return FlushDescriptor.newBuilder(flush)
798 .setFlushSequenceNumber(flushSeqId)
799 .build();
800 }
801
802
803
804
805 @Test
806 public void testReplayRegionOpenEvent() throws IOException {
807 putDataWithFlushes(primaryRegion, 100, 0, 100);
808 int numRows = 100;
809
810
811 primaryRegion.close();
812 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
813
814
815 reader = createWALReaderForPrimary();
816 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
817
818 LOG.info("-- Replaying edits and region events in secondary");
819 while (true) {
820 WAL.Entry entry = reader.next();
821 if (entry == null) {
822 break;
823 }
824 FlushDescriptor flushDesc
825 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
826 RegionEventDescriptor regionEventDesc
827 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
828
829 if (flushDesc != null) {
830
831 } else if (regionEventDesc != null) {
832 regionEvents.add(regionEventDesc);
833 } else {
834
835 }
836 }
837
838
839 assertEquals(3, regionEvents.size());
840
841
842 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
843
844
845 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
846
847
848 int expectedStoreFileCount = 0;
849 for (Store s : secondaryRegion.getStores()) {
850 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
851 }
852 long regionMemstoreSize = secondaryRegion.getMemstoreSize();
853 assertTrue(regionMemstoreSize == 0);
854
855
856 LOG.info("Testing replaying region open event " + regionEvents.get(2));
857 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
858
859
860 expectedStoreFileCount++;
861 for (Store s : secondaryRegion.getStores()) {
862 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
863 }
864 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
865 long newFlushableSize = store.getFlushableSize();
866 assertTrue(newFlushableSize == 0);
867
868
869 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
870 assertTrue(newRegionMemstoreSize == 0);
871
872 assertNull(secondaryRegion.getPrepareFlushResult());
873
874 LOG.info("-- Verifying edits from secondary");
875 verifyData(secondaryRegion, 0, numRows, cq, families);
876
877 LOG.info("-- Verifying edits from primary.");
878 verifyData(primaryRegion, 0, numRows, cq, families);
879 }
880
881
882
883
884
885 @Test
886 public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
887 putDataWithFlushes(primaryRegion, 100, 100, 100);
888 int numRows = 200;
889
890
891 primaryRegion.close();
892 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
893
894
895 reader = createWALReaderForPrimary();
896 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
897
898 LOG.info("-- Replaying edits and region events in secondary");
899 while (true) {
900 WAL.Entry entry = reader.next();
901 if (entry == null) {
902 break;
903 }
904 FlushDescriptor flushDesc
905 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
906 RegionEventDescriptor regionEventDesc
907 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
908
909 if (flushDesc != null) {
910
911 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
912 secondaryRegion.replayWALFlushStartMarker(flushDesc);
913 }
914 } else if (regionEventDesc != null) {
915 regionEvents.add(regionEventDesc);
916 } else {
917 replayEdit(secondaryRegion, entry);
918 }
919 }
920
921
922
923 verifyData(secondaryRegion, 0, numRows, cq, families);
924
925
926 assertEquals(3, regionEvents.size());
927
928
929 int expectedStoreFileCount = 0;
930 for (Store s : secondaryRegion.getStores()) {
931 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
932 }
933
934
935 LOG.info("Testing replaying region open event " + regionEvents.get(2));
936 secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
937
938
939 expectedStoreFileCount = 2;
940 for (Store s : secondaryRegion.getStores()) {
941 assertEquals(expectedStoreFileCount, s.getStorefilesCount());
942 }
943 Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
944 long newSnapshotSize = store.getSnapshotSize();
945 assertTrue(newSnapshotSize == 0);
946
947
948 long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
949 assertTrue(newRegionMemstoreSize == 0);
950
951 assertNull(secondaryRegion.getPrepareFlushResult());
952
953 LOG.info("-- Verifying edits from secondary");
954 verifyData(secondaryRegion, 0, numRows, cq, families);
955
956 LOG.info("-- Verifying edits from primary.");
957 verifyData(primaryRegion, 0, numRows, cq, families);
958 }
959
960
961
962
963
964 @Test
965 public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
966 putDataWithFlushes(primaryRegion, 100, 100, 0);
967 int numRows = 100;
968
969
970 primaryRegion.close();
971 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
972
973
974 reader = createWALReaderForPrimary();
975 List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
976 List<WAL.Entry> edits = Lists.newArrayList();
977
978 LOG.info("-- Replaying edits and region events in secondary");
979 while (true) {
980 WAL.Entry entry = reader.next();
981 if (entry == null) {
982 break;
983 }
984 FlushDescriptor flushDesc
985 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
986 RegionEventDescriptor regionEventDesc
987 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
988
989 if (flushDesc != null) {
990
991 } else if (regionEventDesc != null) {
992 regionEvents.add(regionEventDesc);
993 } else {
994 edits.add(entry);
995 }
996 }
997
998
999
1000 secondaryRegion.replayWALRegionEventMarker(
1001 RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1002 regionEvents.get(2).getLogSequenceNumber()).build());
1003
1004
1005
1006
1007 for (WAL.Entry entry: edits) {
1008 replayEdit(secondaryRegion, entry);
1009 }
1010
1011 boolean expectedFail = false;
1012 try {
1013 verifyData(secondaryRegion, 0, numRows, cq, families);
1014 } catch (AssertionError e) {
1015 expectedFail = true;
1016 }
1017 if (!expectedFail) {
1018 fail("Should have failed this verification");
1019 }
1020 }
1021
1022 @Test
1023 public void testReplayFlushSeqIds() throws IOException {
1024
1025 int start = 0;
1026 LOG.info("-- Writing some data to primary from " + start + " to " + (start+100));
1027 putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1028 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1029 primaryRegion.flush(true);
1030
1031
1032 reader = createWALReaderForPrimary();
1033
1034 long flushSeqId = -1;
1035 LOG.info("-- Replaying flush events in secondary");
1036 while (true) {
1037 WAL.Entry entry = reader.next();
1038 if (entry == null) {
1039 break;
1040 }
1041 FlushDescriptor flushDesc
1042 = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1043 if (flushDesc != null) {
1044 if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1045 LOG.info("-- Replaying flush start in secondary");
1046 secondaryRegion.replayWALFlushStartMarker(flushDesc);
1047 flushSeqId = flushDesc.getFlushSequenceNumber();
1048 } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1049 LOG.info("-- Replaying flush commit in secondary");
1050 secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1051 assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1052 }
1053 }
1054
1055 }
1056
1057
1058
1059 long readPoint = secondaryRegion.getMVCC().memstoreReadPoint();
1060 assertEquals(flushSeqId, readPoint);
1061
1062
1063 verifyData(secondaryRegion, 0, 100, cq, families);
1064 }
1065
1066 @Test
1067 public void testSeqIdsFromReplay() throws IOException {
1068
1069
1070 String method = name.getMethodName();
1071 byte[] tableName = Bytes.toBytes(method);
1072 byte[] family = Bytes.toBytes("family");
1073
1074 HRegion region = initHRegion(tableName, method, family);
1075 try {
1076
1077 long readPoint = region.getMVCC().memstoreReadPoint();
1078 long origSeqId = readPoint + 100;
1079
1080 Put put = new Put(row).add(family, row, row);
1081 put.setDurability(Durability.SKIP_WAL);
1082 replay(region, put, origSeqId);
1083
1084
1085 assertGet(region, family, row);
1086
1087
1088 assertEquals(origSeqId, region.getSequenceId().get());
1089
1090
1091
1092
1093 put = new Put(row2).add(family, row2, row2);
1094 put.setDurability(Durability.SKIP_WAL);
1095 replay(region, put, origSeqId - 50);
1096
1097 assertGet(region, family, row2);
1098 } finally {
1099 region.close();
1100 }
1101 }
1102
1103
1104
1105
1106
1107
1108 @SuppressWarnings("unchecked")
1109 @Test
1110 public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1111 secondaryRegion.close();
1112 walSecondary = spy(walSecondary);
1113
1114
1115 secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1116 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1117 (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1118
1119
1120 putDataByReplay(secondaryRegion, 0, 10, cq, families);
1121 secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1122 setFlushSequenceNumber(10)
1123 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1124 .setAction(FlushAction.START_FLUSH)
1125 .setEncodedRegionName(
1126 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1127 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1128 .build());
1129
1130 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1131 (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1132
1133 secondaryRegion.close();
1134 verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1135 (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1136 }
1137
1138
1139
1140
1141 @Test
1142 public void testRegionReadsEnabledFlag() throws IOException {
1143
1144 putDataByReplay(secondaryRegion, 0, 100, cq, families);
1145
1146 verifyData(secondaryRegion, 0, 100, cq, families);
1147
1148
1149 secondaryRegion.setReadsEnabled(false);
1150 try {
1151 verifyData(secondaryRegion, 0, 100, cq, families);
1152 fail("Should have failed with IOException");
1153 } catch(IOException ex) {
1154
1155 }
1156
1157
1158 putDataByReplay(secondaryRegion, 100, 100, cq, families);
1159
1160
1161 secondaryRegion.setReadsEnabled(true);
1162 verifyData(secondaryRegion, 0, 200, cq, families);
1163 }
1164
1165
1166
1167
1168
1169 @Test
1170 public void testWriteFlushRequestMarker() throws IOException {
1171
1172 FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
1173 assertNotNull(result);
1174 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1175 assertFalse(result.wroteFlushWalMarker);
1176
1177
1178 result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
1179 assertNotNull(result);
1180 assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1181 assertTrue(result.wroteFlushWalMarker);
1182
1183 List<FlushDescriptor> flushes = Lists.newArrayList();
1184 reader = createWALReaderForPrimary();
1185 while (true) {
1186 WAL.Entry entry = reader.next();
1187 if (entry == null) {
1188 break;
1189 }
1190 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1191 if (flush != null) {
1192 flushes.add(flush);
1193 }
1194 }
1195
1196 assertEquals(1, flushes.size());
1197 assertNotNull(flushes.get(0));
1198 assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1199 }
1200
1201
1202
1203
1204
1205
1206
1207 @Test
1208 public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1209 disableReads(secondaryRegion);
1210
1211
1212
1213 primaryRegion.flushcache(true, true);
1214 reader = createWALReaderForPrimary();
1215 while (true) {
1216 WAL.Entry entry = reader.next();
1217 if (entry == null) {
1218 break;
1219 }
1220 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1221 if (flush != null) {
1222 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1223 }
1224 }
1225
1226
1227 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1228 }
1229
1230
1231
1232
1233
1234
1235
1236 @Test
1237 public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1238
1239
1240 disableReads(secondaryRegion);
1241
1242
1243 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1244 primaryRegion.flush(true);
1245
1246 reader = createWALReaderForPrimary();
1247 while (true) {
1248 WAL.Entry entry = reader.next();
1249 if (entry == null) {
1250 break;
1251 }
1252 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1253 if (flush != null) {
1254 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1255 } else {
1256 replayEdit(secondaryRegion, entry);
1257 }
1258 }
1259
1260
1261 verifyData(secondaryRegion, 0, 100, cq, families);
1262 }
1263
1264
1265
1266
1267
1268
1269
1270 @Test
1271 public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1272
1273
1274 disableReads(secondaryRegion);
1275
1276
1277 putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1278 primaryRegion.flush(true);
1279
1280 reader = createWALReaderForPrimary();
1281 while (true) {
1282 WAL.Entry entry = reader.next();
1283 if (entry == null) {
1284 break;
1285 }
1286 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1287 if (flush != null) {
1288 secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1289 }
1290 }
1291
1292
1293 verifyData(secondaryRegion, 0, 100, cq, families);
1294 }
1295
1296
1297
1298
1299
1300
1301
1302 @Test
1303 public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1304
1305 disableReads(secondaryRegion);
1306
1307 primaryRegion.close();
1308 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1309
1310 reader = createWALReaderForPrimary();
1311 while (true) {
1312 WAL.Entry entry = reader.next();
1313 if (entry == null) {
1314 break;
1315 }
1316
1317 RegionEventDescriptor regionEventDesc
1318 = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1319
1320 if (regionEventDesc != null) {
1321 secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1322 }
1323 }
1324
1325
1326 secondaryRegion.get(new Get(Bytes.toBytes(0)));
1327 }
1328
1329 @Test
1330 public void testRefreshStoreFiles() throws IOException {
1331 assertEquals(0, primaryRegion.getStoreFileList(families).size());
1332 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1333
1334
1335 secondaryRegion.refreshStoreFiles();
1336 assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1337
1338
1339 putDataWithFlushes(primaryRegion, 100, 100, 0);
1340 int numRows = 100;
1341
1342
1343 secondaryRegion.refreshStoreFiles();
1344 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1345 secondaryRegion.getStoreFileList(families));
1346 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1347
1348 LOG.info("-- Verifying edits from secondary");
1349 verifyData(secondaryRegion, 0, numRows, cq, families);
1350
1351
1352 putDataWithFlushes(primaryRegion, 100, 300, 0);
1353 numRows = 300;
1354
1355
1356 secondaryRegion.refreshStoreFiles();
1357 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1358 secondaryRegion.getStoreFileList(families));
1359 assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1360
1361 LOG.info("-- Verifying edits from secondary");
1362 verifyData(secondaryRegion, 0, numRows, cq, families);
1363
1364 if (FSUtils.WINDOWS) {
1365
1366 return;
1367 }
1368
1369
1370 primaryRegion.compactStores();
1371 secondaryRegion.refreshStoreFiles();
1372 assertPathListsEqual(primaryRegion.getStoreFileList(families),
1373 secondaryRegion.getStoreFileList(families));
1374 assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1375
1376 LOG.info("-- Verifying edits from secondary");
1377 verifyData(secondaryRegion, 0, numRows, cq, families);
1378
1379 LOG.info("-- Replaying edits in secondary");
1380
1381
1382 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1383 putDataWithFlushes(primaryRegion, 400, 400, 0);
1384 numRows = 400;
1385
1386 reader = createWALReaderForPrimary();
1387 while (true) {
1388 WAL.Entry entry = reader.next();
1389 if (entry == null) {
1390 break;
1391 }
1392 FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1393 if (flush != null) {
1394
1395 } else {
1396 replayEdit(secondaryRegion, entry);
1397 }
1398 }
1399
1400 assertTrue(secondaryRegion.getMemstoreSize() > 0);
1401
1402 secondaryRegion.refreshStoreFiles();
1403
1404 assertTrue(secondaryRegion.getMemstoreSize() == 0);
1405
1406 LOG.info("-- Verifying edits from primary");
1407 verifyData(primaryRegion, 0, numRows, cq, families);
1408 LOG.info("-- Verifying edits from secondary");
1409 verifyData(secondaryRegion, 0, numRows, cq, families);
1410 }
1411
1412
1413
1414
1415 private void assertPathListsEqual(List<String> list1, List<String> list2) {
1416 List<Path> l1 = new ArrayList<>(list1.size());
1417 for (String path : list1) {
1418 l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1419 }
1420 List<Path> l2 = new ArrayList<>(list2.size());
1421 for (String path : list2) {
1422 l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1423 }
1424 assertEquals(l1, l2);
1425 }
1426
1427 private void disableReads(HRegion region) {
1428 region.setReadsEnabled(false);
1429 try {
1430 verifyData(region, 0, 1, cq, families);
1431 fail("Should have failed with IOException");
1432 } catch(IOException ex) {
1433
1434 }
1435 }
1436
1437 private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1438 put.setDurability(Durability.SKIP_WAL);
1439 MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1440 region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1441 }
1442
1443
1444
1445
1446 @Test
1447 public void testReplayBulkLoadEvent() throws IOException {
1448 LOG.info("testReplayBulkLoadEvent starts");
1449 putDataWithFlushes(primaryRegion, 100, 0, 100);
1450
1451
1452 primaryRegion.close();
1453 primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1454
1455
1456 Random random = new Random();
1457 byte[] randomValues = new byte[20];
1458 random.nextBytes(randomValues);
1459 Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1460
1461 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1462 int expectedLoadFileCount = 0;
1463 for (byte[] family : families) {
1464 familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath, family,
1465 randomValues)));
1466 expectedLoadFileCount++;
1467 }
1468 primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1469
1470
1471 reader = createWALReaderForPrimary();
1472
1473 LOG.info("-- Replaying edits and region events in secondary");
1474 BulkLoadDescriptor bulkloadEvent = null;
1475 while (true) {
1476 WAL.Entry entry = reader.next();
1477 if (entry == null) {
1478 break;
1479 }
1480 bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1481 if (bulkloadEvent != null) {
1482 break;
1483 }
1484 }
1485
1486
1487 assertTrue(bulkloadEvent != null);
1488 assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1489
1490
1491 secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1492
1493
1494 List<String> storeFileName = new ArrayList<String>();
1495 for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1496 storeFileName.addAll(storeDesc.getStoreFileList());
1497 }
1498
1499 for (Store s : secondaryRegion.getStores()) {
1500 for (StoreFile sf : s.getStorefiles()) {
1501 storeFileName.remove(sf.getPath().getName());
1502 }
1503 }
1504 assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1505
1506 LOG.info("-- Verifying edits from secondary");
1507 for (byte[] family : families) {
1508 assertGet(secondaryRegion, family, randomValues);
1509 }
1510 }
1511
1512 @Test
1513 public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1514
1515
1516 secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1517 setFlushSequenceNumber(Long.MAX_VALUE)
1518 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1519 .setAction(FlushAction.COMMIT_FLUSH)
1520 .setEncodedRegionName(
1521 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1522 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1523 .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1524 .setFamilyName(ByteString.copyFrom(families[0]))
1525 .setStoreHomeDir("/store_home_dir")
1526 .addFlushOutput("/foo/baz/bar")
1527 .build())
1528 .build());
1529 }
1530
1531 @Test
1532 public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1533
1534
1535 secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1536 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1537 .setEncodedRegionName(
1538 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1539 .setFamilyName(ByteString.copyFrom(families[0]))
1540 .addCompactionInput("/foo")
1541 .addCompactionOutput("/bar")
1542 .setStoreHomeDir("/store_home_dir")
1543 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1544 .build()
1545 , true, true, Long.MAX_VALUE);
1546 }
1547
1548 @Test
1549 public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1550
1551
1552 secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1553 .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1554 .setEncodedRegionName(
1555 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1556 .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1557 .setEventType(EventType.REGION_OPEN)
1558 .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1559 .setLogSequenceNumber(Long.MAX_VALUE)
1560 .addStores(StoreDescriptor.newBuilder()
1561 .setFamilyName(ByteString.copyFrom(families[0]))
1562 .setStoreHomeDir("/store_home_dir")
1563 .addStoreFile("/foo")
1564 .build())
1565 .build());
1566 }
1567
1568 @Test
1569 public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1570
1571
1572 secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1573 .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
1574 .setEncodedRegionName(
1575 ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1576 .setBulkloadSeqNum(Long.MAX_VALUE)
1577 .addStores(StoreDescriptor.newBuilder()
1578 .setFamilyName(ByteString.copyFrom(families[0]))
1579 .setStoreHomeDir("/store_home_dir")
1580 .addStoreFile("/foo")
1581 .build())
1582 .build());
1583 }
1584
1585 private String createHFileForFamilies(Path testPath, byte[] family,
1586 byte[] valueBytes) throws IOException {
1587 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1588
1589 Path testFile = new Path(testPath, UUID.randomUUID().toString());
1590 FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1591 try {
1592 hFileFactory.withOutputStream(out);
1593 hFileFactory.withFileContext(new HFileContext());
1594 HFile.Writer writer = hFileFactory.create();
1595 try {
1596 writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
1597 KeyValue.Type.Put.getCode(), valueBytes)));
1598 } finally {
1599 writer.close();
1600 }
1601 } finally {
1602 out.close();
1603 }
1604 return testFile.toString();
1605 }
1606
1607
1608
1609
1610
1611 private void putDataWithFlushes(HRegion region, int flushInterval,
1612 int numRows, int numRowsAfterFlush) throws IOException {
1613 int start = 0;
1614 for (; start < numRows; start += flushInterval) {
1615 LOG.info("-- Writing some data to primary from " + start + " to " + (start+flushInterval));
1616 putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1617 LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1618 region.flush(true);
1619 }
1620 LOG.info("-- Writing some more data to primary, not flushing");
1621 putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1622 }
1623
1624 private void putDataByReplay(HRegion region,
1625 int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1626 for (int i = startRow; i < startRow + numRows; i++) {
1627 Put put = new Put(Bytes.toBytes("" + i));
1628 put.setDurability(Durability.SKIP_WAL);
1629 for (byte[] family : families) {
1630 put.add(family, qf, EnvironmentEdgeManager.currentTime(), null);
1631 }
1632 replay(region, put, i+1);
1633 }
1634 }
1635
1636 private static HRegion initHRegion(byte[] tableName,
1637 String callingMethod, byte[]... families) throws IOException {
1638 return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1639 callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1640 }
1641
1642 private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1643 String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1644 WAL wal, byte[]... families) throws IOException {
1645 return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1646 isReadOnly, durability, wal, families);
1647 }
1648 }