View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.*;
22  import static org.mockito.Matchers.any;
23  import static org.mockito.Matchers.anyBoolean;
24  import static org.mockito.Mockito.mock;
25  import static org.mockito.Mockito.spy;
26  import static org.mockito.Mockito.times;
27  import static org.mockito.Mockito.verify;
28  import static org.mockito.Mockito.when;
29  import static org.apache.hadoop.hbase.regionserver.TestHRegion.*;
30  
31  import java.io.FileNotFoundException;
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Random;
37  import java.util.UUID;
38  import java.util.concurrent.atomic.AtomicLong;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataOutputStream;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.Cell;
47  import org.apache.hadoop.hbase.CellUtil;
48  import org.apache.hadoop.hbase.HBaseTestingUtility;
49  import org.apache.hadoop.hbase.HColumnDescriptor;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.client.Durability;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.io.hfile.HFile;
60  import org.apache.hadoop.hbase.io.hfile.HFileContext;
61  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
63  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
64  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
65  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
66  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
67  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
68  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
69  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
70  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
71  import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
72  import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
73  import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
74  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
75  import org.apache.hadoop.hbase.testclassification.MediumTests;
76  import org.apache.hadoop.hbase.util.Bytes;
77  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78  import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79  import org.apache.hadoop.hbase.util.FSUtils;
80  import org.apache.hadoop.hbase.util.Pair;
81  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
82  import org.apache.hadoop.hbase.wal.WAL;
83  import org.apache.hadoop.hbase.wal.WALFactory;
84  import org.apache.hadoop.hbase.wal.WALKey;
85  import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
86  import org.apache.hadoop.util.StringUtils;
87  import org.junit.After;
88  import org.junit.Before;
89  import org.junit.Rule;
90  import org.junit.Test;
91  import org.junit.experimental.categories.Category;
92  import org.junit.rules.TestName;
93  
94  import com.google.common.collect.Lists;
95  import com.google.protobuf.ByteString;
96  
97  /**
98   * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
99   * region replicas
100  */
101 @Category(MediumTests.class)
102 public class TestHRegionReplayEvents {
103 
104   static final Log LOG = LogFactory.getLog(TestHRegion.class);
105   @Rule public TestName name = new TestName();
106 
107   private static HBaseTestingUtility TEST_UTIL;
108 
109   public static Configuration CONF ;
110   private String dir;
111   private static FileSystem FILESYSTEM;
112 
113   private byte[][] families = new byte[][] {
114       Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")};
115 
116   // Test names
117   protected byte[] tableName;
118   protected String method;
119   protected final byte[] row = Bytes.toBytes("rowA");
120   protected final byte[] row2 = Bytes.toBytes("rowB");
121   protected byte[] cq = Bytes.toBytes("cq");
122 
123   // per test fields
124   private Path rootDir;
125   private HTableDescriptor htd;
126   private long time;
127   private RegionServerServices rss;
128   private HRegionInfo primaryHri, secondaryHri;
129   private HRegion primaryRegion, secondaryRegion;
130   private WALFactory wals;
131   private WAL walPrimary, walSecondary;
132   private WAL.Reader reader;
133 
134   @Before
135   public void setup() throws IOException {
136     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
137     FILESYSTEM = TEST_UTIL.getTestFileSystem();
138     CONF = TEST_UTIL.getConfiguration();
139     dir = TEST_UTIL.getDataTestDir("TestHRegionReplayEvents").toString();
140     method = name.getMethodName();
141     tableName = Bytes.toBytes(name.getMethodName());
142     rootDir = new Path(dir + method);
143     TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString());
144     method = name.getMethodName();
145 
146     htd = new HTableDescriptor(TableName.valueOf(method));
147     for (byte[] family : families) {
148       htd.addFamily(new HColumnDescriptor(family));
149     }
150 
151     time = System.currentTimeMillis();
152 
153     primaryHri = new HRegionInfo(htd.getTableName(),
154       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
155       false, time, 0);
156     secondaryHri = new HRegionInfo(htd.getTableName(),
157       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
158       false, time, 1);
159 
160     wals = TestHRegion.createWALFactory(CONF, rootDir);
161     walPrimary = wals.getWAL(primaryHri.getEncodedNameAsBytes());
162     walSecondary = wals.getWAL(secondaryHri.getEncodedNameAsBytes());
163 
164     rss = mock(RegionServerServices.class);
165     when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
166     when(rss.getConfiguration()).thenReturn(CONF);
167     when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
168 
169     primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
170     primaryRegion.close();
171 
172     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
173     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
174 
175     reader = null;
176   }
177 
178   @After
179   public void tearDown() throws Exception {
180     if (reader != null) {
181       reader.close();
182     }
183 
184     if (primaryRegion != null) {
185       HRegion.closeHRegion(primaryRegion);
186     }
187     if (secondaryRegion != null) {
188       HRegion.closeHRegion(secondaryRegion);
189     }
190 
191     EnvironmentEdgeManagerTestHelper.reset();
192     LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
193     TEST_UTIL.cleanupTestDir();
194   }
195 
196   String getName() {
197     return name.getMethodName();
198   }
199 
200   // Some of the test cases are as follows:
201   // 1. replay flush start marker again
202   // 2. replay flush with smaller seqId than what is there in memstore snapshot
203   // 3. replay flush with larger seqId than what is there in memstore snapshot
204   // 4. replay flush commit without flush prepare. non droppable memstore
205   // 5. replay flush commit without flush prepare. droppable memstore
206   // 6. replay open region event
207   // 7. replay open region event after flush start
208   // 8. replay flush form an earlier seqId (test ignoring seqIds)
209   // 9. start flush does not prevent region from closing.
210 
211   @Test
212   public void testRegionReplicaSecondaryCannotFlush() throws IOException {
213     // load some data and flush ensure that the secondary replica will not execute the flush
214 
215     // load some data to secondary by replaying
216     putDataByReplay(secondaryRegion, 0, 1000, cq, families);
217 
218     verifyData(secondaryRegion, 0, 1000, cq, families);
219 
220     // flush region
221     FlushResultImpl flush = (FlushResultImpl)secondaryRegion.flush(true);
222     assertEquals(flush.result, FlushResultImpl.Result.CANNOT_FLUSH);
223 
224     verifyData(secondaryRegion, 0, 1000, cq, families);
225 
226     // close the region, and inspect that it has not flushed
227     Map<byte[], List<StoreFile>> files = secondaryRegion.close(false);
228     // assert that there are no files (due to flush)
229     for (List<StoreFile> f : files.values()) {
230       assertTrue(f.isEmpty());
231     }
232   }
233 
234   /**
235    * Tests a case where we replay only a flush start marker, then the region is closed. This region
236    * should not block indefinitely
237    */
238   @Test (timeout = 60000)
239   public void testOnlyReplayingFlushStartDoesNotHoldUpRegionClose() throws IOException {
240     // load some data to primary and flush
241     int start = 0;
242     LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
243     putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
244     LOG.info("-- Flushing primary, creating 3 files for 3 stores");
245     primaryRegion.flush(true);
246 
247     // now replay the edits and the flush marker
248     reader = createWALReaderForPrimary();
249 
250     LOG.info("-- Replaying edits and flush events in secondary");
251     while (true) {
252       WAL.Entry entry = reader.next();
253       if (entry == null) {
254         break;
255       }
256       FlushDescriptor flushDesc
257         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
258       if (flushDesc != null) {
259         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
260           LOG.info("-- Replaying flush start in secondary");
261           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
262         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
263           LOG.info("-- NOT Replaying flush commit in secondary");
264         }
265       } else {
266         replayEdit(secondaryRegion, entry);
267       }
268     }
269 
270     assertTrue(rss.getRegionServerAccounting().getGlobalMemstoreSize() > 0);
271     // now close the region which should not cause hold because of un-committed flush
272     secondaryRegion.close();
273 
274     // verify that the memstore size is back to what it was
275     assertEquals(0, rss.getRegionServerAccounting().getGlobalMemstoreSize());
276   }
277 
278   static int replayEdit(HRegion region, WAL.Entry entry) throws IOException {
279     if (WALEdit.isMetaEditFamily(entry.getEdit().getCells().get(0))) {
280       return 0; // handled elsewhere
281     }
282     Put put = new Put(entry.getEdit().getCells().get(0).getRow());
283     for (Cell cell : entry.getEdit().getCells()) put.add(cell);
284     put.setDurability(Durability.SKIP_WAL);
285     MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
286     region.batchReplay(new MutationReplay[] {mutation},
287       entry.getKey().getLogSeqNum());
288     return Integer.parseInt(Bytes.toString(put.getRow()));
289   }
290 
291   WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
292     return wals.createReader(TEST_UTIL.getTestFileSystem(),
293       DefaultWALProvider.getCurrentFileName(walPrimary),
294       TEST_UTIL.getConfiguration());
295   }
296 
297   @Test
298   public void testReplayFlushesAndCompactions() throws IOException {
299     // initiate a secondary region with some data.
300 
301     // load some data to primary and flush. 3 flushes and some more unflushed data
302     putDataWithFlushes(primaryRegion, 100, 300, 100);
303 
304     // compaction from primary
305     LOG.info("-- Compacting primary, only 1 store");
306     primaryRegion.compactStore(Bytes.toBytes("cf1"),
307       NoLimitCompactionThroughputController.INSTANCE);
308 
309     // now replay the edits and the flush marker
310     reader = createWALReaderForPrimary();
311 
312     LOG.info("-- Replaying edits and flush events in secondary");
313     int lastReplayed = 0;
314     int expectedStoreFileCount = 0;
315     while (true) {
316       WAL.Entry entry = reader.next();
317       if (entry == null) {
318         break;
319       }
320       FlushDescriptor flushDesc
321       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
322       CompactionDescriptor compactionDesc
323       = WALEdit.getCompaction(entry.getEdit().getCells().get(0));
324       if (flushDesc != null) {
325         // first verify that everything is replayed and visible before flush event replay
326         verifyData(secondaryRegion, 0, lastReplayed, cq, families);
327         Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
328         long storeMemstoreSize = store.getMemStoreSize();
329         long regionMemstoreSize = secondaryRegion.getMemstoreSize();
330         long storeFlushableSize = store.getFlushableSize();
331         long storeSize = store.getSize();
332         long storeSizeUncompressed = store.getStoreSizeUncompressed();
333         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
334           LOG.info("-- Replaying flush start in secondary");
335           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(flushDesc);
336           assertNull(result.result);
337           assertEquals(result.flushOpSeqId, flushDesc.getFlushSequenceNumber());
338 
339           // assert that the store memstore is smaller now
340           long newStoreMemstoreSize = store.getMemStoreSize();
341           LOG.info("Memstore size reduced by:"
342               + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
343           assertTrue(storeMemstoreSize > newStoreMemstoreSize);
344 
345         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
346           LOG.info("-- Replaying flush commit in secondary");
347           secondaryRegion.replayWALFlushCommitMarker(flushDesc);
348 
349           // assert that the flush files are picked
350           expectedStoreFileCount++;
351           for (Store s : secondaryRegion.getStores()) {
352             assertEquals(expectedStoreFileCount, s.getStorefilesCount());
353           }
354           long newFlushableSize = store.getFlushableSize();
355           assertTrue(storeFlushableSize > newFlushableSize);
356 
357           // assert that the region memstore is smaller now
358           long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
359           assertTrue(regionMemstoreSize > newRegionMemstoreSize);
360 
361           // assert that the store sizes are bigger
362           assertTrue(store.getSize() > storeSize);
363           assertTrue(store.getStoreSizeUncompressed() > storeSizeUncompressed);
364           assertEquals(store.getSize(), store.getStorefilesSize());
365         }
366         // after replay verify that everything is still visible
367         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
368       } else if (compactionDesc != null) {
369         secondaryRegion.replayWALCompactionMarker(compactionDesc, true, false, Long.MAX_VALUE);
370 
371         // assert that the compaction is applied
372         for (Store store : secondaryRegion.getStores()) {
373           if (store.getColumnFamilyName().equals("cf1")) {
374             assertEquals(1, store.getStorefilesCount());
375           } else {
376             assertEquals(expectedStoreFileCount, store.getStorefilesCount());
377           }
378         }
379       } else {
380         lastReplayed = replayEdit(secondaryRegion, entry);;
381       }
382     }
383 
384     assertEquals(400-1, lastReplayed);
385     LOG.info("-- Verifying edits from secondary");
386     verifyData(secondaryRegion, 0, 400, cq, families);
387 
388     LOG.info("-- Verifying edits from primary. Ensuring that files are not deleted");
389     verifyData(primaryRegion, 0, lastReplayed, cq, families);
390     for (Store store : primaryRegion.getStores()) {
391       if (store.getColumnFamilyName().equals("cf1")) {
392         assertEquals(1, store.getStorefilesCount());
393       } else {
394         assertEquals(expectedStoreFileCount, store.getStorefilesCount());
395       }
396     }
397   }
398 
399   /**
400    * Tests cases where we prepare a flush with some seqId and we receive other flush start markers
401    * equal to, greater or less than the previous flush start marker.
402    */
403   @Test
404   public void testReplayFlushStartMarkers() throws IOException {
405     // load some data to primary and flush. 1 flush and some more unflushed data
406     putDataWithFlushes(primaryRegion, 100, 100, 100);
407     int numRows = 200;
408 
409     // now replay the edits and the flush marker
410     reader =  createWALReaderForPrimary();
411 
412     LOG.info("-- Replaying edits and flush events in secondary");
413 
414     FlushDescriptor startFlushDesc = null;
415 
416     int lastReplayed = 0;
417     while (true) {
418       WAL.Entry entry = reader.next();
419       if (entry == null) {
420         break;
421       }
422       FlushDescriptor flushDesc
423       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
424       if (flushDesc != null) {
425         // first verify that everything is replayed and visible before flush event replay
426         Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
427         long storeMemstoreSize = store.getMemStoreSize();
428         long regionMemstoreSize = secondaryRegion.getMemstoreSize();
429         long storeFlushableSize = store.getFlushableSize();
430 
431         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
432           startFlushDesc = flushDesc;
433           LOG.info("-- Replaying flush start in secondary");
434           PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
435           assertNull(result.result);
436           assertEquals(result.flushOpSeqId, startFlushDesc.getFlushSequenceNumber());
437           assertTrue(regionMemstoreSize > 0);
438           assertTrue(storeFlushableSize > 0);
439 
440           // assert that the store memstore is smaller now
441           long newStoreMemstoreSize = store.getMemStoreSize();
442           LOG.info("Memstore size reduced by:"
443               + StringUtils.humanReadableInt(newStoreMemstoreSize - storeMemstoreSize));
444           assertTrue(storeMemstoreSize > newStoreMemstoreSize);
445           verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
446 
447         }
448         // after replay verify that everything is still visible
449         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
450       } else {
451         lastReplayed = replayEdit(secondaryRegion, entry);
452       }
453     }
454 
455     // at this point, there should be some data (rows 0-100) in memstore snapshot
456     // and some more data in memstores (rows 100-200)
457 
458     verifyData(secondaryRegion, 0, numRows, cq, families);
459 
460     // Test case 1: replay the same flush start marker again
461     LOG.info("-- Replaying same flush start in secondary again");
462     PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
463     assertNull(result); // this should return null. Ignoring the flush start marker
464     // assert that we still have prepared flush with the previous setup.
465     assertNotNull(secondaryRegion.getPrepareFlushResult());
466     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
467       startFlushDesc.getFlushSequenceNumber());
468     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
469     verifyData(secondaryRegion, 0, numRows, cq, families);
470 
471     // Test case 2: replay a flush start marker with a smaller seqId
472     FlushDescriptor startFlushDescSmallerSeqId
473       = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() - 50);
474     LOG.info("-- Replaying same flush start in secondary again " + startFlushDescSmallerSeqId);
475     result = secondaryRegion.replayWALFlushStartMarker(startFlushDescSmallerSeqId);
476     assertNull(result); // this should return null. Ignoring the flush start marker
477     // assert that we still have prepared flush with the previous setup.
478     assertNotNull(secondaryRegion.getPrepareFlushResult());
479     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
480       startFlushDesc.getFlushSequenceNumber());
481     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
482     verifyData(secondaryRegion, 0, numRows, cq, families);
483 
484     // Test case 3: replay a flush start marker with a larger seqId
485     FlushDescriptor startFlushDescLargerSeqId
486       = clone(startFlushDesc, startFlushDesc.getFlushSequenceNumber() + 50);
487     LOG.info("-- Replaying same flush start in secondary again " + startFlushDescLargerSeqId);
488     result = secondaryRegion.replayWALFlushStartMarker(startFlushDescLargerSeqId);
489     assertNull(result); // this should return null. Ignoring the flush start marker
490     // assert that we still have prepared flush with the previous setup.
491     assertNotNull(secondaryRegion.getPrepareFlushResult());
492     assertEquals(secondaryRegion.getPrepareFlushResult().flushOpSeqId,
493       startFlushDesc.getFlushSequenceNumber());
494     assertTrue(secondaryRegion.getMemstoreSize() > 0); // memstore is not empty
495     verifyData(secondaryRegion, 0, numRows, cq, families);
496 
497     LOG.info("-- Verifying edits from secondary");
498     verifyData(secondaryRegion, 0, numRows, cq, families);
499 
500     LOG.info("-- Verifying edits from primary.");
501     verifyData(primaryRegion, 0, numRows, cq, families);
502   }
503 
504   /**
505    * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
506    * less than the previous flush start marker.
507    */
508   @Test
509   public void testReplayFlushCommitMarkerSmallerThanFlushStartMarker() throws IOException {
510     // load some data to primary and flush. 2 flushes and some more unflushed data
511     putDataWithFlushes(primaryRegion, 100, 200, 100);
512     int numRows = 300;
513 
514     // now replay the edits and the flush marker
515     reader =  createWALReaderForPrimary();
516 
517     LOG.info("-- Replaying edits and flush events in secondary");
518     FlushDescriptor startFlushDesc = null;
519     FlushDescriptor commitFlushDesc = null;
520 
521     int lastReplayed = 0;
522     while (true) {
523       System.out.println(lastReplayed);
524       WAL.Entry entry = reader.next();
525       if (entry == null) {
526         break;
527       }
528       FlushDescriptor flushDesc
529       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
530       if (flushDesc != null) {
531         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
532           // don't replay the first flush start marker, hold on to it, replay the second one
533           if (startFlushDesc == null) {
534             startFlushDesc = flushDesc;
535           } else {
536             LOG.info("-- Replaying flush start in secondary");
537             startFlushDesc = flushDesc;
538             PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
539             assertNull(result.result);
540           }
541         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
542           // do not replay any flush commit yet
543           if (commitFlushDesc == null) {
544             commitFlushDesc = flushDesc; // hold on to the first flush commit marker
545           }
546         }
547         // after replay verify that everything is still visible
548         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
549       } else {
550         lastReplayed = replayEdit(secondaryRegion, entry);
551       }
552     }
553 
554     // at this point, there should be some data (rows 0-200) in memstore snapshot
555     // and some more data in memstores (rows 200-300)
556     verifyData(secondaryRegion, 0, numRows, cq, families);
557 
558     // no store files in the region
559     int expectedStoreFileCount = 0;
560     for (Store s : secondaryRegion.getStores()) {
561       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
562     }
563     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
564 
565     // Test case 1: replay the a flush commit marker smaller than what we have prepared
566     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
567         + startFlushDesc);
568     assertTrue(commitFlushDesc.getFlushSequenceNumber() < startFlushDesc.getFlushSequenceNumber());
569 
570     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
571     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
572 
573     // assert that the flush files are picked
574     expectedStoreFileCount++;
575     for (Store s : secondaryRegion.getStores()) {
576       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
577     }
578     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
579     long newFlushableSize = store.getFlushableSize();
580     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
581 
582     // assert that the region memstore is same as before
583     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
584     assertEquals(regionMemstoreSize, newRegionMemstoreSize);
585 
586     assertNotNull(secondaryRegion.getPrepareFlushResult()); // not dropped
587 
588     LOG.info("-- Verifying edits from secondary");
589     verifyData(secondaryRegion, 0, numRows, cq, families);
590 
591     LOG.info("-- Verifying edits from primary.");
592     verifyData(primaryRegion, 0, numRows, cq, families);
593   }
594 
595   /**
596    * Tests the case where we prepare a flush with some seqId and we receive a flush commit marker
597    * larger than the previous flush start marker.
598    */
599   @Test
600   public void testReplayFlushCommitMarkerLargerThanFlushStartMarker() throws IOException {
601     // load some data to primary and flush. 1 flush and some more unflushed data
602     putDataWithFlushes(primaryRegion, 100, 100, 100);
603     int numRows = 200;
604 
605     // now replay the edits and the flush marker
606     reader =  createWALReaderForPrimary();
607 
608     LOG.info("-- Replaying edits and flush events in secondary");
609     FlushDescriptor startFlushDesc = null;
610     FlushDescriptor commitFlushDesc = null;
611 
612     int lastReplayed = 0;
613     while (true) {
614       WAL.Entry entry = reader.next();
615       if (entry == null) {
616         break;
617       }
618       FlushDescriptor flushDesc
619       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
620       if (flushDesc != null) {
621         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
622           if (startFlushDesc == null) {
623             LOG.info("-- Replaying flush start in secondary");
624             startFlushDesc = flushDesc;
625             PrepareFlushResult result = secondaryRegion.replayWALFlushStartMarker(startFlushDesc);
626             assertNull(result.result);
627           }
628         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
629           // do not replay any flush commit yet
630           // hold on to the flush commit marker but simulate a larger
631           // flush commit seqId
632           commitFlushDesc =
633               FlushDescriptor.newBuilder(flushDesc)
634               .setFlushSequenceNumber(flushDesc.getFlushSequenceNumber() + 50)
635               .build();
636         }
637         // after replay verify that everything is still visible
638         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
639       } else {
640         lastReplayed = replayEdit(secondaryRegion, entry);
641       }
642     }
643 
644     // at this point, there should be some data (rows 0-100) in memstore snapshot
645     // and some more data in memstores (rows 100-200)
646     verifyData(secondaryRegion, 0, numRows, cq, families);
647 
648     // no store files in the region
649     int expectedStoreFileCount = 0;
650     for (Store s : secondaryRegion.getStores()) {
651       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
652     }
653     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
654 
655     // Test case 1: replay the a flush commit marker larger than what we have prepared
656     LOG.info("Testing replaying flush COMMIT " + commitFlushDesc + " on top of flush START"
657         + startFlushDesc);
658     assertTrue(commitFlushDesc.getFlushSequenceNumber() > startFlushDesc.getFlushSequenceNumber());
659 
660     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
661     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
662 
663     // assert that the flush files are picked
664     expectedStoreFileCount++;
665     for (Store s : secondaryRegion.getStores()) {
666       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
667     }
668     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
669     long newFlushableSize = store.getFlushableSize();
670     assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
671 
672     // assert that the region memstore is smaller than before, but not empty
673     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
674     assertTrue(newRegionMemstoreSize > 0);
675     assertTrue(regionMemstoreSize > newRegionMemstoreSize);
676 
677     assertNull(secondaryRegion.getPrepareFlushResult()); // prepare snapshot should be dropped
678 
679     LOG.info("-- Verifying edits from secondary");
680     verifyData(secondaryRegion, 0, numRows, cq, families);
681 
682     LOG.info("-- Verifying edits from primary.");
683     verifyData(primaryRegion, 0, numRows, cq, families);
684   }
685 
686   /**
687    * Tests the case where we receive a flush commit before receiving any flush prepare markers.
688    * The memstore edits should be dropped after the flush commit replay since they should be in
689    * flushed files
690    */
691   @Test
692   public void testReplayFlushCommitMarkerWithoutFlushStartMarkerDroppableMemstore()
693       throws IOException {
694     testReplayFlushCommitMarkerWithoutFlushStartMarker(true);
695   }
696 
697   /**
698    * Tests the case where we receive a flush commit before receiving any flush prepare markers.
699    * The memstore edits should be not dropped after the flush commit replay since not every edit
700    * will be in flushed files (based on seqId)
701    */
702   @Test
703   public void testReplayFlushCommitMarkerWithoutFlushStartMarkerNonDroppableMemstore()
704       throws IOException {
705     testReplayFlushCommitMarkerWithoutFlushStartMarker(false);
706   }
707 
708   /**
709    * Tests the case where we receive a flush commit before receiving any flush prepare markers
710    */
711   public void testReplayFlushCommitMarkerWithoutFlushStartMarker(boolean droppableMemstore)
712       throws IOException {
713     // load some data to primary and flush. 1 flushes and some more unflushed data.
714     // write more data after flush depending on whether droppableSnapshot
715     putDataWithFlushes(primaryRegion, 100, 100, droppableMemstore ? 0 : 100);
716     int numRows = droppableMemstore ? 100 : 200;
717 
718     // now replay the edits and the flush marker
719     reader =  createWALReaderForPrimary();
720 
721     LOG.info("-- Replaying edits and flush events in secondary");
722     FlushDescriptor commitFlushDesc = null;
723 
724     int lastReplayed = 0;
725     while (true) {
726       WAL.Entry entry = reader.next();
727       if (entry == null) {
728         break;
729       }
730       FlushDescriptor flushDesc
731       = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
732       if (flushDesc != null) {
733         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
734           // do not replay flush start marker
735         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
736           commitFlushDesc = flushDesc; // hold on to the flush commit marker
737         }
738         // after replay verify that everything is still visible
739         verifyData(secondaryRegion, 0, lastReplayed+1, cq, families);
740       } else {
741         lastReplayed = replayEdit(secondaryRegion, entry);
742       }
743     }
744 
745     // at this point, there should be some data (rows 0-200) in the memstore without snapshot
746     // and some more data in memstores (rows 100-300)
747     verifyData(secondaryRegion, 0, numRows, cq, families);
748 
749     // no store files in the region
750     int expectedStoreFileCount = 0;
751     for (Store s : secondaryRegion.getStores()) {
752       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
753     }
754     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
755 
756     // Test case 1: replay a flush commit marker without start flush marker
757     assertNull(secondaryRegion.getPrepareFlushResult());
758     assertTrue(commitFlushDesc.getFlushSequenceNumber() > 0);
759 
760     // ensure all files are visible in secondary
761     for (Store store : secondaryRegion.getStores()) {
762       assertTrue(store.getMaxSequenceId() <= secondaryRegion.getSequenceId().get());
763     }
764 
765     LOG.info("-- Replaying flush commit in secondary" + commitFlushDesc);
766     secondaryRegion.replayWALFlushCommitMarker(commitFlushDesc);
767 
768     // assert that the flush files are picked
769     expectedStoreFileCount++;
770     for (Store s : secondaryRegion.getStores()) {
771       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
772     }
773     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
774     long newFlushableSize = store.getFlushableSize();
775     if (droppableMemstore) {
776       assertTrue(newFlushableSize == 0); // assert that the memstore is dropped
777     } else {
778       assertTrue(newFlushableSize > 0); // assert that the memstore is not dropped
779     }
780 
781     // assert that the region memstore is same as before (we could not drop)
782     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
783     if (droppableMemstore) {
784       assertTrue(0 == newRegionMemstoreSize);
785     } else {
786       assertTrue(regionMemstoreSize == newRegionMemstoreSize);
787     }
788 
789     LOG.info("-- Verifying edits from secondary");
790     verifyData(secondaryRegion, 0, numRows, cq, families);
791 
792     LOG.info("-- Verifying edits from primary.");
793     verifyData(primaryRegion, 0, numRows, cq, families);
794   }
795 
796   private FlushDescriptor clone(FlushDescriptor flush, long flushSeqId) {
797     return FlushDescriptor.newBuilder(flush)
798         .setFlushSequenceNumber(flushSeqId)
799         .build();
800   }
801 
802   /**
803    * Tests replaying region open markers from primary region. Checks whether the files are picked up
804    */
805   @Test
806   public void testReplayRegionOpenEvent() throws IOException {
807     putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
808     int numRows = 100;
809 
810     // close the region and open again.
811     primaryRegion.close();
812     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
813 
814     // now replay the edits and the flush marker
815     reader =  createWALReaderForPrimary();
816     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
817 
818     LOG.info("-- Replaying edits and region events in secondary");
819     while (true) {
820       WAL.Entry entry = reader.next();
821       if (entry == null) {
822         break;
823       }
824       FlushDescriptor flushDesc
825         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
826       RegionEventDescriptor regionEventDesc
827         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
828 
829       if (flushDesc != null) {
830         // don't replay flush events
831       } else if (regionEventDesc != null) {
832         regionEvents.add(regionEventDesc);
833       } else {
834         // don't replay edits
835       }
836     }
837 
838     // we should have 1 open, 1 close and 1 open event
839     assertEquals(3, regionEvents.size());
840 
841     // replay the first region open event.
842     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(0));
843 
844     // replay the close event as well
845     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(1));
846 
847     // no store files in the region
848     int expectedStoreFileCount = 0;
849     for (Store s : secondaryRegion.getStores()) {
850       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
851     }
852     long regionMemstoreSize = secondaryRegion.getMemstoreSize();
853     assertTrue(regionMemstoreSize == 0);
854 
855     // now replay the region open event that should contain new file locations
856     LOG.info("Testing replaying region open event " + regionEvents.get(2));
857     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
858 
859     // assert that the flush files are picked
860     expectedStoreFileCount++;
861     for (Store s : secondaryRegion.getStores()) {
862       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
863     }
864     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
865     long newFlushableSize = store.getFlushableSize();
866     assertTrue(newFlushableSize == 0);
867 
868     // assert that the region memstore is empty
869     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
870     assertTrue(newRegionMemstoreSize == 0);
871 
872     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
873 
874     LOG.info("-- Verifying edits from secondary");
875     verifyData(secondaryRegion, 0, numRows, cq, families);
876 
877     LOG.info("-- Verifying edits from primary.");
878     verifyData(primaryRegion, 0, numRows, cq, families);
879   }
880 
881   /**
882    * Tests the case where we replay a region open event after a flush start but before receiving
883    * flush commit
884    */
885   @Test
886   public void testReplayRegionOpenEventAfterFlushStart() throws IOException {
887     putDataWithFlushes(primaryRegion, 100, 100, 100);
888     int numRows = 200;
889 
890     // close the region and open again.
891     primaryRegion.close();
892     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
893 
894     // now replay the edits and the flush marker
895     reader =  createWALReaderForPrimary();
896     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
897 
898     LOG.info("-- Replaying edits and region events in secondary");
899     while (true) {
900       WAL.Entry entry = reader.next();
901       if (entry == null) {
902         break;
903       }
904       FlushDescriptor flushDesc
905         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
906       RegionEventDescriptor regionEventDesc
907         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
908 
909       if (flushDesc != null) {
910         // only replay flush start
911         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
912           secondaryRegion.replayWALFlushStartMarker(flushDesc);
913         }
914       } else if (regionEventDesc != null) {
915         regionEvents.add(regionEventDesc);
916       } else {
917         replayEdit(secondaryRegion, entry);
918       }
919     }
920 
921     // at this point, there should be some data (rows 0-100) in the memstore snapshot
922     // and some more data in memstores (rows 100-200)
923     verifyData(secondaryRegion, 0, numRows, cq, families);
924 
925     // we should have 1 open, 1 close and 1 open event
926     assertEquals(3, regionEvents.size());
927 
928     // no store files in the region
929     int expectedStoreFileCount = 0;
930     for (Store s : secondaryRegion.getStores()) {
931       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
932     }
933 
934     // now replay the region open event that should contain new file locations
935     LOG.info("Testing replaying region open event " + regionEvents.get(2));
936     secondaryRegion.replayWALRegionEventMarker(regionEvents.get(2));
937 
938     // assert that the flush files are picked
939     expectedStoreFileCount = 2; // two flushes happened
940     for (Store s : secondaryRegion.getStores()) {
941       assertEquals(expectedStoreFileCount, s.getStorefilesCount());
942     }
943     Store store = secondaryRegion.getStore(Bytes.toBytes("cf1"));
944     long newSnapshotSize = store.getSnapshotSize();
945     assertTrue(newSnapshotSize == 0);
946 
947     // assert that the region memstore is empty
948     long newRegionMemstoreSize = secondaryRegion.getMemstoreSize();
949     assertTrue(newRegionMemstoreSize == 0);
950 
951     assertNull(secondaryRegion.getPrepareFlushResult()); //prepare snapshot should be dropped if any
952 
953     LOG.info("-- Verifying edits from secondary");
954     verifyData(secondaryRegion, 0, numRows, cq, families);
955 
956     LOG.info("-- Verifying edits from primary.");
957     verifyData(primaryRegion, 0, numRows, cq, families);
958   }
959 
960   /**
961    * Tests whether edits coming in for replay are skipped which have smaller seq id than the seqId
962    * of the last replayed region open event.
963    */
964   @Test
965   public void testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent() throws IOException {
966     putDataWithFlushes(primaryRegion, 100, 100, 0);
967     int numRows = 100;
968 
969     // close the region and open again.
970     primaryRegion.close();
971     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
972 
973     // now replay the edits and the flush marker
974     reader =  createWALReaderForPrimary();
975     List<RegionEventDescriptor> regionEvents = Lists.newArrayList();
976     List<WAL.Entry> edits = Lists.newArrayList();
977 
978     LOG.info("-- Replaying edits and region events in secondary");
979     while (true) {
980       WAL.Entry entry = reader.next();
981       if (entry == null) {
982         break;
983       }
984       FlushDescriptor flushDesc
985         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
986       RegionEventDescriptor regionEventDesc
987         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
988 
989       if (flushDesc != null) {
990         // don't replay flushes
991       } else if (regionEventDesc != null) {
992         regionEvents.add(regionEventDesc);
993       } else {
994         edits.add(entry);
995       }
996     }
997 
998     // replay the region open of first open, but with the seqid of the second open
999     // this way non of the flush files will be picked up.
1000     secondaryRegion.replayWALRegionEventMarker(
1001       RegionEventDescriptor.newBuilder(regionEvents.get(0)).setLogSequenceNumber(
1002         regionEvents.get(2).getLogSequenceNumber()).build());
1003 
1004 
1005     // replay edits from the before region close. If replay does not
1006     // skip these the following verification will NOT fail.
1007     for (WAL.Entry entry: edits) {
1008       replayEdit(secondaryRegion, entry);
1009     }
1010 
1011     boolean expectedFail = false;
1012     try {
1013       verifyData(secondaryRegion, 0, numRows, cq, families);
1014     } catch (AssertionError e) {
1015       expectedFail = true; // expected
1016     }
1017     if (!expectedFail) {
1018       fail("Should have failed this verification");
1019     }
1020   }
1021 
1022   @Test
1023   public void testReplayFlushSeqIds() throws IOException {
1024     // load some data to primary and flush
1025     int start = 0;
1026     LOG.info("-- Writing some data to primary from " +  start + " to " + (start+100));
1027     putData(primaryRegion, Durability.SYNC_WAL, start, 100, cq, families);
1028     LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1029     primaryRegion.flush(true);
1030 
1031     // now replay the flush marker
1032     reader =  createWALReaderForPrimary();
1033 
1034     long flushSeqId = -1;
1035     LOG.info("-- Replaying flush events in secondary");
1036     while (true) {
1037       WAL.Entry entry = reader.next();
1038       if (entry == null) {
1039         break;
1040       }
1041       FlushDescriptor flushDesc
1042         = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1043       if (flushDesc != null) {
1044         if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1045           LOG.info("-- Replaying flush start in secondary");
1046           secondaryRegion.replayWALFlushStartMarker(flushDesc);
1047           flushSeqId = flushDesc.getFlushSequenceNumber();
1048         } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1049           LOG.info("-- Replaying flush commit in secondary");
1050           secondaryRegion.replayWALFlushCommitMarker(flushDesc);
1051           assertEquals(flushSeqId, flushDesc.getFlushSequenceNumber());
1052         }
1053       }
1054       // else do not replay
1055     }
1056 
1057     // TODO: what to do with this?
1058     // assert that the newly picked up flush file is visible
1059     long readPoint = secondaryRegion.getMVCC().memstoreReadPoint();
1060     assertEquals(flushSeqId, readPoint);
1061 
1062     // after replay verify that everything is still visible
1063     verifyData(secondaryRegion, 0, 100, cq, families);
1064   }
1065 
1066   @Test
1067   public void testSeqIdsFromReplay() throws IOException {
1068     // test the case where seqId's coming from replayed WALEdits are made persisted with their
1069     // original seqIds and they are made visible through mvcc read point upon replay
1070     String method = name.getMethodName();
1071     byte[] tableName = Bytes.toBytes(method);
1072     byte[] family = Bytes.toBytes("family");
1073 
1074     HRegion region = initHRegion(tableName, method, family);
1075     try {
1076       // replay an entry that is bigger than current read point
1077       long readPoint = region.getMVCC().memstoreReadPoint();
1078       long origSeqId = readPoint + 100;
1079 
1080       Put put = new Put(row).add(family, row, row);
1081       put.setDurability(Durability.SKIP_WAL); // we replay with skip wal
1082       replay(region, put, origSeqId);
1083 
1084       // read point should have advanced to this seqId
1085       assertGet(region, family, row);
1086 
1087       // region seqId should have advanced at least to this seqId
1088       assertEquals(origSeqId, region.getSequenceId().get());
1089 
1090       // replay an entry that is smaller than current read point
1091       // caution: adding an entry below current read point might cause partial dirty reads. Normal
1092       // replay does not allow reads while replay is going on.
1093       put = new Put(row2).add(family, row2, row2);
1094       put.setDurability(Durability.SKIP_WAL);
1095       replay(region, put, origSeqId - 50);
1096 
1097       assertGet(region, family, row2);
1098     } finally {
1099       region.close();
1100     }
1101   }
1102 
1103   /**
1104    * Tests that a region opened in secondary mode would not write region open / close
1105    * events to its WAL.
1106    * @throws IOException
1107    */
1108   @SuppressWarnings("unchecked")
1109   @Test
1110   public void testSecondaryRegionDoesNotWriteRegionEventsToWAL() throws IOException {
1111     secondaryRegion.close();
1112     walSecondary = spy(walSecondary);
1113 
1114     // test for region open and close
1115     secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
1116     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1117       (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1118 
1119     // test for replay prepare flush
1120     putDataByReplay(secondaryRegion, 0, 10, cq, families);
1121     secondaryRegion.replayWALFlushStartMarker(FlushDescriptor.newBuilder().
1122       setFlushSequenceNumber(10)
1123       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1124       .setAction(FlushAction.START_FLUSH)
1125       .setEncodedRegionName(
1126         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1127       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1128       .build());
1129 
1130     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1131       (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1132 
1133     secondaryRegion.close();
1134     verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
1135       (WALKey)any(), (WALEdit)any(), (AtomicLong)any(), anyBoolean(), (List<Cell>) any());
1136   }
1137 
1138   /**
1139    * Tests the reads enabled flag for the region. When unset all reads should be rejected
1140    */
1141   @Test
1142   public void testRegionReadsEnabledFlag() throws IOException {
1143 
1144     putDataByReplay(secondaryRegion, 0, 100, cq, families);
1145 
1146     verifyData(secondaryRegion, 0, 100, cq, families);
1147 
1148     // now disable reads
1149     secondaryRegion.setReadsEnabled(false);
1150     try {
1151       verifyData(secondaryRegion, 0, 100, cq, families);
1152       fail("Should have failed with IOException");
1153     } catch(IOException ex) {
1154       // expected
1155     }
1156 
1157     // verify that we can still replay data
1158     putDataByReplay(secondaryRegion, 100, 100, cq, families);
1159 
1160     // now enable reads again
1161     secondaryRegion.setReadsEnabled(true);
1162     verifyData(secondaryRegion, 0, 200, cq, families);
1163   }
1164 
1165   /**
1166    * Tests the case where a request for flush cache is sent to the region, but region cannot flush.
1167    * It should write the flush request marker instead.
1168    */
1169   @Test
1170   public void testWriteFlushRequestMarker() throws IOException {
1171     // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
1172     FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
1173     assertNotNull(result);
1174     assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1175     assertFalse(result.wroteFlushWalMarker);
1176 
1177     // request flush again, but this time with writeFlushRequestWalMarker = true
1178     result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
1179     assertNotNull(result);
1180     assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
1181     assertTrue(result.wroteFlushWalMarker);
1182 
1183     List<FlushDescriptor> flushes = Lists.newArrayList();
1184     reader = createWALReaderForPrimary();
1185     while (true) {
1186       WAL.Entry entry = reader.next();
1187       if (entry == null) {
1188         break;
1189       }
1190       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1191       if (flush != null) {
1192         flushes.add(flush);
1193       }
1194     }
1195 
1196     assertEquals(1, flushes.size());
1197     assertNotNull(flushes.get(0));
1198     assertEquals(FlushDescriptor.FlushAction.CANNOT_FLUSH, flushes.get(0).getAction());
1199   }
1200 
1201   /**
1202    * Test the case where the secondary region replica is not in reads enabled state because it is
1203    * waiting for a flush or region open marker from primary region. Replaying CANNOT_FLUSH
1204    * flush marker entry should restore the reads enabled status in the region and allow the reads
1205    * to continue.
1206    */
1207   @Test
1208   public void testReplayingFlushRequestRestoresReadsEnabledState() throws IOException {
1209     disableReads(secondaryRegion);
1210 
1211     // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
1212     // triggered flush restores readsEnabled
1213     primaryRegion.flushcache(true, true);
1214     reader = createWALReaderForPrimary();
1215     while (true) {
1216       WAL.Entry entry = reader.next();
1217       if (entry == null) {
1218         break;
1219       }
1220       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1221       if (flush != null) {
1222         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1223       }
1224     }
1225 
1226     // now reads should be enabled
1227     secondaryRegion.get(new Get(Bytes.toBytes(0)));
1228   }
1229 
1230   /**
1231    * Test the case where the secondary region replica is not in reads enabled state because it is
1232    * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1233    * entries should restore the reads enabled status in the region and allow the reads
1234    * to continue.
1235    */
1236   @Test
1237   public void testReplayingFlushRestoresReadsEnabledState() throws IOException {
1238     // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1239     // from triggered flush restores readsEnabled
1240     disableReads(secondaryRegion);
1241 
1242     // put some data in primary
1243     putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1244     primaryRegion.flush(true);
1245 
1246     reader = createWALReaderForPrimary();
1247     while (true) {
1248       WAL.Entry entry = reader.next();
1249       if (entry == null) {
1250         break;
1251       }
1252       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1253       if (flush != null) {
1254         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1255       } else {
1256         replayEdit(secondaryRegion, entry);
1257       }
1258     }
1259 
1260     // now reads should be enabled
1261     verifyData(secondaryRegion, 0, 100, cq, families);
1262   }
1263 
1264   /**
1265    * Test the case where the secondary region replica is not in reads enabled state because it is
1266    * waiting for a flush or region open marker from primary region. Replaying flush start and commit
1267    * entries should restore the reads enabled status in the region and allow the reads
1268    * to continue.
1269    */
1270   @Test
1271   public void testReplayingFlushWithEmptyMemstoreRestoresReadsEnabledState() throws IOException {
1272     // Test case 2: Test that replaying FLUSH_START and FLUSH_COMMIT markers assuming these came
1273     // from triggered flush restores readsEnabled
1274     disableReads(secondaryRegion);
1275 
1276     // put some data in primary
1277     putData(primaryRegion, Durability.SYNC_WAL, 0, 100, cq, families);
1278     primaryRegion.flush(true);
1279 
1280     reader = createWALReaderForPrimary();
1281     while (true) {
1282       WAL.Entry entry = reader.next();
1283       if (entry == null) {
1284         break;
1285       }
1286       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1287       if (flush != null) {
1288         secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
1289       }
1290     }
1291 
1292     // now reads should be enabled
1293     verifyData(secondaryRegion, 0, 100, cq, families);
1294   }
1295 
1296   /**
1297    * Test the case where the secondary region replica is not in reads enabled state because it is
1298    * waiting for a flush or region open marker from primary region. Replaying region open event
1299    * entry from primary should restore the reads enabled status in the region and allow the reads
1300    * to continue.
1301    */
1302   @Test
1303   public void testReplayingRegionOpenEventRestoresReadsEnabledState() throws IOException {
1304     // Test case 3: Test that replaying region open event markers restores readsEnabled
1305     disableReads(secondaryRegion);
1306 
1307     primaryRegion.close();
1308     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1309 
1310     reader = createWALReaderForPrimary();
1311     while (true) {
1312       WAL.Entry entry = reader.next();
1313       if (entry == null) {
1314         break;
1315       }
1316 
1317       RegionEventDescriptor regionEventDesc
1318         = WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
1319 
1320       if (regionEventDesc != null) {
1321         secondaryRegion.replayWALRegionEventMarker(regionEventDesc);
1322       }
1323     }
1324 
1325     // now reads should be enabled
1326     secondaryRegion.get(new Get(Bytes.toBytes(0)));
1327   }
1328 
1329   @Test
1330   public void testRefreshStoreFiles() throws IOException {
1331     assertEquals(0, primaryRegion.getStoreFileList(families).size());
1332     assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1333 
1334     // Test case 1: refresh with an empty region
1335     secondaryRegion.refreshStoreFiles();
1336     assertEquals(0, secondaryRegion.getStoreFileList(families).size());
1337 
1338     // do one flush
1339     putDataWithFlushes(primaryRegion, 100, 100, 0);
1340     int numRows = 100;
1341 
1342     // refresh the store file list, and ensure that the files are picked up.
1343     secondaryRegion.refreshStoreFiles();
1344     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1345       secondaryRegion.getStoreFileList(families));
1346     assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1347 
1348     LOG.info("-- Verifying edits from secondary");
1349     verifyData(secondaryRegion, 0, numRows, cq, families);
1350 
1351     // Test case 2: 3 some more flushes
1352     putDataWithFlushes(primaryRegion, 100, 300, 0);
1353     numRows = 300;
1354 
1355     // refresh the store file list, and ensure that the files are picked up.
1356     secondaryRegion.refreshStoreFiles();
1357     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1358       secondaryRegion.getStoreFileList(families));
1359     assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size());
1360 
1361     LOG.info("-- Verifying edits from secondary");
1362     verifyData(secondaryRegion, 0, numRows, cq, families);
1363 
1364     if (FSUtils.WINDOWS) {
1365       // compaction cannot move files while they are open in secondary on windows. Skip remaining.
1366       return;
1367     }
1368 
1369     // Test case 3: compact primary files
1370     primaryRegion.compactStores();
1371     secondaryRegion.refreshStoreFiles();
1372     assertPathListsEqual(primaryRegion.getStoreFileList(families),
1373       secondaryRegion.getStoreFileList(families));
1374     assertEquals(families.length, secondaryRegion.getStoreFileList(families).size());
1375 
1376     LOG.info("-- Verifying edits from secondary");
1377     verifyData(secondaryRegion, 0, numRows, cq, families);
1378 
1379     LOG.info("-- Replaying edits in secondary");
1380 
1381     // Test case 4: replay some edits, ensure that memstore is dropped.
1382     assertTrue(secondaryRegion.getMemstoreSize() == 0);
1383     putDataWithFlushes(primaryRegion, 400, 400, 0);
1384     numRows = 400;
1385 
1386     reader =  createWALReaderForPrimary();
1387     while (true) {
1388       WAL.Entry entry = reader.next();
1389       if (entry == null) {
1390         break;
1391       }
1392       FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
1393       if (flush != null) {
1394         // do not replay flush
1395       } else {
1396         replayEdit(secondaryRegion, entry);
1397       }
1398     }
1399 
1400     assertTrue(secondaryRegion.getMemstoreSize() > 0);
1401 
1402     secondaryRegion.refreshStoreFiles();
1403 
1404     assertTrue(secondaryRegion.getMemstoreSize() == 0);
1405 
1406     LOG.info("-- Verifying edits from primary");
1407     verifyData(primaryRegion, 0, numRows, cq, families);
1408     LOG.info("-- Verifying edits from secondary");
1409     verifyData(secondaryRegion, 0, numRows, cq, families);
1410   }
1411 
1412   /**
1413    * Paths can be qualified or not. This does the assertion using String->Path conversion.
1414    */
1415   private void assertPathListsEqual(List<String> list1, List<String> list2) {
1416     List<Path> l1 = new ArrayList<>(list1.size());
1417     for (String path : list1) {
1418       l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1419     }
1420     List<Path> l2 = new ArrayList<>(list2.size());
1421     for (String path : list2) {
1422       l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path)));
1423     }
1424     assertEquals(l1, l2);
1425   }
1426 
1427   private void disableReads(HRegion region) {
1428     region.setReadsEnabled(false);
1429     try {
1430       verifyData(region, 0, 1, cq, families);
1431       fail("Should have failed with IOException");
1432     } catch(IOException ex) {
1433       // expected
1434     }
1435   }
1436 
1437   private void replay(HRegion region, Put put, long replaySeqId) throws IOException {
1438     put.setDurability(Durability.SKIP_WAL);
1439     MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
1440     region.batchReplay(new MutationReplay[] {mutation}, replaySeqId);
1441   }
1442 
1443   /**
1444    * Tests replaying region open markers from primary region. Checks whether the files are picked up
1445    */
1446   @Test
1447   public void testReplayBulkLoadEvent() throws IOException {
1448     LOG.info("testReplayBulkLoadEvent starts");
1449     putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush
1450 
1451     // close the region and open again.
1452     primaryRegion.close();
1453     primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
1454 
1455     // bulk load a file into primary region
1456     Random random = new Random();
1457     byte[] randomValues = new byte[20];
1458     random.nextBytes(randomValues);
1459     Path testPath = TEST_UTIL.getDataTestDirOnTestFS();
1460 
1461     List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1462     int expectedLoadFileCount = 0;
1463     for (byte[] family : families) {
1464       familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(testPath, family,
1465         randomValues)));
1466       expectedLoadFileCount++;
1467     }
1468     primaryRegion.bulkLoadHFiles(familyPaths, false, null);
1469 
1470     // now replay the edits and the bulk load marker
1471     reader = createWALReaderForPrimary();
1472 
1473     LOG.info("-- Replaying edits and region events in secondary");
1474     BulkLoadDescriptor bulkloadEvent = null;
1475     while (true) {
1476       WAL.Entry entry = reader.next();
1477       if (entry == null) {
1478         break;
1479       }
1480       bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0));
1481       if (bulkloadEvent != null) {
1482         break;
1483       }
1484     }
1485 
1486     // we should have 1 bulk load event
1487     assertTrue(bulkloadEvent != null);
1488     assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount());
1489 
1490     // replay the bulk load event
1491     secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent);
1492 
1493 
1494     List<String> storeFileName = new ArrayList<String>();
1495     for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) {
1496       storeFileName.addAll(storeDesc.getStoreFileList());
1497     }
1498     // assert that the bulk loaded files are picked
1499     for (Store s : secondaryRegion.getStores()) {
1500       for (StoreFile sf : s.getStorefiles()) {
1501         storeFileName.remove(sf.getPath().getName());
1502       }
1503     }
1504     assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty());
1505 
1506     LOG.info("-- Verifying edits from secondary");
1507     for (byte[] family : families) {
1508       assertGet(secondaryRegion, family, randomValues);
1509     }
1510   }
1511 
1512   @Test
1513   public void testReplayingFlushCommitWithFileAlreadyDeleted() throws IOException {
1514     // tests replaying flush commit marker, but the flush file has already been compacted
1515     // from primary and also deleted from the archive directory
1516     secondaryRegion.replayWALFlushCommitMarker(FlushDescriptor.newBuilder().
1517       setFlushSequenceNumber(Long.MAX_VALUE)
1518       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1519       .setAction(FlushAction.COMMIT_FLUSH)
1520       .setEncodedRegionName(
1521         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1522       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1523       .addStoreFlushes(StoreFlushDescriptor.newBuilder()
1524         .setFamilyName(ByteString.copyFrom(families[0]))
1525         .setStoreHomeDir("/store_home_dir")
1526         .addFlushOutput("/foo/baz/bar")
1527         .build())
1528       .build());
1529   }
1530 
1531   @Test
1532   public void testReplayingCompactionWithFileAlreadyDeleted() throws IOException {
1533     // tests replaying compaction marker, but the compaction output file has already been compacted
1534     // from primary and also deleted from the archive directory
1535     secondaryRegion.replayWALCompactionMarker(CompactionDescriptor.newBuilder()
1536       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1537       .setEncodedRegionName(
1538         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1539       .setFamilyName(ByteString.copyFrom(families[0]))
1540       .addCompactionInput("/foo")
1541       .addCompactionOutput("/bar")
1542       .setStoreHomeDir("/store_home_dir")
1543       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1544       .build()
1545       , true, true, Long.MAX_VALUE);
1546   }
1547 
1548   @Test
1549   public void testReplayingRegionOpenEventWithFileAlreadyDeleted() throws IOException {
1550     // tests replaying region open event marker, but the region files have already been compacted
1551     // from primary and also deleted from the archive directory
1552     secondaryRegion.replayWALRegionEventMarker(RegionEventDescriptor.newBuilder()
1553       .setTableName(ByteString.copyFrom(primaryRegion.getTableDesc().getTableName().getName()))
1554       .setEncodedRegionName(
1555         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1556       .setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
1557       .setEventType(EventType.REGION_OPEN)
1558       .setServer(ProtobufUtil.toServerName(ServerName.valueOf("foo", 1, 1)))
1559       .setLogSequenceNumber(Long.MAX_VALUE)
1560       .addStores(StoreDescriptor.newBuilder()
1561         .setFamilyName(ByteString.copyFrom(families[0]))
1562         .setStoreHomeDir("/store_home_dir")
1563         .addStoreFile("/foo")
1564         .build())
1565       .build());
1566   }
1567 
1568   @Test
1569   public void testReplayingBulkLoadEventWithFileAlreadyDeleted() throws IOException {
1570     // tests replaying bulk load event marker, but the bulk load files have already been compacted
1571     // from primary and also deleted from the archive directory
1572     secondaryRegion.replayWALBulkLoadEventMarker(BulkLoadDescriptor.newBuilder()
1573       .setTableName(ProtobufUtil.toProtoTableName(primaryRegion.getTableDesc().getTableName()))
1574       .setEncodedRegionName(
1575         ByteString.copyFrom(primaryRegion.getRegionInfo().getEncodedNameAsBytes()))
1576       .setBulkloadSeqNum(Long.MAX_VALUE)
1577       .addStores(StoreDescriptor.newBuilder()
1578         .setFamilyName(ByteString.copyFrom(families[0]))
1579         .setStoreHomeDir("/store_home_dir")
1580         .addStoreFile("/foo")
1581         .build())
1582       .build());
1583   }
1584 
1585   private String createHFileForFamilies(Path testPath, byte[] family,
1586       byte[] valueBytes) throws IOException {
1587     HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration());
1588     // TODO We need a way to do this without creating files
1589     Path testFile = new Path(testPath, UUID.randomUUID().toString());
1590     FSDataOutputStream out = TEST_UTIL.getTestFileSystem().create(testFile);
1591     try {
1592       hFileFactory.withOutputStream(out);
1593       hFileFactory.withFileContext(new HFileContext());
1594       HFile.Writer writer = hFileFactory.create();
1595       try {
1596         writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l,
1597           KeyValue.Type.Put.getCode(), valueBytes)));
1598       } finally {
1599         writer.close();
1600       }
1601     } finally {
1602       out.close();
1603     }
1604     return testFile.toString();
1605   }
1606 
1607   /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does
1608    * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of
1609    * more rows but does not execute flush after
1610    * @throws IOException */
1611   private void putDataWithFlushes(HRegion region, int flushInterval,
1612       int numRows, int numRowsAfterFlush) throws IOException {
1613     int start = 0;
1614     for (; start < numRows; start += flushInterval) {
1615       LOG.info("-- Writing some data to primary from " +  start + " to " + (start+flushInterval));
1616       putData(region, Durability.SYNC_WAL, start, flushInterval, cq, families);
1617       LOG.info("-- Flushing primary, creating 3 files for 3 stores");
1618       region.flush(true);
1619     }
1620     LOG.info("-- Writing some more data to primary, not flushing");
1621     putData(region, Durability.SYNC_WAL, start, numRowsAfterFlush, cq, families);
1622   }
1623 
1624   private void putDataByReplay(HRegion region,
1625       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
1626     for (int i = startRow; i < startRow + numRows; i++) {
1627       Put put = new Put(Bytes.toBytes("" + i));
1628       put.setDurability(Durability.SKIP_WAL);
1629       for (byte[] family : families) {
1630         put.add(family, qf, EnvironmentEdgeManager.currentTime(), null);
1631       }
1632       replay(region, put, i+1);
1633     }
1634   }
1635 
1636   private static HRegion initHRegion(byte[] tableName,
1637       String callingMethod, byte[]... families) throws IOException {
1638     return initHRegion(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
1639       callingMethod, TEST_UTIL.getConfiguration(), false, Durability.SYNC_WAL, null, families);
1640   }
1641 
1642   private static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
1643       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
1644       WAL wal, byte[]... families) throws IOException {
1645     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
1646       isReadOnly, durability, wal, families);
1647   }
1648 }