View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR;
24  import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR;
25  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
26  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
27  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
28  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3;
29  import static org.junit.Assert.assertArrayEquals;
30  import static org.junit.Assert.assertEquals;
31  import static org.junit.Assert.assertFalse;
32  import static org.junit.Assert.assertNotNull;
33  import static org.junit.Assert.assertNull;
34  import static org.junit.Assert.assertTrue;
35  import static org.junit.Assert.fail;
36  import static org.mockito.Matchers.isA;
37  import static org.mockito.Matchers.any;
38  import static org.mockito.Matchers.anyBoolean;
39  import static org.mockito.Matchers.anyLong;
40  import static org.mockito.Mockito.doThrow;
41  import static org.mockito.Mockito.mock;
42  import static org.mockito.Mockito.never;
43  import static org.mockito.Mockito.spy;
44  import static org.mockito.Mockito.times;
45  import static org.mockito.Mockito.verify;
46  import static org.mockito.Mockito.when;
47  
48  import java.io.IOException;
49  import java.io.InterruptedIOException;
50  import java.security.PrivilegedExceptionAction;
51  import java.util.ArrayList;
52  import java.util.Arrays;
53  import java.util.Collection;
54  import java.util.Collections;
55  import java.util.HashMap;
56  import java.util.List;
57  import java.util.Map;
58  import java.util.NavigableMap;
59  import java.util.TreeMap;
60  import java.util.UUID;
61  import java.util.concurrent.Callable;
62  import java.util.concurrent.CountDownLatch;
63  import java.util.concurrent.ExecutorService;
64  import java.util.concurrent.Executors;
65  import java.util.concurrent.Future;
66  import java.util.concurrent.TimeUnit;
67  import java.util.concurrent.atomic.AtomicBoolean;
68  import java.util.concurrent.atomic.AtomicInteger;
69  import java.util.concurrent.atomic.AtomicLong;
70  import java.util.concurrent.atomic.AtomicReference;
71  
72  import org.apache.commons.lang.RandomStringUtils;
73  import org.apache.commons.logging.Log;
74  import org.apache.commons.logging.LogFactory;
75  import org.apache.hadoop.conf.Configuration;
76  import org.apache.hadoop.fs.FSDataOutputStream;
77  import org.apache.hadoop.fs.FileStatus;
78  import org.apache.hadoop.fs.FileSystem;
79  import org.apache.hadoop.fs.Path;
80  import org.apache.hadoop.hbase.Cell;
81  import org.apache.hadoop.hbase.CellComparator;
82  import org.apache.hadoop.hbase.CellUtil;
83  import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
84  import org.apache.hadoop.hbase.DroppedSnapshotException;
85  import org.apache.hadoop.hbase.HBaseConfiguration;
86  import org.apache.hadoop.hbase.HBaseTestCase;
87  import org.apache.hadoop.hbase.HBaseTestingUtility;
88  import org.apache.hadoop.hbase.HColumnDescriptor;
89  import org.apache.hadoop.hbase.HConstants;
90  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
91  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
92  import org.apache.hadoop.hbase.HRegionInfo;
93  import org.apache.hadoop.hbase.HTableDescriptor;
94  import org.apache.hadoop.hbase.KeyValue;
95  import org.apache.hadoop.hbase.MiniHBaseCluster;
96  import org.apache.hadoop.hbase.MultithreadedTestUtil;
97  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
98  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
99  import org.apache.hadoop.hbase.NotServingRegionException;
100 import org.apache.hadoop.hbase.RegionTooBusyException;
101 import org.apache.hadoop.hbase.ServerName;
102 import org.apache.hadoop.hbase.TableName;
103 import org.apache.hadoop.hbase.Tag;
104 import org.apache.hadoop.hbase.TagType;
105 import org.apache.hadoop.hbase.Waiter;
106 import org.apache.hadoop.hbase.client.Append;
107 import org.apache.hadoop.hbase.client.Delete;
108 import org.apache.hadoop.hbase.client.Durability;
109 import org.apache.hadoop.hbase.client.Get;
110 import org.apache.hadoop.hbase.client.Increment;
111 import org.apache.hadoop.hbase.client.Mutation;
112 import org.apache.hadoop.hbase.client.Put;
113 import org.apache.hadoop.hbase.client.Result;
114 import org.apache.hadoop.hbase.client.RowMutations;
115 import org.apache.hadoop.hbase.client.Scan;
116 import org.apache.hadoop.hbase.client.Table;
117 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
118 import org.apache.hadoop.hbase.filter.BinaryComparator;
119 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
120 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
121 import org.apache.hadoop.hbase.filter.Filter;
122 import org.apache.hadoop.hbase.filter.FilterBase;
123 import org.apache.hadoop.hbase.filter.FilterList;
124 import org.apache.hadoop.hbase.filter.NullComparator;
125 import org.apache.hadoop.hbase.filter.PrefixFilter;
126 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
127 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
128 import org.apache.hadoop.hbase.io.hfile.HFile;
129 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
130 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
131 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
132 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
133 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
134 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
135 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
136 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
137 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
138 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
139 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
140 import org.apache.hadoop.hbase.regionserver.Region.RowLock;
141 import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
142 import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
143 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
144 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
145 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
146 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
147 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
148 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
149 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
150 import org.apache.hadoop.hbase.security.User;
151 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
152 import org.apache.hadoop.hbase.testclassification.MediumTests;
153 import org.apache.hadoop.hbase.util.Bytes;
154 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
155 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
156 import org.apache.hadoop.hbase.util.FSUtils;
157 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
158 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
159 import org.apache.hadoop.hbase.util.PairOfSameType;
160 import org.apache.hadoop.hbase.util.Threads;
161 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
162 import org.apache.hadoop.hbase.wal.FaultyFSLog;
163 import org.apache.hadoop.hbase.wal.WAL;
164 import org.apache.hadoop.hbase.wal.WALFactory;
165 import org.apache.hadoop.hbase.wal.WALKey;
166 import org.apache.hadoop.hbase.wal.WALProvider;
167 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
168 import org.apache.hadoop.hbase.wal.WALSplitter;
169 import org.junit.After;
170 import org.junit.Assert;
171 import org.junit.Assume;
172 import org.junit.Before;
173 import org.junit.Rule;
174 import org.junit.Test;
175 import org.junit.experimental.categories.Category;
176 import org.junit.rules.TestName;
177 import org.mockito.ArgumentCaptor;
178 import org.mockito.ArgumentMatcher;
179 import org.mockito.Mockito;
180 
181 import com.google.common.collect.ImmutableList;
182 import com.google.common.collect.Lists;
183 import com.google.common.collect.Maps;
184 import com.google.protobuf.ByteString;
185 
186 /**
187  * Basic stand-alone testing of HRegion.  No clusters!
188  *
189  * A lot of the meta information for an HRegion now lives inside other HRegions
190  * or in the HBaseMaster, so only basic testing is possible.
191  */
192 @Category(MediumTests.class)
193 @SuppressWarnings("deprecation")
194 public class TestHRegion {
195   // Do not spin up clusters in here. If you need to spin up a cluster, do it
196   // over in TestHRegionOnCluster.
197   static final Log LOG = LogFactory.getLog(TestHRegion.class);
198   @Rule public TestName name = new TestName();
199 
200   /** Set to true on Windows platforms */
201   private static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
202 
203   private static final String COLUMN_FAMILY = "MyCF";
204   private static final byte [] COLUMN_FAMILY_BYTES = Bytes.toBytes(COLUMN_FAMILY);
205 
206   HRegion region = null;
207   // Do not run unit tests in parallel (? Why not?  It don't work?  Why not?  St.Ack)
208   private static HBaseTestingUtility TEST_UTIL;
209   public static Configuration CONF ;
210   private String dir;
211   private static FileSystem FILESYSTEM;
212   private final int MAX_VERSIONS = 2;
213 
214   // Test names
215   protected byte[] tableName;
216   protected String method;
217   protected final byte[] qual1 = Bytes.toBytes("qual1");
218   protected final byte[] qual2 = Bytes.toBytes("qual2");
219   protected final byte[] qual3 = Bytes.toBytes("qual3");
220   protected final byte[] value1 = Bytes.toBytes("value1");
221   protected final byte[] value2 = Bytes.toBytes("value2");
222   protected final byte[] row = Bytes.toBytes("rowA");
223   protected final byte[] row2 = Bytes.toBytes("rowB");
224 
225   protected final MetricsAssertHelper metricsAssertHelper = CompatibilitySingletonFactory
226       .getInstance(MetricsAssertHelper.class);
227 
228   @Before
229   public void setup() throws IOException {
230     TEST_UTIL = HBaseTestingUtility.createLocalHTU();
231     FILESYSTEM = TEST_UTIL.getTestFileSystem();
232     CONF = TEST_UTIL.getConfiguration();
233     dir = TEST_UTIL.getDataTestDir("TestHRegion").toString();
234     method = name.getMethodName();
235     tableName = Bytes.toBytes(name.getMethodName());
236   }
237 
238   @After
239   public void tearDown() throws Exception {
240     EnvironmentEdgeManagerTestHelper.reset();
241     LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
242     TEST_UTIL.cleanupTestDir();
243   }
244 
245   String getName() {
246     return name.getMethodName();
247   }
248 
249   /**
250    * Test for Bug 2 of HBASE-10466.
251    * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize
252    * is smaller than a certain value, or when region close starts a flush is ongoing, the first
253    * flush is skipped and only the second flush takes place. However, two flushes are required in
254    * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
255    * in current memstore. The fix is removing all conditions except abort check so we ensure 2
256    * flushes for region close."
257    * @throws IOException
258    */
259   @Test (timeout=60000)
260   public void testCloseCarryingSnapshot() throws IOException {
261     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
262     Store store = region.getStore(COLUMN_FAMILY_BYTES);
263     // Get some random bytes.
264     byte [] value = Bytes.toBytes(name.getMethodName());
265     // Make a random put against our cf.
266     Put put = new Put(value);
267     put.add(COLUMN_FAMILY_BYTES, null, value);
268     // First put something in current memstore, which will be in snapshot after flusher.prepare()
269     region.put(put);
270     StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
271     storeFlushCtx.prepare();
272     // Second put something in current memstore
273     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
274     region.put(put);
275     // Close with something in memstore and something in the snapshot.  Make sure all is cleared.
276     region.close();
277     assertEquals(0, region.getMemstoreSize());
278     HRegion.closeHRegion(region);
279   }
280 
281 
282 
283   /*
284    * This test is for verifying memstore snapshot size is correctly updated in case of rollback
285    * See HBASE-10845
286    */
287   @Test (timeout=60000)
288   public void testMemstoreSnapshotSize() throws IOException {
289     class MyFaultyFSLog extends FaultyFSLog {
290       StoreFlushContext storeFlushCtx;
291       public MyFaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
292           throws IOException {
293         super(fs, rootDir, logName, conf);
294       }
295 
296       void setStoreFlushCtx(StoreFlushContext storeFlushCtx) {
297         this.storeFlushCtx = storeFlushCtx;
298       }
299 
300       @Override
301       public void sync(long txid) throws IOException {
302         storeFlushCtx.prepare();
303         super.sync(txid);
304       }
305     }
306 
307     FileSystem fs = FileSystem.get(CONF);
308     Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
309     MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
310     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
311         CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
312 
313     Store store = region.getStore(COLUMN_FAMILY_BYTES);
314     // Get some random bytes.
315     byte [] value = Bytes.toBytes(name.getMethodName());
316     faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
317 
318     Put put = new Put(value);
319     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
320     faultyLog.setFailureType(FaultyFSLog.FailureType.SYNC);
321 
322     boolean threwIOE = false;
323     try {
324       region.put(put);
325     } catch (IOException ioe) {
326       threwIOE = true;
327     } finally {
328       assertTrue("The regionserver should have thrown an exception", threwIOE);
329     }
330     long sz = store.getFlushableSize();
331     assertTrue("flushable size should be zero, but it is " + sz, sz == 0);
332     HRegion.closeHRegion(region);
333   }
334 
335   /**
336    * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
337    */
338   @Test
339   public void testMemstoreSizeWithFlushCanceling() throws IOException {
340     FileSystem fs = FileSystem.get(CONF);
341     Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
342     FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
343     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
344         CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
345     Store store = region.getStore(COLUMN_FAMILY_BYTES);
346     assertEquals(0, region.getMemstoreSize());
347 
348     // Put some value and make sure flush could be completed normally
349     byte [] value = Bytes.toBytes(name.getMethodName());
350     Put put = new Put(value);
351     put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
352     region.put(put);
353     long onePutSize = region.getMemstoreSize();
354     assertTrue(onePutSize > 0);
355     region.flush(true);
356     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
357     assertEquals("flushable size should be zero", 0, store.getFlushableSize());
358 
359     // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
360     RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
361     RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
362     when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null);
363     region.setCoprocessorHost(mockedCPHost);
364     region.put(put);
365     region.flush(true);
366     assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize());
367     assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize());
368 
369     // set normalCPHost and flush again, the snapshot will be flushed
370     region.setCoprocessorHost(normalCPHost);
371     region.flush(true);
372     assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
373     assertEquals("flushable size should be zero", 0, store.getFlushableSize());
374     HRegion.closeHRegion(region);
375   }
376 
377   @Test
378   public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
379     String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
380     FileSystem fs = FileSystem.get(CONF);
381     Path rootDir = new Path(dir + testName);
382     FSHLog hLog = new FSHLog(fs, rootDir, testName, CONF);
383     HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
384         CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
385     Store store = region.getStore(COLUMN_FAMILY_BYTES);
386     assertEquals(0, region.getMemstoreSize());
387 
388     // Put one value
389     byte [] value = Bytes.toBytes(name.getMethodName());
390     Put put = new Put(value);
391     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
392     region.put(put);
393     long onePutSize = region.getMemstoreSize();
394     assertTrue(onePutSize > 0);
395 
396     RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
397     doThrow(new IOException())
398        .when(mockedCPHost).postBatchMutate(Mockito.<MiniBatchOperationInProgress<Mutation>>any());
399     region.setCoprocessorHost(mockedCPHost);
400 
401     put = new Put(value);
402     put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("dfg"), value);
403     try {
404       region.put(put);
405       fail("Should have failed with IOException");
406     } catch (IOException expected) {
407     }
408     assertEquals("memstoreSize should be incremented", onePutSize * 2, region.getMemstoreSize());
409     assertEquals("flushable size should be incremented", onePutSize * 2, store.getFlushableSize());
410 
411     region.setCoprocessorHost(null);
412     HBaseTestingUtility.closeRegionAndWAL(region);
413   }
414 
415   /**
416    * Test we do not lose data if we fail a flush and then close.
417    * Part of HBase-10466.  Tests the following from the issue description:
418    * "Bug 1: Wrong calculation of HRegion.memstoreSize: When a flush fails, data to be flushed is
419    * kept in each MemStore's snapshot and wait for next flush attempt to continue on it. But when
420    * the next flush succeeds, the counter of total memstore size in HRegion is always deduced by
421    * the sum of current memstore sizes instead of snapshots left from previous failed flush. This
422    * calculation is problematic that almost every time there is failed flush, HRegion.memstoreSize
423    * gets reduced by a wrong value. If region flush could not proceed for a couple cycles, the size
424    * in current memstore could be much larger than the snapshot. It's likely to drift memstoreSize
425    * much smaller than expected. In extreme case, if the error accumulates to even bigger than
426    * HRegion's memstore size limit, any further flush is skipped because flush does not do anything
427    * if memstoreSize is not larger than 0."
428    * @throws Exception
429    */
430   @Test (timeout=60000)
431   public void testFlushSizeAccounting() throws Exception {
432     final Configuration conf = HBaseConfiguration.create(CONF);
433     // Only retry once.
434     conf.setInt("hbase.hstore.flush.retries.number", 1);
435     final User user =
436       User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
437     // Inject our faulty LocalFileSystem
438     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
439     user.runAs(new PrivilegedExceptionAction<Object>() {
440       @Override
441       public Object run() throws Exception {
442         // Make sure it worked (above is sensitive to caching details in hadoop core)
443         FileSystem fs = FileSystem.get(conf);
444         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
445         FaultyFileSystem ffs = (FaultyFileSystem)fs;
446         HRegion region = null;
447         try {
448           // Initialize region
449           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
450           long size = region.getMemstoreSize();
451           Assert.assertEquals(0, size);
452           // Put one item into memstore.  Measure the size of one item in memstore.
453           Put p1 = new Put(row);
454           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
455           region.put(p1);
456           final long sizeOfOnePut = region.getMemstoreSize();
457           // Fail a flush which means the current memstore will hang out as memstore 'snapshot'.
458           try {
459             LOG.info("Flushing");
460             region.flush(true);
461             Assert.fail("Didn't bubble up IOE!");
462           } catch (DroppedSnapshotException dse) {
463             // What we are expecting
464             region.closing.set(false); // this is needed for the rest of the test to work
465           } catch (Exception e) {
466             // What we are expecting
467             region.closing.set(false); // this is needed for the rest of the test to work
468           }
469           // Make it so all writes succeed from here on out
470           ffs.fault.set(false);
471           // WAL is bad because of above faulty fs. Roll WAL.
472           try {
473             region.getWAL().rollWriter(true);
474           } catch (Exception e) {
475             int x = 0;
476           }
477           // Check sizes.  Should still be the one entry.
478           Assert.assertEquals(sizeOfOnePut, region.getMemstoreSize());
479           // Now add two entries so that on this next flush that fails, we can see if we
480           // subtract the right amount, the snapshot size only.
481           Put p2 = new Put(row);
482           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
483           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
484           region.put(p2);
485           Assert.assertEquals(sizeOfOnePut * 3, region.getMemstoreSize());
486           // Do a successful flush.  It will clear the snapshot only.  Thats how flushes work.
487           // If already a snapshot, we clear it else we move the memstore to be snapshot and flush
488           // it
489           region.flush(true);
490           // Make sure our memory accounting is right.
491           Assert.assertEquals(sizeOfOnePut * 2, region.getMemstoreSize());
492         } catch (Exception e) {
493           int x = 0;
494         } finally {
495           HRegion.closeHRegion(region);
496         }
497         return null;
498       }
499     });
500     FileSystem.closeAllForUGI(user.getUGI());
501   }
502 
503   @Test (timeout=60000)
504   public void testCloseWithFailingFlush() throws Exception {
505     final Configuration conf = HBaseConfiguration.create(CONF);
506     // Only retry once.
507     conf.setInt("hbase.hstore.flush.retries.number", 1);
508     final User user =
509       User.createUserForTesting(conf, this.name.getMethodName(), new String[]{"foo"});
510     // Inject our faulty LocalFileSystem
511     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
512     user.runAs(new PrivilegedExceptionAction<Object>() {
513       @Override
514       public Object run() throws Exception {
515         // Make sure it worked (above is sensitive to caching details in hadoop core)
516         FileSystem fs = FileSystem.get(conf);
517         Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
518         FaultyFileSystem ffs = (FaultyFileSystem)fs;
519         HRegion region = null;
520         try {
521           // Initialize region
522           region = initHRegion(tableName, name.getMethodName(), conf, COLUMN_FAMILY_BYTES);
523           long size = region.getMemstoreSize();
524           Assert.assertEquals(0, size);
525           // Put one item into memstore.  Measure the size of one item in memstore.
526           Put p1 = new Put(row);
527           p1.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual1, 1, (byte[])null));
528           region.put(p1);
529           // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
530           Store store = region.getStore(COLUMN_FAMILY_BYTES);
531           StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
532           storeFlushCtx.prepare();
533           // Now add two entries to the foreground memstore.
534           Put p2 = new Put(row);
535           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual2, 2, (byte[])null));
536           p2.add(new KeyValue(row, COLUMN_FAMILY_BYTES, qual3, 3, (byte[])null));
537           region.put(p2);
538           // Now try close on top of a failing flush.
539           region.close();
540           fail();
541         } catch (IOException dse) {
542           // Expected
543           LOG.info("Expected DroppedSnapshotException");
544         } finally {
545           // Make it so all writes succeed from here on out so can close clean
546           ffs.fault.set(false);
547           region.getWAL().rollWriter(true);
548           HRegion.closeHRegion(region);
549         }
550         return null;
551       }
552     });
553     FileSystem.closeAllForUGI(user.getUGI());
554   }
555 
556   @Test
557   public void testCompactionAffectedByScanners() throws Exception {
558     byte[] family = Bytes.toBytes("family");
559     this.region = initHRegion(tableName, method, CONF, family);
560 
561     Put put = new Put(Bytes.toBytes("r1"));
562     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
563     region.put(put);
564     region.flush(true);
565 
566     Scan scan = new Scan();
567     scan.setMaxVersions(3);
568     // open the first scanner
569     RegionScanner scanner1 = region.getScanner(scan);
570 
571     Delete delete = new Delete(Bytes.toBytes("r1"));
572     region.delete(delete);
573     region.flush(true);
574 
575     // open the second scanner
576     RegionScanner scanner2 = region.getScanner(scan);
577 
578     List<Cell> results = new ArrayList<Cell>();
579 
580     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
581 
582     // make a major compaction
583     region.compact(true);
584 
585     // open the third scanner
586     RegionScanner scanner3 = region.getScanner(scan);
587 
588     // get data from scanner 1, 2, 3 after major compaction
589     scanner1.next(results);
590     System.out.println(results);
591     assertEquals(1, results.size());
592 
593     results.clear();
594     scanner2.next(results);
595     System.out.println(results);
596     assertEquals(0, results.size());
597 
598     results.clear();
599     scanner3.next(results);
600     System.out.println(results);
601     assertEquals(0, results.size());
602   }
603 
604   @Test
605   public void testToShowNPEOnRegionScannerReseek() throws Exception {
606     byte[] family = Bytes.toBytes("family");
607     this.region = initHRegion(tableName, method, CONF, family);
608 
609     Put put = new Put(Bytes.toBytes("r1"));
610     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
611     region.put(put);
612     put = new Put(Bytes.toBytes("r2"));
613     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
614     region.put(put);
615     region.flush(true);
616 
617     Scan scan = new Scan();
618     scan.setMaxVersions(3);
619     // open the first scanner
620     RegionScanner scanner1 = region.getScanner(scan);
621 
622     System.out.println("Smallest read point:" + region.getSmallestReadPoint());
623 
624     region.compact(true);
625 
626     scanner1.reseek(Bytes.toBytes("r2"));
627     List<Cell> results = new ArrayList<Cell>();
628     scanner1.next(results);
629     Cell keyValue = results.get(0);
630     Assert.assertTrue(Bytes.compareTo(CellUtil.cloneRow(keyValue), Bytes.toBytes("r2")) == 0);
631     scanner1.close();
632   }
633 
634   @Test
635   public void testSkipRecoveredEditsReplay() throws Exception {
636     String method = "testSkipRecoveredEditsReplay";
637     TableName tableName = TableName.valueOf(method);
638     byte[] family = Bytes.toBytes("family");
639     this.region = initHRegion(tableName, method, CONF, family);
640     final WALFactory wals = new WALFactory(CONF, null, method);
641     try {
642       Path regiondir = region.getRegionFileSystem().getRegionDir();
643       FileSystem fs = region.getRegionFileSystem().getFileSystem();
644       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
645 
646       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
647 
648       long maxSeqId = 1050;
649       long minSeqId = 1000;
650 
651       for (long i = minSeqId; i <= maxSeqId; i += 10) {
652         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
653         fs.create(recoveredEdits);
654         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
655 
656         long time = System.nanoTime();
657         WALEdit edit = new WALEdit();
658         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
659             .toBytes(i)));
660         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
661             HConstants.DEFAULT_CLUSTER_ID), edit));
662 
663         writer.close();
664       }
665       MonitoredTask status = TaskMonitor.get().createStatus(method);
666       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
667       for (Store store : region.getStores()) {
668         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1);
669       }
670       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
671       assertEquals(maxSeqId, seqId);
672       region.getMVCC().initialize(seqId);
673       Get get = new Get(row);
674       Result result = region.get(get);
675       for (long i = minSeqId; i <= maxSeqId; i += 10) {
676         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
677         assertEquals(1, kvs.size());
678         assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
679       }
680     } finally {
681       HRegion.closeHRegion(this.region);
682       this.region = null;
683       wals.close();
684     }
685   }
686 
687   @Test
688   public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
689     String method = "testSkipRecoveredEditsReplaySomeIgnored";
690     TableName tableName = TableName.valueOf(method);
691     byte[] family = Bytes.toBytes("family");
692     this.region = initHRegion(tableName, method, CONF, family);
693     final WALFactory wals = new WALFactory(CONF, null, method);
694     try {
695       Path regiondir = region.getRegionFileSystem().getRegionDir();
696       FileSystem fs = region.getRegionFileSystem().getFileSystem();
697       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
698 
699       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
700 
701       long maxSeqId = 1050;
702       long minSeqId = 1000;
703 
704       for (long i = minSeqId; i <= maxSeqId; i += 10) {
705         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
706         fs.create(recoveredEdits);
707         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
708 
709         long time = System.nanoTime();
710         WALEdit edit = new WALEdit();
711         edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
712             .toBytes(i)));
713         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
714             HConstants.DEFAULT_CLUSTER_ID), edit));
715 
716         writer.close();
717       }
718       long recoverSeqId = 1030;
719       MonitoredTask status = TaskMonitor.get().createStatus(method);
720       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
721       for (Store store : region.getStores()) {
722         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
723       }
724       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
725       assertEquals(maxSeqId, seqId);
726       region.getMVCC().initialize(seqId);
727       Get get = new Get(row);
728       Result result = region.get(get);
729       for (long i = minSeqId; i <= maxSeqId; i += 10) {
730         List<Cell> kvs = result.getColumnCells(family, Bytes.toBytes(i));
731         if (i < recoverSeqId) {
732           assertEquals(0, kvs.size());
733         } else {
734           assertEquals(1, kvs.size());
735           assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0)));
736         }
737       }
738     } finally {
739       HRegion.closeHRegion(this.region);
740       this.region = null;
741       wals.close();
742     }
743   }
744 
745   @Test
746   public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
747     byte[] family = Bytes.toBytes("family");
748     this.region = initHRegion(tableName, method, CONF, family);
749     try {
750       Path regiondir = region.getRegionFileSystem().getRegionDir();
751       FileSystem fs = region.getRegionFileSystem().getFileSystem();
752 
753       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
754       for (int i = 1000; i < 1050; i += 10) {
755         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
756         FSDataOutputStream dos = fs.create(recoveredEdits);
757         dos.writeInt(i);
758         dos.close();
759       }
760       long minSeqId = 2000;
761       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", minSeqId - 1));
762       FSDataOutputStream dos = fs.create(recoveredEdits);
763       dos.close();
764 
765       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
766       for (Store store : region.getStores()) {
767         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
768       }
769       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, null);
770       assertEquals(minSeqId, seqId);
771     } finally {
772       HRegion.closeHRegion(this.region);
773       this.region = null;
774     }
775   }
776 
777   @Test
778   public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
779     String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
780     TableName tableName = TableName.valueOf(method);
781     byte[] family = Bytes.toBytes("family");
782     this.region = initHRegion(tableName, method, CONF, family);
783     final WALFactory wals = new WALFactory(CONF, null, method);
784     try {
785       Path regiondir = region.getRegionFileSystem().getRegionDir();
786       FileSystem fs = region.getRegionFileSystem().getFileSystem();
787       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
788       byte[][] columns = region.getTableDesc().getFamiliesKeys().toArray(new byte[0][]);
789 
790       assertEquals(0, region.getStoreFileList(columns).size());
791 
792       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
793 
794       long maxSeqId = 1050;
795       long minSeqId = 1000;
796 
797       for (long i = minSeqId; i <= maxSeqId; i += 10) {
798         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
799         fs.create(recoveredEdits);
800         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
801 
802         long time = System.nanoTime();
803         WALEdit edit = null;
804         if (i == maxSeqId) {
805           edit = WALEdit.createCompaction(region.getRegionInfo(),
806           CompactionDescriptor.newBuilder()
807           .setTableName(ByteString.copyFrom(tableName.getName()))
808           .setFamilyName(ByteString.copyFrom(regionName))
809           .setEncodedRegionName(ByteString.copyFrom(regionName))
810           .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
811           .setRegionName(ByteString.copyFrom(region.getRegionInfo().getRegionName()))
812           .build());
813         } else {
814           edit = new WALEdit();
815           edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
816             .toBytes(i)));
817         }
818         writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time,
819             HConstants.DEFAULT_CLUSTER_ID), edit));
820         writer.close();
821       }
822 
823       long recoverSeqId = 1030;
824       Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
825       MonitoredTask status = TaskMonitor.get().createStatus(method);
826       for (Store store : region.getStores()) {
827         maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
828       }
829       long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
830       assertEquals(maxSeqId, seqId);
831 
832       // assert that the files are flushed
833       assertEquals(1, region.getStoreFileList(columns).size());
834 
835     } finally {
836       HRegion.closeHRegion(this.region);
837       this.region = null;
838       wals.close();
839     }
840   }
841 
842   @Test
843   public void testRecoveredEditsReplayCompaction() throws Exception {
844     testRecoveredEditsReplayCompaction(false);
845     testRecoveredEditsReplayCompaction(true);
846   }
847   public void testRecoveredEditsReplayCompaction(boolean mismatchedRegionName) throws Exception {
848     String method = name.getMethodName();
849     TableName tableName = TableName.valueOf(method);
850     byte[] family = Bytes.toBytes("family");
851     this.region = initHRegion(tableName, method, CONF, family);
852     final WALFactory wals = new WALFactory(CONF, null, method);
853     try {
854       Path regiondir = region.getRegionFileSystem().getRegionDir();
855       FileSystem fs = region.getRegionFileSystem().getFileSystem();
856       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
857 
858       long maxSeqId = 3;
859       long minSeqId = 0;
860 
861       for (long i = minSeqId; i < maxSeqId; i++) {
862         Put put = new Put(Bytes.toBytes(i));
863         put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
864         region.put(put);
865         region.flush(true);
866       }
867 
868       // this will create a region with 3 files
869       assertEquals(3, region.getStore(family).getStorefilesCount());
870       List<Path> storeFiles = new ArrayList<Path>(3);
871       for (StoreFile sf : region.getStore(family).getStorefiles()) {
872         storeFiles.add(sf.getPath());
873       }
874 
875       // disable compaction completion
876       CONF.setBoolean("hbase.hstore.compaction.complete", false);
877       region.compactStores();
878 
879       // ensure that nothing changed
880       assertEquals(3, region.getStore(family).getStorefilesCount());
881 
882       // now find the compacted file, and manually add it to the recovered edits
883       Path tmpDir = region.getRegionFileSystem().getTempDir();
884       FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
885       String errorMsg = "Expected to find 1 file in the region temp directory "
886           + "from the compaction, could not find any";
887       assertNotNull(errorMsg, files);
888       assertEquals(errorMsg, 1, files.length);
889       // move the file inside region dir
890       Path newFile = region.getRegionFileSystem().commitStoreFile(Bytes.toString(family),
891           files[0].getPath());
892 
893       byte[] encodedNameAsBytes = this.region.getRegionInfo().getEncodedNameAsBytes();
894       byte[] fakeEncodedNameAsBytes = new byte [encodedNameAsBytes.length];
895       for (int i=0; i < encodedNameAsBytes.length; i++) {
896         // Mix the byte array to have a new encodedName
897         fakeEncodedNameAsBytes[i] = (byte) (encodedNameAsBytes[i] + 1);
898       }
899 
900       CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(this.region
901         .getRegionInfo(), mismatchedRegionName ? fakeEncodedNameAsBytes : null, family,
902             storeFiles, Lists.newArrayList(newFile),
903             region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
904 
905       WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
906           this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1));
907 
908       Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
909 
910       Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
911       fs.create(recoveredEdits);
912       WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
913 
914       long time = System.nanoTime();
915 
916       writer.append(new WAL.Entry(new HLogKey(regionName, tableName, 10, time,
917           HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
918           compactionDescriptor)));
919       writer.close();
920 
921       // close the region now, and reopen again
922       region.getTableDesc();
923       region.getRegionInfo();
924       region.close();
925       try {
926         region = HRegion.openHRegion(region, null);
927       } catch (WrongRegionException wre) {
928         fail("Matching encoded region name should not have produced WrongRegionException");
929       }
930 
931       // now check whether we have only one store file, the compacted one
932       Collection<StoreFile> sfs = region.getStore(family).getStorefiles();
933       for (StoreFile sf : sfs) {
934         LOG.info(sf.getPath());
935       }
936       if (!mismatchedRegionName) {
937         assertEquals(1, region.getStore(family).getStorefilesCount());
938       }
939       files = FSUtils.listStatus(fs, tmpDir);
940       assertTrue("Expected to find 0 files inside " + tmpDir, files == null || files.length == 0);
941 
942       for (long i = minSeqId; i < maxSeqId; i++) {
943         Get get = new Get(Bytes.toBytes(i));
944         Result result = region.get(get);
945         byte[] value = result.getValue(family, Bytes.toBytes(i));
946         assertArrayEquals(Bytes.toBytes(i), value);
947       }
948     } finally {
949       HRegion.closeHRegion(this.region);
950       this.region = null;
951       wals.close();
952     }
953   }
954 
955   @Test
956   public void testFlushMarkers() throws Exception {
957     // tests that flush markers are written to WAL and handled at recovered edits
958     String method = name.getMethodName();
959     TableName tableName = TableName.valueOf(method);
960     byte[] family = Bytes.toBytes("family");
961     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
962     final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
963     FSUtils.setRootDir(walConf, logDir);
964     final WALFactory wals = new WALFactory(walConf, null, method);
965     final WAL wal = wals.getWAL(tableName.getName());
966 
967     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
968       HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
969     try {
970       Path regiondir = region.getRegionFileSystem().getRegionDir();
971       FileSystem fs = region.getRegionFileSystem().getFileSystem();
972       byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
973 
974       long maxSeqId = 3;
975       long minSeqId = 0;
976 
977       for (long i = minSeqId; i < maxSeqId; i++) {
978         Put put = new Put(Bytes.toBytes(i));
979         put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
980         region.put(put);
981         region.flush(true);
982       }
983 
984       // this will create a region with 3 files from flush
985       assertEquals(3, region.getStore(family).getStorefilesCount());
986       List<String> storeFiles = new ArrayList<String>(3);
987       for (StoreFile sf : region.getStore(family).getStorefiles()) {
988         storeFiles.add(sf.getPath().getName());
989       }
990 
991       // now verify that the flush markers are written
992       wal.shutdown();
993       WAL.Reader reader = WALFactory.createReader(fs, DefaultWALProvider.getCurrentFileName(wal),
994         TEST_UTIL.getConfiguration());
995       try {
996         List<WAL.Entry> flushDescriptors = new ArrayList<WAL.Entry>();
997         long lastFlushSeqId = -1;
998         while (true) {
999           WAL.Entry entry = reader.next();
1000           if (entry == null) {
1001             break;
1002           }
1003           Cell cell = entry.getEdit().getCells().get(0);
1004           if (WALEdit.isMetaEditFamily(cell)) {
1005             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
1006             assertNotNull(flushDesc);
1007             assertArrayEquals(tableName.getName(), flushDesc.getTableName().toByteArray());
1008             if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1009               assertTrue(flushDesc.getFlushSequenceNumber() > lastFlushSeqId);
1010             } else if (flushDesc.getAction() == FlushAction.COMMIT_FLUSH) {
1011               assertTrue(flushDesc.getFlushSequenceNumber() == lastFlushSeqId);
1012             }
1013             lastFlushSeqId = flushDesc.getFlushSequenceNumber();
1014             assertArrayEquals(regionName, flushDesc.getEncodedRegionName().toByteArray());
1015             assertEquals(1, flushDesc.getStoreFlushesCount()); //only one store
1016             StoreFlushDescriptor storeFlushDesc = flushDesc.getStoreFlushes(0);
1017             assertArrayEquals(family, storeFlushDesc.getFamilyName().toByteArray());
1018             assertEquals("family", storeFlushDesc.getStoreHomeDir());
1019             if (flushDesc.getAction() == FlushAction.START_FLUSH) {
1020               assertEquals(0, storeFlushDesc.getFlushOutputCount());
1021             } else {
1022               assertEquals(1, storeFlushDesc.getFlushOutputCount()); //only one file from flush
1023               assertTrue(storeFiles.contains(storeFlushDesc.getFlushOutput(0)));
1024             }
1025 
1026             flushDescriptors.add(entry);
1027           }
1028         }
1029 
1030         assertEquals(3 * 2, flushDescriptors.size()); // START_FLUSH and COMMIT_FLUSH per flush
1031 
1032         // now write those markers to the recovered edits again.
1033 
1034         Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
1035 
1036         Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
1037         fs.create(recoveredEdits);
1038         WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
1039 
1040         for (WAL.Entry entry : flushDescriptors) {
1041           writer.append(entry);
1042         }
1043         writer.close();
1044       } finally {
1045         if (null != reader) {
1046           try {
1047             reader.close();
1048           } catch (IOException exception) {
1049             LOG.warn("Problem closing wal: " + exception.getMessage());
1050             LOG.debug("exception details", exception);
1051           }
1052         }
1053       }
1054 
1055 
1056       // close the region now, and reopen again
1057       region.close();
1058       region = HRegion.openHRegion(region, null);
1059 
1060       // now check whether we have can read back the data from region
1061       for (long i = minSeqId; i < maxSeqId; i++) {
1062         Get get = new Get(Bytes.toBytes(i));
1063         Result result = region.get(get);
1064         byte[] value = result.getValue(family, Bytes.toBytes(i));
1065         assertArrayEquals(Bytes.toBytes(i), value);
1066       }
1067     } finally {
1068       HRegion.closeHRegion(this.region);
1069       this.region = null;
1070       wals.close();
1071     }
1072   }
1073 
1074   class IsFlushWALMarker extends ArgumentMatcher<WALEdit> {
1075     volatile FlushAction[] actions;
1076     public IsFlushWALMarker(FlushAction... actions) {
1077       this.actions = actions;
1078     }
1079     @Override
1080     public boolean matches(Object edit) {
1081       List<Cell> cells = ((WALEdit)edit).getCells();
1082       if (cells.isEmpty()) {
1083         return false;
1084       }
1085       if (WALEdit.isMetaEditFamily(cells.get(0))) {
1086         FlushDescriptor desc = null;
1087         try {
1088           desc = WALEdit.getFlushDescriptor(cells.get(0));
1089         } catch (IOException e) {
1090           LOG.warn(e);
1091           return false;
1092         }
1093         if (desc != null) {
1094           for (FlushAction action : actions) {
1095             if (desc.getAction() == action) {
1096               return true;
1097             }
1098           }
1099         }
1100       }
1101       return false;
1102     }
1103     public IsFlushWALMarker set(FlushAction... actions) {
1104       this.actions = actions;
1105       return this;
1106     }
1107   }
1108 
1109   @Test (timeout=60000)
1110   public void testFlushMarkersWALFail() throws Exception {
1111     // test the cases where the WAL append for flush markers fail.
1112     String method = name.getMethodName();
1113     TableName tableName = TableName.valueOf(method);
1114     byte[] family = Bytes.toBytes("family");
1115 
1116     // spy an actual WAL implementation to throw exception (was not able to mock)
1117     Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + "log");
1118 
1119     final Configuration walConf = new Configuration(TEST_UTIL.getConfiguration());
1120     FSUtils.setRootDir(walConf, logDir);
1121 
1122     // Make up a WAL that we can manipulate at append time.
1123     class FailAppendFlushMarkerWAL extends FSHLog {
1124       volatile FlushAction [] flushActions = null;
1125 
1126       public FailAppendFlushMarkerWAL(FileSystem fs, Path root, String logDir, Configuration conf)
1127       throws IOException {
1128         super(fs, root, logDir, conf);
1129       }
1130 
1131       @Override
1132       protected Writer createWriterInstance(Path path) throws IOException {
1133         final Writer w = super.createWriterInstance(path);
1134         return new Writer() {
1135           @Override
1136           public void close() throws IOException {
1137             w.close();
1138           }
1139 
1140           @Override
1141           public void sync() throws IOException {
1142             w.sync();
1143           }
1144 
1145           @Override
1146           public void append(Entry entry) throws IOException {
1147             List<Cell> cells = entry.getEdit().getCells();
1148             if (WALEdit.isMetaEditFamily(cells.get(0))) {
1149                FlushDescriptor desc = WALEdit.getFlushDescriptor(cells.get(0));
1150               if (desc != null) {
1151                 for (FlushAction flushAction: flushActions) {
1152                   if (desc.getAction().equals(flushAction)) {
1153                     throw new IOException("Failed to append flush marker! " + flushAction);
1154                   }
1155                 }
1156               }
1157             }
1158             w.append(entry);
1159           }
1160 
1161           @Override
1162           public long getLength() throws IOException {
1163             return w.getLength();
1164           }
1165         };
1166       }
1167     }
1168     FailAppendFlushMarkerWAL wal =
1169       new FailAppendFlushMarkerWAL(FileSystem.get(walConf), FSUtils.getRootDir(walConf),
1170         getName(), walConf);
1171     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
1172       HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
1173     try {
1174       int i = 0;
1175       Put put = new Put(Bytes.toBytes(i));
1176       put.setDurability(Durability.SKIP_WAL); // have to skip mocked wal
1177       put.add(family, Bytes.toBytes(i), Bytes.toBytes(i));
1178       region.put(put);
1179 
1180       // 1. Test case where START_FLUSH throws exception
1181       wal.flushActions = new FlushAction [] {FlushAction.START_FLUSH};
1182 
1183       // start cache flush will throw exception
1184       try {
1185         region.flush(true);
1186         fail("This should have thrown exception");
1187       } catch (DroppedSnapshotException unexpected) {
1188         // this should not be a dropped snapshot exception. Meaning that RS will not abort
1189         throw unexpected;
1190       } catch (IOException expected) {
1191         // expected
1192       }
1193       // The WAL is hosed. It has failed an append and a sync. It has an exception stuck in it
1194       // which it will keep returning until we roll the WAL to prevent any further appends going
1195       // in or syncs succeeding on top of failed appends, a no-no.
1196       wal.rollWriter(true);
1197 
1198       // 2. Test case where START_FLUSH succeeds but COMMIT_FLUSH will throw exception
1199       wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH};
1200 
1201       try {
1202         region.flush(true);
1203         fail("This should have thrown exception");
1204       } catch (DroppedSnapshotException expected) {
1205         // we expect this exception, since we were able to write the snapshot, but failed to
1206         // write the flush marker to WAL
1207       } catch (IOException unexpected) {
1208         throw unexpected;
1209       }
1210 
1211       region.close();
1212       // Roll WAL to clean out any exceptions stuck in it. See note above where we roll WAL.
1213       wal.rollWriter(true);
1214       this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
1215         HConstants.EMPTY_END_ROW, method, CONF, false, Durability.USE_DEFAULT, wal, family);
1216       region.put(put);
1217 
1218       // 3. Test case where ABORT_FLUSH will throw exception.
1219       // Even if ABORT_FLUSH throws exception, we should not fail with IOE, but continue with
1220       // DroppedSnapshotException. Below COMMMIT_FLUSH will cause flush to abort
1221       wal.flushActions = new FlushAction [] {FlushAction.COMMIT_FLUSH, FlushAction.ABORT_FLUSH};
1222 
1223       try {
1224         region.flush(true);
1225         fail("This should have thrown exception");
1226       } catch (DroppedSnapshotException expected) {
1227         // we expect this exception, since we were able to write the snapshot, but failed to
1228         // write the flush marker to WAL
1229       } catch (IOException unexpected) {
1230         throw unexpected;
1231       }
1232 
1233     } finally {
1234       HRegion.closeHRegion(this.region);
1235       this.region = null;
1236     }
1237   }
1238 
1239   @Test
1240   public void testGetWhileRegionClose() throws IOException {
1241     TableName tableName = TableName.valueOf(name.getMethodName());
1242     Configuration hc = initSplit();
1243     int numRows = 100;
1244     byte[][] families = { fam1, fam2, fam3 };
1245 
1246     // Setting up region
1247     String method = name.getMethodName();
1248     this.region = initHRegion(tableName, method, hc, families);
1249     try {
1250       // Put data in region
1251       final int startRow = 100;
1252       putData(startRow, numRows, qual1, families);
1253       putData(startRow, numRows, qual2, families);
1254       putData(startRow, numRows, qual3, families);
1255       final AtomicBoolean done = new AtomicBoolean(false);
1256       final AtomicInteger gets = new AtomicInteger(0);
1257       GetTillDoneOrException[] threads = new GetTillDoneOrException[10];
1258       try {
1259         // Set ten threads running concurrently getting from the region.
1260         for (int i = 0; i < threads.length / 2; i++) {
1261           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1262           threads[i].setDaemon(true);
1263           threads[i].start();
1264         }
1265         // Artificially make the condition by setting closing flag explicitly.
1266         // I can't make the issue happen with a call to region.close().
1267         this.region.closing.set(true);
1268         for (int i = threads.length / 2; i < threads.length; i++) {
1269           threads[i] = new GetTillDoneOrException(i, Bytes.toBytes("" + startRow), done, gets);
1270           threads[i].setDaemon(true);
1271           threads[i].start();
1272         }
1273       } finally {
1274         if (this.region != null) {
1275           HRegion.closeHRegion(this.region);
1276         }
1277       }
1278       done.set(true);
1279       for (GetTillDoneOrException t : threads) {
1280         try {
1281           t.join();
1282         } catch (InterruptedException e) {
1283           e.printStackTrace();
1284         }
1285         if (t.e != null) {
1286           LOG.info("Exception=" + t.e);
1287           assertFalse("Found a NPE in " + t.getName(), t.e instanceof NullPointerException);
1288         }
1289       }
1290     } finally {
1291       HRegion.closeHRegion(this.region);
1292       this.region = null;
1293     }
1294   }
1295 
1296   /*
1297    * Thread that does get on single row until 'done' flag is flipped. If an
1298    * exception causes us to fail, it records it.
1299    */
1300   class GetTillDoneOrException extends Thread {
1301     private final Get g;
1302     private final AtomicBoolean done;
1303     private final AtomicInteger count;
1304     private Exception e;
1305 
1306     GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) {
1307       super("getter." + i);
1308       this.g = new Get(r);
1309       this.done = d;
1310       this.count = c;
1311     }
1312 
1313     @Override
1314     public void run() {
1315       while (!this.done.get()) {
1316         try {
1317           assertTrue(region.get(g).size() > 0);
1318           this.count.incrementAndGet();
1319         } catch (Exception e) {
1320           this.e = e;
1321           break;
1322         }
1323       }
1324     }
1325   }
1326 
1327   /*
1328    * An involved filter test. Has multiple column families and deletes in mix.
1329    */
1330   @Test
1331   public void testWeirdCacheBehaviour() throws Exception {
1332     byte[] TABLE = Bytes.toBytes("testWeirdCacheBehaviour");
1333     byte[][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"),
1334         Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") };
1335     this.region = initHRegion(TABLE, getName(), CONF, FAMILIES);
1336     try {
1337       String value = "this is the value";
1338       String value2 = "this is some other value";
1339       String keyPrefix1 = "prefix1";
1340       String keyPrefix2 = "prefix2";
1341       String keyPrefix3 = "prefix3";
1342       putRows(this.region, 3, value, keyPrefix1);
1343       putRows(this.region, 3, value, keyPrefix2);
1344       putRows(this.region, 3, value, keyPrefix3);
1345       putRows(this.region, 3, value2, keyPrefix1);
1346       putRows(this.region, 3, value2, keyPrefix2);
1347       putRows(this.region, 3, value2, keyPrefix3);
1348       System.out.println("Checking values for key: " + keyPrefix1);
1349       assertEquals("Got back incorrect number of rows from scan", 3,
1350           getNumberOfRows(keyPrefix1, value2, this.region));
1351       System.out.println("Checking values for key: " + keyPrefix2);
1352       assertEquals("Got back incorrect number of rows from scan", 3,
1353           getNumberOfRows(keyPrefix2, value2, this.region));
1354       System.out.println("Checking values for key: " + keyPrefix3);
1355       assertEquals("Got back incorrect number of rows from scan", 3,
1356           getNumberOfRows(keyPrefix3, value2, this.region));
1357       deleteColumns(this.region, value2, keyPrefix1);
1358       deleteColumns(this.region, value2, keyPrefix2);
1359       deleteColumns(this.region, value2, keyPrefix3);
1360       System.out.println("Starting important checks.....");
1361       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix1, 0,
1362           getNumberOfRows(keyPrefix1, value2, this.region));
1363       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix2, 0,
1364           getNumberOfRows(keyPrefix2, value2, this.region));
1365       assertEquals("Got back incorrect number of rows from scan: " + keyPrefix3, 0,
1366           getNumberOfRows(keyPrefix3, value2, this.region));
1367     } finally {
1368       HRegion.closeHRegion(this.region);
1369       this.region = null;
1370     }
1371   }
1372 
1373   @Test
1374   public void testAppendWithReadOnlyTable() throws Exception {
1375     byte[] TABLE = Bytes.toBytes("readOnlyTable");
1376     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
1377     boolean exceptionCaught = false;
1378     Append append = new Append(Bytes.toBytes("somerow"));
1379     append.setDurability(Durability.SKIP_WAL);
1380     append.add(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"),
1381         Bytes.toBytes("somevalue"));
1382     try {
1383       region.append(append);
1384     } catch (IOException e) {
1385       exceptionCaught = true;
1386     } finally {
1387       HRegion.closeHRegion(this.region);
1388       this.region = null;
1389     }
1390     assertTrue(exceptionCaught == true);
1391   }
1392 
1393   @Test
1394   public void testIncrWithReadOnlyTable() throws Exception {
1395     byte[] TABLE = Bytes.toBytes("readOnlyTable");
1396     this.region = initHRegion(TABLE, getName(), CONF, true, Bytes.toBytes("somefamily"));
1397     boolean exceptionCaught = false;
1398     Increment inc = new Increment(Bytes.toBytes("somerow"));
1399     inc.setDurability(Durability.SKIP_WAL);
1400     inc.addColumn(Bytes.toBytes("somefamily"), Bytes.toBytes("somequalifier"), 1L);
1401     try {
1402       region.increment(inc);
1403     } catch (IOException e) {
1404       exceptionCaught = true;
1405     } finally {
1406       HRegion.closeHRegion(this.region);
1407       this.region = null;
1408     }
1409     assertTrue(exceptionCaught == true);
1410   }
1411 
1412   private void deleteColumns(HRegion r, String value, String keyPrefix) throws IOException {
1413     InternalScanner scanner = buildScanner(keyPrefix, value, r);
1414     int count = 0;
1415     boolean more = false;
1416     List<Cell> results = new ArrayList<Cell>();
1417     do {
1418       more = scanner.next(results);
1419       if (results != null && !results.isEmpty())
1420         count++;
1421       else
1422         break;
1423       Delete delete = new Delete(CellUtil.cloneRow(results.get(0)));
1424       delete.deleteColumn(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"));
1425       r.delete(delete);
1426       results.clear();
1427     } while (more);
1428     assertEquals("Did not perform correct number of deletes", 3, count);
1429   }
1430 
1431   private int getNumberOfRows(String keyPrefix, String value, HRegion r) throws Exception {
1432     InternalScanner resultScanner = buildScanner(keyPrefix, value, r);
1433     int numberOfResults = 0;
1434     List<Cell> results = new ArrayList<Cell>();
1435     boolean more = false;
1436     do {
1437       more = resultScanner.next(results);
1438       if (results != null && !results.isEmpty())
1439         numberOfResults++;
1440       else
1441         break;
1442       for (Cell kv : results) {
1443         System.out.println("kv=" + kv.toString() + ", " + Bytes.toString(CellUtil.cloneValue(kv)));
1444       }
1445       results.clear();
1446     } while (more);
1447     return numberOfResults;
1448   }
1449 
1450   private InternalScanner buildScanner(String keyPrefix, String value, HRegion r)
1451       throws IOException {
1452     // Defaults FilterList.Operator.MUST_PASS_ALL.
1453     FilterList allFilters = new FilterList();
1454     allFilters.addFilter(new PrefixFilter(Bytes.toBytes(keyPrefix)));
1455     // Only return rows where this column value exists in the row.
1456     SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("trans-tags"),
1457         Bytes.toBytes("qual2"), CompareOp.EQUAL, Bytes.toBytes(value));
1458     filter.setFilterIfMissing(true);
1459     allFilters.addFilter(filter);
1460     Scan scan = new Scan();
1461     scan.addFamily(Bytes.toBytes("trans-blob"));
1462     scan.addFamily(Bytes.toBytes("trans-type"));
1463     scan.addFamily(Bytes.toBytes("trans-date"));
1464     scan.addFamily(Bytes.toBytes("trans-tags"));
1465     scan.addFamily(Bytes.toBytes("trans-group"));
1466     scan.setFilter(allFilters);
1467     return r.getScanner(scan);
1468   }
1469 
1470   private void putRows(HRegion r, int numRows, String value, String key) throws IOException {
1471     for (int i = 0; i < numRows; i++) {
1472       String row = key + "_" + i/* UUID.randomUUID().toString() */;
1473       System.out.println(String.format("Saving row: %s, with value %s", row, value));
1474       Put put = new Put(Bytes.toBytes(row));
1475       put.setDurability(Durability.SKIP_WAL);
1476       put.add(Bytes.toBytes("trans-blob"), null, Bytes.toBytes("value for blob"));
1477       put.add(Bytes.toBytes("trans-type"), null, Bytes.toBytes("statement"));
1478       put.add(Bytes.toBytes("trans-date"), null, Bytes.toBytes("20090921010101999"));
1479       put.add(Bytes.toBytes("trans-tags"), Bytes.toBytes("qual2"), Bytes.toBytes(value));
1480       put.add(Bytes.toBytes("trans-group"), null, Bytes.toBytes("adhocTransactionGroupId"));
1481       r.put(put);
1482     }
1483   }
1484 
1485   @Test
1486   public void testFamilyWithAndWithoutColon() throws Exception {
1487     byte[] b = Bytes.toBytes(getName());
1488     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1489     this.region = initHRegion(b, getName(), CONF, cf);
1490     try {
1491       Put p = new Put(b);
1492       byte[] cfwithcolon = Bytes.toBytes(COLUMN_FAMILY + ":");
1493       p.add(cfwithcolon, cfwithcolon, cfwithcolon);
1494       boolean exception = false;
1495       try {
1496         this.region.put(p);
1497       } catch (NoSuchColumnFamilyException e) {
1498         exception = true;
1499       }
1500       assertTrue(exception);
1501     } finally {
1502       HRegion.closeHRegion(this.region);
1503       this.region = null;
1504     }
1505   }
1506 
1507   @Test
1508   public void testBatchPut_whileNoRowLocksHeld() throws IOException {
1509     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1510     byte[] qual = Bytes.toBytes("qual");
1511     byte[] val = Bytes.toBytes("val");
1512     this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
1513     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1514     try {
1515       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1516       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1517 
1518       LOG.info("First a batch put with all valid puts");
1519       final Put[] puts = new Put[10];
1520       for (int i = 0; i < 10; i++) {
1521         puts[i] = new Put(Bytes.toBytes("row_" + i));
1522         puts[i].add(cf, qual, val);
1523       }
1524 
1525       OperationStatus[] codes = this.region.batchMutate(puts);
1526       assertEquals(10, codes.length);
1527       for (int i = 0; i < 10; i++) {
1528         assertEquals(OperationStatusCode.SUCCESS, codes[i].getOperationStatusCode());
1529       }
1530       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 1, source);
1531 
1532       LOG.info("Next a batch put with one invalid family");
1533       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
1534       codes = this.region.batchMutate(puts);
1535       assertEquals(10, codes.length);
1536       for (int i = 0; i < 10; i++) {
1537         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1538             codes[i].getOperationStatusCode());
1539       }
1540 
1541       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs + 2, source);
1542     } finally {
1543       HRegion.closeHRegion(this.region);
1544       this.region = null;
1545     }
1546   }
1547 
1548   @Test
1549   public void testBatchPut_whileMultipleRowLocksHeld() throws Exception {
1550     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1551     byte[] qual = Bytes.toBytes("qual");
1552     byte[] val = Bytes.toBytes("val");
1553     this.region = initHRegion(Bytes.toBytes(getName()), getName(), CONF, cf);
1554     MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1555     try {
1556       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1557       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1558 
1559       final Put[] puts = new Put[10];
1560       for (int i = 0; i < 10; i++) {
1561         puts[i] = new Put(Bytes.toBytes("row_" + i));
1562         puts[i].add(cf, qual, val);
1563       }
1564       puts[5].add(Bytes.toBytes("BAD_CF"), qual, val);
1565 
1566       LOG.info("batchPut will have to break into four batches to avoid row locks");
1567       RowLock rowLock1 = region.getRowLock(Bytes.toBytes("row_2"));
1568       RowLock rowLock2 = region.getRowLock(Bytes.toBytes("row_4"));
1569       RowLock rowLock3 = region.getRowLock(Bytes.toBytes("row_6"));
1570 
1571       MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(CONF);
1572       final AtomicReference<OperationStatus[]> retFromThread = new AtomicReference<OperationStatus[]>();
1573       TestThread putter = new TestThread(ctx) {
1574         @Override
1575         public void doWork() throws IOException {
1576           retFromThread.set(region.batchMutate(puts));
1577         }
1578       };
1579       LOG.info("...starting put thread while holding locks");
1580       ctx.addThread(putter);
1581       ctx.startThreads();
1582 
1583       LOG.info("...waiting for put thread to sync 1st time");
1584       waitForCounter(source, "syncTimeNumOps", syncs + 1);
1585 
1586       // Now attempt to close the region from another thread.  Prior to HBASE-12565
1587       // this would cause the in-progress batchMutate operation to to fail with
1588       // exception because it use to release and re-acquire the close-guard lock
1589       // between batches.  Caller then didn't get status indicating which writes succeeded.
1590       // We now expect this thread to block until the batchMutate call finishes.
1591       Thread regionCloseThread = new Thread() {
1592         @Override
1593         public void run() {
1594           try {
1595             HRegion.closeHRegion(region);
1596           } catch (IOException e) {
1597             throw new RuntimeException(e);
1598           }
1599         }
1600       };
1601       regionCloseThread.start();
1602 
1603       LOG.info("...releasing row lock 1, which should let put thread continue");
1604       rowLock1.release();
1605 
1606       LOG.info("...waiting for put thread to sync 2nd time");
1607       waitForCounter(source, "syncTimeNumOps", syncs + 2);
1608 
1609       LOG.info("...releasing row lock 2, which should let put thread continue");
1610       rowLock2.release();
1611 
1612       LOG.info("...waiting for put thread to sync 3rd time");
1613       waitForCounter(source, "syncTimeNumOps", syncs + 3);
1614 
1615       LOG.info("...releasing row lock 3, which should let put thread continue");
1616       rowLock3.release();
1617 
1618       LOG.info("...waiting for put thread to sync 4th time");
1619       waitForCounter(source, "syncTimeNumOps", syncs + 4);
1620 
1621       LOG.info("...joining on put thread");
1622       ctx.stop();
1623       regionCloseThread.join();
1624 
1625       OperationStatus[] codes = retFromThread.get();
1626       for (int i = 0; i < codes.length; i++) {
1627         assertEquals((i == 5) ? OperationStatusCode.BAD_FAMILY : OperationStatusCode.SUCCESS,
1628             codes[i].getOperationStatusCode());
1629       }
1630     } finally {
1631       HRegion.closeHRegion(this.region);
1632       this.region = null;
1633     }
1634   }
1635 
1636   private void waitForCounter(MetricsWALSource source, String metricName, long expectedCount)
1637       throws InterruptedException {
1638     long startWait = System.currentTimeMillis();
1639     long currentCount;
1640     while ((currentCount = metricsAssertHelper.getCounter(metricName, source)) < expectedCount) {
1641       Thread.sleep(100);
1642       if (System.currentTimeMillis() - startWait > 10000) {
1643         fail(String.format("Timed out waiting for '%s' >= '%s', currentCount=%s", metricName,
1644           expectedCount, currentCount));
1645       }
1646     }
1647   }
1648 
1649   @Test
1650   public void testBatchPutWithTsSlop() throws Exception {
1651     byte[] b = Bytes.toBytes(getName());
1652     byte[] cf = Bytes.toBytes(COLUMN_FAMILY);
1653     byte[] qual = Bytes.toBytes("qual");
1654     byte[] val = Bytes.toBytes("val");
1655 
1656     // add data with a timestamp that is too recent for range. Ensure assert
1657     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
1658     this.region = initHRegion(b, getName(), CONF, cf);
1659 
1660     try {
1661       MetricsWALSource source = CompatibilitySingletonFactory.getInstance(MetricsWALSource.class);
1662       long syncs = metricsAssertHelper.getCounter("syncTimeNumOps", source);
1663       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1664 
1665       final Put[] puts = new Put[10];
1666       for (int i = 0; i < 10; i++) {
1667         puts[i] = new Put(Bytes.toBytes("row_" + i), Long.MAX_VALUE - 100);
1668         puts[i].add(cf, qual, val);
1669       }
1670 
1671       OperationStatus[] codes = this.region.batchMutate(puts);
1672       assertEquals(10, codes.length);
1673       for (int i = 0; i < 10; i++) {
1674         assertEquals(OperationStatusCode.SANITY_CHECK_FAILURE, codes[i].getOperationStatusCode());
1675       }
1676       metricsAssertHelper.assertCounter("syncTimeNumOps", syncs, source);
1677 
1678     } finally {
1679       HRegion.closeHRegion(this.region);
1680       this.region = null;
1681     }
1682 
1683   }
1684 
1685   // ////////////////////////////////////////////////////////////////////////////
1686   // checkAndMutate tests
1687   // ////////////////////////////////////////////////////////////////////////////
1688   @Test
1689   public void testCheckAndMutate_WithEmptyRowValue() throws IOException {
1690     byte[] row1 = Bytes.toBytes("row1");
1691     byte[] fam1 = Bytes.toBytes("fam1");
1692     byte[] qf1 = Bytes.toBytes("qualifier");
1693     byte[] emptyVal = new byte[] {};
1694     byte[] val1 = Bytes.toBytes("value1");
1695     byte[] val2 = Bytes.toBytes("value2");
1696 
1697     // Setting up region
1698     String method = this.getName();
1699     this.region = initHRegion(tableName, method, CONF, fam1);
1700     try {
1701       // Putting empty data in key
1702       Put put = new Put(row1);
1703       put.add(fam1, qf1, emptyVal);
1704 
1705       // checkAndPut with empty value
1706       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1707           emptyVal), put, true);
1708       assertTrue(res);
1709 
1710       // Putting data in key
1711       put = new Put(row1);
1712       put.add(fam1, qf1, val1);
1713 
1714       // checkAndPut with correct value
1715       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1716           put, true);
1717       assertTrue(res);
1718 
1719       // not empty anymore
1720       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1721           put, true);
1722       assertFalse(res);
1723 
1724       Delete delete = new Delete(row1);
1725       delete.deleteColumn(fam1, qf1);
1726       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1727           delete, true);
1728       assertFalse(res);
1729 
1730       put = new Put(row1);
1731       put.add(fam1, qf1, val2);
1732       // checkAndPut with correct value
1733       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1734           put, true);
1735       assertTrue(res);
1736 
1737       // checkAndDelete with correct value
1738       delete = new Delete(row1);
1739       delete.deleteColumn(fam1, qf1);
1740       delete.deleteColumn(fam1, qf1);
1741       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1742           delete, true);
1743       assertTrue(res);
1744 
1745       delete = new Delete(row1);
1746       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
1747           delete, true);
1748       assertTrue(res);
1749 
1750       // checkAndPut looking for a null value
1751       put = new Put(row1);
1752       put.add(fam1, qf1, val1);
1753 
1754       res = region
1755           .checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new NullComparator(), put, true);
1756       assertTrue(res);
1757     } finally {
1758       HRegion.closeHRegion(this.region);
1759       this.region = null;
1760     }
1761   }
1762 
1763   @Test
1764   public void testCheckAndMutate_WithWrongValue() throws IOException {
1765     byte[] row1 = Bytes.toBytes("row1");
1766     byte[] fam1 = Bytes.toBytes("fam1");
1767     byte[] qf1 = Bytes.toBytes("qualifier");
1768     byte[] val1 = Bytes.toBytes("value1");
1769     byte[] val2 = Bytes.toBytes("value2");
1770 
1771     // Setting up region
1772     String method = this.getName();
1773     this.region = initHRegion(tableName, method, CONF, fam1);
1774     try {
1775       // Putting data in key
1776       Put put = new Put(row1);
1777       put.add(fam1, qf1, val1);
1778       region.put(put);
1779 
1780       // checkAndPut with wrong value
1781       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1782           val2), put, true);
1783       assertEquals(false, res);
1784 
1785       // checkAndDelete with wrong value
1786       Delete delete = new Delete(row1);
1787       delete.deleteFamily(fam1);
1788       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val2),
1789           put, true);
1790       assertEquals(false, res);
1791     } finally {
1792       HRegion.closeHRegion(this.region);
1793       this.region = null;
1794     }
1795   }
1796 
1797   @Test
1798   public void testCheckAndMutate_WithCorrectValue() throws IOException {
1799     byte[] row1 = Bytes.toBytes("row1");
1800     byte[] fam1 = Bytes.toBytes("fam1");
1801     byte[] qf1 = Bytes.toBytes("qualifier");
1802     byte[] val1 = Bytes.toBytes("value1");
1803 
1804     // Setting up region
1805     String method = this.getName();
1806     this.region = initHRegion(tableName, method, CONF, fam1);
1807     try {
1808       // Putting data in key
1809       Put put = new Put(row1);
1810       put.add(fam1, qf1, val1);
1811       region.put(put);
1812 
1813       // checkAndPut with correct value
1814       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1815           val1), put, true);
1816       assertEquals(true, res);
1817 
1818       // checkAndDelete with correct value
1819       Delete delete = new Delete(row1);
1820       delete.deleteColumn(fam1, qf1);
1821       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
1822           delete, true);
1823       assertEquals(true, res);
1824     } finally {
1825       HRegion.closeHRegion(this.region);
1826       this.region = null;
1827     }
1828   }
1829 
1830   @Test
1831   public void testCheckAndMutate_WithNonEqualCompareOp() throws IOException {
1832     byte[] row1 = Bytes.toBytes("row1");
1833     byte[] fam1 = Bytes.toBytes("fam1");
1834     byte[] qf1 = Bytes.toBytes("qualifier");
1835     byte[] val1 = Bytes.toBytes("value1");
1836     byte[] val2 = Bytes.toBytes("value2");
1837     byte[] val3 = Bytes.toBytes("value3");
1838     byte[] val4 = Bytes.toBytes("value4");
1839 
1840     // Setting up region
1841     String method = this.getName();
1842     this.region = initHRegion(tableName, method, CONF, fam1);
1843     try {
1844       // Putting val3 in key
1845       Put put = new Put(row1);
1846       put.add(fam1, qf1, val3);
1847       region.put(put);
1848 
1849       // Test CompareOp.LESS: original = val3, compare with val3, fail
1850       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1851           new BinaryComparator(val3), put, true);
1852       assertEquals(false, res);
1853 
1854       // Test CompareOp.LESS: original = val3, compare with val4, fail
1855       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1856           new BinaryComparator(val4), put, true);
1857       assertEquals(false, res);
1858 
1859       // Test CompareOp.LESS: original = val3, compare with val2,
1860       // succeed (now value = val2)
1861       put = new Put(row1);
1862       put.add(fam1, qf1, val2);
1863       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS,
1864           new BinaryComparator(val2), put, true);
1865       assertEquals(true, res);
1866 
1867       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val3, fail
1868       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1869           new BinaryComparator(val3), put, true);
1870       assertEquals(false, res);
1871 
1872       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val2,
1873       // succeed (value still = val2)
1874       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1875           new BinaryComparator(val2), put, true);
1876       assertEquals(true, res);
1877 
1878       // Test CompareOp.LESS_OR_EQUAL: original = val2, compare with val1,
1879       // succeed (now value = val3)
1880       put = new Put(row1);
1881       put.add(fam1, qf1, val3);
1882       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.LESS_OR_EQUAL,
1883           new BinaryComparator(val1), put, true);
1884       assertEquals(true, res);
1885 
1886       // Test CompareOp.GREATER: original = val3, compare with val3, fail
1887       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1888           new BinaryComparator(val3), put, true);
1889       assertEquals(false, res);
1890 
1891       // Test CompareOp.GREATER: original = val3, compare with val2, fail
1892       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1893           new BinaryComparator(val2), put, true);
1894       assertEquals(false, res);
1895 
1896       // Test CompareOp.GREATER: original = val3, compare with val4,
1897       // succeed (now value = val2)
1898       put = new Put(row1);
1899       put.add(fam1, qf1, val2);
1900       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER,
1901           new BinaryComparator(val4), put, true);
1902       assertEquals(true, res);
1903 
1904       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val1, fail
1905       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1906           new BinaryComparator(val1), put, true);
1907       assertEquals(false, res);
1908 
1909       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val2,
1910       // succeed (value still = val2)
1911       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1912           new BinaryComparator(val2), put, true);
1913       assertEquals(true, res);
1914 
1915       // Test CompareOp.GREATER_OR_EQUAL: original = val2, compare with val3, succeed
1916       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.GREATER_OR_EQUAL,
1917           new BinaryComparator(val3), put, true);
1918       assertEquals(true, res);
1919     } finally {
1920       HRegion.closeHRegion(this.region);
1921       this.region = null;
1922     }
1923   }
1924 
1925   @Test
1926   public void testCheckAndPut_ThatPutWasWritten() throws IOException {
1927     byte[] row1 = Bytes.toBytes("row1");
1928     byte[] fam1 = Bytes.toBytes("fam1");
1929     byte[] fam2 = Bytes.toBytes("fam2");
1930     byte[] qf1 = Bytes.toBytes("qualifier");
1931     byte[] val1 = Bytes.toBytes("value1");
1932     byte[] val2 = Bytes.toBytes("value2");
1933 
1934     byte[][] families = { fam1, fam2 };
1935 
1936     // Setting up region
1937     String method = this.getName();
1938     this.region = initHRegion(tableName, method, CONF, families);
1939     try {
1940       // Putting data in the key to check
1941       Put put = new Put(row1);
1942       put.add(fam1, qf1, val1);
1943       region.put(put);
1944 
1945       // Creating put to add
1946       long ts = System.currentTimeMillis();
1947       KeyValue kv = new KeyValue(row1, fam2, qf1, ts, KeyValue.Type.Put, val2);
1948       put = new Put(row1);
1949       put.add(kv);
1950 
1951       // checkAndPut with wrong value
1952       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
1953           val1), put, true);
1954       assertEquals(true, res);
1955 
1956       Get get = new Get(row1);
1957       get.addColumn(fam2, qf1);
1958       Cell[] actual = region.get(get).rawCells();
1959 
1960       Cell[] expected = { kv };
1961 
1962       assertEquals(expected.length, actual.length);
1963       for (int i = 0; i < actual.length; i++) {
1964         assertEquals(expected[i], actual[i]);
1965       }
1966     } finally {
1967       HRegion.closeHRegion(this.region);
1968       this.region = null;
1969     }
1970   }
1971 
1972   @Test
1973   public void testCheckAndPut_wrongRowInPut() throws IOException {
1974     TableName tableName = TableName.valueOf(name.getMethodName());
1975     this.region = initHRegion(tableName, this.getName(), CONF, COLUMNS);
1976     try {
1977       Put put = new Put(row2);
1978       put.add(fam1, qual1, value1);
1979       try {
1980         region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
1981             new BinaryComparator(value2), put, false);
1982         fail();
1983       } catch (org.apache.hadoop.hbase.DoNotRetryIOException expected) {
1984         // expected exception.
1985       }
1986     } finally {
1987       HRegion.closeHRegion(this.region);
1988       this.region = null;
1989     }
1990   }
1991 
1992   @Test
1993   public void testCheckAndDelete_ThatDeleteWasWritten() throws IOException {
1994     byte[] row1 = Bytes.toBytes("row1");
1995     byte[] fam1 = Bytes.toBytes("fam1");
1996     byte[] fam2 = Bytes.toBytes("fam2");
1997     byte[] qf1 = Bytes.toBytes("qualifier1");
1998     byte[] qf2 = Bytes.toBytes("qualifier2");
1999     byte[] qf3 = Bytes.toBytes("qualifier3");
2000     byte[] val1 = Bytes.toBytes("value1");
2001     byte[] val2 = Bytes.toBytes("value2");
2002     byte[] val3 = Bytes.toBytes("value3");
2003     byte[] emptyVal = new byte[] {};
2004 
2005     byte[][] families = { fam1, fam2 };
2006 
2007     // Setting up region
2008     String method = this.getName();
2009     this.region = initHRegion(tableName, method, CONF, families);
2010     try {
2011       // Put content
2012       Put put = new Put(row1);
2013       put.add(fam1, qf1, val1);
2014       region.put(put);
2015       Threads.sleep(2);
2016 
2017       put = new Put(row1);
2018       put.add(fam1, qf1, val2);
2019       put.add(fam2, qf1, val3);
2020       put.add(fam2, qf2, val2);
2021       put.add(fam2, qf3, val1);
2022       put.add(fam1, qf3, val1);
2023       region.put(put);
2024 
2025       // Multi-column delete
2026       Delete delete = new Delete(row1);
2027       delete.deleteColumn(fam1, qf1);
2028       delete.deleteColumn(fam2, qf1);
2029       delete.deleteColumn(fam1, qf3);
2030       boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(
2031           val2), delete, true);
2032       assertEquals(true, res);
2033 
2034       Get get = new Get(row1);
2035       get.addColumn(fam1, qf1);
2036       get.addColumn(fam1, qf3);
2037       get.addColumn(fam2, qf2);
2038       Result r = region.get(get);
2039       assertEquals(2, r.size());
2040       assertArrayEquals(val1, r.getValue(fam1, qf1));
2041       assertArrayEquals(val2, r.getValue(fam2, qf2));
2042 
2043       // Family delete
2044       delete = new Delete(row1);
2045       delete.deleteFamily(fam2);
2046       res = region.checkAndMutate(row1, fam2, qf1, CompareOp.EQUAL, new BinaryComparator(emptyVal),
2047           delete, true);
2048       assertEquals(true, res);
2049 
2050       get = new Get(row1);
2051       r = region.get(get);
2052       assertEquals(1, r.size());
2053       assertArrayEquals(val1, r.getValue(fam1, qf1));
2054 
2055       // Row delete
2056       delete = new Delete(row1);
2057       res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL, new BinaryComparator(val1),
2058           delete, true);
2059       assertEquals(true, res);
2060       get = new Get(row1);
2061       r = region.get(get);
2062       assertEquals(0, r.size());
2063     } finally {
2064       HRegion.closeHRegion(this.region);
2065       this.region = null;
2066     }
2067   }
2068 
2069   // ////////////////////////////////////////////////////////////////////////////
2070   // Delete tests
2071   // ////////////////////////////////////////////////////////////////////////////
2072   @Test
2073   public void testDelete_multiDeleteColumn() throws IOException {
2074     byte[] row1 = Bytes.toBytes("row1");
2075     byte[] fam1 = Bytes.toBytes("fam1");
2076     byte[] qual = Bytes.toBytes("qualifier");
2077     byte[] value = Bytes.toBytes("value");
2078 
2079     Put put = new Put(row1);
2080     put.add(fam1, qual, 1, value);
2081     put.add(fam1, qual, 2, value);
2082 
2083     String method = this.getName();
2084     this.region = initHRegion(tableName, method, CONF, fam1);
2085     try {
2086       region.put(put);
2087 
2088       // We do support deleting more than 1 'latest' version
2089       Delete delete = new Delete(row1);
2090       delete.deleteColumn(fam1, qual);
2091       delete.deleteColumn(fam1, qual);
2092       region.delete(delete);
2093 
2094       Get get = new Get(row1);
2095       get.addFamily(fam1);
2096       Result r = region.get(get);
2097       assertEquals(0, r.size());
2098     } finally {
2099       HRegion.closeHRegion(this.region);
2100       this.region = null;
2101     }
2102   }
2103 
2104   @Test
2105   public void testDelete_CheckFamily() throws IOException {
2106     byte[] row1 = Bytes.toBytes("row1");
2107     byte[] fam1 = Bytes.toBytes("fam1");
2108     byte[] fam2 = Bytes.toBytes("fam2");
2109     byte[] fam3 = Bytes.toBytes("fam3");
2110     byte[] fam4 = Bytes.toBytes("fam4");
2111 
2112     // Setting up region
2113     String method = this.getName();
2114     this.region = initHRegion(tableName, method, CONF, fam1, fam2, fam3);
2115     try {
2116       List<Cell> kvs = new ArrayList<Cell>();
2117       kvs.add(new KeyValue(row1, fam4, null, null));
2118 
2119       // testing existing family
2120       byte[] family = fam2;
2121       try {
2122         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2123             Bytes.BYTES_COMPARATOR);
2124         deleteMap.put(family, kvs);
2125         region.delete(deleteMap, Durability.SYNC_WAL);
2126       } catch (Exception e) {
2127         assertTrue("Family " + new String(family) + " does not exist", false);
2128       }
2129 
2130       // testing non existing family
2131       boolean ok = false;
2132       family = fam4;
2133       try {
2134         NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2135             Bytes.BYTES_COMPARATOR);
2136         deleteMap.put(family, kvs);
2137         region.delete(deleteMap, Durability.SYNC_WAL);
2138       } catch (Exception e) {
2139         ok = true;
2140       }
2141       assertEquals("Family " + new String(family) + " does exist", true, ok);
2142     } finally {
2143       HRegion.closeHRegion(this.region);
2144       this.region = null;
2145     }
2146   }
2147 
2148   @Test
2149   public void testDelete_mixed() throws IOException, InterruptedException {
2150     byte[] fam = Bytes.toBytes("info");
2151     byte[][] families = { fam };
2152     String method = this.getName();
2153     this.region = initHRegion(tableName, method, CONF, families);
2154     try {
2155       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2156 
2157       byte[] row = Bytes.toBytes("table_name");
2158       // column names
2159       byte[] serverinfo = Bytes.toBytes("serverinfo");
2160       byte[] splitA = Bytes.toBytes("splitA");
2161       byte[] splitB = Bytes.toBytes("splitB");
2162 
2163       // add some data:
2164       Put put = new Put(row);
2165       put.add(fam, splitA, Bytes.toBytes("reference_A"));
2166       region.put(put);
2167 
2168       put = new Put(row);
2169       put.add(fam, splitB, Bytes.toBytes("reference_B"));
2170       region.put(put);
2171 
2172       put = new Put(row);
2173       put.add(fam, serverinfo, Bytes.toBytes("ip_address"));
2174       region.put(put);
2175 
2176       // ok now delete a split:
2177       Delete delete = new Delete(row);
2178       delete.deleteColumns(fam, splitA);
2179       region.delete(delete);
2180 
2181       // assert some things:
2182       Get get = new Get(row).addColumn(fam, serverinfo);
2183       Result result = region.get(get);
2184       assertEquals(1, result.size());
2185 
2186       get = new Get(row).addColumn(fam, splitA);
2187       result = region.get(get);
2188       assertEquals(0, result.size());
2189 
2190       get = new Get(row).addColumn(fam, splitB);
2191       result = region.get(get);
2192       assertEquals(1, result.size());
2193 
2194       // Assert that after a delete, I can put.
2195       put = new Put(row);
2196       put.add(fam, splitA, Bytes.toBytes("reference_A"));
2197       region.put(put);
2198       get = new Get(row);
2199       result = region.get(get);
2200       assertEquals(3, result.size());
2201 
2202       // Now delete all... then test I can add stuff back
2203       delete = new Delete(row);
2204       region.delete(delete);
2205       assertEquals(0, region.get(get).size());
2206 
2207       region.put(new Put(row).add(fam, splitA, Bytes.toBytes("reference_A")));
2208       result = region.get(get);
2209       assertEquals(1, result.size());
2210     } finally {
2211       HRegion.closeHRegion(this.region);
2212       this.region = null;
2213     }
2214   }
2215 
2216   @Test
2217   public void testDeleteRowWithFutureTs() throws IOException {
2218     byte[] fam = Bytes.toBytes("info");
2219     byte[][] families = { fam };
2220     String method = this.getName();
2221     this.region = initHRegion(tableName, method, CONF, families);
2222     try {
2223       byte[] row = Bytes.toBytes("table_name");
2224       // column names
2225       byte[] serverinfo = Bytes.toBytes("serverinfo");
2226 
2227       // add data in the far future
2228       Put put = new Put(row);
2229       put.add(fam, serverinfo, HConstants.LATEST_TIMESTAMP - 5, Bytes.toBytes("value"));
2230       region.put(put);
2231 
2232       // now delete something in the present
2233       Delete delete = new Delete(row);
2234       region.delete(delete);
2235 
2236       // make sure we still see our data
2237       Get get = new Get(row).addColumn(fam, serverinfo);
2238       Result result = region.get(get);
2239       assertEquals(1, result.size());
2240 
2241       // delete the future row
2242       delete = new Delete(row, HConstants.LATEST_TIMESTAMP - 3);
2243       region.delete(delete);
2244 
2245       // make sure it is gone
2246       get = new Get(row).addColumn(fam, serverinfo);
2247       result = region.get(get);
2248       assertEquals(0, result.size());
2249     } finally {
2250       HRegion.closeHRegion(this.region);
2251       this.region = null;
2252     }
2253   }
2254 
2255   /**
2256    * Tests that the special LATEST_TIMESTAMP option for puts gets replaced by
2257    * the actual timestamp
2258    */
2259   @Test
2260   public void testPutWithLatestTS() throws IOException {
2261     byte[] fam = Bytes.toBytes("info");
2262     byte[][] families = { fam };
2263     String method = this.getName();
2264     this.region = initHRegion(tableName, method, CONF, families);
2265     try {
2266       byte[] row = Bytes.toBytes("row1");
2267       // column names
2268       byte[] qual = Bytes.toBytes("qual");
2269 
2270       // add data with LATEST_TIMESTAMP, put without WAL
2271       Put put = new Put(row);
2272       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2273       region.put(put);
2274 
2275       // Make sure it shows up with an actual timestamp
2276       Get get = new Get(row).addColumn(fam, qual);
2277       Result result = region.get(get);
2278       assertEquals(1, result.size());
2279       Cell kv = result.rawCells()[0];
2280       LOG.info("Got: " + kv);
2281       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2282           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2283 
2284       // Check same with WAL enabled (historically these took different
2285       // code paths, so check both)
2286       row = Bytes.toBytes("row2");
2287       put = new Put(row);
2288       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, Bytes.toBytes("value"));
2289       region.put(put);
2290 
2291       // Make sure it shows up with an actual timestamp
2292       get = new Get(row).addColumn(fam, qual);
2293       result = region.get(get);
2294       assertEquals(1, result.size());
2295       kv = result.rawCells()[0];
2296       LOG.info("Got: " + kv);
2297       assertTrue("LATEST_TIMESTAMP was not replaced with real timestamp",
2298           kv.getTimestamp() != HConstants.LATEST_TIMESTAMP);
2299     } finally {
2300       HRegion.closeHRegion(this.region);
2301       this.region = null;
2302     }
2303 
2304   }
2305 
2306   /**
2307    * Tests that there is server-side filtering for invalid timestamp upper
2308    * bound. Note that the timestamp lower bound is automatically handled for us
2309    * by the TTL field.
2310    */
2311   @Test
2312   public void testPutWithTsSlop() throws IOException {
2313     byte[] fam = Bytes.toBytes("info");
2314     byte[][] families = { fam };
2315     String method = this.getName();
2316 
2317     // add data with a timestamp that is too recent for range. Ensure assert
2318     CONF.setInt("hbase.hregion.keyvalue.timestamp.slop.millisecs", 1000);
2319     this.region = initHRegion(tableName, method, CONF, families);
2320     boolean caughtExcep = false;
2321     try {
2322       try {
2323         // no TS specified == use latest. should not error
2324         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), Bytes.toBytes("value")));
2325         // TS out of range. should error
2326         region.put(new Put(row).add(fam, Bytes.toBytes("qual"), System.currentTimeMillis() + 2000,
2327             Bytes.toBytes("value")));
2328         fail("Expected IOE for TS out of configured timerange");
2329       } catch (FailedSanityCheckException ioe) {
2330         LOG.debug("Received expected exception", ioe);
2331         caughtExcep = true;
2332       }
2333       assertTrue("Should catch FailedSanityCheckException", caughtExcep);
2334     } finally {
2335       HRegion.closeHRegion(this.region);
2336       this.region = null;
2337     }
2338   }
2339 
2340   @Test
2341   public void testScanner_DeleteOneFamilyNotAnother() throws IOException {
2342     byte[] fam1 = Bytes.toBytes("columnA");
2343     byte[] fam2 = Bytes.toBytes("columnB");
2344     this.region = initHRegion(tableName, getName(), CONF, fam1, fam2);
2345     try {
2346       byte[] rowA = Bytes.toBytes("rowA");
2347       byte[] rowB = Bytes.toBytes("rowB");
2348 
2349       byte[] value = Bytes.toBytes("value");
2350 
2351       Delete delete = new Delete(rowA);
2352       delete.deleteFamily(fam1);
2353 
2354       region.delete(delete);
2355 
2356       // now create data.
2357       Put put = new Put(rowA);
2358       put.add(fam2, null, value);
2359       region.put(put);
2360 
2361       put = new Put(rowB);
2362       put.add(fam1, null, value);
2363       put.add(fam2, null, value);
2364       region.put(put);
2365 
2366       Scan scan = new Scan();
2367       scan.addFamily(fam1).addFamily(fam2);
2368       InternalScanner s = region.getScanner(scan);
2369       List<Cell> results = new ArrayList<Cell>();
2370       s.next(results);
2371       assertTrue(CellUtil.matchingRow(results.get(0), rowA));
2372 
2373       results.clear();
2374       s.next(results);
2375       assertTrue(CellUtil.matchingRow(results.get(0), rowB));
2376     } finally {
2377       HRegion.closeHRegion(this.region);
2378       this.region = null;
2379     }
2380   }
2381 
2382   @Test
2383   public void testDeleteColumns_PostInsert() throws IOException, InterruptedException {
2384     Delete delete = new Delete(row);
2385     delete.deleteColumns(fam1, qual1);
2386     doTestDelete_AndPostInsert(delete);
2387   }
2388 
2389   @Test
2390   public void testDeleteFamily_PostInsert() throws IOException, InterruptedException {
2391     Delete delete = new Delete(row);
2392     delete.deleteFamily(fam1);
2393     doTestDelete_AndPostInsert(delete);
2394   }
2395 
2396   public void doTestDelete_AndPostInsert(Delete delete) throws IOException, InterruptedException {
2397     TableName tableName = TableName.valueOf(name.getMethodName());
2398     this.region = initHRegion(tableName, getName(), CONF, fam1);
2399     try {
2400       EnvironmentEdgeManagerTestHelper.injectEdge(new IncrementingEnvironmentEdge());
2401       Put put = new Put(row);
2402       put.add(fam1, qual1, value1);
2403       region.put(put);
2404 
2405       // now delete the value:
2406       region.delete(delete);
2407 
2408       // ok put data:
2409       put = new Put(row);
2410       put.add(fam1, qual1, value2);
2411       region.put(put);
2412 
2413       // ok get:
2414       Get get = new Get(row);
2415       get.addColumn(fam1, qual1);
2416 
2417       Result r = region.get(get);
2418       assertEquals(1, r.size());
2419       assertArrayEquals(value2, r.getValue(fam1, qual1));
2420 
2421       // next:
2422       Scan scan = new Scan(row);
2423       scan.addColumn(fam1, qual1);
2424       InternalScanner s = region.getScanner(scan);
2425 
2426       List<Cell> results = new ArrayList<Cell>();
2427       assertEquals(false, s.next(results));
2428       assertEquals(1, results.size());
2429       Cell kv = results.get(0);
2430 
2431       assertArrayEquals(value2, CellUtil.cloneValue(kv));
2432       assertArrayEquals(fam1, CellUtil.cloneFamily(kv));
2433       assertArrayEquals(qual1, CellUtil.cloneQualifier(kv));
2434       assertArrayEquals(row, CellUtil.cloneRow(kv));
2435     } finally {
2436       HRegion.closeHRegion(this.region);
2437       this.region = null;
2438     }
2439   }
2440 
2441   @Test
2442   public void testDelete_CheckTimestampUpdated() throws IOException {
2443     TableName tableName = TableName.valueOf(name.getMethodName());
2444     byte[] row1 = Bytes.toBytes("row1");
2445     byte[] col1 = Bytes.toBytes("col1");
2446     byte[] col2 = Bytes.toBytes("col2");
2447     byte[] col3 = Bytes.toBytes("col3");
2448 
2449     // Setting up region
2450     String method = this.getName();
2451     this.region = initHRegion(tableName, method, CONF, fam1);
2452     try {
2453       // Building checkerList
2454       List<Cell> kvs = new ArrayList<Cell>();
2455       kvs.add(new KeyValue(row1, fam1, col1, null));
2456       kvs.add(new KeyValue(row1, fam1, col2, null));
2457       kvs.add(new KeyValue(row1, fam1, col3, null));
2458 
2459       NavigableMap<byte[], List<Cell>> deleteMap = new TreeMap<byte[], List<Cell>>(
2460           Bytes.BYTES_COMPARATOR);
2461       deleteMap.put(fam1, kvs);
2462       region.delete(deleteMap, Durability.SYNC_WAL);
2463 
2464       // extract the key values out the memstore:
2465       // This is kinda hacky, but better than nothing...
2466       long now = System.currentTimeMillis();
2467       DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore;
2468       Cell firstCell = memstore.cellSet.first();
2469       assertTrue(firstCell.getTimestamp() <= now);
2470       now = firstCell.getTimestamp();
2471       for (Cell cell : memstore.cellSet) {
2472         assertTrue(cell.getTimestamp() <= now);
2473         now = cell.getTimestamp();
2474       }
2475     } finally {
2476       HRegion.closeHRegion(this.region);
2477       this.region = null;
2478     }
2479   }
2480 
2481   // ////////////////////////////////////////////////////////////////////////////
2482   // Get tests
2483   // ////////////////////////////////////////////////////////////////////////////
2484   @Test
2485   public void testGet_FamilyChecker() throws IOException {
2486     byte[] row1 = Bytes.toBytes("row1");
2487     byte[] fam1 = Bytes.toBytes("fam1");
2488     byte[] fam2 = Bytes.toBytes("False");
2489     byte[] col1 = Bytes.toBytes("col1");
2490 
2491     // Setting up region
2492     String method = this.getName();
2493     this.region = initHRegion(tableName, method, CONF, fam1);
2494     try {
2495       Get get = new Get(row1);
2496       get.addColumn(fam2, col1);
2497 
2498       // Test
2499       try {
2500         region.get(get);
2501       } catch (org.apache.hadoop.hbase.DoNotRetryIOException e) {
2502         assertFalse(false);
2503         return;
2504       }
2505       assertFalse(true);
2506     } finally {
2507       HRegion.closeHRegion(this.region);
2508       this.region = null;
2509     }
2510   }
2511 
2512   @Test
2513   public void testGet_Basic() throws IOException {
2514     byte[] row1 = Bytes.toBytes("row1");
2515     byte[] fam1 = Bytes.toBytes("fam1");
2516     byte[] col1 = Bytes.toBytes("col1");
2517     byte[] col2 = Bytes.toBytes("col2");
2518     byte[] col3 = Bytes.toBytes("col3");
2519     byte[] col4 = Bytes.toBytes("col4");
2520     byte[] col5 = Bytes.toBytes("col5");
2521 
2522     // Setting up region
2523     String method = this.getName();
2524     this.region = initHRegion(tableName, method, CONF, fam1);
2525     try {
2526       // Add to memstore
2527       Put put = new Put(row1);
2528       put.add(fam1, col1, null);
2529       put.add(fam1, col2, null);
2530       put.add(fam1, col3, null);
2531       put.add(fam1, col4, null);
2532       put.add(fam1, col5, null);
2533       region.put(put);
2534 
2535       Get get = new Get(row1);
2536       get.addColumn(fam1, col2);
2537       get.addColumn(fam1, col4);
2538       // Expected result
2539       KeyValue kv1 = new KeyValue(row1, fam1, col2);
2540       KeyValue kv2 = new KeyValue(row1, fam1, col4);
2541       KeyValue[] expected = { kv1, kv2 };
2542 
2543       // Test
2544       Result res = region.get(get);
2545       assertEquals(expected.length, res.size());
2546       for (int i = 0; i < res.size(); i++) {
2547         assertTrue(CellUtil.matchingRow(expected[i], res.rawCells()[i]));
2548         assertTrue(CellUtil.matchingFamily(expected[i], res.rawCells()[i]));
2549         assertTrue(CellUtil.matchingQualifier(expected[i], res.rawCells()[i]));
2550       }
2551 
2552       // Test using a filter on a Get
2553       Get g = new Get(row1);
2554       final int count = 2;
2555       g.setFilter(new ColumnCountGetFilter(count));
2556       res = region.get(g);
2557       assertEquals(count, res.size());
2558     } finally {
2559       HRegion.closeHRegion(this.region);
2560       this.region = null;
2561     }
2562   }
2563 
2564   @Test
2565   public void testGet_Empty() throws IOException {
2566     byte[] row = Bytes.toBytes("row");
2567     byte[] fam = Bytes.toBytes("fam");
2568 
2569     String method = this.getName();
2570     this.region = initHRegion(tableName, method, CONF, fam);
2571     try {
2572       Get get = new Get(row);
2573       get.addFamily(fam);
2574       Result r = region.get(get);
2575 
2576       assertTrue(r.isEmpty());
2577     } finally {
2578       HRegion.closeHRegion(this.region);
2579       this.region = null;
2580     }
2581   }
2582 
2583   // ////////////////////////////////////////////////////////////////////////////
2584   // Merge test
2585   // ////////////////////////////////////////////////////////////////////////////
2586   @Test
2587   public void testMerge() throws IOException {
2588     byte[][] families = { fam1, fam2, fam3 };
2589     Configuration hc = initSplit();
2590     // Setting up region
2591     String method = this.getName();
2592     this.region = initHRegion(tableName, method, hc, families);
2593     try {
2594       LOG.info("" + HBaseTestCase.addContent(region, fam3));
2595       region.flush(true);
2596       region.compactStores();
2597       byte[] splitRow = region.checkSplit();
2598       assertNotNull(splitRow);
2599       LOG.info("SplitRow: " + Bytes.toString(splitRow));
2600       HRegion[] subregions = splitRegion(region, splitRow);
2601       try {
2602         // Need to open the regions.
2603         for (int i = 0; i < subregions.length; i++) {
2604           HRegion.openHRegion(subregions[i], null);
2605           subregions[i].compactStores();
2606         }
2607         Path oldRegionPath = region.getRegionFileSystem().getRegionDir();
2608         Path oldRegion1 = subregions[0].getRegionFileSystem().getRegionDir();
2609         Path oldRegion2 = subregions[1].getRegionFileSystem().getRegionDir();
2610         long startTime = System.currentTimeMillis();
2611         region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
2612         LOG.info("Merge regions elapsed time: "
2613             + ((System.currentTimeMillis() - startTime) / 1000.0));
2614         FILESYSTEM.delete(oldRegion1, true);
2615         FILESYSTEM.delete(oldRegion2, true);
2616         FILESYSTEM.delete(oldRegionPath, true);
2617         LOG.info("splitAndMerge completed.");
2618       } finally {
2619         for (int i = 0; i < subregions.length; i++) {
2620           try {
2621             HRegion.closeHRegion(subregions[i]);
2622           } catch (IOException e) {
2623             // Ignore.
2624           }
2625         }
2626       }
2627     } finally {
2628       HRegion.closeHRegion(this.region);
2629       this.region = null;
2630     }
2631   }
2632 
2633   /**
2634    * @param parent
2635    *          Region to split.
2636    * @param midkey
2637    *          Key to split around.
2638    * @return The Regions we created.
2639    * @throws IOException
2640    */
2641   HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException {
2642     PairOfSameType<Region> result = null;
2643     SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey);
2644     // If prepare does not return true, for some reason -- logged inside in
2645     // the prepare call -- we are not ready to split just now. Just return.
2646     if (!st.prepare()) {
2647       parent.clearSplit();
2648       return null;
2649     }
2650     try {
2651       result = st.execute(null, null);
2652     } catch (IOException ioe) {
2653       try {
2654         LOG.info("Running rollback of failed split of " +
2655           parent.getRegionInfo().getRegionNameAsString() + "; " + ioe.getMessage());
2656         st.rollback(null, null);
2657         LOG.info("Successful rollback of failed split of " +
2658           parent.getRegionInfo().getRegionNameAsString());
2659         return null;
2660       } catch (RuntimeException e) {
2661         // If failed rollback, kill this server to avoid having a hole in table.
2662         LOG.info("Failed rollback of failed split of " +
2663           parent.getRegionInfo().getRegionNameAsString() + " -- aborting server", e);
2664       }
2665     }
2666     finally {
2667       parent.clearSplit();
2668     }
2669     return new HRegion[] { (HRegion)result.getFirst(), (HRegion)result.getSecond() };
2670   }
2671 
2672   // ////////////////////////////////////////////////////////////////////////////
2673   // Scanner tests
2674   // ////////////////////////////////////////////////////////////////////////////
2675   @Test
2676   public void testGetScanner_WithOkFamilies() throws IOException {
2677     byte[] fam1 = Bytes.toBytes("fam1");
2678     byte[] fam2 = Bytes.toBytes("fam2");
2679 
2680     byte[][] families = { fam1, fam2 };
2681 
2682     // Setting up region
2683     String method = this.getName();
2684     this.region = initHRegion(tableName, method, CONF, families);
2685     try {
2686       Scan scan = new Scan();
2687       scan.addFamily(fam1);
2688       scan.addFamily(fam2);
2689       try {
2690         region.getScanner(scan);
2691       } catch (Exception e) {
2692         assertTrue("Families could not be found in Region", false);
2693       }
2694     } finally {
2695       HRegion.closeHRegion(this.region);
2696       this.region = null;
2697     }
2698   }
2699 
2700   @Test
2701   public void testGetScanner_WithNotOkFamilies() throws IOException {
2702     byte[] fam1 = Bytes.toBytes("fam1");
2703     byte[] fam2 = Bytes.toBytes("fam2");
2704 
2705     byte[][] families = { fam1 };
2706 
2707     // Setting up region
2708     String method = this.getName();
2709     this.region = initHRegion(tableName, method, CONF, families);
2710     try {
2711       Scan scan = new Scan();
2712       scan.addFamily(fam2);
2713       boolean ok = false;
2714       try {
2715         region.getScanner(scan);
2716       } catch (Exception e) {
2717         ok = true;
2718       }
2719       assertTrue("Families could not be found in Region", ok);
2720     } finally {
2721       HRegion.closeHRegion(this.region);
2722       this.region = null;
2723     }
2724   }
2725 
2726   @Test
2727   public void testGetScanner_WithNoFamilies() throws IOException {
2728     byte[] row1 = Bytes.toBytes("row1");
2729     byte[] fam1 = Bytes.toBytes("fam1");
2730     byte[] fam2 = Bytes.toBytes("fam2");
2731     byte[] fam3 = Bytes.toBytes("fam3");
2732     byte[] fam4 = Bytes.toBytes("fam4");
2733 
2734     byte[][] families = { fam1, fam2, fam3, fam4 };
2735 
2736     // Setting up region
2737     String method = this.getName();
2738     this.region = initHRegion(tableName, method, CONF, families);
2739     try {
2740 
2741       // Putting data in Region
2742       Put put = new Put(row1);
2743       put.add(fam1, null, null);
2744       put.add(fam2, null, null);
2745       put.add(fam3, null, null);
2746       put.add(fam4, null, null);
2747       region.put(put);
2748 
2749       Scan scan = null;
2750       HRegion.RegionScannerImpl is = null;
2751 
2752       // Testing to see how many scanners that is produced by getScanner,
2753       // starting
2754       // with known number, 2 - current = 1
2755       scan = new Scan();
2756       scan.addFamily(fam2);
2757       scan.addFamily(fam4);
2758       is = (RegionScannerImpl) region.getScanner(scan);
2759       assertEquals(1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2760 
2761       scan = new Scan();
2762       is = (RegionScannerImpl) region.getScanner(scan);
2763       assertEquals(families.length - 1, ((RegionScannerImpl) is).storeHeap.getHeap().size());
2764     } finally {
2765       HRegion.closeHRegion(this.region);
2766       this.region = null;
2767     }
2768   }
2769 
2770   /**
2771    * This method tests https://issues.apache.org/jira/browse/HBASE-2516.
2772    *
2773    * @throws IOException
2774    */
2775   @Test
2776   public void testGetScanner_WithRegionClosed() throws IOException {
2777     byte[] fam1 = Bytes.toBytes("fam1");
2778     byte[] fam2 = Bytes.toBytes("fam2");
2779 
2780     byte[][] families = { fam1, fam2 };
2781 
2782     // Setting up region
2783     String method = this.getName();
2784     try {
2785       this.region = initHRegion(tableName, method, CONF, families);
2786     } catch (IOException e) {
2787       e.printStackTrace();
2788       fail("Got IOException during initHRegion, " + e.getMessage());
2789     }
2790     try {
2791       region.closed.set(true);
2792       try {
2793         region.getScanner(null);
2794         fail("Expected to get an exception during getScanner on a region that is closed");
2795       } catch (NotServingRegionException e) {
2796         // this is the correct exception that is expected
2797       } catch (IOException e) {
2798         fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: "
2799             + e.getMessage());
2800       }
2801     } finally {
2802       HRegion.closeHRegion(this.region);
2803       this.region = null;
2804     }
2805   }
2806 
2807   @Test
2808   public void testRegionScanner_Next() throws IOException {
2809     byte[] row1 = Bytes.toBytes("row1");
2810     byte[] row2 = Bytes.toBytes("row2");
2811     byte[] fam1 = Bytes.toBytes("fam1");
2812     byte[] fam2 = Bytes.toBytes("fam2");
2813     byte[] fam3 = Bytes.toBytes("fam3");
2814     byte[] fam4 = Bytes.toBytes("fam4");
2815 
2816     byte[][] families = { fam1, fam2, fam3, fam4 };
2817     long ts = System.currentTimeMillis();
2818 
2819     // Setting up region
2820     String method = this.getName();
2821     this.region = initHRegion(tableName, method, CONF, families);
2822     try {
2823       // Putting data in Region
2824       Put put = null;
2825       put = new Put(row1);
2826       put.add(fam1, (byte[]) null, ts, null);
2827       put.add(fam2, (byte[]) null, ts, null);
2828       put.add(fam3, (byte[]) null, ts, null);
2829       put.add(fam4, (byte[]) null, ts, null);
2830       region.put(put);
2831 
2832       put = new Put(row2);
2833       put.add(fam1, (byte[]) null, ts, null);
2834       put.add(fam2, (byte[]) null, ts, null);
2835       put.add(fam3, (byte[]) null, ts, null);
2836       put.add(fam4, (byte[]) null, ts, null);
2837       region.put(put);
2838 
2839       Scan scan = new Scan();
2840       scan.addFamily(fam2);
2841       scan.addFamily(fam4);
2842       InternalScanner is = region.getScanner(scan);
2843 
2844       List<Cell> res = null;
2845 
2846       // Result 1
2847       List<Cell> expected1 = new ArrayList<Cell>();
2848       expected1.add(new KeyValue(row1, fam2, null, ts, KeyValue.Type.Put, null));
2849       expected1.add(new KeyValue(row1, fam4, null, ts, KeyValue.Type.Put, null));
2850 
2851       res = new ArrayList<Cell>();
2852       is.next(res);
2853       for (int i = 0; i < res.size(); i++) {
2854         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected1.get(i), res.get(i)));
2855       }
2856 
2857       // Result 2
2858       List<Cell> expected2 = new ArrayList<Cell>();
2859       expected2.add(new KeyValue(row2, fam2, null, ts, KeyValue.Type.Put, null));
2860       expected2.add(new KeyValue(row2, fam4, null, ts, KeyValue.Type.Put, null));
2861 
2862       res = new ArrayList<Cell>();
2863       is.next(res);
2864       for (int i = 0; i < res.size(); i++) {
2865         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected2.get(i), res.get(i)));
2866       }
2867     } finally {
2868       HRegion.closeHRegion(this.region);
2869       this.region = null;
2870     }
2871   }
2872 
2873   @Test
2874   public void testScanner_ExplicitColumns_FromMemStore_EnforceVersions() throws IOException {
2875     byte[] row1 = Bytes.toBytes("row1");
2876     byte[] qf1 = Bytes.toBytes("qualifier1");
2877     byte[] qf2 = Bytes.toBytes("qualifier2");
2878     byte[] fam1 = Bytes.toBytes("fam1");
2879     byte[][] families = { fam1 };
2880 
2881     long ts1 = System.currentTimeMillis();
2882     long ts2 = ts1 + 1;
2883     long ts3 = ts1 + 2;
2884 
2885     // Setting up region
2886     String method = this.getName();
2887     this.region = initHRegion(tableName, method, CONF, families);
2888     try {
2889       // Putting data in Region
2890       Put put = null;
2891       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2892       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2893       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2894 
2895       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2896       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2897       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2898 
2899       put = new Put(row1);
2900       put.add(kv13);
2901       put.add(kv12);
2902       put.add(kv11);
2903       put.add(kv23);
2904       put.add(kv22);
2905       put.add(kv21);
2906       region.put(put);
2907 
2908       // Expected
2909       List<Cell> expected = new ArrayList<Cell>();
2910       expected.add(kv13);
2911       expected.add(kv12);
2912 
2913       Scan scan = new Scan(row1);
2914       scan.addColumn(fam1, qf1);
2915       scan.setMaxVersions(MAX_VERSIONS);
2916       List<Cell> actual = new ArrayList<Cell>();
2917       InternalScanner scanner = region.getScanner(scan);
2918 
2919       boolean hasNext = scanner.next(actual);
2920       assertEquals(false, hasNext);
2921 
2922       // Verify result
2923       for (int i = 0; i < expected.size(); i++) {
2924         assertEquals(expected.get(i), actual.get(i));
2925       }
2926     } finally {
2927       HRegion.closeHRegion(this.region);
2928       this.region = null;
2929     }
2930   }
2931 
2932   @Test
2933   public void testScanner_ExplicitColumns_FromFilesOnly_EnforceVersions() throws IOException {
2934     byte[] row1 = Bytes.toBytes("row1");
2935     byte[] qf1 = Bytes.toBytes("qualifier1");
2936     byte[] qf2 = Bytes.toBytes("qualifier2");
2937     byte[] fam1 = Bytes.toBytes("fam1");
2938     byte[][] families = { fam1 };
2939 
2940     long ts1 = 1; // System.currentTimeMillis();
2941     long ts2 = ts1 + 1;
2942     long ts3 = ts1 + 2;
2943 
2944     // Setting up region
2945     String method = this.getName();
2946     this.region = initHRegion(tableName, method, CONF, families);
2947     try {
2948       // Putting data in Region
2949       Put put = null;
2950       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
2951       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
2952       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
2953 
2954       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
2955       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
2956       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
2957 
2958       put = new Put(row1);
2959       put.add(kv13);
2960       put.add(kv12);
2961       put.add(kv11);
2962       put.add(kv23);
2963       put.add(kv22);
2964       put.add(kv21);
2965       region.put(put);
2966       region.flush(true);
2967 
2968       // Expected
2969       List<Cell> expected = new ArrayList<Cell>();
2970       expected.add(kv13);
2971       expected.add(kv12);
2972       expected.add(kv23);
2973       expected.add(kv22);
2974 
2975       Scan scan = new Scan(row1);
2976       scan.addColumn(fam1, qf1);
2977       scan.addColumn(fam1, qf2);
2978       scan.setMaxVersions(MAX_VERSIONS);
2979       List<Cell> actual = new ArrayList<Cell>();
2980       InternalScanner scanner = region.getScanner(scan);
2981 
2982       boolean hasNext = scanner.next(actual);
2983       assertEquals(false, hasNext);
2984 
2985       // Verify result
2986       for (int i = 0; i < expected.size(); i++) {
2987         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
2988       }
2989     } finally {
2990       HRegion.closeHRegion(this.region);
2991       this.region = null;
2992     }
2993   }
2994 
2995   @Test
2996   public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException {
2997     byte[] row1 = Bytes.toBytes("row1");
2998     byte[] fam1 = Bytes.toBytes("fam1");
2999     byte[][] families = { fam1 };
3000     byte[] qf1 = Bytes.toBytes("qualifier1");
3001     byte[] qf2 = Bytes.toBytes("qualifier2");
3002 
3003     long ts1 = 1;
3004     long ts2 = ts1 + 1;
3005     long ts3 = ts1 + 2;
3006     long ts4 = ts1 + 3;
3007 
3008     // Setting up region
3009     String method = this.getName();
3010     this.region = initHRegion(tableName, method, CONF, families);
3011     try {
3012       // Putting data in Region
3013       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3014       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3015       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3016       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3017 
3018       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3019       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3020       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3021       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3022 
3023       Put put = null;
3024       put = new Put(row1);
3025       put.add(kv14);
3026       put.add(kv24);
3027       region.put(put);
3028       region.flush(true);
3029 
3030       put = new Put(row1);
3031       put.add(kv23);
3032       put.add(kv13);
3033       region.put(put);
3034       region.flush(true);
3035 
3036       put = new Put(row1);
3037       put.add(kv22);
3038       put.add(kv12);
3039       region.put(put);
3040       region.flush(true);
3041 
3042       put = new Put(row1);
3043       put.add(kv21);
3044       put.add(kv11);
3045       region.put(put);
3046 
3047       // Expected
3048       List<Cell> expected = new ArrayList<Cell>();
3049       expected.add(kv14);
3050       expected.add(kv13);
3051       expected.add(kv12);
3052       expected.add(kv24);
3053       expected.add(kv23);
3054       expected.add(kv22);
3055 
3056       Scan scan = new Scan(row1);
3057       scan.addColumn(fam1, qf1);
3058       scan.addColumn(fam1, qf2);
3059       int versions = 3;
3060       scan.setMaxVersions(versions);
3061       List<Cell> actual = new ArrayList<Cell>();
3062       InternalScanner scanner = region.getScanner(scan);
3063 
3064       boolean hasNext = scanner.next(actual);
3065       assertEquals(false, hasNext);
3066 
3067       // Verify result
3068       for (int i = 0; i < expected.size(); i++) {
3069         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3070       }
3071     } finally {
3072       HRegion.closeHRegion(this.region);
3073       this.region = null;
3074     }
3075   }
3076 
3077   @Test
3078   public void testScanner_Wildcard_FromMemStore_EnforceVersions() throws IOException {
3079     byte[] row1 = Bytes.toBytes("row1");
3080     byte[] qf1 = Bytes.toBytes("qualifier1");
3081     byte[] qf2 = Bytes.toBytes("qualifier2");
3082     byte[] fam1 = Bytes.toBytes("fam1");
3083     byte[][] families = { fam1 };
3084 
3085     long ts1 = System.currentTimeMillis();
3086     long ts2 = ts1 + 1;
3087     long ts3 = ts1 + 2;
3088 
3089     // Setting up region
3090     String method = this.getName();
3091     this.region = initHRegion(tableName, method, CONF, families);
3092     try {
3093       // Putting data in Region
3094       Put put = null;
3095       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3096       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3097       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3098 
3099       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3100       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3101       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3102 
3103       put = new Put(row1);
3104       put.add(kv13);
3105       put.add(kv12);
3106       put.add(kv11);
3107       put.add(kv23);
3108       put.add(kv22);
3109       put.add(kv21);
3110       region.put(put);
3111 
3112       // Expected
3113       List<Cell> expected = new ArrayList<Cell>();
3114       expected.add(kv13);
3115       expected.add(kv12);
3116       expected.add(kv23);
3117       expected.add(kv22);
3118 
3119       Scan scan = new Scan(row1);
3120       scan.addFamily(fam1);
3121       scan.setMaxVersions(MAX_VERSIONS);
3122       List<Cell> actual = new ArrayList<Cell>();
3123       InternalScanner scanner = region.getScanner(scan);
3124 
3125       boolean hasNext = scanner.next(actual);
3126       assertEquals(false, hasNext);
3127 
3128       // Verify result
3129       for (int i = 0; i < expected.size(); i++) {
3130         assertEquals(expected.get(i), actual.get(i));
3131       }
3132     } finally {
3133       HRegion.closeHRegion(this.region);
3134       this.region = null;
3135     }
3136   }
3137 
3138   @Test
3139   public void testScanner_Wildcard_FromFilesOnly_EnforceVersions() throws IOException {
3140     byte[] row1 = Bytes.toBytes("row1");
3141     byte[] qf1 = Bytes.toBytes("qualifier1");
3142     byte[] qf2 = Bytes.toBytes("qualifier2");
3143     byte[] fam1 = Bytes.toBytes("fam1");
3144 
3145     long ts1 = 1; // System.currentTimeMillis();
3146     long ts2 = ts1 + 1;
3147     long ts3 = ts1 + 2;
3148 
3149     // Setting up region
3150     String method = this.getName();
3151     this.region = initHRegion(tableName, method, CONF, fam1);
3152     try {
3153       // Putting data in Region
3154       Put put = null;
3155       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3156       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3157       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3158 
3159       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3160       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3161       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3162 
3163       put = new Put(row1);
3164       put.add(kv13);
3165       put.add(kv12);
3166       put.add(kv11);
3167       put.add(kv23);
3168       put.add(kv22);
3169       put.add(kv21);
3170       region.put(put);
3171       region.flush(true);
3172 
3173       // Expected
3174       List<Cell> expected = new ArrayList<Cell>();
3175       expected.add(kv13);
3176       expected.add(kv12);
3177       expected.add(kv23);
3178       expected.add(kv22);
3179 
3180       Scan scan = new Scan(row1);
3181       scan.addFamily(fam1);
3182       scan.setMaxVersions(MAX_VERSIONS);
3183       List<Cell> actual = new ArrayList<Cell>();
3184       InternalScanner scanner = region.getScanner(scan);
3185 
3186       boolean hasNext = scanner.next(actual);
3187       assertEquals(false, hasNext);
3188 
3189       // Verify result
3190       for (int i = 0; i < expected.size(); i++) {
3191         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3192       }
3193     } finally {
3194       HRegion.closeHRegion(this.region);
3195       this.region = null;
3196     }
3197   }
3198 
3199   @Test
3200   public void testScanner_StopRow1542() throws IOException {
3201     byte[] family = Bytes.toBytes("testFamily");
3202     this.region = initHRegion(tableName, getName(), CONF, family);
3203     try {
3204       byte[] row1 = Bytes.toBytes("row111");
3205       byte[] row2 = Bytes.toBytes("row222");
3206       byte[] row3 = Bytes.toBytes("row333");
3207       byte[] row4 = Bytes.toBytes("row444");
3208       byte[] row5 = Bytes.toBytes("row555");
3209 
3210       byte[] col1 = Bytes.toBytes("Pub111");
3211       byte[] col2 = Bytes.toBytes("Pub222");
3212 
3213       Put put = new Put(row1);
3214       put.add(family, col1, Bytes.toBytes(10L));
3215       region.put(put);
3216 
3217       put = new Put(row2);
3218       put.add(family, col1, Bytes.toBytes(15L));
3219       region.put(put);
3220 
3221       put = new Put(row3);
3222       put.add(family, col2, Bytes.toBytes(20L));
3223       region.put(put);
3224 
3225       put = new Put(row4);
3226       put.add(family, col2, Bytes.toBytes(30L));
3227       region.put(put);
3228 
3229       put = new Put(row5);
3230       put.add(family, col1, Bytes.toBytes(40L));
3231       region.put(put);
3232 
3233       Scan scan = new Scan(row3, row4);
3234       scan.setMaxVersions();
3235       scan.addColumn(family, col1);
3236       InternalScanner s = region.getScanner(scan);
3237 
3238       List<Cell> results = new ArrayList<Cell>();
3239       assertEquals(false, s.next(results));
3240       assertEquals(0, results.size());
3241     } finally {
3242       HRegion.closeHRegion(this.region);
3243       this.region = null;
3244     }
3245   }
3246 
3247   @Test
3248   public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException {
3249     byte[] row1 = Bytes.toBytes("row1");
3250     byte[] fam1 = Bytes.toBytes("fam1");
3251     byte[] qf1 = Bytes.toBytes("qualifier1");
3252     byte[] qf2 = Bytes.toBytes("quateslifier2");
3253 
3254     long ts1 = 1;
3255     long ts2 = ts1 + 1;
3256     long ts3 = ts1 + 2;
3257     long ts4 = ts1 + 3;
3258 
3259     // Setting up region
3260     String method = this.getName();
3261     this.region = initHRegion(tableName, method, CONF, fam1);
3262     try {
3263       // Putting data in Region
3264       KeyValue kv14 = new KeyValue(row1, fam1, qf1, ts4, KeyValue.Type.Put, null);
3265       KeyValue kv13 = new KeyValue(row1, fam1, qf1, ts3, KeyValue.Type.Put, null);
3266       KeyValue kv12 = new KeyValue(row1, fam1, qf1, ts2, KeyValue.Type.Put, null);
3267       KeyValue kv11 = new KeyValue(row1, fam1, qf1, ts1, KeyValue.Type.Put, null);
3268 
3269       KeyValue kv24 = new KeyValue(row1, fam1, qf2, ts4, KeyValue.Type.Put, null);
3270       KeyValue kv23 = new KeyValue(row1, fam1, qf2, ts3, KeyValue.Type.Put, null);
3271       KeyValue kv22 = new KeyValue(row1, fam1, qf2, ts2, KeyValue.Type.Put, null);
3272       KeyValue kv21 = new KeyValue(row1, fam1, qf2, ts1, KeyValue.Type.Put, null);
3273 
3274       Put put = null;
3275       put = new Put(row1);
3276       put.add(kv14);
3277       put.add(kv24);
3278       region.put(put);
3279       region.flush(true);
3280 
3281       put = new Put(row1);
3282       put.add(kv23);
3283       put.add(kv13);
3284       region.put(put);
3285       region.flush(true);
3286 
3287       put = new Put(row1);
3288       put.add(kv22);
3289       put.add(kv12);
3290       region.put(put);
3291       region.flush(true);
3292 
3293       put = new Put(row1);
3294       put.add(kv21);
3295       put.add(kv11);
3296       region.put(put);
3297 
3298       // Expected
3299       List<KeyValue> expected = new ArrayList<KeyValue>();
3300       expected.add(kv14);
3301       expected.add(kv13);
3302       expected.add(kv12);
3303       expected.add(kv24);
3304       expected.add(kv23);
3305       expected.add(kv22);
3306 
3307       Scan scan = new Scan(row1);
3308       int versions = 3;
3309       scan.setMaxVersions(versions);
3310       List<Cell> actual = new ArrayList<Cell>();
3311       InternalScanner scanner = region.getScanner(scan);
3312 
3313       boolean hasNext = scanner.next(actual);
3314       assertEquals(false, hasNext);
3315 
3316       // Verify result
3317       for (int i = 0; i < expected.size(); i++) {
3318         assertTrue(CellComparator.equalsIgnoreMvccVersion(expected.get(i), actual.get(i)));
3319       }
3320     } finally {
3321       HRegion.closeHRegion(this.region);
3322       this.region = null;
3323     }
3324   }
3325 
3326   /**
3327    * Added for HBASE-5416
3328    *
3329    * Here we test scan optimization when only subset of CFs are used in filter
3330    * conditions.
3331    */
3332   @Test
3333   public void testScanner_JoinedScanners() throws IOException {
3334     byte[] cf_essential = Bytes.toBytes("essential");
3335     byte[] cf_joined = Bytes.toBytes("joined");
3336     byte[] cf_alpha = Bytes.toBytes("alpha");
3337     this.region = initHRegion(tableName, getName(), CONF, cf_essential, cf_joined, cf_alpha);
3338     try {
3339       byte[] row1 = Bytes.toBytes("row1");
3340       byte[] row2 = Bytes.toBytes("row2");
3341       byte[] row3 = Bytes.toBytes("row3");
3342 
3343       byte[] col_normal = Bytes.toBytes("d");
3344       byte[] col_alpha = Bytes.toBytes("a");
3345 
3346       byte[] filtered_val = Bytes.toBytes(3);
3347 
3348       Put put = new Put(row1);
3349       put.add(cf_essential, col_normal, Bytes.toBytes(1));
3350       put.add(cf_joined, col_alpha, Bytes.toBytes(1));
3351       region.put(put);
3352 
3353       put = new Put(row2);
3354       put.add(cf_essential, col_alpha, Bytes.toBytes(2));
3355       put.add(cf_joined, col_normal, Bytes.toBytes(2));
3356       put.add(cf_alpha, col_alpha, Bytes.toBytes(2));
3357       region.put(put);
3358 
3359       put = new Put(row3);
3360       put.add(cf_essential, col_normal, filtered_val);
3361       put.add(cf_joined, col_normal, filtered_val);
3362       region.put(put);
3363 
3364       // Check two things:
3365       // 1. result list contains expected values
3366       // 2. result list is sorted properly
3367 
3368       Scan scan = new Scan();
3369       Filter filter = new SingleColumnValueExcludeFilter(cf_essential, col_normal,
3370           CompareOp.NOT_EQUAL, filtered_val);
3371       scan.setFilter(filter);
3372       scan.setLoadColumnFamiliesOnDemand(true);
3373       InternalScanner s = region.getScanner(scan);
3374 
3375       List<Cell> results = new ArrayList<Cell>();
3376       assertTrue(s.next(results));
3377       assertEquals(results.size(), 1);
3378       results.clear();
3379 
3380       assertTrue(s.next(results));
3381       assertEquals(results.size(), 3);
3382       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(0), cf_alpha));
3383       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(1), cf_essential));
3384       assertTrue("orderCheck", CellUtil.matchingFamily(results.get(2), cf_joined));
3385       results.clear();
3386 
3387       assertFalse(s.next(results));
3388       assertEquals(results.size(), 0);
3389     } finally {
3390       HRegion.closeHRegion(this.region);
3391       this.region = null;
3392     }
3393   }
3394 
3395   /**
3396    * HBASE-5416
3397    *
3398    * Test case when scan limits amount of KVs returned on each next() call.
3399    */
3400   @Test
3401   public void testScanner_JoinedScannersWithLimits() throws IOException {
3402     final byte[] cf_first = Bytes.toBytes("first");
3403     final byte[] cf_second = Bytes.toBytes("second");
3404 
3405     this.region = initHRegion(tableName, getName(), CONF, cf_first, cf_second);
3406     try {
3407       final byte[] col_a = Bytes.toBytes("a");
3408       final byte[] col_b = Bytes.toBytes("b");
3409 
3410       Put put;
3411 
3412       for (int i = 0; i < 10; i++) {
3413         put = new Put(Bytes.toBytes("r" + Integer.toString(i)));
3414         put.add(cf_first, col_a, Bytes.toBytes(i));
3415         if (i < 5) {
3416           put.add(cf_first, col_b, Bytes.toBytes(i));
3417           put.add(cf_second, col_a, Bytes.toBytes(i));
3418           put.add(cf_second, col_b, Bytes.toBytes(i));
3419         }
3420         region.put(put);
3421       }
3422 
3423       Scan scan = new Scan();
3424       scan.setLoadColumnFamiliesOnDemand(true);
3425       Filter bogusFilter = new FilterBase() {
3426         @Override
3427         public ReturnCode filterKeyValue(Cell ignored) throws IOException {
3428           return ReturnCode.INCLUDE;
3429         }
3430         @Override
3431         public boolean isFamilyEssential(byte[] name) {
3432           return Bytes.equals(name, cf_first);
3433         }
3434       };
3435 
3436       scan.setFilter(bogusFilter);
3437       InternalScanner s = region.getScanner(scan);
3438 
3439       // Our data looks like this:
3440       // r0: first:a, first:b, second:a, second:b
3441       // r1: first:a, first:b, second:a, second:b
3442       // r2: first:a, first:b, second:a, second:b
3443       // r3: first:a, first:b, second:a, second:b
3444       // r4: first:a, first:b, second:a, second:b
3445       // r5: first:a
3446       // r6: first:a
3447       // r7: first:a
3448       // r8: first:a
3449       // r9: first:a
3450 
3451       // But due to next's limit set to 3, we should get this:
3452       // r0: first:a, first:b, second:a
3453       // r0: second:b
3454       // r1: first:a, first:b, second:a
3455       // r1: second:b
3456       // r2: first:a, first:b, second:a
3457       // r2: second:b
3458       // r3: first:a, first:b, second:a
3459       // r3: second:b
3460       // r4: first:a, first:b, second:a
3461       // r4: second:b
3462       // r5: first:a
3463       // r6: first:a
3464       // r7: first:a
3465       // r8: first:a
3466       // r9: first:a
3467 
3468       List<Cell> results = new ArrayList<Cell>();
3469       int index = 0;
3470       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(3).build();
3471       while (true) {
3472         boolean more = s.next(results, scannerContext);
3473         if ((index >> 1) < 5) {
3474           if (index % 2 == 0)
3475             assertEquals(results.size(), 3);
3476           else
3477             assertEquals(results.size(), 1);
3478         } else
3479           assertEquals(results.size(), 1);
3480         results.clear();
3481         index++;
3482         if (!more)
3483           break;
3484       }
3485     } finally {
3486       HRegion.closeHRegion(this.region);
3487       this.region = null;
3488     }
3489   }
3490 
3491   /**
3492    * Write an HFile block full with Cells whose qualifier that are identical between
3493    * 0 and Short.MAX_VALUE. See HBASE-13329.
3494    * @throws Exception
3495    */
3496   @Test
3497   public void testLongQualifier() throws Exception {
3498     String method = name.getMethodName();
3499     TableName tableName = TableName.valueOf(method);
3500     byte[] family = Bytes.toBytes("family");
3501     this.region = initHRegion(tableName, method, CONF, family);
3502     byte[] q = new byte[Short.MAX_VALUE+2];
3503     Arrays.fill(q, 0, q.length-1, (byte)42);
3504     for (byte i=0; i<10; i++) {
3505       Put p = new Put(Bytes.toBytes("row"));
3506       // qualifiers that differ past Short.MAX_VALUE
3507       q[q.length-1]=i;
3508       p.addColumn(family, q, q);
3509       region.put(p);
3510     }
3511     region.flush(false);
3512     HRegion.closeHRegion(this.region);
3513     this.region = null;
3514   }
3515   // ////////////////////////////////////////////////////////////////////////////
3516   // Split test
3517   // ////////////////////////////////////////////////////////////////////////////
3518   /**
3519    * Splits twice and verifies getting from each of the split regions.
3520    *
3521    * @throws Exception
3522    */
3523   @Test
3524   public void testBasicSplit() throws Exception {
3525     byte[][] families = { fam1, fam2, fam3 };
3526 
3527     Configuration hc = initSplit();
3528     // Setting up region
3529     String method = this.getName();
3530     this.region = initHRegion(tableName, method, hc, families);
3531 
3532     try {
3533       LOG.info("" + HBaseTestCase.addContent(region, fam3));
3534       region.flush(true);
3535       region.compactStores();
3536       byte[] splitRow = region.checkSplit();
3537       assertNotNull(splitRow);
3538       LOG.info("SplitRow: " + Bytes.toString(splitRow));
3539       HRegion[] regions = splitRegion(region, splitRow);
3540       try {
3541         // Need to open the regions.
3542         // TODO: Add an 'open' to HRegion... don't do open by constructing
3543         // instance.
3544         for (int i = 0; i < regions.length; i++) {
3545           regions[i] = HRegion.openHRegion(regions[i], null);
3546         }
3547         // Assert can get rows out of new regions. Should be able to get first
3548         // row from first region and the midkey from second region.
3549         assertGet(regions[0], fam3, Bytes.toBytes(START_KEY));
3550         assertGet(regions[1], fam3, splitRow);
3551         // Test I can get scanner and that it starts at right place.
3552         assertScan(regions[0], fam3, Bytes.toBytes(START_KEY));
3553         assertScan(regions[1], fam3, splitRow);
3554         // Now prove can't split regions that have references.
3555         for (int i = 0; i < regions.length; i++) {
3556           // Add so much data to this region, we create a store file that is >
3557           // than one of our unsplitable references. it will.
3558           for (int j = 0; j < 2; j++) {
3559             HBaseTestCase.addContent(regions[i], fam3);
3560           }
3561           HBaseTestCase.addContent(regions[i], fam2);
3562           HBaseTestCase.addContent(regions[i], fam1);
3563           regions[i].flush(true);
3564         }
3565 
3566         byte[][] midkeys = new byte[regions.length][];
3567         // To make regions splitable force compaction.
3568         for (int i = 0; i < regions.length; i++) {
3569           regions[i].compactStores();
3570           midkeys[i] = regions[i].checkSplit();
3571         }
3572 
3573         TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
3574         // Split these two daughter regions so then I'll have 4 regions. Will
3575         // split because added data above.
3576         for (int i = 0; i < regions.length; i++) {
3577           HRegion[] rs = null;
3578           if (midkeys[i] != null) {
3579             rs = splitRegion(regions[i], midkeys[i]);
3580             for (int j = 0; j < rs.length; j++) {
3581               sortedMap.put(Bytes.toString(rs[j].getRegionInfo().getRegionName()),
3582                 HRegion.openHRegion(rs[j], null));
3583             }
3584           }
3585         }
3586         LOG.info("Made 4 regions");
3587         // The splits should have been even. Test I can get some arbitrary row
3588         // out of each.
3589         int interval = (LAST_CHAR - FIRST_CHAR) / 3;
3590         byte[] b = Bytes.toBytes(START_KEY);
3591         for (HRegion r : sortedMap.values()) {
3592           assertGet(r, fam3, b);
3593           b[0] += interval;
3594         }
3595       } finally {
3596         for (int i = 0; i < regions.length; i++) {
3597           try {
3598             regions[i].close();
3599           } catch (IOException e) {
3600             // Ignore.
3601           }
3602         }
3603       }
3604     } finally {
3605       HRegion.closeHRegion(this.region);
3606       this.region = null;
3607     }
3608   }
3609 
3610   @Test
3611   public void testSplitRegion() throws IOException {
3612     byte[] qualifier = Bytes.toBytes("qualifier");
3613     Configuration hc = initSplit();
3614     int numRows = 10;
3615     byte[][] families = { fam1, fam3 };
3616 
3617     // Setting up region
3618     String method = this.getName();
3619     this.region = initHRegion(tableName, method, hc, families);
3620 
3621     // Put data in region
3622     int startRow = 100;
3623     putData(startRow, numRows, qualifier, families);
3624     int splitRow = startRow + numRows;
3625     putData(splitRow, numRows, qualifier, families);
3626     region.flush(true);
3627 
3628     HRegion[] regions = null;
3629     try {
3630       regions = splitRegion(region, Bytes.toBytes("" + splitRow));
3631       // Opening the regions returned.
3632       for (int i = 0; i < regions.length; i++) {
3633         regions[i] = HRegion.openHRegion(regions[i], null);
3634       }
3635       // Verifying that the region has been split
3636       assertEquals(2, regions.length);
3637 
3638       // Verifying that all data is still there and that data is in the right
3639       // place
3640       verifyData(regions[0], startRow, numRows, qualifier, families);
3641       verifyData(regions[1], splitRow, numRows, qualifier, families);
3642 
3643     } finally {
3644       HRegion.closeHRegion(this.region);
3645       this.region = null;
3646     }
3647   }
3648 
3649   @Test
3650   public void testClearForceSplit() throws IOException {
3651     byte[] qualifier = Bytes.toBytes("qualifier");
3652     Configuration hc = initSplit();
3653     int numRows = 10;
3654     byte[][] families = { fam1, fam3 };
3655 
3656     // Setting up region
3657     String method = this.getName();
3658     this.region = initHRegion(tableName, method, hc, families);
3659 
3660     // Put data in region
3661     int startRow = 100;
3662     putData(startRow, numRows, qualifier, families);
3663     int splitRow = startRow + numRows;
3664     byte[] splitRowBytes = Bytes.toBytes("" + splitRow);
3665     putData(splitRow, numRows, qualifier, families);
3666     region.flush(true);
3667 
3668     HRegion[] regions = null;
3669     try {
3670       // Set force split
3671       region.forceSplit(splitRowBytes);
3672       assertTrue(region.shouldForceSplit());
3673       // Split point should be the force split row
3674       assertTrue(Bytes.equals(splitRowBytes, region.checkSplit()));
3675 
3676       // Add a store that has references.
3677       HStore storeMock = Mockito.mock(HStore.class);
3678       when(storeMock.hasReferences()).thenReturn(true);
3679       when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf"));
3680       when(storeMock.close()).thenReturn(ImmutableList.<StoreFile>of());
3681       when(storeMock.getColumnFamilyName()).thenReturn("cf");
3682       region.stores.put(Bytes.toBytes(storeMock.getColumnFamilyName()), storeMock);
3683       assertTrue(region.hasReferences());
3684 
3685       // Will not split since the store has references.
3686       regions = splitRegion(region, splitRowBytes);
3687       assertNull(regions);
3688 
3689       // Region force split should be cleared after the split try.
3690       assertFalse(region.shouldForceSplit());
3691 
3692       // Remove the store that has references.
3693       region.stores.remove(Bytes.toBytes(storeMock.getColumnFamilyName()));
3694       assertFalse(region.hasReferences());
3695 
3696       // Now we can split.
3697       regions = splitRegion(region, splitRowBytes);
3698 
3699       // Opening the regions returned.
3700       for (int i = 0; i < regions.length; i++) {
3701         regions[i] = HRegion.openHRegion(regions[i], null);
3702       }
3703       // Verifying that the region has been split
3704       assertEquals(2, regions.length);
3705 
3706       // Verifying that all data is still there and that data is in the right
3707       // place
3708       verifyData(regions[0], startRow, numRows, qualifier, families);
3709       verifyData(regions[1], splitRow, numRows, qualifier, families);
3710 
3711     } finally {
3712       HRegion.closeHRegion(this.region);
3713       this.region = null;
3714     }
3715   }
3716 
3717   /**
3718    * Flushes the cache in a thread while scanning. The tests verify that the
3719    * scan is coherent - e.g. the returned results are always of the same or
3720    * later update as the previous results.
3721    *
3722    * @throws IOException
3723    *           scan / compact
3724    * @throws InterruptedException
3725    *           thread join
3726    */
3727   @Test
3728   public void testFlushCacheWhileScanning() throws IOException, InterruptedException {
3729     byte[] family = Bytes.toBytes("family");
3730     int numRows = 1000;
3731     int flushAndScanInterval = 10;
3732     int compactInterval = 10 * flushAndScanInterval;
3733 
3734     String method = "testFlushCacheWhileScanning";
3735     this.region = initHRegion(tableName, method, CONF, family);
3736     try {
3737       FlushThread flushThread = new FlushThread();
3738       flushThread.start();
3739 
3740       Scan scan = new Scan();
3741       scan.addFamily(family);
3742       scan.setFilter(new SingleColumnValueFilter(family, qual1, CompareOp.EQUAL,
3743           new BinaryComparator(Bytes.toBytes(5L))));
3744 
3745       int expectedCount = 0;
3746       List<Cell> res = new ArrayList<Cell>();
3747 
3748       boolean toggle = true;
3749       for (long i = 0; i < numRows; i++) {
3750         Put put = new Put(Bytes.toBytes(i));
3751         put.setDurability(Durability.SKIP_WAL);
3752         put.add(family, qual1, Bytes.toBytes(i % 10));
3753         region.put(put);
3754 
3755         if (i != 0 && i % compactInterval == 0) {
3756           // System.out.println("iteration = " + i);
3757           region.compact(true);
3758         }
3759 
3760         if (i % 10 == 5L) {
3761           expectedCount++;
3762         }
3763 
3764         if (i != 0 && i % flushAndScanInterval == 0) {
3765           res.clear();
3766           InternalScanner scanner = region.getScanner(scan);
3767           if (toggle) {
3768             flushThread.flush();
3769           }
3770           while (scanner.next(res))
3771             ;
3772           if (!toggle) {
3773             flushThread.flush();
3774           }
3775           assertEquals("i=" + i, expectedCount, res.size());
3776           toggle = !toggle;
3777         }
3778       }
3779 
3780       flushThread.done();
3781       flushThread.join();
3782       flushThread.checkNoError();
3783     } finally {
3784       HRegion.closeHRegion(this.region);
3785       this.region = null;
3786     }
3787   }
3788 
3789   protected class FlushThread extends Thread {
3790     private volatile boolean done;
3791     private Throwable error = null;
3792 
3793     public void done() {
3794       done = true;
3795       synchronized (this) {
3796         interrupt();
3797       }
3798     }
3799 
3800     public void checkNoError() {
3801       if (error != null) {
3802         assertNull(error);
3803       }
3804     }
3805 
3806     @Override
3807     public void run() {
3808       done = false;
3809       while (!done) {
3810         synchronized (this) {
3811           try {
3812             wait();
3813           } catch (InterruptedException ignored) {
3814             if (done) {
3815               break;
3816             }
3817           }
3818         }
3819         try {
3820           region.flush(true);
3821         } catch (IOException e) {
3822           if (!done) {
3823             LOG.error("Error while flusing cache", e);
3824             error = e;
3825           }
3826           break;
3827         }
3828       }
3829 
3830     }
3831 
3832     public void flush() {
3833       synchronized (this) {
3834         notify();
3835       }
3836 
3837     }
3838   }
3839 
3840   /**
3841    * Writes very wide records and scans for the latest every time.. Flushes and
3842    * compacts the region every now and then to keep things realistic.
3843    *
3844    * @throws IOException
3845    *           by flush / scan / compaction
3846    * @throws InterruptedException
3847    *           when joining threads
3848    */
3849   @Test
3850   public void testWritesWhileScanning() throws IOException, InterruptedException {
3851     int testCount = 100;
3852     int numRows = 1;
3853     int numFamilies = 10;
3854     int numQualifiers = 100;
3855     int flushInterval = 7;
3856     int compactInterval = 5 * flushInterval;
3857     byte[][] families = new byte[numFamilies][];
3858     for (int i = 0; i < numFamilies; i++) {
3859       families[i] = Bytes.toBytes("family" + i);
3860     }
3861     byte[][] qualifiers = new byte[numQualifiers][];
3862     for (int i = 0; i < numQualifiers; i++) {
3863       qualifiers[i] = Bytes.toBytes("qual" + i);
3864     }
3865 
3866     String method = "testWritesWhileScanning";
3867     this.region = initHRegion(tableName, method, CONF, families);
3868     try {
3869       PutThread putThread = new PutThread(numRows, families, qualifiers);
3870       putThread.start();
3871       putThread.waitForFirstPut();
3872 
3873       FlushThread flushThread = new FlushThread();
3874       flushThread.start();
3875 
3876       Scan scan = new Scan(Bytes.toBytes("row0"), Bytes.toBytes("row1"));
3877 
3878       int expectedCount = numFamilies * numQualifiers;
3879       List<Cell> res = new ArrayList<Cell>();
3880 
3881       long prevTimestamp = 0L;
3882       for (int i = 0; i < testCount; i++) {
3883 
3884         if (i != 0 && i % compactInterval == 0) {
3885           region.compact(true);
3886         }
3887 
3888         if (i != 0 && i % flushInterval == 0) {
3889           flushThread.flush();
3890         }
3891 
3892         boolean previousEmpty = res.isEmpty();
3893         res.clear();
3894         InternalScanner scanner = region.getScanner(scan);
3895         while (scanner.next(res))
3896           ;
3897         if (!res.isEmpty() || !previousEmpty || i > compactInterval) {
3898           assertEquals("i=" + i, expectedCount, res.size());
3899           long timestamp = res.get(0).getTimestamp();
3900           assertTrue("Timestamps were broke: " + timestamp + " prev: " + prevTimestamp,
3901               timestamp >= prevTimestamp);
3902           prevTimestamp = timestamp;
3903         }
3904       }
3905 
3906       putThread.done();
3907 
3908       region.flush(true);
3909 
3910       putThread.join();
3911       putThread.checkNoError();
3912 
3913       flushThread.done();
3914       flushThread.join();
3915       flushThread.checkNoError();
3916     } finally {
3917       try {
3918         HRegion.closeHRegion(this.region);
3919       } catch (DroppedSnapshotException dse) {
3920         // We could get this on way out because we interrupt the background flusher and it could
3921         // fail anywhere causing a DSE over in the background flusher... only it is not properly
3922         // dealt with so could still be memory hanging out when we get to here -- memory we can't
3923         // flush because the accounting is 'off' since original DSE.
3924       }
3925       this.region = null;
3926     }
3927   }
3928 
3929   protected class PutThread extends Thread {
3930     private volatile boolean done;
3931     private volatile int numPutsFinished = 0;
3932 
3933     private Throwable error = null;
3934     private int numRows;
3935     private byte[][] families;
3936     private byte[][] qualifiers;
3937 
3938     private PutThread(int numRows, byte[][] families, byte[][] qualifiers) {
3939       this.numRows = numRows;
3940       this.families = families;
3941       this.qualifiers = qualifiers;
3942     }
3943 
3944     /**
3945      * Block until this thread has put at least one row.
3946      */
3947     public void waitForFirstPut() throws InterruptedException {
3948       // wait until put thread actually puts some data
3949       while (numPutsFinished == 0) {
3950         checkNoError();
3951         Thread.sleep(50);
3952       }
3953     }
3954 
3955     public void done() {
3956       done = true;
3957       synchronized (this) {
3958         interrupt();
3959       }
3960     }
3961 
3962     public void checkNoError() {
3963       if (error != null) {
3964         assertNull(error);
3965       }
3966     }
3967 
3968     @Override
3969     public void run() {
3970       done = false;
3971       while (!done) {
3972         try {
3973           for (int r = 0; r < numRows; r++) {
3974             byte[] row = Bytes.toBytes("row" + r);
3975             Put put = new Put(row);
3976             put.setDurability(Durability.SKIP_WAL);
3977             byte[] value = Bytes.toBytes(String.valueOf(numPutsFinished));
3978             for (byte[] family : families) {
3979               for (byte[] qualifier : qualifiers) {
3980                 put.add(family, qualifier, (long) numPutsFinished, value);
3981               }
3982             }
3983             region.put(put);
3984             numPutsFinished++;
3985             if (numPutsFinished > 0 && numPutsFinished % 47 == 0) {
3986               System.out.println("put iteration = " + numPutsFinished);
3987               Delete delete = new Delete(row, (long) numPutsFinished - 30);
3988               region.delete(delete);
3989             }
3990             numPutsFinished++;
3991           }
3992         } catch (InterruptedIOException e) {
3993           // This is fine. It means we are done, or didn't get the lock on time
3994         } catch (IOException e) {
3995           LOG.error("error while putting records", e);
3996           error = e;
3997           break;
3998         }
3999       }
4000 
4001     }
4002 
4003   }
4004 
4005   /**
4006    * Writes very wide records and gets the latest row every time.. Flushes and
4007    * compacts the region aggressivly to catch issues.
4008    *
4009    * @throws IOException
4010    *           by flush / scan / compaction
4011    * @throws InterruptedException
4012    *           when joining threads
4013    */
4014   @Test
4015   public void testWritesWhileGetting() throws Exception {
4016     int testCount = 100;
4017     int numRows = 1;
4018     int numFamilies = 10;
4019     int numQualifiers = 100;
4020     int compactInterval = 100;
4021     byte[][] families = new byte[numFamilies][];
4022     for (int i = 0; i < numFamilies; i++) {
4023       families[i] = Bytes.toBytes("family" + i);
4024     }
4025     byte[][] qualifiers = new byte[numQualifiers][];
4026     for (int i = 0; i < numQualifiers; i++) {
4027       qualifiers[i] = Bytes.toBytes("qual" + i);
4028     }
4029 
4030 
4031     String method = "testWritesWhileGetting";
4032     // This test flushes constantly and can cause many files to be created,
4033     // possibly
4034     // extending over the ulimit. Make sure compactions are aggressive in
4035     // reducing
4036     // the number of HFiles created.
4037     Configuration conf = HBaseConfiguration.create(CONF);
4038     conf.setInt("hbase.hstore.compaction.min", 1);
4039     conf.setInt("hbase.hstore.compaction.max", 1000);
4040     this.region = initHRegion(tableName, method, conf, families);
4041     PutThread putThread = null;
4042     MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf);
4043     try {
4044       putThread = new PutThread(numRows, families, qualifiers);
4045       putThread.start();
4046       putThread.waitForFirstPut();
4047 
4048       // Add a thread that flushes as fast as possible
4049       ctx.addThread(new RepeatingTestThread(ctx) {
4050         private int flushesSinceCompact = 0;
4051         private final int maxFlushesSinceCompact = 20;
4052 
4053         @Override
4054         public void doAnAction() throws Exception {
4055           if (region.flush(true).isCompactionNeeded()) {
4056             ++flushesSinceCompact;
4057           }
4058           // Compact regularly to avoid creating too many files and exceeding
4059           // the ulimit.
4060           if (flushesSinceCompact == maxFlushesSinceCompact) {
4061             region.compact(false);
4062             flushesSinceCompact = 0;
4063           }
4064         }
4065       });
4066       ctx.startThreads();
4067 
4068       Get get = new Get(Bytes.toBytes("row0"));
4069       Result result = null;
4070 
4071       int expectedCount = numFamilies * numQualifiers;
4072 
4073       long prevTimestamp = 0L;
4074       for (int i = 0; i < testCount; i++) {
4075 
4076         boolean previousEmpty = result == null || result.isEmpty();
4077         result = region.get(get);
4078         if (!result.isEmpty() || !previousEmpty || i > compactInterval) {
4079           assertEquals("i=" + i, expectedCount, result.size());
4080           // TODO this was removed, now what dangit?!
4081           // search looking for the qualifier in question?
4082           long timestamp = 0;
4083           for (Cell kv : result.rawCells()) {
4084             if (CellUtil.matchingFamily(kv, families[0])
4085                 && CellUtil.matchingQualifier(kv, qualifiers[0])) {
4086               timestamp = kv.getTimestamp();
4087             }
4088           }
4089           assertTrue(timestamp >= prevTimestamp);
4090           prevTimestamp = timestamp;
4091           Cell previousKV = null;
4092 
4093           for (Cell kv : result.rawCells()) {
4094             byte[] thisValue = CellUtil.cloneValue(kv);
4095             if (previousKV != null) {
4096               if (Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue) != 0) {
4097                 LOG.warn("These two KV should have the same value." + " Previous KV:" + previousKV
4098                     + "(memStoreTS:" + previousKV.getMvccVersion() + ")" + ", New KV: " + kv
4099                     + "(memStoreTS:" + kv.getMvccVersion() + ")");
4100                 assertEquals(0, Bytes.compareTo(CellUtil.cloneValue(previousKV), thisValue));
4101               }
4102             }
4103             previousKV = kv;
4104           }
4105         }
4106       }
4107     } finally {
4108       if (putThread != null)
4109         putThread.done();
4110 
4111       region.flush(true);
4112 
4113       if (putThread != null) {
4114         putThread.join();
4115         putThread.checkNoError();
4116       }
4117 
4118       ctx.stop();
4119       HRegion.closeHRegion(this.region);
4120       this.region = null;
4121     }
4122   }
4123 
4124   @Test
4125   public void testHolesInMeta() throws Exception {
4126     byte[] family = Bytes.toBytes("family");
4127     this.region = initHRegion(tableName, Bytes.toBytes("x"), Bytes.toBytes("z"), method, CONF,
4128         false, family);
4129     try {
4130       byte[] rowNotServed = Bytes.toBytes("a");
4131       Get g = new Get(rowNotServed);
4132       try {
4133         region.get(g);
4134         fail();
4135       } catch (WrongRegionException x) {
4136         // OK
4137       }
4138       byte[] row = Bytes.toBytes("y");
4139       g = new Get(row);
4140       region.get(g);
4141     } finally {
4142       HRegion.closeHRegion(this.region);
4143       this.region = null;
4144     }
4145   }
4146 
4147   @Test
4148   public void testIndexesScanWithOneDeletedRow() throws IOException {
4149     byte[] family = Bytes.toBytes("family");
4150 
4151     // Setting up region
4152     String method = "testIndexesScanWithOneDeletedRow";
4153     this.region = initHRegion(tableName, method, CONF, family);
4154     try {
4155       Put put = new Put(Bytes.toBytes(1L));
4156       put.add(family, qual1, 1L, Bytes.toBytes(1L));
4157       region.put(put);
4158 
4159       region.flush(true);
4160 
4161       Delete delete = new Delete(Bytes.toBytes(1L), 1L);
4162       region.delete(delete);
4163 
4164       put = new Put(Bytes.toBytes(2L));
4165       put.add(family, qual1, 2L, Bytes.toBytes(2L));
4166       region.put(put);
4167 
4168       Scan idxScan = new Scan();
4169       idxScan.addFamily(family);
4170       idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, Arrays.<Filter> asList(
4171           new SingleColumnValueFilter(family, qual1, CompareOp.GREATER_OR_EQUAL,
4172               new BinaryComparator(Bytes.toBytes(0L))), new SingleColumnValueFilter(family, qual1,
4173               CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(3L))))));
4174       InternalScanner scanner = region.getScanner(idxScan);
4175       List<Cell> res = new ArrayList<Cell>();
4176 
4177       while (scanner.next(res))
4178         ;
4179       assertEquals(1L, res.size());
4180     } finally {
4181       HRegion.closeHRegion(this.region);
4182       this.region = null;
4183     }
4184   }
4185 
4186   // ////////////////////////////////////////////////////////////////////////////
4187   // Bloom filter test
4188   // ////////////////////////////////////////////////////////////////////////////
4189   @Test
4190   public void testBloomFilterSize() throws IOException {
4191     byte[] fam1 = Bytes.toBytes("fam1");
4192     byte[] qf1 = Bytes.toBytes("col");
4193     byte[] val1 = Bytes.toBytes("value1");
4194     // Create Table
4195     HColumnDescriptor hcd = new HColumnDescriptor(fam1).setMaxVersions(Integer.MAX_VALUE)
4196         .setBloomFilterType(BloomType.ROWCOL);
4197 
4198     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
4199     htd.addFamily(hcd);
4200     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4201     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4202     try {
4203       int num_unique_rows = 10;
4204       int duplicate_multiplier = 2;
4205       int num_storefiles = 4;
4206 
4207       int version = 0;
4208       for (int f = 0; f < num_storefiles; f++) {
4209         for (int i = 0; i < duplicate_multiplier; i++) {
4210           for (int j = 0; j < num_unique_rows; j++) {
4211             Put put = new Put(Bytes.toBytes("row" + j));
4212             put.setDurability(Durability.SKIP_WAL);
4213             put.add(fam1, qf1, version++, val1);
4214             region.put(put);
4215           }
4216         }
4217         region.flush(true);
4218       }
4219       // before compaction
4220       HStore store = (HStore) region.getStore(fam1);
4221       Collection<StoreFile> storeFiles = store.getStorefiles();
4222       for (StoreFile storefile : storeFiles) {
4223         StoreFile.Reader reader = storefile.getReader();
4224         reader.loadFileInfo();
4225         reader.loadBloomfilter();
4226         assertEquals(num_unique_rows * duplicate_multiplier, reader.getEntries());
4227         assertEquals(num_unique_rows, reader.getFilterEntries());
4228       }
4229 
4230       region.compact(true);
4231 
4232       // after compaction
4233       storeFiles = store.getStorefiles();
4234       for (StoreFile storefile : storeFiles) {
4235         StoreFile.Reader reader = storefile.getReader();
4236         reader.loadFileInfo();
4237         reader.loadBloomfilter();
4238         assertEquals(num_unique_rows * duplicate_multiplier * num_storefiles, reader.getEntries());
4239         assertEquals(num_unique_rows, reader.getFilterEntries());
4240       }
4241     } finally {
4242       HRegion.closeHRegion(this.region);
4243       this.region = null;
4244     }
4245   }
4246 
4247   @Test
4248   public void testAllColumnsWithBloomFilter() throws IOException {
4249     byte[] TABLE = Bytes.toBytes("testAllColumnsWithBloomFilter");
4250     byte[] FAMILY = Bytes.toBytes("family");
4251 
4252     // Create table
4253     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY).setMaxVersions(Integer.MAX_VALUE)
4254         .setBloomFilterType(BloomType.ROWCOL);
4255     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
4256     htd.addFamily(hcd);
4257     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4258     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4259     try {
4260       // For row:0, col:0: insert versions 1 through 5.
4261       byte row[] = Bytes.toBytes("row:" + 0);
4262       byte column[] = Bytes.toBytes("column:" + 0);
4263       Put put = new Put(row);
4264       put.setDurability(Durability.SKIP_WAL);
4265       for (long idx = 1; idx <= 4; idx++) {
4266         put.add(FAMILY, column, idx, Bytes.toBytes("value-version-" + idx));
4267       }
4268       region.put(put);
4269 
4270       // Flush
4271       region.flush(true);
4272 
4273       // Get rows
4274       Get get = new Get(row);
4275       get.setMaxVersions();
4276       Cell[] kvs = region.get(get).rawCells();
4277 
4278       // Check if rows are correct
4279       assertEquals(4, kvs.length);
4280       checkOneCell(kvs[0], FAMILY, 0, 0, 4);
4281       checkOneCell(kvs[1], FAMILY, 0, 0, 3);
4282       checkOneCell(kvs[2], FAMILY, 0, 0, 2);
4283       checkOneCell(kvs[3], FAMILY, 0, 0, 1);
4284     } finally {
4285       HRegion.closeHRegion(this.region);
4286       this.region = null;
4287     }
4288   }
4289 
4290   /**
4291    * Testcase to cover bug-fix for HBASE-2823 Ensures correct delete when
4292    * issuing delete row on columns with bloom filter set to row+col
4293    * (BloomType.ROWCOL)
4294    */
4295   @Test
4296   public void testDeleteRowWithBloomFilter() throws IOException {
4297     byte[] familyName = Bytes.toBytes("familyName");
4298 
4299     // Create Table
4300     HColumnDescriptor hcd = new HColumnDescriptor(familyName).setMaxVersions(Integer.MAX_VALUE)
4301         .setBloomFilterType(BloomType.ROWCOL);
4302 
4303     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
4304     htd.addFamily(hcd);
4305     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
4306     this.region = TEST_UTIL.createLocalHRegion(info, htd);
4307     try {
4308       // Insert some data
4309       byte row[] = Bytes.toBytes("row1");
4310       byte col[] = Bytes.toBytes("col1");
4311 
4312       Put put = new Put(row);
4313       put.add(familyName, col, 1, Bytes.toBytes("SomeRandomValue"));
4314       region.put(put);
4315       region.flush(true);
4316 
4317       Delete del = new Delete(row);
4318       region.delete(del);
4319       region.flush(true);
4320 
4321       // Get remaining rows (should have none)
4322       Get get = new Get(row);
4323       get.addColumn(familyName, col);
4324 
4325       Cell[] keyValues = region.get(get).rawCells();
4326       assertTrue(keyValues.length == 0);
4327     } finally {
4328       HRegion.closeHRegion(this.region);
4329       this.region = null;
4330     }
4331   }
4332 
4333   @Test
4334   public void testgetHDFSBlocksDistribution() throws Exception {
4335     HBaseTestingUtility htu = new HBaseTestingUtility();
4336     // Why do we set the block size in this test?  If we set it smaller than the kvs, then we'll
4337     // break up the file in to more pieces that can be distributed across the three nodes and we
4338     // won't be able to have the condition this test asserts; that at least one node has
4339     // a copy of all replicas -- if small block size, then blocks are spread evenly across the
4340     // the three nodes.  hfilev3 with tags seems to put us over the block size.  St.Ack.
4341     // final int DEFAULT_BLOCK_SIZE = 1024;
4342     // htu.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
4343     htu.getConfiguration().setInt("dfs.replication", 2);
4344 
4345     // set up a cluster with 3 nodes
4346     MiniHBaseCluster cluster = null;
4347     String dataNodeHosts[] = new String[] { "host1", "host2", "host3" };
4348     int regionServersCount = 3;
4349 
4350     try {
4351       cluster = htu.startMiniCluster(1, regionServersCount, dataNodeHosts);
4352       byte[][] families = { fam1, fam2 };
4353       Table ht = htu.createTable(Bytes.toBytes(this.getName()), families);
4354 
4355       // Setting up region
4356       byte row[] = Bytes.toBytes("row1");
4357       byte col[] = Bytes.toBytes("col1");
4358 
4359       Put put = new Put(row);
4360       put.add(fam1, col, 1, Bytes.toBytes("test1"));
4361       put.add(fam2, col, 1, Bytes.toBytes("test2"));
4362       ht.put(put);
4363 
4364       HRegion firstRegion = htu.getHBaseCluster().getRegions(TableName.valueOf(this.getName()))
4365           .get(0);
4366       firstRegion.flush(true);
4367       HDFSBlocksDistribution blocksDistribution1 = firstRegion.getHDFSBlocksDistribution();
4368 
4369       // Given the default replication factor is 2 and we have 2 HFiles,
4370       // we will have total of 4 replica of blocks on 3 datanodes; thus there
4371       // must be at least one host that have replica for 2 HFiles. That host's
4372       // weight will be equal to the unique block weight.
4373       long uniqueBlocksWeight1 = blocksDistribution1.getUniqueBlocksTotalWeight();
4374       StringBuilder sb = new StringBuilder();
4375       for (String host: blocksDistribution1.getTopHosts()) {
4376         if (sb.length() > 0) sb.append(", ");
4377         sb.append(host);
4378         sb.append("=");
4379         sb.append(blocksDistribution1.getWeight(host));
4380       }
4381 
4382       String topHost = blocksDistribution1.getTopHosts().get(0);
4383       long topHostWeight = blocksDistribution1.getWeight(topHost);
4384       String msg = "uniqueBlocksWeight=" + uniqueBlocksWeight1 + ", topHostWeight=" +
4385         topHostWeight + ", topHost=" + topHost + "; " + sb.toString();
4386       LOG.info(msg);
4387       assertTrue(msg, uniqueBlocksWeight1 == topHostWeight);
4388 
4389       // use the static method to compute the value, it should be the same.
4390       // static method is used by load balancer or other components
4391       HDFSBlocksDistribution blocksDistribution2 = HRegion.computeHDFSBlocksDistribution(
4392           htu.getConfiguration(), firstRegion.getTableDesc(), firstRegion.getRegionInfo());
4393       long uniqueBlocksWeight2 = blocksDistribution2.getUniqueBlocksTotalWeight();
4394 
4395       assertTrue(uniqueBlocksWeight1 == uniqueBlocksWeight2);
4396 
4397       ht.close();
4398     } finally {
4399       if (cluster != null) {
4400         htu.shutdownMiniCluster();
4401       }
4402     }
4403   }
4404 
4405   /**
4406    * Testcase to check state of region initialization task set to ABORTED or not
4407    * if any exceptions during initialization
4408    *
4409    * @throws Exception
4410    */
4411   @Test
4412   public void testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization() throws Exception {
4413     TableName tableName = TableName.valueOf(name.getMethodName());
4414     HRegionInfo info = null;
4415     try {
4416       FileSystem fs = Mockito.mock(FileSystem.class);
4417       Mockito.when(fs.exists((Path) Mockito.anyObject())).thenThrow(new IOException());
4418       HTableDescriptor htd = new HTableDescriptor(tableName);
4419       htd.addFamily(new HColumnDescriptor("cf"));
4420       info = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
4421           HConstants.EMPTY_BYTE_ARRAY, false);
4422       Path path = new Path(dir + "testStatusSettingToAbortIfAnyExceptionDuringRegionInitilization");
4423       region = HRegion.newHRegion(path, null, fs, CONF, info, htd, null);
4424       // region initialization throws IOException and set task state to ABORTED.
4425       region.initialize();
4426       fail("Region initialization should fail due to IOException");
4427     } catch (IOException io) {
4428       List<MonitoredTask> tasks = TaskMonitor.get().getTasks();
4429       for (MonitoredTask monitoredTask : tasks) {
4430         if (!(monitoredTask instanceof MonitoredRPCHandler)
4431             && monitoredTask.getDescription().contains(region.toString())) {
4432           assertTrue("Region state should be ABORTED.",
4433               monitoredTask.getState().equals(MonitoredTask.State.ABORTED));
4434           break;
4435         }
4436       }
4437     } finally {
4438       HRegion.closeHRegion(region);
4439     }
4440   }
4441 
4442   /**
4443    * Verifies that the .regioninfo file is written on region creation and that
4444    * is recreated if missing during region opening.
4445    */
4446   @Test
4447   public void testRegionInfoFileCreation() throws IOException {
4448     Path rootDir = new Path(dir + "testRegionInfoFileCreation");
4449 
4450     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testtb"));
4451     htd.addFamily(new HColumnDescriptor("cf"));
4452 
4453     HRegionInfo hri = new HRegionInfo(htd.getTableName());
4454 
4455     // Create a region and skip the initialization (like CreateTableHandler)
4456     HRegion region = HRegion.createHRegion(hri, rootDir, CONF, htd, null, false, true);
4457 //    HRegion region = TEST_UTIL.createLocalHRegion(hri, htd);
4458     Path regionDir = region.getRegionFileSystem().getRegionDir();
4459     FileSystem fs = region.getRegionFileSystem().getFileSystem();
4460     HRegion.closeHRegion(region);
4461 
4462     Path regionInfoFile = new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE);
4463 
4464     // Verify that the .regioninfo file is present
4465     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4466         fs.exists(regionInfoFile));
4467 
4468     // Try to open the region
4469     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4470     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4471     HRegion.closeHRegion(region);
4472 
4473     // Verify that the .regioninfo file is still there
4474     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4475         fs.exists(regionInfoFile));
4476 
4477     // Remove the .regioninfo file and verify is recreated on region open
4478     fs.delete(regionInfoFile, true);
4479     assertFalse(HRegionFileSystem.REGION_INFO_FILE + " should be removed from the region dir",
4480         fs.exists(regionInfoFile));
4481 
4482     region = HRegion.openHRegion(rootDir, hri, htd, null, CONF);
4483 //    region = TEST_UTIL.openHRegion(hri, htd);
4484     assertEquals(regionDir, region.getRegionFileSystem().getRegionDir());
4485     HRegion.closeHRegion(region);
4486 
4487     // Verify that the .regioninfo file is still there
4488     assertTrue(HRegionFileSystem.REGION_INFO_FILE + " should be present in the region dir",
4489         fs.exists(new Path(regionDir, HRegionFileSystem.REGION_INFO_FILE)));
4490   }
4491 
4492   /**
4493    * TestCase for increment
4494    */
4495   private static class Incrementer implements Runnable {
4496     private HRegion region;
4497     private final static byte[] incRow = Bytes.toBytes("incRow");
4498     private final static byte[] family = Bytes.toBytes("family");
4499     private final static byte[] qualifier = Bytes.toBytes("qualifier");
4500     private final static long ONE = 1l;
4501     private int incCounter;
4502 
4503     public Incrementer(HRegion region, int incCounter) {
4504       this.region = region;
4505       this.incCounter = incCounter;
4506     }
4507 
4508     @Override
4509     public void run() {
4510       int count = 0;
4511       while (count < incCounter) {
4512         Increment inc = new Increment(incRow);
4513         inc.addColumn(family, qualifier, ONE);
4514         count++;
4515         try {
4516           region.increment(inc);
4517         } catch (IOException e) {
4518           e.printStackTrace();
4519           break;
4520         }
4521       }
4522     }
4523   }
4524 
4525   /**
4526    * Test case to check increment function with memstore flushing
4527    * @throws Exception
4528    */
4529   @Test
4530   public void testParallelIncrementWithMemStoreFlush() throws Exception {
4531     byte[] family = Incrementer.family;
4532     this.region = initHRegion(tableName, method, CONF, family);
4533     final HRegion region = this.region;
4534     final AtomicBoolean incrementDone = new AtomicBoolean(false);
4535     Runnable flusher = new Runnable() {
4536       @Override
4537       public void run() {
4538         while (!incrementDone.get()) {
4539           try {
4540             region.flush(true);
4541           } catch (Exception e) {
4542             e.printStackTrace();
4543           }
4544         }
4545       }
4546     };
4547 
4548     // after all increment finished, the row will increment to 20*100 = 2000
4549     int threadNum = 20;
4550     int incCounter = 100;
4551     long expected = threadNum * incCounter;
4552     Thread[] incrementers = new Thread[threadNum];
4553     Thread flushThread = new Thread(flusher);
4554     for (int i = 0; i < threadNum; i++) {
4555       incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
4556       incrementers[i].start();
4557     }
4558     flushThread.start();
4559     for (int i = 0; i < threadNum; i++) {
4560       incrementers[i].join();
4561     }
4562 
4563     incrementDone.set(true);
4564     flushThread.join();
4565 
4566     Get get = new Get(Incrementer.incRow);
4567     get.addColumn(Incrementer.family, Incrementer.qualifier);
4568     get.setMaxVersions(1);
4569     Result res = this.region.get(get);
4570     List<Cell> kvs = res.getColumnCells(Incrementer.family, Incrementer.qualifier);
4571 
4572     // we just got the latest version
4573     assertEquals(kvs.size(), 1);
4574     Cell kv = kvs.get(0);
4575     assertEquals(expected, Bytes.toLong(kv.getValueArray(), kv.getValueOffset()));
4576     this.region = null;
4577   }
4578 
4579   /**
4580    * TestCase for append
4581    */
4582   private static class Appender implements Runnable {
4583     private HRegion region;
4584     private final static byte[] appendRow = Bytes.toBytes("appendRow");
4585     private final static byte[] family = Bytes.toBytes("family");
4586     private final static byte[] qualifier = Bytes.toBytes("qualifier");
4587     private final static byte[] CHAR = Bytes.toBytes("a");
4588     private int appendCounter;
4589 
4590     public Appender(HRegion region, int appendCounter) {
4591       this.region = region;
4592       this.appendCounter = appendCounter;
4593     }
4594 
4595     @Override
4596     public void run() {
4597       int count = 0;
4598       while (count < appendCounter) {
4599         Append app = new Append(appendRow);
4600         app.add(family, qualifier, CHAR);
4601         count++;
4602         try {
4603           region.append(app);
4604         } catch (IOException e) {
4605           e.printStackTrace();
4606           break;
4607         }
4608       }
4609     }
4610   }
4611 
4612   /**
4613    * Test case to check append function with memstore flushing
4614    * @throws Exception
4615    */
4616   @Test
4617   public void testParallelAppendWithMemStoreFlush() throws Exception {
4618     byte[] family = Appender.family;
4619     this.region = initHRegion(tableName, method, CONF, family);
4620     final HRegion region = this.region;
4621     final AtomicBoolean appendDone = new AtomicBoolean(false);
4622     Runnable flusher = new Runnable() {
4623       @Override
4624       public void run() {
4625         while (!appendDone.get()) {
4626           try {
4627             region.flush(true);
4628           } catch (Exception e) {
4629             e.printStackTrace();
4630           }
4631         }
4632       }
4633     };
4634 
4635     // after all append finished, the value will append to threadNum *
4636     // appendCounter Appender.CHAR
4637     int threadNum = 20;
4638     int appendCounter = 100;
4639     byte[] expected = new byte[threadNum * appendCounter];
4640     for (int i = 0; i < threadNum * appendCounter; i++) {
4641       System.arraycopy(Appender.CHAR, 0, expected, i, 1);
4642     }
4643     Thread[] appenders = new Thread[threadNum];
4644     Thread flushThread = new Thread(flusher);
4645     for (int i = 0; i < threadNum; i++) {
4646       appenders[i] = new Thread(new Appender(this.region, appendCounter));
4647       appenders[i].start();
4648     }
4649     flushThread.start();
4650     for (int i = 0; i < threadNum; i++) {
4651       appenders[i].join();
4652     }
4653 
4654     appendDone.set(true);
4655     flushThread.join();
4656 
4657     Get get = new Get(Appender.appendRow);
4658     get.addColumn(Appender.family, Appender.qualifier);
4659     get.setMaxVersions(1);
4660     Result res = this.region.get(get);
4661     List<Cell> kvs = res.getColumnCells(Appender.family, Appender.qualifier);
4662 
4663     // we just got the latest version
4664     assertEquals(kvs.size(), 1);
4665     Cell kv = kvs.get(0);
4666     byte[] appendResult = new byte[kv.getValueLength()];
4667     System.arraycopy(kv.getValueArray(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
4668     assertArrayEquals(expected, appendResult);
4669     this.region = null;
4670   }
4671 
4672   /**
4673    * Test case to check put function with memstore flushing for same row, same ts
4674    * @throws Exception
4675    */
4676   @Test
4677   public void testPutWithMemStoreFlush() throws Exception {
4678     byte[] family = Bytes.toBytes("family");
4679     ;
4680     byte[] qualifier = Bytes.toBytes("qualifier");
4681     byte[] row = Bytes.toBytes("putRow");
4682     byte[] value = null;
4683     this.region = initHRegion(tableName, method, CONF, family);
4684     Put put = null;
4685     Get get = null;
4686     List<Cell> kvs = null;
4687     Result res = null;
4688 
4689     put = new Put(row);
4690     value = Bytes.toBytes("value0");
4691     put.add(family, qualifier, 1234567l, value);
4692     region.put(put);
4693     get = new Get(row);
4694     get.addColumn(family, qualifier);
4695     get.setMaxVersions();
4696     res = this.region.get(get);
4697     kvs = res.getColumnCells(family, qualifier);
4698     assertEquals(1, kvs.size());
4699     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4700 
4701     region.flush(true);
4702     get = new Get(row);
4703     get.addColumn(family, qualifier);
4704     get.setMaxVersions();
4705     res = this.region.get(get);
4706     kvs = res.getColumnCells(family, qualifier);
4707     assertEquals(1, kvs.size());
4708     assertArrayEquals(Bytes.toBytes("value0"), CellUtil.cloneValue(kvs.get(0)));
4709 
4710     put = new Put(row);
4711     value = Bytes.toBytes("value1");
4712     put.add(family, qualifier, 1234567l, value);
4713     region.put(put);
4714     get = new Get(row);
4715     get.addColumn(family, qualifier);
4716     get.setMaxVersions();
4717     res = this.region.get(get);
4718     kvs = res.getColumnCells(family, qualifier);
4719     assertEquals(1, kvs.size());
4720     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4721 
4722     region.flush(true);
4723     get = new Get(row);
4724     get.addColumn(family, qualifier);
4725     get.setMaxVersions();
4726     res = this.region.get(get);
4727     kvs = res.getColumnCells(family, qualifier);
4728     assertEquals(1, kvs.size());
4729     assertArrayEquals(Bytes.toBytes("value1"), CellUtil.cloneValue(kvs.get(0)));
4730   }
4731 
4732   @Test
4733   public void testDurability() throws Exception {
4734     String method = "testDurability";
4735     // there are 5 x 5 cases:
4736     // table durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT) x mutation
4737     // durability(SYNC,FSYNC,ASYC,SKIP,USE_DEFAULT)
4738 
4739     // expected cases for append and sync wal
4740     durabilityTest(method, Durability.SYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4741     durabilityTest(method, Durability.SYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4742     durabilityTest(method, Durability.SYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4743 
4744     durabilityTest(method, Durability.FSYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4745     durabilityTest(method, Durability.FSYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4746     durabilityTest(method, Durability.FSYNC_WAL, Durability.USE_DEFAULT, 0, true, true, false);
4747 
4748     durabilityTest(method, Durability.ASYNC_WAL, Durability.SYNC_WAL, 0, true, true, false);
4749     durabilityTest(method, Durability.ASYNC_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4750 
4751     durabilityTest(method, Durability.SKIP_WAL, Durability.SYNC_WAL, 0, true, true, false);
4752     durabilityTest(method, Durability.SKIP_WAL, Durability.FSYNC_WAL, 0, true, true, false);
4753 
4754     durabilityTest(method, Durability.USE_DEFAULT, Durability.SYNC_WAL, 0, true, true, false);
4755     durabilityTest(method, Durability.USE_DEFAULT, Durability.FSYNC_WAL, 0, true, true, false);
4756     durabilityTest(method, Durability.USE_DEFAULT, Durability.USE_DEFAULT, 0, true, true, false);
4757 
4758     // expected cases for async wal
4759     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4760     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4761     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4762     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 0, true, false, false);
4763     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 0, true, false, false);
4764     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4765 
4766     durabilityTest(method, Durability.SYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4767     durabilityTest(method, Durability.FSYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4768     durabilityTest(method, Durability.ASYNC_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4769     durabilityTest(method, Durability.SKIP_WAL, Durability.ASYNC_WAL, 5000, true, false, true);
4770     durabilityTest(method, Durability.USE_DEFAULT, Durability.ASYNC_WAL, 5000, true, false, true);
4771     durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true);
4772 
4773     // expect skip wal cases
4774     durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4775     durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4776     durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false);
4777     durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false);
4778     durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false);
4779     durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false);
4780 
4781   }
4782 
4783   @SuppressWarnings("unchecked")
4784   private void durabilityTest(String method, Durability tableDurability,
4785       Durability mutationDurability, long timeout, boolean expectAppend, final boolean expectSync,
4786       final boolean expectSyncFromLogSyncer) throws Exception {
4787     Configuration conf = HBaseConfiguration.create(CONF);
4788     method = method + "_" + tableDurability.name() + "_" + mutationDurability.name();
4789     TableName tableName = TableName.valueOf(method);
4790     byte[] family = Bytes.toBytes("family");
4791     Path logDir = new Path(new Path(dir + method), "log");
4792     final Configuration walConf = new Configuration(conf);
4793     FSUtils.setRootDir(walConf, logDir);
4794     final WALFactory wals = new WALFactory(walConf, null, UUID.randomUUID().toString());
4795     final WAL wal = spy(wals.getWAL(tableName.getName()));
4796     this.region = initHRegion(tableName.getName(), HConstants.EMPTY_START_ROW,
4797         HConstants.EMPTY_END_ROW, method, conf, false, tableDurability, wal,
4798         new byte[][] { family });
4799 
4800     Put put = new Put(Bytes.toBytes("r1"));
4801     put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
4802     put.setDurability(mutationDurability);
4803     region.put(put);
4804 
4805     //verify append called or not
4806     verify(wal, expectAppend ? times(1) : never())
4807       .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
4808           (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<Cell>)any());
4809 
4810     // verify sync called or not
4811     if (expectSync || expectSyncFromLogSyncer) {
4812       TEST_UTIL.waitFor(timeout, new Waiter.Predicate<Exception>() {
4813         @Override
4814         public boolean evaluate() throws Exception {
4815           try {
4816             if (expectSync) {
4817               verify(wal, times(1)).sync(anyLong()); // Hregion calls this one
4818             } else if (expectSyncFromLogSyncer) {
4819               verify(wal, times(1)).sync(); // wal syncer calls this one
4820             }
4821           } catch (Throwable ignore) {
4822           }
4823           return true;
4824         }
4825       });
4826     } else {
4827       //verify(wal, never()).sync(anyLong());
4828       verify(wal, never()).sync();
4829     }
4830 
4831     HRegion.closeHRegion(this.region);
4832     this.region = null;
4833   }
4834 
4835   @Test
4836   public void testRegionReplicaSecondary() throws IOException {
4837     // create a primary region, load some data and flush
4838     // create a secondary region, and do a get against that
4839     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4840     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4841 
4842     byte[][] families = new byte[][] {
4843         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4844     };
4845     byte[] cq = Bytes.toBytes("cq");
4846     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4847     for (byte[] family : families) {
4848       htd.addFamily(new HColumnDescriptor(family));
4849     }
4850 
4851     long time = System.currentTimeMillis();
4852     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4853       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4854       false, time, 0);
4855     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4856       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4857       false, time, 1);
4858 
4859     HRegion primaryRegion = null, secondaryRegion = null;
4860 
4861     try {
4862       primaryRegion = HRegion.createHRegion(primaryHri,
4863         rootDir, TEST_UTIL.getConfiguration(), htd);
4864 
4865       // load some data
4866       putData(primaryRegion, 0, 1000, cq, families);
4867 
4868       // flush region
4869       primaryRegion.flush(true);
4870 
4871       // open secondary region
4872       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4873 
4874       verifyData(secondaryRegion, 0, 1000, cq, families);
4875     } finally {
4876       if (primaryRegion != null) {
4877         HRegion.closeHRegion(primaryRegion);
4878       }
4879       if (secondaryRegion != null) {
4880         HRegion.closeHRegion(secondaryRegion);
4881       }
4882     }
4883   }
4884 
4885   @Test
4886   public void testRegionReplicaSecondaryIsReadOnly() throws IOException {
4887     // create a primary region, load some data and flush
4888     // create a secondary region, and do a put against that
4889     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4890     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4891 
4892     byte[][] families = new byte[][] {
4893         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4894     };
4895     byte[] cq = Bytes.toBytes("cq");
4896     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4897     for (byte[] family : families) {
4898       htd.addFamily(new HColumnDescriptor(family));
4899     }
4900 
4901     long time = System.currentTimeMillis();
4902     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4903       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4904       false, time, 0);
4905     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4906       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4907       false, time, 1);
4908 
4909     HRegion primaryRegion = null, secondaryRegion = null;
4910 
4911     try {
4912       primaryRegion = HRegion.createHRegion(primaryHri,
4913         rootDir, TEST_UTIL.getConfiguration(), htd);
4914 
4915       // load some data
4916       putData(primaryRegion, 0, 1000, cq, families);
4917 
4918       // flush region
4919       primaryRegion.flush(true);
4920 
4921       // open secondary region
4922       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4923 
4924       try {
4925         putData(secondaryRegion, 0, 1000, cq, families);
4926         fail("Should have thrown exception");
4927       } catch (IOException ex) {
4928         // expected
4929       }
4930     } finally {
4931       if (primaryRegion != null) {
4932         HRegion.closeHRegion(primaryRegion);
4933       }
4934       if (secondaryRegion != null) {
4935         HRegion.closeHRegion(secondaryRegion);
4936       }
4937     }
4938   }
4939 
4940   static WALFactory createWALFactory(Configuration conf, Path rootDir) throws IOException {
4941     Configuration confForWAL = new Configuration(conf);
4942     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
4943     return new WALFactory(confForWAL,
4944         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
4945         "hregion-" + RandomStringUtils.randomNumeric(8));
4946   }
4947 
4948   @Test
4949   public void testCompactionFromPrimary() throws IOException {
4950     // The test is flaky in Windows Jenkins run.
4951     // Disable for Windows env to reduce noise.
4952     Assume.assumeTrue(!WINDOWS);
4953 
4954     Path rootDir = new Path(dir + "testRegionReplicaSecondary");
4955     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
4956 
4957     byte[][] families = new byte[][] {
4958         Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3")
4959     };
4960     byte[] cq = Bytes.toBytes("cq");
4961     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary"));
4962     for (byte[] family : families) {
4963       htd.addFamily(new HColumnDescriptor(family));
4964     }
4965 
4966     long time = System.currentTimeMillis();
4967     HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(),
4968       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4969       false, time, 0);
4970     HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(),
4971       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
4972       false, time, 1);
4973 
4974     HRegion primaryRegion = null, secondaryRegion = null;
4975 
4976     try {
4977       primaryRegion = HRegion.createHRegion(primaryHri,
4978         rootDir, TEST_UTIL.getConfiguration(), htd);
4979 
4980       // load some data
4981       putData(primaryRegion, 0, 1000, cq, families);
4982 
4983       // flush region
4984       primaryRegion.flush(true);
4985 
4986       // open secondary region
4987       secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF);
4988 
4989       // move the file of the primary region to the archive, simulating a compaction
4990       Collection<StoreFile> storeFiles = primaryRegion.getStore(families[0]).getStorefiles();
4991       primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles);
4992       Collection<StoreFileInfo> storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]);
4993       Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0);
4994 
4995       verifyData(secondaryRegion, 0, 1000, cq, families);
4996     } finally {
4997       if (primaryRegion != null) {
4998         HRegion.closeHRegion(primaryRegion);
4999       }
5000       if (secondaryRegion != null) {
5001         HRegion.closeHRegion(secondaryRegion);
5002       }
5003     }
5004   }
5005 
5006   private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5007     putData(this.region, startRow, numRows, qf, families);
5008   }
5009 
5010   private void putData(HRegion region,
5011       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5012     putData(region, Durability.SKIP_WAL, startRow, numRows, qf, families);
5013   }
5014 
5015   static void putData(HRegion region, Durability durability,
5016       int startRow, int numRows, byte[] qf, byte[]... families) throws IOException {
5017     for (int i = startRow; i < startRow + numRows; i++) {
5018       Put put = new Put(Bytes.toBytes("" + i));
5019       put.setDurability(durability);
5020       for (byte[] family : families) {
5021         put.add(family, qf, null);
5022       }
5023       region.put(put);
5024     }
5025   }
5026 
5027   static void verifyData(HRegion newReg, int startRow, int numRows, byte[] qf, byte[]... families)
5028       throws IOException {
5029     for (int i = startRow; i < startRow + numRows; i++) {
5030       byte[] row = Bytes.toBytes("" + i);
5031       Get get = new Get(row);
5032       for (byte[] family : families) {
5033         get.addColumn(family, qf);
5034       }
5035       Result result = newReg.get(get);
5036       Cell[] raw = result.rawCells();
5037       assertEquals(families.length, result.size());
5038       for (int j = 0; j < families.length; j++) {
5039         assertTrue(CellUtil.matchingRow(raw[j], row));
5040         assertTrue(CellUtil.matchingFamily(raw[j], families[j]));
5041         assertTrue(CellUtil.matchingQualifier(raw[j], qf));
5042       }
5043     }
5044   }
5045 
5046   static void assertGet(final HRegion r, final byte[] family, final byte[] k) throws IOException {
5047     // Now I have k, get values out and assert they are as expected.
5048     Get get = new Get(k).addFamily(family).setMaxVersions();
5049     Cell[] results = r.get(get).rawCells();
5050     for (int j = 0; j < results.length; j++) {
5051       byte[] tmp = CellUtil.cloneValue(results[j]);
5052       // Row should be equal to value every time.
5053       assertTrue(Bytes.equals(k, tmp));
5054     }
5055   }
5056 
5057   /*
5058    * Assert first value in the passed region is <code>firstValue</code>.
5059    *
5060    * @param r
5061    *
5062    * @param fs
5063    *
5064    * @param firstValue
5065    *
5066    * @throws IOException
5067    */
5068   private void assertScan(final HRegion r, final byte[] fs, final byte[] firstValue)
5069       throws IOException {
5070     byte[][] families = { fs };
5071     Scan scan = new Scan();
5072     for (int i = 0; i < families.length; i++)
5073       scan.addFamily(families[i]);
5074     InternalScanner s = r.getScanner(scan);
5075     try {
5076       List<Cell> curVals = new ArrayList<Cell>();
5077       boolean first = true;
5078       OUTER_LOOP: while (s.next(curVals)) {
5079         for (Cell kv : curVals) {
5080           byte[] val = CellUtil.cloneValue(kv);
5081           byte[] curval = val;
5082           if (first) {
5083             first = false;
5084             assertTrue(Bytes.compareTo(curval, firstValue) == 0);
5085           } else {
5086             // Not asserting anything. Might as well break.
5087             break OUTER_LOOP;
5088           }
5089         }
5090       }
5091     } finally {
5092       s.close();
5093     }
5094   }
5095 
5096   /**
5097    * Test that we get the expected flush results back
5098    * @throws IOException
5099    */
5100   @Test
5101   public void testFlushResult() throws IOException {
5102     String method = name.getMethodName();
5103     byte[] tableName = Bytes.toBytes(method);
5104     byte[] family = Bytes.toBytes("family");
5105 
5106     this.region = initHRegion(tableName, method, family);
5107 
5108     // empty memstore, flush doesn't run
5109     HRegion.FlushResult fr = region.flush(true);
5110     assertFalse(fr.isFlushSucceeded());
5111     assertFalse(fr.isCompactionNeeded());
5112 
5113     // Flush enough files to get up to the threshold, doesn't need compactions
5114     for (int i = 0; i < 2; i++) {
5115       Put put = new Put(tableName).add(family, family, tableName);
5116       region.put(put);
5117       fr = region.flush(true);
5118       assertTrue(fr.isFlushSucceeded());
5119       assertFalse(fr.isCompactionNeeded());
5120     }
5121 
5122     // Two flushes after the threshold, compactions are needed
5123     for (int i = 0; i < 2; i++) {
5124       Put put = new Put(tableName).add(family, family, tableName);
5125       region.put(put);
5126       fr = region.flush(true);
5127       assertTrue(fr.isFlushSucceeded());
5128       assertTrue(fr.isCompactionNeeded());
5129     }
5130   }
5131 
5132   private Configuration initSplit() {
5133     // Always compact if there is more than one store file.
5134     CONF.setInt("hbase.hstore.compactionThreshold", 2);
5135 
5136     // Make lease timeout longer, lease checks less frequent
5137     CONF.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
5138 
5139     CONF.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 10 * 1000);
5140 
5141     // Increase the amount of time between client retries
5142     CONF.setLong("hbase.client.pause", 15 * 1000);
5143 
5144     // This size should make it so we always split using the addContent
5145     // below. After adding all data, the first region is 1.3M
5146     CONF.setLong(HConstants.HREGION_MAX_FILESIZE, 1024 * 128);
5147     return CONF;
5148   }
5149 
5150   /**
5151    * @param tableName
5152    * @param callingMethod
5153    * @param conf
5154    * @param families
5155    * @throws IOException
5156    * @return A region on which you must call
5157    *         {@link HRegion#closeHRegion(HRegion)} when done.
5158    */
5159   public static HRegion initHRegion(TableName tableName, String callingMethod, Configuration conf,
5160       byte[]... families) throws IOException {
5161     return initHRegion(tableName.getName(), null, null, callingMethod, conf, false, families);
5162   }
5163 
5164   /**
5165    * @param tableName
5166    * @param callingMethod
5167    * @param conf
5168    * @param families
5169    * @throws IOException
5170    * @return A region on which you must call
5171    *         {@link HRegion#closeHRegion(HRegion)} when done.
5172    */
5173   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
5174       byte[]... families) throws IOException {
5175     return initHRegion(tableName, null, null, callingMethod, conf, false, families);
5176   }
5177 
5178   /**
5179    * @param tableName
5180    * @param callingMethod
5181    * @param conf
5182    * @param isReadOnly
5183    * @param families
5184    * @throws IOException
5185    * @return A region on which you must call
5186    *         {@link HRegion#closeHRegion(HRegion)} when done.
5187    */
5188   public static HRegion initHRegion(byte[] tableName, String callingMethod, Configuration conf,
5189       boolean isReadOnly, byte[]... families) throws IOException {
5190     return initHRegion(tableName, null, null, callingMethod, conf, isReadOnly, families);
5191   }
5192 
5193   public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
5194       String callingMethod, Configuration conf, boolean isReadOnly, byte[]... families)
5195       throws IOException {
5196     return initHRegion(tableName, startKey, stopKey, callingMethod, conf, isReadOnly,
5197         Durability.SYNC_WAL, null, families);
5198   }
5199 
5200   /**
5201    * @param tableName
5202    * @param startKey
5203    * @param stopKey
5204    * @param callingMethod
5205    * @param conf
5206    * @param isReadOnly
5207    * @param families
5208    * @throws IOException
5209    * @return A region on which you must call
5210    *         {@link HRegion#closeHRegion(HRegion)} when done.
5211    */
5212   public static HRegion initHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
5213       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
5214       WAL wal, byte[]... families) throws IOException {
5215     return TEST_UTIL.createLocalHRegion(tableName, startKey, stopKey, callingMethod, conf,
5216         isReadOnly, durability, wal, families);
5217   }
5218 
5219   /**
5220    * Assert that the passed in Cell has expected contents for the specified row,
5221    * column & timestamp.
5222    */
5223   private void checkOneCell(Cell kv, byte[] cf, int rowIdx, int colIdx, long ts) {
5224     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
5225     assertEquals("Row mismatch which checking: " + ctx, "row:" + rowIdx,
5226         Bytes.toString(CellUtil.cloneRow(kv)));
5227     assertEquals("ColumnFamily mismatch while checking: " + ctx, Bytes.toString(cf),
5228         Bytes.toString(CellUtil.cloneFamily(kv)));
5229     assertEquals("Column qualifier mismatch while checking: " + ctx, "column:" + colIdx,
5230         Bytes.toString(CellUtil.cloneQualifier(kv)));
5231     assertEquals("Timestamp mismatch while checking: " + ctx, ts, kv.getTimestamp());
5232     assertEquals("Value mismatch while checking: " + ctx, "value-version-" + ts,
5233         Bytes.toString(CellUtil.cloneValue(kv)));
5234   }
5235 
5236   @Test (timeout=60000)
5237   public void testReverseScanner_FromMemStore_SingleCF_Normal()
5238       throws IOException {
5239     byte[] rowC = Bytes.toBytes("rowC");
5240     byte[] rowA = Bytes.toBytes("rowA");
5241     byte[] rowB = Bytes.toBytes("rowB");
5242     byte[] cf = Bytes.toBytes("CF");
5243     byte[][] families = { cf };
5244     byte[] col = Bytes.toBytes("C");
5245     long ts = 1;
5246     String method = this.getName();
5247     this.region = initHRegion(tableName, method, families);
5248     try {
5249       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5250       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5251           null);
5252       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5253       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5254       Put put = null;
5255       put = new Put(rowC);
5256       put.add(kv1);
5257       put.add(kv11);
5258       region.put(put);
5259       put = new Put(rowA);
5260       put.add(kv2);
5261       region.put(put);
5262       put = new Put(rowB);
5263       put.add(kv3);
5264       region.put(put);
5265 
5266       Scan scan = new Scan(rowC);
5267       scan.setMaxVersions(5);
5268       scan.setReversed(true);
5269       InternalScanner scanner = region.getScanner(scan);
5270       List<Cell> currRow = new ArrayList<Cell>();
5271       boolean hasNext = scanner.next(currRow);
5272       assertEquals(2, currRow.size());
5273       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5274       assertTrue(hasNext);
5275       currRow.clear();
5276       hasNext = scanner.next(currRow);
5277       assertEquals(1, currRow.size());
5278       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5279       assertTrue(hasNext);
5280       currRow.clear();
5281       hasNext = scanner.next(currRow);
5282       assertEquals(1, currRow.size());
5283       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5284       assertFalse(hasNext);
5285       scanner.close();
5286     } finally {
5287       HRegion.closeHRegion(this.region);
5288       this.region = null;
5289     }
5290   }
5291 
5292   @Test (timeout=60000)
5293   public void testReverseScanner_FromMemStore_SingleCF_LargerKey()
5294       throws IOException {
5295     byte[] rowC = Bytes.toBytes("rowC");
5296     byte[] rowA = Bytes.toBytes("rowA");
5297     byte[] rowB = Bytes.toBytes("rowB");
5298     byte[] rowD = Bytes.toBytes("rowD");
5299     byte[] cf = Bytes.toBytes("CF");
5300     byte[][] families = { cf };
5301     byte[] col = Bytes.toBytes("C");
5302     long ts = 1;
5303     String method = this.getName();
5304     this.region = initHRegion(tableName, method, families);
5305     try {
5306       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5307       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5308           null);
5309       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5310       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5311       Put put = null;
5312       put = new Put(rowC);
5313       put.add(kv1);
5314       put.add(kv11);
5315       region.put(put);
5316       put = new Put(rowA);
5317       put.add(kv2);
5318       region.put(put);
5319       put = new Put(rowB);
5320       put.add(kv3);
5321       region.put(put);
5322 
5323       Scan scan = new Scan(rowD);
5324       List<Cell> currRow = new ArrayList<Cell>();
5325       scan.setReversed(true);
5326       scan.setMaxVersions(5);
5327       InternalScanner scanner = region.getScanner(scan);
5328       boolean hasNext = scanner.next(currRow);
5329       assertEquals(2, currRow.size());
5330       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5331       assertTrue(hasNext);
5332       currRow.clear();
5333       hasNext = scanner.next(currRow);
5334       assertEquals(1, currRow.size());
5335       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5336       assertTrue(hasNext);
5337       currRow.clear();
5338       hasNext = scanner.next(currRow);
5339       assertEquals(1, currRow.size());
5340       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5341       assertFalse(hasNext);
5342       scanner.close();
5343     } finally {
5344       HRegion.closeHRegion(this.region);
5345       this.region = null;
5346     }
5347   }
5348 
5349   @Test (timeout=60000)
5350   public void testReverseScanner_FromMemStore_SingleCF_FullScan()
5351       throws IOException {
5352     byte[] rowC = Bytes.toBytes("rowC");
5353     byte[] rowA = Bytes.toBytes("rowA");
5354     byte[] rowB = Bytes.toBytes("rowB");
5355     byte[] cf = Bytes.toBytes("CF");
5356     byte[][] families = { cf };
5357     byte[] col = Bytes.toBytes("C");
5358     long ts = 1;
5359     String method = this.getName();
5360     this.region = initHRegion(tableName, method, families);
5361     try {
5362       KeyValue kv1 = new KeyValue(rowC, cf, col, ts, KeyValue.Type.Put, null);
5363       KeyValue kv11 = new KeyValue(rowC, cf, col, ts + 1, KeyValue.Type.Put,
5364           null);
5365       KeyValue kv2 = new KeyValue(rowA, cf, col, ts, KeyValue.Type.Put, null);
5366       KeyValue kv3 = new KeyValue(rowB, cf, col, ts, KeyValue.Type.Put, null);
5367       Put put = null;
5368       put = new Put(rowC);
5369       put.add(kv1);
5370       put.add(kv11);
5371       region.put(put);
5372       put = new Put(rowA);
5373       put.add(kv2);
5374       region.put(put);
5375       put = new Put(rowB);
5376       put.add(kv3);
5377       region.put(put);
5378       Scan scan = new Scan();
5379       List<Cell> currRow = new ArrayList<Cell>();
5380       scan.setReversed(true);
5381       InternalScanner scanner = region.getScanner(scan);
5382       boolean hasNext = scanner.next(currRow);
5383       assertEquals(1, currRow.size());
5384       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5385       assertTrue(hasNext);
5386       currRow.clear();
5387       hasNext = scanner.next(currRow);
5388       assertEquals(1, currRow.size());
5389       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5390       assertTrue(hasNext);
5391       currRow.clear();
5392       hasNext = scanner.next(currRow);
5393       assertEquals(1, currRow.size());
5394       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowA));
5395       assertFalse(hasNext);
5396       scanner.close();
5397     } finally {
5398       HRegion.closeHRegion(this.region);
5399       this.region = null;
5400     }
5401   }
5402 
5403   @Test (timeout=60000)
5404   public void testReverseScanner_moreRowsMayExistAfter() throws IOException {
5405     // case for "INCLUDE_AND_SEEK_NEXT_ROW & SEEK_NEXT_ROW" endless loop
5406     byte[] rowA = Bytes.toBytes("rowA");
5407     byte[] rowB = Bytes.toBytes("rowB");
5408     byte[] rowC = Bytes.toBytes("rowC");
5409     byte[] rowD = Bytes.toBytes("rowD");
5410     byte[] rowE = Bytes.toBytes("rowE");
5411     byte[] cf = Bytes.toBytes("CF");
5412     byte[][] families = { cf };
5413     byte[] col1 = Bytes.toBytes("col1");
5414     byte[] col2 = Bytes.toBytes("col2");
5415     long ts = 1;
5416     String method = this.getName();
5417     this.region = initHRegion(tableName, method, families);
5418     try {
5419       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5420       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5421       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5422       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5423       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5424       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5425       Put put = null;
5426       put = new Put(rowA);
5427       put.add(kv1);
5428       region.put(put);
5429       put = new Put(rowB);
5430       put.add(kv2);
5431       region.put(put);
5432       put = new Put(rowC);
5433       put.add(kv3);
5434       region.put(put);
5435       put = new Put(rowD);
5436       put.add(kv4_1);
5437       region.put(put);
5438       put = new Put(rowD);
5439       put.add(kv4_2);
5440       region.put(put);
5441       put = new Put(rowE);
5442       put.add(kv5);
5443       region.put(put);
5444       region.flush(true);
5445       Scan scan = new Scan(rowD, rowA);
5446       scan.addColumn(families[0], col1);
5447       scan.setReversed(true);
5448       List<Cell> currRow = new ArrayList<Cell>();
5449       InternalScanner scanner = region.getScanner(scan);
5450       boolean hasNext = scanner.next(currRow);
5451       assertEquals(1, currRow.size());
5452       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5453       assertTrue(hasNext);
5454       currRow.clear();
5455       hasNext = scanner.next(currRow);
5456       assertEquals(1, currRow.size());
5457       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5458       assertTrue(hasNext);
5459       currRow.clear();
5460       hasNext = scanner.next(currRow);
5461       assertEquals(1, currRow.size());
5462       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5463       assertFalse(hasNext);
5464       scanner.close();
5465 
5466       scan = new Scan(rowD, rowA);
5467       scan.addColumn(families[0], col2);
5468       scan.setReversed(true);
5469       currRow.clear();
5470       scanner = region.getScanner(scan);
5471       hasNext = scanner.next(currRow);
5472       assertEquals(1, currRow.size());
5473       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5474       scanner.close();
5475     } finally {
5476       HRegion.closeHRegion(this.region);
5477       this.region = null;
5478     }
5479   }
5480 
5481   @Test (timeout=60000)
5482   public void testReverseScanner_smaller_blocksize() throws IOException {
5483     // case to ensure no conflict with HFile index optimization
5484     byte[] rowA = Bytes.toBytes("rowA");
5485     byte[] rowB = Bytes.toBytes("rowB");
5486     byte[] rowC = Bytes.toBytes("rowC");
5487     byte[] rowD = Bytes.toBytes("rowD");
5488     byte[] rowE = Bytes.toBytes("rowE");
5489     byte[] cf = Bytes.toBytes("CF");
5490     byte[][] families = { cf };
5491     byte[] col1 = Bytes.toBytes("col1");
5492     byte[] col2 = Bytes.toBytes("col2");
5493     long ts = 1;
5494     String method = this.getName();
5495     HBaseConfiguration config = new HBaseConfiguration();
5496     config.setInt("test.block.size", 1);
5497     this.region = initHRegion(tableName, method, config, families);
5498     try {
5499       KeyValue kv1 = new KeyValue(rowA, cf, col1, ts, KeyValue.Type.Put, null);
5500       KeyValue kv2 = new KeyValue(rowB, cf, col1, ts, KeyValue.Type.Put, null);
5501       KeyValue kv3 = new KeyValue(rowC, cf, col1, ts, KeyValue.Type.Put, null);
5502       KeyValue kv4_1 = new KeyValue(rowD, cf, col1, ts, KeyValue.Type.Put, null);
5503       KeyValue kv4_2 = new KeyValue(rowD, cf, col2, ts, KeyValue.Type.Put, null);
5504       KeyValue kv5 = new KeyValue(rowE, cf, col1, ts, KeyValue.Type.Put, null);
5505       Put put = null;
5506       put = new Put(rowA);
5507       put.add(kv1);
5508       region.put(put);
5509       put = new Put(rowB);
5510       put.add(kv2);
5511       region.put(put);
5512       put = new Put(rowC);
5513       put.add(kv3);
5514       region.put(put);
5515       put = new Put(rowD);
5516       put.add(kv4_1);
5517       region.put(put);
5518       put = new Put(rowD);
5519       put.add(kv4_2);
5520       region.put(put);
5521       put = new Put(rowE);
5522       put.add(kv5);
5523       region.put(put);
5524       region.flush(true);
5525       Scan scan = new Scan(rowD, rowA);
5526       scan.addColumn(families[0], col1);
5527       scan.setReversed(true);
5528       List<Cell> currRow = new ArrayList<Cell>();
5529       InternalScanner scanner = region.getScanner(scan);
5530       boolean hasNext = scanner.next(currRow);
5531       assertEquals(1, currRow.size());
5532       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5533       assertTrue(hasNext);
5534       currRow.clear();
5535       hasNext = scanner.next(currRow);
5536       assertEquals(1, currRow.size());
5537       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowC));
5538       assertTrue(hasNext);
5539       currRow.clear();
5540       hasNext = scanner.next(currRow);
5541       assertEquals(1, currRow.size());
5542       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowB));
5543       assertFalse(hasNext);
5544       scanner.close();
5545 
5546       scan = new Scan(rowD, rowA);
5547       scan.addColumn(families[0], col2);
5548       scan.setReversed(true);
5549       currRow.clear();
5550       scanner = region.getScanner(scan);
5551       hasNext = scanner.next(currRow);
5552       assertEquals(1, currRow.size());
5553       assertTrue(Bytes.equals(currRow.get(0).getRow(), rowD));
5554       scanner.close();
5555     } finally {
5556       HRegion.closeHRegion(this.region);
5557       this.region = null;
5558     }
5559   }
5560 
5561   @Test (timeout=60000)
5562   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs1()
5563       throws IOException {
5564     byte[] row0 = Bytes.toBytes("row0"); // 1 kv
5565     byte[] row1 = Bytes.toBytes("row1"); // 2 kv
5566     byte[] row2 = Bytes.toBytes("row2"); // 4 kv
5567     byte[] row3 = Bytes.toBytes("row3"); // 2 kv
5568     byte[] row4 = Bytes.toBytes("row4"); // 5 kv
5569     byte[] row5 = Bytes.toBytes("row5"); // 2 kv
5570     byte[] cf1 = Bytes.toBytes("CF1");
5571     byte[] cf2 = Bytes.toBytes("CF2");
5572     byte[] cf3 = Bytes.toBytes("CF3");
5573     byte[][] families = { cf1, cf2, cf3 };
5574     byte[] col = Bytes.toBytes("C");
5575     long ts = 1;
5576     String method = this.getName();
5577     HBaseConfiguration conf = new HBaseConfiguration();
5578     // disable compactions in this test.
5579     conf.setInt("hbase.hstore.compactionThreshold", 10000);
5580     this.region = initHRegion(tableName, method, conf, families);
5581     try {
5582       // kv naming style: kv(row number) totalKvCountInThisRow seq no
5583       KeyValue kv0_1_1 = new KeyValue(row0, cf1, col, ts, KeyValue.Type.Put,
5584           null);
5585       KeyValue kv1_2_1 = new KeyValue(row1, cf2, col, ts, KeyValue.Type.Put,
5586           null);
5587       KeyValue kv1_2_2 = new KeyValue(row1, cf1, col, ts + 1,
5588           KeyValue.Type.Put, null);
5589       KeyValue kv2_4_1 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put,
5590           null);
5591       KeyValue kv2_4_2 = new KeyValue(row2, cf1, col, ts, KeyValue.Type.Put,
5592           null);
5593       KeyValue kv2_4_3 = new KeyValue(row2, cf3, col, ts, KeyValue.Type.Put,
5594           null);
5595       KeyValue kv2_4_4 = new KeyValue(row2, cf1, col, ts + 4,
5596           KeyValue.Type.Put, null);
5597       KeyValue kv3_2_1 = new KeyValue(row3, cf2, col, ts, KeyValue.Type.Put,
5598           null);
5599       KeyValue kv3_2_2 = new KeyValue(row3, cf1, col, ts + 4,
5600           KeyValue.Type.Put, null);
5601       KeyValue kv4_5_1 = new KeyValue(row4, cf1, col, ts, KeyValue.Type.Put,
5602           null);
5603       KeyValue kv4_5_2 = new KeyValue(row4, cf3, col, ts, KeyValue.Type.Put,
5604           null);
5605       KeyValue kv4_5_3 = new KeyValue(row4, cf3, col, ts + 5,
5606           KeyValue.Type.Put, null);
5607       KeyValue kv4_5_4 = new KeyValue(row4, cf2, col, ts, KeyValue.Type.Put,
5608           null);
5609       KeyValue kv4_5_5 = new KeyValue(row4, cf1, col, ts + 3,
5610           KeyValue.Type.Put, null);
5611       KeyValue kv5_2_1 = new KeyValue(row5, cf2, col, ts, KeyValue.Type.Put,
5612           null);
5613       KeyValue kv5_2_2 = new KeyValue(row5, cf3, col, ts, KeyValue.Type.Put,
5614           null);
5615       // hfiles(cf1/cf2) :"row1"(1 kv) / "row2"(1 kv) / "row4"(2 kv)
5616       Put put = null;
5617       put = new Put(row1);
5618       put.add(kv1_2_1);
5619       region.put(put);
5620       put = new Put(row2);
5621       put.add(kv2_4_1);
5622       region.put(put);
5623       put = new Put(row4);
5624       put.add(kv4_5_4);
5625       put.add(kv4_5_5);
5626       region.put(put);
5627       region.flush(true);
5628       // hfiles(cf1/cf3) : "row1" (1 kvs) / "row2" (1 kv) / "row4" (2 kv)
5629       put = new Put(row4);
5630       put.add(kv4_5_1);
5631       put.add(kv4_5_3);
5632       region.put(put);
5633       put = new Put(row1);
5634       put.add(kv1_2_2);
5635       region.put(put);
5636       put = new Put(row2);
5637       put.add(kv2_4_4);
5638       region.put(put);
5639       region.flush(true);
5640       // hfiles(cf1/cf3) : "row2"(2 kv) / "row3"(1 kvs) / "row4" (1 kv)
5641       put = new Put(row4);
5642       put.add(kv4_5_2);
5643       region.put(put);
5644       put = new Put(row2);
5645       put.add(kv2_4_2);
5646       put.add(kv2_4_3);
5647       region.put(put);
5648       put = new Put(row3);
5649       put.add(kv3_2_2);
5650       region.put(put);
5651       region.flush(true);
5652       // memstore(cf1/cf2/cf3) : "row0" (1 kvs) / "row3" ( 1 kv) / "row5" (max)
5653       // ( 2 kv)
5654       put = new Put(row0);
5655       put.add(kv0_1_1);
5656       region.put(put);
5657       put = new Put(row3);
5658       put.add(kv3_2_1);
5659       region.put(put);
5660       put = new Put(row5);
5661       put.add(kv5_2_1);
5662       put.add(kv5_2_2);
5663       region.put(put);
5664       // scan range = ["row4", min), skip the max "row5"
5665       Scan scan = new Scan(row4);
5666       scan.setMaxVersions(5);
5667       scan.setBatch(3);
5668       scan.setReversed(true);
5669       InternalScanner scanner = region.getScanner(scan);
5670       List<Cell> currRow = new ArrayList<Cell>();
5671       boolean hasNext = false;
5672       // 1. scan out "row4" (5 kvs), "row5" can't be scanned out since not
5673       // included in scan range
5674       // "row4" takes 2 next() calls since batch=3
5675       hasNext = scanner.next(currRow);
5676       assertEquals(3, currRow.size());
5677       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5678       assertTrue(hasNext);
5679       currRow.clear();
5680       hasNext = scanner.next(currRow);
5681       assertEquals(2, currRow.size());
5682       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5683       assertTrue(hasNext);
5684       // 2. scan out "row3" (2 kv)
5685       currRow.clear();
5686       hasNext = scanner.next(currRow);
5687       assertEquals(2, currRow.size());
5688       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
5689       assertTrue(hasNext);
5690       // 3. scan out "row2" (4 kvs)
5691       // "row2" takes 2 next() calls since batch=3
5692       currRow.clear();
5693       hasNext = scanner.next(currRow);
5694       assertEquals(3, currRow.size());
5695       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5696       assertTrue(hasNext);
5697       currRow.clear();
5698       hasNext = scanner.next(currRow);
5699       assertEquals(1, currRow.size());
5700       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5701       assertTrue(hasNext);
5702       // 4. scan out "row1" (2 kv)
5703       currRow.clear();
5704       hasNext = scanner.next(currRow);
5705       assertEquals(2, currRow.size());
5706       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
5707       assertTrue(hasNext);
5708       // 5. scan out "row0" (1 kv)
5709       currRow.clear();
5710       hasNext = scanner.next(currRow);
5711       assertEquals(1, currRow.size());
5712       assertTrue(Bytes.equals(currRow.get(0).getRow(), row0));
5713       assertFalse(hasNext);
5714 
5715       scanner.close();
5716     } finally {
5717       HRegion.closeHRegion(this.region);
5718       this.region = null;
5719     }
5720   }
5721 
5722   @Test (timeout=60000)
5723   public void testReverseScanner_FromMemStoreAndHFiles_MultiCFs2()
5724       throws IOException {
5725     byte[] row1 = Bytes.toBytes("row1");
5726     byte[] row2 = Bytes.toBytes("row2");
5727     byte[] row3 = Bytes.toBytes("row3");
5728     byte[] row4 = Bytes.toBytes("row4");
5729     byte[] cf1 = Bytes.toBytes("CF1");
5730     byte[] cf2 = Bytes.toBytes("CF2");
5731     byte[] cf3 = Bytes.toBytes("CF3");
5732     byte[] cf4 = Bytes.toBytes("CF4");
5733     byte[][] families = { cf1, cf2, cf3, cf4 };
5734     byte[] col = Bytes.toBytes("C");
5735     long ts = 1;
5736     String method = this.getName();
5737     HBaseConfiguration conf = new HBaseConfiguration();
5738     // disable compactions in this test.
5739     conf.setInt("hbase.hstore.compactionThreshold", 10000);
5740     this.region = initHRegion(tableName, method, conf, families);
5741     try {
5742       KeyValue kv1 = new KeyValue(row1, cf1, col, ts, KeyValue.Type.Put, null);
5743       KeyValue kv2 = new KeyValue(row2, cf2, col, ts, KeyValue.Type.Put, null);
5744       KeyValue kv3 = new KeyValue(row3, cf3, col, ts, KeyValue.Type.Put, null);
5745       KeyValue kv4 = new KeyValue(row4, cf4, col, ts, KeyValue.Type.Put, null);
5746       // storefile1
5747       Put put = new Put(row1);
5748       put.add(kv1);
5749       region.put(put);
5750       region.flush(true);
5751       // storefile2
5752       put = new Put(row2);
5753       put.add(kv2);
5754       region.put(put);
5755       region.flush(true);
5756       // storefile3
5757       put = new Put(row3);
5758       put.add(kv3);
5759       region.put(put);
5760       region.flush(true);
5761       // memstore
5762       put = new Put(row4);
5763       put.add(kv4);
5764       region.put(put);
5765       // scan range = ["row4", min)
5766       Scan scan = new Scan(row4);
5767       scan.setReversed(true);
5768       scan.setBatch(10);
5769       InternalScanner scanner = region.getScanner(scan);
5770       List<Cell> currRow = new ArrayList<Cell>();
5771       boolean hasNext = scanner.next(currRow);
5772       assertEquals(1, currRow.size());
5773       assertTrue(Bytes.equals(currRow.get(0).getRow(), row4));
5774       assertTrue(hasNext);
5775       currRow.clear();
5776       hasNext = scanner.next(currRow);
5777       assertEquals(1, currRow.size());
5778       assertTrue(Bytes.equals(currRow.get(0).getRow(), row3));
5779       assertTrue(hasNext);
5780       currRow.clear();
5781       hasNext = scanner.next(currRow);
5782       assertEquals(1, currRow.size());
5783       assertTrue(Bytes.equals(currRow.get(0).getRow(), row2));
5784       assertTrue(hasNext);
5785       currRow.clear();
5786       hasNext = scanner.next(currRow);
5787       assertEquals(1, currRow.size());
5788       assertTrue(Bytes.equals(currRow.get(0).getRow(), row1));
5789       assertFalse(hasNext);
5790     } finally {
5791       HRegion.closeHRegion(this.region);
5792       this.region = null;
5793     }
5794   }
5795 
5796   /**
5797    * Test for HBASE-14497: Reverse Scan threw StackOverflow caused by readPt checking
5798    */
5799   @Test (timeout = 60000)
5800   public void testReverseScanner_StackOverflow() throws IOException {
5801     byte[] cf1 = Bytes.toBytes("CF1");
5802     byte[][] families = {cf1};
5803     byte[] col = Bytes.toBytes("C");
5804     String method = this.getName();
5805     HBaseConfiguration conf = new HBaseConfiguration();
5806     this.region = initHRegion(tableName, method, conf, families);
5807     try {
5808       // setup with one storefile and one memstore, to create scanner and get an earlier readPt
5809       Put put = new Put(Bytes.toBytes("19998"));
5810       put.add(cf1, col, Bytes.toBytes("val"));
5811       region.put(put);
5812       region.flushcache(true, true);
5813       Put put2 = new Put(Bytes.toBytes("19997"));
5814       put2.add(cf1, col, Bytes.toBytes("val"));
5815       region.put(put2);
5816 
5817       Scan scan = new Scan(Bytes.toBytes("19998"));
5818       scan.setReversed(true);
5819       InternalScanner scanner = region.getScanner(scan);
5820 
5821       // create one storefile contains many rows will be skipped
5822       // to check StoreFileScanner.seekToPreviousRow
5823       for (int i = 10000; i < 20000; i++) {
5824         Put p = new Put(Bytes.toBytes("" + i));
5825         p.add(cf1, col, Bytes.toBytes("" + i));
5826         region.put(p);
5827       }
5828       region.flushcache(true, true);
5829 
5830       // create one memstore contains many rows will be skipped
5831       // to check MemStoreScanner.seekToPreviousRow
5832       for (int i = 10000; i < 20000; i++) {
5833         Put p = new Put(Bytes.toBytes("" + i));
5834         p.add(cf1, col, Bytes.toBytes("" + i));
5835         region.put(p);
5836       }
5837 
5838       List<Cell> currRow = new ArrayList<>();
5839       boolean hasNext;
5840       do {
5841         hasNext = scanner.next(currRow);
5842       } while (hasNext);
5843       assertEquals(2, currRow.size());
5844       assertArrayEquals(Bytes.toBytes("19998"), currRow.get(0).getRow());
5845       assertArrayEquals(Bytes.toBytes("19997"), currRow.get(1).getRow());
5846     } finally {
5847       HBaseTestingUtility.closeRegionAndWAL(this.region);
5848       this.region = null;
5849     }
5850   }
5851 
5852   @Test (timeout=60000)
5853   public void testSplitRegionWithReverseScan() throws IOException {
5854     byte [] tableName = Bytes.toBytes("testSplitRegionWithReverseScan");
5855     byte [] qualifier = Bytes.toBytes("qualifier");
5856     Configuration hc = initSplit();
5857     int numRows = 3;
5858     byte [][] families = {fam1};
5859 
5860     //Setting up region
5861     String method = this.getName();
5862     this.region = initHRegion(tableName, method, hc, families);
5863 
5864     //Put data in region
5865     int startRow = 100;
5866     putData(startRow, numRows, qualifier, families);
5867     int splitRow = startRow + numRows;
5868     putData(splitRow, numRows, qualifier, families);
5869     region.flush(true);
5870 
5871     HRegion [] regions = null;
5872     try {
5873       regions = splitRegion(region, Bytes.toBytes("" + splitRow));
5874       //Opening the regions returned.
5875       for (int i = 0; i < regions.length; i++) {
5876         regions[i] = HRegion.openHRegion(regions[i], null);
5877       }
5878       //Verifying that the region has been split
5879       assertEquals(2, regions.length);
5880 
5881       //Verifying that all data is still there and that data is in the right
5882       //place
5883       verifyData(regions[0], startRow, numRows, qualifier, families);
5884       verifyData(regions[1], splitRow, numRows, qualifier, families);
5885 
5886       //fire the reverse scan1:  top range, and larger than the last row
5887       Scan scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 10 * numRows)));
5888       scan.setReversed(true);
5889       InternalScanner scanner = regions[1].getScanner(scan);
5890       List<Cell> currRow = new ArrayList<Cell>();
5891       boolean more = false;
5892       int verify = startRow + 2 * numRows - 1;
5893       do {
5894         more = scanner.next(currRow);
5895         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5896         verify--;
5897         currRow.clear();
5898       } while(more);
5899       assertEquals(verify, startRow + numRows - 1);
5900       scanner.close();
5901       //fire the reverse scan2:  top range, and equals to the last row
5902       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + 2 * numRows - 1)));
5903       scan.setReversed(true);
5904       scanner = regions[1].getScanner(scan);
5905       verify = startRow + 2 * numRows - 1;
5906       do {
5907         more = scanner.next(currRow);
5908         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5909         verify--;
5910         currRow.clear();
5911       } while(more);
5912       assertEquals(verify, startRow + numRows - 1);
5913       scanner.close();
5914       //fire the reverse scan3:  bottom range, and larger than the last row
5915       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows)));
5916       scan.setReversed(true);
5917       scanner = regions[0].getScanner(scan);
5918       verify = startRow + numRows - 1;
5919       do {
5920         more = scanner.next(currRow);
5921         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5922         verify--;
5923         currRow.clear();
5924       } while(more);
5925       assertEquals(verify, 99);
5926       scanner.close();
5927       //fire the reverse scan4:  bottom range, and equals to the last row
5928       scan = new Scan(Bytes.toBytes(String.valueOf(startRow + numRows - 1)));
5929       scan.setReversed(true);
5930       scanner = regions[0].getScanner(scan);
5931       verify = startRow + numRows - 1;
5932       do {
5933         more = scanner.next(currRow);
5934         assertEquals(Bytes.toString(currRow.get(0).getRow()), verify + "");
5935         verify--;
5936         currRow.clear();
5937       } while(more);
5938       assertEquals(verify, startRow - 1);
5939       scanner.close();
5940     } finally {
5941       HRegion.closeHRegion(this.region);
5942       this.region = null;
5943     }
5944   }
5945 
5946   @Test
5947   public void testWriteRequestsCounter() throws IOException {
5948     byte[] fam = Bytes.toBytes("info");
5949     byte[][] families = { fam };
5950     this.region = initHRegion(tableName, method, CONF, families);
5951 
5952     Assert.assertEquals(0L, region.getWriteRequestsCount());
5953 
5954     Put put = new Put(row);
5955     put.add(fam, fam, fam);
5956 
5957     Assert.assertEquals(0L, region.getWriteRequestsCount());
5958     region.put(put);
5959     Assert.assertEquals(1L, region.getWriteRequestsCount());
5960     region.put(put);
5961     Assert.assertEquals(2L, region.getWriteRequestsCount());
5962     region.put(put);
5963     Assert.assertEquals(3L, region.getWriteRequestsCount());
5964 
5965     region.delete(new Delete(row));
5966     Assert.assertEquals(4L, region.getWriteRequestsCount());
5967 
5968     HRegion.closeHRegion(this.region);
5969     this.region = null;
5970   }
5971 
5972   @Test
5973   @SuppressWarnings("unchecked")
5974   public void testOpenRegionWrittenToWAL() throws Exception {
5975     final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWAL", 100, 42);
5976     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
5977 
5978     HTableDescriptor htd
5979         = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
5980     htd.addFamily(new HColumnDescriptor(fam1));
5981     htd.addFamily(new HColumnDescriptor(fam2));
5982 
5983     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
5984       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
5985 
5986     // open the region w/o rss and wal and flush some files
5987     HRegion region =
5988          HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
5989             .getConfiguration(), htd);
5990     assertNotNull(region);
5991 
5992     // create a file in fam1 for the region before opening in OpenRegionHandler
5993     region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
5994     region.flush(true);
5995     region.close();
5996 
5997     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
5998 
5999     // capture append() calls
6000     WAL wal = mock(WAL.class);
6001     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6002 
6003     try {
6004       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6005         TEST_UTIL.getConfiguration(), rss, null);
6006 
6007       verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
6008         , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
6009 
6010       WALEdit edit = editCaptor.getValue();
6011       assertNotNull(edit);
6012       assertNotNull(edit.getCells());
6013       assertEquals(1, edit.getCells().size());
6014       RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6015       assertNotNull(desc);
6016 
6017       LOG.info("RegionEventDescriptor from WAL: " + desc);
6018 
6019       assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
6020       assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
6021       assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6022         hri.getEncodedNameAsBytes()));
6023       assertTrue(desc.getLogSequenceNumber() > 0);
6024       assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6025       assertEquals(2, desc.getStoresCount());
6026 
6027       StoreDescriptor store = desc.getStores(0);
6028       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6029       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6030       assertEquals(1, store.getStoreFileCount()); // 1store file
6031       assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
6032 
6033       store = desc.getStores(1);
6034       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6035       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6036       assertEquals(0, store.getStoreFileCount()); // no store files
6037 
6038     } finally {
6039       HRegion.closeHRegion(region);
6040     }
6041   }
6042 
6043   // Helper for test testOpenRegionWrittenToWALForLogReplay
6044   static class HRegionWithSeqId extends HRegion {
6045     public HRegionWithSeqId(final Path tableDir, final WAL wal, final FileSystem fs,
6046         final Configuration confParam, final HRegionInfo regionInfo,
6047         final HTableDescriptor htd, final RegionServerServices rsServices) {
6048       super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
6049     }
6050     @Override
6051     protected long getNextSequenceId(WAL wal) throws IOException {
6052       return 42;
6053     }
6054   }
6055 
6056   @Test
6057   public void testFlushedFileWithNoTags() throws Exception {
6058     String method = "testFlushedFileWithNoTags";
6059     HTableDescriptor htd = new HTableDescriptor(tableName);
6060     htd.addFamily(new HColumnDescriptor(fam1));
6061     region = initHRegion(Bytes.toBytes(method), method, TEST_UTIL.getConfiguration(), fam1);
6062     Put put = new Put(Bytes.toBytes("a-b-0-0"));
6063     put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
6064     region.put(put);
6065     region.flush(true);
6066     Store store = region.getStore(fam1);
6067     Collection<StoreFile> storefiles = store.getStorefiles();
6068     for (StoreFile sf : storefiles) {
6069       assertFalse("Tags should not be present "
6070           ,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
6071     }
6072   }
6073   @Test
6074   @SuppressWarnings("unchecked")
6075   public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
6076     // similar to the above test but with distributed log replay
6077     final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay",
6078       100, 42);
6079     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6080 
6081     HTableDescriptor htd
6082         = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWALForLogReplay"));
6083     htd.addFamily(new HColumnDescriptor(fam1));
6084     htd.addFamily(new HColumnDescriptor(fam2));
6085 
6086     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6087       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6088 
6089     // open the region w/o rss and wal and flush some files
6090     HRegion region = HRegion.createHRegion(hri, TEST_UTIL.getDataTestDir(), TEST_UTIL
6091              .getConfiguration(), htd);
6092     assertNotNull(region);
6093 
6094     // create a file in fam1 for the region before opening in OpenRegionHandler
6095     region.put(new Put(Bytes.toBytes("a")).add(fam1, fam1, fam1));
6096     region.flush(true);
6097     HRegion.closeHRegion(region);
6098 
6099     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6100 
6101     // capture append() calls
6102     WAL wal = mock(WAL.class);
6103     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6104 
6105     // add the region to recovering regions
6106     HashMap<String, Region> recoveringRegions = Maps.newHashMap();
6107     recoveringRegions.put(region.getRegionInfo().getEncodedName(), null);
6108     when(rss.getRecoveringRegions()).thenReturn(recoveringRegions);
6109 
6110     try {
6111       Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6112       conf.set(HConstants.REGION_IMPL, HRegionWithSeqId.class.getName());
6113       region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6114         conf, rss, null);
6115 
6116       // verify that we have not appended region open event to WAL because this region is still
6117       // recovering
6118       verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
6119         , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
6120 
6121       // not put the region out of recovering state
6122       new FinishRegionRecoveringHandler(rss, region.getRegionInfo().getEncodedName(), "/foo")
6123         .prepare().process();
6124 
6125       // now we should have put the entry
6126       verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
6127         , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
6128 
6129       WALEdit edit = editCaptor.getValue();
6130       assertNotNull(edit);
6131       assertNotNull(edit.getCells());
6132       assertEquals(1, edit.getCells().size());
6133       RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6134       assertNotNull(desc);
6135 
6136       LOG.info("RegionEventDescriptor from WAL: " + desc);
6137 
6138       assertEquals(RegionEventDescriptor.EventType.REGION_OPEN, desc.getEventType());
6139       assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
6140       assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6141         hri.getEncodedNameAsBytes()));
6142       assertTrue(desc.getLogSequenceNumber() > 0);
6143       assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6144       assertEquals(2, desc.getStoresCount());
6145 
6146       StoreDescriptor store = desc.getStores(0);
6147       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6148       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6149       assertEquals(1, store.getStoreFileCount()); // 1store file
6150       assertFalse(store.getStoreFile(0).contains("/")); // ensure path is relative
6151 
6152       store = desc.getStores(1);
6153       assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6154       assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6155       assertEquals(0, store.getStoreFileCount()); // no store files
6156 
6157     } finally {
6158       HRegion.closeHRegion(region);
6159     }
6160   }
6161 
6162   @Test
6163   @SuppressWarnings("unchecked")
6164   public void testCloseRegionWrittenToWAL() throws Exception {
6165 
6166     Path rootDir = new Path(dir + name.getMethodName());
6167     FSUtils.setRootDir(TEST_UTIL.getConfiguration(), rootDir);
6168 
6169     final ServerName serverName = ServerName.valueOf("testCloseRegionWrittenToWAL", 100, 42);
6170     final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
6171 
6172     HTableDescriptor htd
6173     = new HTableDescriptor(TableName.valueOf("testOpenRegionWrittenToWAL"));
6174     htd.addFamily(new HColumnDescriptor(fam1));
6175     htd.addFamily(new HColumnDescriptor(fam2));
6176 
6177     HRegionInfo hri = new HRegionInfo(htd.getTableName(),
6178       HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
6179 
6180     ArgumentCaptor<WALEdit> editCaptor = ArgumentCaptor.forClass(WALEdit.class);
6181 
6182     // capture append() calls
6183     WAL wal = mock(WAL.class);
6184     when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
6185 
6186     // create and then open a region first so that it can be closed later
6187     region = HRegion.createHRegion(hri, rootDir, TEST_UTIL.getConfiguration(), htd, rss.getWAL(hri));
6188     region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
6189       TEST_UTIL.getConfiguration(), rss, null);
6190 
6191     // close the region
6192     region.close(false);
6193 
6194     // 2 times, one for region open, the other close region
6195     verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
6196       editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
6197 
6198     WALEdit edit = editCaptor.getAllValues().get(1);
6199     assertNotNull(edit);
6200     assertNotNull(edit.getCells());
6201     assertEquals(1, edit.getCells().size());
6202     RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
6203     assertNotNull(desc);
6204 
6205     LOG.info("RegionEventDescriptor from WAL: " + desc);
6206 
6207     assertEquals(RegionEventDescriptor.EventType.REGION_CLOSE, desc.getEventType());
6208     assertTrue(Bytes.equals(desc.getTableName().toByteArray(), htd.getName()));
6209     assertTrue(Bytes.equals(desc.getEncodedRegionName().toByteArray(),
6210       hri.getEncodedNameAsBytes()));
6211     assertTrue(desc.getLogSequenceNumber() > 0);
6212     assertEquals(serverName, ProtobufUtil.toServerName(desc.getServer()));
6213     assertEquals(2, desc.getStoresCount());
6214 
6215     StoreDescriptor store = desc.getStores(0);
6216     assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam1));
6217     assertEquals(store.getStoreHomeDir(), Bytes.toString(fam1));
6218     assertEquals(0, store.getStoreFileCount()); // no store files
6219 
6220     store = desc.getStores(1);
6221     assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), fam2));
6222     assertEquals(store.getStoreHomeDir(), Bytes.toString(fam2));
6223     assertEquals(0, store.getStoreFileCount()); // no store files
6224   }
6225 
6226   /**
6227    * Test RegionTooBusyException thrown when region is busy
6228    */
6229   @Test (timeout=24000)
6230   public void testRegionTooBusy() throws IOException {
6231     String method = "testRegionTooBusy";
6232     byte[] tableName = Bytes.toBytes(method);
6233     byte[] family = Bytes.toBytes("family");
6234     long defaultBusyWaitDuration = CONF.getLong("hbase.busy.wait.duration",
6235       HRegion.DEFAULT_BUSY_WAIT_DURATION);
6236     CONF.setLong("hbase.busy.wait.duration", 1000);
6237     region = initHRegion(tableName, method, CONF, family);
6238     final AtomicBoolean stopped = new AtomicBoolean(true);
6239     Thread t = new Thread(new Runnable() {
6240       @Override
6241       public void run() {
6242         try {
6243           region.lock.writeLock().lock();
6244           stopped.set(false);
6245           while (!stopped.get()) {
6246             Thread.sleep(100);
6247           }
6248         } catch (InterruptedException ie) {
6249         } finally {
6250           region.lock.writeLock().unlock();
6251         }
6252       }
6253     });
6254     t.start();
6255     Get get = new Get(row);
6256     try {
6257       while (stopped.get()) {
6258         Thread.sleep(100);
6259       }
6260       region.get(get);
6261       fail("Should throw RegionTooBusyException");
6262     } catch (InterruptedException ie) {
6263       fail("test interrupted");
6264     } catch (RegionTooBusyException e) {
6265       // Good, expected
6266     } finally {
6267       stopped.set(true);
6268       try {
6269         t.join();
6270       } catch (Throwable e) {
6271       }
6272 
6273       HRegion.closeHRegion(region);
6274       region = null;
6275       CONF.setLong("hbase.busy.wait.duration", defaultBusyWaitDuration);
6276     }
6277   }
6278 
6279   @Test
6280   public void testCellTTLs() throws IOException {
6281     IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
6282     EnvironmentEdgeManager.injectEdge(edge);
6283 
6284     final byte[] row = Bytes.toBytes("testRow");
6285     final byte[] q1 = Bytes.toBytes("q1");
6286     final byte[] q2 = Bytes.toBytes("q2");
6287     final byte[] q3 = Bytes.toBytes("q3");
6288     final byte[] q4 = Bytes.toBytes("q4");
6289 
6290     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
6291     HColumnDescriptor hcd = new HColumnDescriptor(fam1);
6292     hcd.setTimeToLive(10); // 10 seconds
6293     htd.addFamily(hcd);
6294 
6295     Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
6296     conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
6297 
6298     HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
6299         HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
6300       TEST_UTIL.getDataTestDir(), conf, htd);
6301     assertNotNull(region);
6302     try {
6303       long now = EnvironmentEdgeManager.currentTime();
6304       // Add a cell that will expire in 5 seconds via cell TTL
6305       region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
6306         HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
6307           // TTL tags specify ts in milliseconds
6308           new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6309       // Add a cell that will expire after 10 seconds via family setting
6310       region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
6311       // Add a cell that will expire in 15 seconds via cell TTL
6312       region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
6313         HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
6314           // TTL tags specify ts in milliseconds
6315           new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
6316       // Add a cell that will expire in 20 seconds via family setting
6317       region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
6318 
6319       // Flush so we are sure store scanning gets this right
6320       region.flush(true);
6321 
6322       // A query at time T+0 should return all cells
6323       Result r = region.get(new Get(row));
6324       assertNotNull(r.getValue(fam1, q1));
6325       assertNotNull(r.getValue(fam1, q2));
6326       assertNotNull(r.getValue(fam1, q3));
6327       assertNotNull(r.getValue(fam1, q4));
6328 
6329       // Increment time to T+5 seconds
6330       edge.incrementTime(5000);
6331 
6332       r = region.get(new Get(row));
6333       assertNull(r.getValue(fam1, q1));
6334       assertNotNull(r.getValue(fam1, q2));
6335       assertNotNull(r.getValue(fam1, q3));
6336       assertNotNull(r.getValue(fam1, q4));
6337 
6338       // Increment time to T+10 seconds
6339       edge.incrementTime(5000);
6340 
6341       r = region.get(new Get(row));
6342       assertNull(r.getValue(fam1, q1));
6343       assertNull(r.getValue(fam1, q2));
6344       assertNotNull(r.getValue(fam1, q3));
6345       assertNotNull(r.getValue(fam1, q4));
6346 
6347       // Increment time to T+15 seconds
6348       edge.incrementTime(5000);
6349 
6350       r = region.get(new Get(row));
6351       assertNull(r.getValue(fam1, q1));
6352       assertNull(r.getValue(fam1, q2));
6353       assertNull(r.getValue(fam1, q3));
6354       assertNotNull(r.getValue(fam1, q4));
6355 
6356       // Increment time to T+20 seconds
6357       edge.incrementTime(10000);
6358 
6359       r = region.get(new Get(row));
6360       assertNull(r.getValue(fam1, q1));
6361       assertNull(r.getValue(fam1, q2));
6362       assertNull(r.getValue(fam1, q3));
6363       assertNull(r.getValue(fam1, q4));
6364 
6365       // Fun with disappearing increments
6366 
6367       // Start at 1
6368       region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
6369       r = region.get(new Get(row));
6370       byte[] val = r.getValue(fam1, q1);
6371       assertNotNull(val);
6372       assertEquals(Bytes.toLong(val), 1L);
6373 
6374       // Increment with a TTL of 5 seconds
6375       Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
6376       incr.setTTL(5000);
6377       region.increment(incr); // 2
6378 
6379       // New value should be 2
6380       r = region.get(new Get(row));
6381       val = r.getValue(fam1, q1);
6382       assertNotNull(val);
6383       assertEquals(Bytes.toLong(val), 2L);
6384 
6385       // Increment time to T+25 seconds
6386       edge.incrementTime(5000);
6387 
6388       // Value should be back to 1
6389       r = region.get(new Get(row));
6390       val = r.getValue(fam1, q1);
6391       assertNotNull(val);
6392       assertEquals(Bytes.toLong(val), 1L);
6393 
6394       // Increment time to T+30 seconds
6395       edge.incrementTime(5000);
6396 
6397       // Original value written at T+20 should be gone now via family TTL
6398       r = region.get(new Get(row));
6399       assertNull(r.getValue(fam1, q1));
6400 
6401     } finally {
6402       HRegion.closeHRegion(region);
6403     }
6404   }
6405 
6406   @Test
6407   public void testIncrementTimestampsAreMonotonic() throws IOException {
6408     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6409     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6410     EnvironmentEdgeManager.injectEdge(edge);
6411 
6412     edge.setValue(10);
6413     Increment inc = new Increment(row);
6414     inc.setDurability(Durability.SKIP_WAL);
6415     inc.addColumn(fam1, qual1, 1L);
6416     region.increment(inc);
6417 
6418     Result result = region.get(new Get(row));
6419     Cell c = result.getColumnLatestCell(fam1, qual1);
6420     assertNotNull(c);
6421     assertEquals(c.getTimestamp(), 10L);
6422 
6423     edge.setValue(1); // clock goes back
6424     region.increment(inc);
6425     result = region.get(new Get(row));
6426     c = result.getColumnLatestCell(fam1, qual1);
6427     assertEquals(c.getTimestamp(), 10L);
6428     assertEquals(Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()), 2L);
6429   }
6430 
6431   @Test
6432   public void testAppendTimestampsAreMonotonic() throws IOException {
6433     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6434     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6435     EnvironmentEdgeManager.injectEdge(edge);
6436 
6437     edge.setValue(10);
6438     Append a = new Append(row);
6439     a.setDurability(Durability.SKIP_WAL);
6440     a.add(fam1, qual1, qual1);
6441     region.append(a);
6442 
6443     Result result = region.get(new Get(row));
6444     Cell c = result.getColumnLatestCell(fam1, qual1);
6445     assertNotNull(c);
6446     assertEquals(c.getTimestamp(), 10L);
6447 
6448     edge.setValue(1); // clock goes back
6449     region.append(a);
6450     result = region.get(new Get(row));
6451     c = result.getColumnLatestCell(fam1, qual1);
6452     assertEquals(c.getTimestamp(), 10L);
6453 
6454     byte[] expected = new byte[qual1.length*2];
6455     System.arraycopy(qual1, 0, expected, 0, qual1.length);
6456     System.arraycopy(qual1, 0, expected, qual1.length, qual1.length);
6457 
6458     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6459       expected, 0, expected.length));
6460   }
6461 
6462   @Test
6463   public void testCheckAndMutateTimestampsAreMonotonic() throws IOException {
6464     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6465     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6466     EnvironmentEdgeManager.injectEdge(edge);
6467 
6468     edge.setValue(10);
6469     Put p = new Put(row);
6470     p.setDurability(Durability.SKIP_WAL);
6471     p.addColumn(fam1, qual1, qual1);
6472     region.put(p);
6473 
6474     Result result = region.get(new Get(row));
6475     Cell c = result.getColumnLatestCell(fam1, qual1);
6476     assertNotNull(c);
6477     assertEquals(c.getTimestamp(), 10L);
6478 
6479     edge.setValue(1); // clock goes back
6480     p = new Put(row);
6481     p.setDurability(Durability.SKIP_WAL);
6482     p.addColumn(fam1, qual1, qual2);
6483     region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1), p, false);
6484     result = region.get(new Get(row));
6485     c = result.getColumnLatestCell(fam1, qual1);
6486     assertEquals(c.getTimestamp(), 10L);
6487 
6488     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6489       qual2, 0, qual2.length));
6490   }
6491 
6492   @Test(timeout = 60000)
6493   public void testBatchMutateWithWrongRegionException() throws Exception {
6494     final byte[] a = Bytes.toBytes("a");
6495     final byte[] b = Bytes.toBytes("b");
6496     final byte[] c = Bytes.toBytes("c"); // exclusive
6497 
6498     int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
6499     CONF.setInt("hbase.rowlock.wait.duration", 1000);
6500     final HRegion region = initHRegion(tableName, a, c, name.getMethodName(), CONF, false, fam1);
6501 
6502     Mutation[] mutations = new Mutation[] {
6503         new Put(a).addImmutable(fam1, null, null),
6504         new Put(c).addImmutable(fam1, null, null), // this is outside the region boundary
6505         new Put(b).addImmutable(fam1, null, null),
6506     };
6507 
6508     OperationStatus[] status = region.batchMutate(mutations);
6509     assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6510     assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE);
6511     assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6512 
6513 
6514     // test with a row lock held for a long time
6515     final CountDownLatch obtainedRowLock = new CountDownLatch(1);
6516     ExecutorService exec = Executors.newFixedThreadPool(2);
6517     Future<Void> f1 = exec.submit(new Callable<Void>() {
6518       @Override
6519       public Void call() throws Exception {
6520         LOG.info("Acquiring row lock");
6521         RowLock rl = region.getRowLock(b);
6522         obtainedRowLock.countDown();
6523         LOG.info("Waiting for 5 seconds before releasing lock");
6524         Threads.sleep(5000);
6525         LOG.info("Releasing row lock");
6526         rl.release();
6527         return null;
6528       }
6529     });
6530     obtainedRowLock.await(30, TimeUnit.SECONDS);
6531 
6532     Future<Void> f2 = exec.submit(new Callable<Void>() {
6533       @Override
6534       public Void call() throws Exception {
6535         Mutation[] mutations = new Mutation[] {
6536             new Put(a).addImmutable(fam1, null, null),
6537             new Put(b).addImmutable(fam1, null, null),
6538         };
6539 
6540         // this will wait for the row lock, and it will eventually succeed
6541         OperationStatus[] status = region.batchMutate(mutations);
6542         assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6543         assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SUCCESS);
6544         return null;
6545       }
6546     });
6547 
6548     f1.get();
6549     f2.get();
6550 
6551     CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
6552   }
6553 
6554   @Test
6555   public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException {
6556     HRegion region = initHRegion(tableName, name.getMethodName(), CONF, fam1);
6557     ManualEnvironmentEdge edge = new ManualEnvironmentEdge();
6558     EnvironmentEdgeManager.injectEdge(edge);
6559 
6560     edge.setValue(10);
6561     Put p = new Put(row);
6562     p.setDurability(Durability.SKIP_WAL);
6563     p.addColumn(fam1, qual1, qual1);
6564     region.put(p);
6565 
6566     Result result = region.get(new Get(row));
6567     Cell c = result.getColumnLatestCell(fam1, qual1);
6568     assertNotNull(c);
6569     assertEquals(c.getTimestamp(), 10L);
6570 
6571     edge.setValue(1); // clock goes back
6572     p = new Put(row);
6573     p.setDurability(Durability.SKIP_WAL);
6574     p.addColumn(fam1, qual1, qual2);
6575     RowMutations rm = new RowMutations(row);
6576     rm.add(p);
6577     region.checkAndRowMutate(row, fam1, qual1, CompareOp.EQUAL, new BinaryComparator(qual1),
6578       rm, false);
6579     result = region.get(new Get(row));
6580     c = result.getColumnLatestCell(fam1, qual1);
6581     assertEquals(c.getTimestamp(), 10L);
6582 
6583     assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(),
6584       qual2, 0, qual2.length));
6585   }
6586 
6587   static HRegion initHRegion(byte[] tableName, String callingMethod,
6588       byte[]... families) throws IOException {
6589     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
6590         families);
6591   }
6592 
6593   /**
6594    * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
6595    * @throws IOException if IO error occurred during test
6596    */
6597   @Test(timeout = 60000)
6598   public void testWritesWhileRollWriter() throws IOException {
6599     int testCount = 10;
6600     int numRows = 1024;
6601     int numFamilies = 2;
6602     int numQualifiers = 2;
6603     final byte[][] families = new byte[numFamilies][];
6604     for (int i = 0; i < numFamilies; i++) {
6605       families[i] = Bytes.toBytes("family" + i);
6606     }
6607     final byte[][] qualifiers = new byte[numQualifiers][];
6608     for (int i = 0; i < numQualifiers; i++) {
6609       qualifiers[i] = Bytes.toBytes("qual" + i);
6610     }
6611 
6612     String method = "testWritesWhileRollWriter";
6613     CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
6614     this.region = initHRegion(tableName, method, CONF, families);
6615     try {
6616       List<Thread> threads = new ArrayList<Thread>();
6617       for (int i = 0; i < numRows; i++) {
6618         final int count = i;
6619         Thread t = new Thread(new Runnable() {
6620 
6621           @Override
6622           public void run() {
6623             byte[] row = Bytes.toBytes("row" + count);
6624             Put put = new Put(row);
6625             put.setDurability(Durability.SYNC_WAL);
6626             byte[] value = Bytes.toBytes(String.valueOf(count));
6627             for (byte[] family : families) {
6628               for (byte[] qualifier : qualifiers) {
6629                 put.addColumn(family, qualifier, (long) count, value);
6630               }
6631             }
6632             try {
6633               region.put(put);
6634             } catch (IOException e) {
6635               throw new RuntimeException(e);
6636             }
6637           }
6638         });
6639         threads.add(t);
6640       }
6641       for (Thread t : threads) {
6642         t.start();
6643       }
6644 
6645       for (int i = 0; i < testCount; i++) {
6646         region.getWAL().rollWriter();
6647         Thread.yield();
6648       }
6649     } finally {
6650       try {
6651         HBaseTestingUtility.closeRegionAndWAL(this.region);
6652         CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
6653       } catch (DroppedSnapshotException dse) {
6654         // We could get this on way out because we interrupt the background flusher and it could
6655         // fail anywhere causing a DSE over in the background flusher... only it is not properly
6656         // dealt with so could still be memory hanging out when we get to here -- memory we can't
6657         // flush because the accounting is 'off' since original DSE.
6658       }
6659       this.region = null;
6660     }
6661   }
6662 }