View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableSet;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.Cell;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HBaseTestingUtility;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.KeepDeletedCells;
42  import org.apache.hadoop.hbase.KeyValue;
43  import org.apache.hadoop.hbase.KeyValueUtil;
44  import org.apache.hadoop.hbase.client.Durability;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.client.Result;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49  import org.apache.hadoop.hbase.filter.Filter;
50  import org.apache.hadoop.hbase.filter.FilterList;
51  import org.apache.hadoop.hbase.filter.FilterList.Operator;
52  import org.apache.hadoop.hbase.filter.PageFilter;
53  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
54  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
56  import org.apache.hadoop.hbase.io.hfile.HFileContext;
57  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
58  import org.apache.hadoop.hbase.testclassification.MediumTests;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.util.Pair;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  
64  import com.google.common.collect.Lists;
65  /**
66   * Test cases against ReversibleKeyValueScanner
67   */
68  @Category(MediumTests.class)
69  public class TestReversibleScanners {
70    private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class);
71    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
72  
73    private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
74    private static long TS = System.currentTimeMillis();
75    private static int MAXMVCC = 7;
76    private static byte[] ROW = Bytes.toBytes("testRow");
77    private static final int ROWSIZE = 200;
78    private static byte[][] ROWS = makeN(ROW, ROWSIZE);
79    private static byte[] QUAL = Bytes.toBytes("testQual");
80    private static final int QUALSIZE = 5;
81    private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
82    private static byte[] VALUE = Bytes.toBytes("testValue");
83    private static final int VALUESIZE = 3;
84    private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
85  
86    @Test
87    public void testReversibleStoreFileScanner() throws IOException {
88      FileSystem fs = TEST_UTIL.getTestFileSystem();
89      Path hfilePath = new Path(new Path(
90          TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
91          "regionname"), "familyname");
92      CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
93      for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
94        HFileContextBuilder hcBuilder = new HFileContextBuilder();
95        hcBuilder.withBlockSize(2 * 1024);
96        hcBuilder.withDataBlockEncoding(encoding);
97        HFileContext hFileContext = hcBuilder.build();
98        StoreFile.Writer writer = new StoreFile.WriterBuilder(
99            TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
100           .withFileContext(hFileContext).build();
101       writeStoreFile(writer);
102 
103       StoreFile sf = new StoreFile(fs, writer.getPath(),
104           TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
105 
106       List<StoreFileScanner> scanners = StoreFileScanner
107           .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
108               false, Long.MAX_VALUE);
109       StoreFileScanner scanner = scanners.get(0);
110       seekTestOfReversibleKeyValueScanner(scanner);
111       for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
112         LOG.info("Setting read point to " + readPoint);
113         scanners = StoreFileScanner.getScannersForStoreFiles(
114             Collections.singletonList(sf), false, true, false, readPoint);
115         seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
116       }
117     }
118 
119   }
120 
121   @Test
122   public void testReversibleMemstoreScanner() throws IOException {
123     MemStore memstore = new DefaultMemStore();
124     writeMemstore(memstore);
125     List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
126     seekTestOfReversibleKeyValueScanner(scanners.get(0));
127     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
128       LOG.info("Setting read point to " + readPoint);
129       scanners = memstore.getScanners(readPoint);
130       seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
131     }
132 
133   }
134 
135   @Test
136   public void testReversibleKeyValueHeap() throws IOException {
137     // write data to one memstore and two store files
138     FileSystem fs = TEST_UTIL.getTestFileSystem();
139     Path hfilePath = new Path(new Path(
140         TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
141         "familyname");
142     CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
143     HFileContextBuilder hcBuilder = new HFileContextBuilder();
144     hcBuilder.withBlockSize(2 * 1024);
145     HFileContext hFileContext = hcBuilder.build();
146     StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
147         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
148         hfilePath).withFileContext(hFileContext).build();
149     StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
150         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
151         hfilePath).withFileContext(hFileContext).build();
152 
153     MemStore memstore = new DefaultMemStore();
154     writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
155         writer2 });
156 
157     StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
158         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
159 
160     StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
161         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
162     /**
163      * Test without MVCC
164      */
165     int startRowNum = ROWSIZE / 2;
166     ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
167         ROWS[startRowNum], MAXMVCC);
168     internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
169 
170     startRowNum = ROWSIZE - 1;
171     kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
172         HConstants.EMPTY_START_ROW, MAXMVCC);
173     internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
174 
175     /**
176      * Test with MVCC
177      */
178     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
179       LOG.info("Setting read point to " + readPoint);
180       startRowNum = ROWSIZE - 1;
181       kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
182           HConstants.EMPTY_START_ROW, readPoint);
183       for (int i = startRowNum; i >= 0; i--) {
184         if (i - 2 < 0) break;
185         i = i - 2;
186         kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
187         Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
188             i, 0, readPoint);
189         if (nextReadableNum == null) break;
190         KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
191             nextReadableNum.getSecond());
192         assertEquals(expecedKey, kvHeap.peek());
193         i = nextReadableNum.getFirst();
194         int qualNum = nextReadableNum.getSecond();
195         if (qualNum + 1 < QUALSIZE) {
196           kvHeap.backwardSeek(makeKV(i, qualNum + 1));
197           nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
198               readPoint);
199           if (nextReadableNum == null) break;
200           expecedKey = makeKV(nextReadableNum.getFirst(),
201               nextReadableNum.getSecond());
202           assertEquals(expecedKey, kvHeap.peek());
203           i = nextReadableNum.getFirst();
204           qualNum = nextReadableNum.getSecond();
205         }
206 
207         kvHeap.next();
208 
209         if (qualNum + 1 >= QUALSIZE) {
210           nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
211               readPoint);
212         } else {
213           nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
214               readPoint);
215         }
216         if (nextReadableNum == null) break;
217         expecedKey = makeKV(nextReadableNum.getFirst(),
218             nextReadableNum.getSecond());
219         assertEquals(expecedKey, kvHeap.peek());
220         i = nextReadableNum.getFirst();
221       }
222     }
223   }
224 
225   @Test
226   public void testReversibleStoreScanner() throws IOException {
227     // write data to one memstore and two store files
228     FileSystem fs = TEST_UTIL.getTestFileSystem();
229     Path hfilePath = new Path(new Path(
230         TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
231         "familyname");
232     CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
233     HFileContextBuilder hcBuilder = new HFileContextBuilder();
234     hcBuilder.withBlockSize(2 * 1024);
235     HFileContext hFileContext = hcBuilder.build();
236     StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
237         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
238         hfilePath).withFileContext(hFileContext).build();
239     StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
240         TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
241         hfilePath).withFileContext(hFileContext).build();
242 
243     MemStore memstore = new DefaultMemStore();
244     writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
245         writer2 });
246 
247     StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
248         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
249 
250     StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
251         TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
252 
253     ScanType scanType = ScanType.USER_SCAN;
254     ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
255         Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
256 
257     // Case 1.Test a full reversed scan
258     Scan scan = new Scan();
259     scan.setReversed(true);
260     StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
261         scan, scanType, scanInfo, MAXMVCC);
262     verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
263 
264     // Case 2.Test reversed scan with a specified start row
265     int startRowNum = ROWSIZE / 2;
266     byte[] startRow = ROWS[startRowNum];
267     scan.setStartRow(startRow);
268     storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
269         scanType, scanInfo, MAXMVCC);
270     verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
271         startRowNum + 1, false);
272 
273     // Case 3.Test reversed scan with a specified start row and specified
274     // qualifiers
275     assertTrue(QUALSIZE > 2);
276     scan.addColumn(FAMILYNAME, QUALS[0]);
277     scan.addColumn(FAMILYNAME, QUALS[2]);
278     storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
279         scanType, scanInfo, MAXMVCC);
280     verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
281         false);
282 
283     // Case 4.Test reversed scan with mvcc based on case 3
284     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
285       LOG.info("Setting read point to " + readPoint);
286       storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
287           scanType, scanInfo, readPoint);
288       int expectedRowCount = 0;
289       int expectedKVCount = 0;
290       for (int i = startRowNum; i >= 0; i--) {
291         int kvCount = 0;
292         if (makeMVCC(i, 0) <= readPoint) {
293           kvCount++;
294         }
295         if (makeMVCC(i, 2) <= readPoint) {
296           kvCount++;
297         }
298         if (kvCount > 0) {
299           expectedRowCount++;
300           expectedKVCount += kvCount;
301         }
302       }
303       verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
304           false);
305     }
306   }
307 
308   @Test
309   public void testReversibleRegionScanner() throws IOException {
310     byte[] tableName = Bytes.toBytes("testtable");
311     byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
312     Configuration conf = HBaseConfiguration.create();
313     HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null,
314         "testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null,
315         FAMILYNAME, FAMILYNAME2);
316     loadDataToRegion(region, FAMILYNAME2);
317 
318     // verify row count with forward scan
319     Scan scan = new Scan();
320     InternalScanner scanner = region.getScanner(scan);
321     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
322 
323     // Case1:Full reversed scan
324     scan.setReversed(true);
325     scanner = region.getScanner(scan);
326     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
327 
328     // Case2:Full reversed scan with one family
329     scan = new Scan();
330     scan.setReversed(true);
331     scan.addFamily(FAMILYNAME);
332     scanner = region.getScanner(scan);
333     verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
334 
335     // Case3:Specify qualifiers + One family
336     byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
337     for (byte[] specifiedQualifier : specifiedQualifiers)
338       scan.addColumn(FAMILYNAME, specifiedQualifier);
339     scanner = region.getScanner(scan);
340     verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
341 
342     // Case4:Specify qualifiers + Two families
343     for (byte[] specifiedQualifier : specifiedQualifiers)
344       scan.addColumn(FAMILYNAME2, specifiedQualifier);
345     scanner = region.getScanner(scan);
346     verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
347 
348     // Case5: Case4 + specify start row
349     int startRowNum = ROWSIZE * 3 / 4;
350     scan.setStartRow(ROWS[startRowNum]);
351     scanner = region.getScanner(scan);
352     verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
353         false);
354 
355     // Case6: Case4 + specify stop row
356     int stopRowNum = ROWSIZE / 4;
357     scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
358     scan.setStopRow(ROWS[stopRowNum]);
359     scanner = region.getScanner(scan);
360     verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
361         - stopRowNum - 1), false);
362 
363     // Case7: Case4 + specify start row + specify stop row
364     scan.setStartRow(ROWS[startRowNum]);
365     scanner = region.getScanner(scan);
366     verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
367         (startRowNum - stopRowNum), false);
368 
369     // Case8: Case7 + SingleColumnValueFilter
370     int valueNum = startRowNum % VALUESIZE;
371     Filter filter = new SingleColumnValueFilter(FAMILYNAME,
372         specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
373     scan.setFilter(filter);
374     scanner = region.getScanner(scan);
375     int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
376         + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
377     verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
378         false);
379 
380     // Case9: Case7 + PageFilter
381     int pageSize = 10;
382     filter = new PageFilter(pageSize);
383     scan.setFilter(filter);
384     scanner = region.getScanner(scan);
385     int expectedRowNum = pageSize;
386     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
387 
388     // Case10: Case7 + FilterList+MUST_PASS_ONE
389     SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
390         FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
391     SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
392         FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
393     expectedRowNum = 0;
394     for (int i = startRowNum; i > stopRowNum; i--) {
395       if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
396         expectedRowNum++;
397       }
398     }
399     filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
400     scan.setFilter(filter);
401     scanner = region.getScanner(scan);
402     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
403 
404     // Case10: Case7 + FilterList+MUST_PASS_ALL
405     filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
406     expectedRowNum = 0;
407     scan.setFilter(filter);
408     scanner = region.getScanner(scan);
409     verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
410   }
411 
412   private StoreScanner getReversibleStoreScanner(MemStore memstore,
413       StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
414       ScanInfo scanInfo, int readPoint) throws IOException {
415     List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
416         false, readPoint);
417     NavigableSet<byte[]> columns = null;
418     for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
419         .entrySet()) {
420       // Should only one family
421       columns = entry.getValue();
422     }
423     StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
424         scanType, columns, scanners);
425     return storeScanner;
426   }
427 
428   private void verifyCountAndOrder(InternalScanner scanner,
429       int expectedKVCount, int expectedRowCount, boolean forward)
430       throws IOException {
431     List<Cell> kvList = new ArrayList<Cell>();
432     Result lastResult = null;
433     int rowCount = 0;
434     int kvCount = 0;
435     try {
436       while (scanner.next(kvList)) {
437         if (kvList.isEmpty()) continue;
438         rowCount++;
439         kvCount += kvList.size();
440         if (lastResult != null) {
441           Result curResult = Result.create(kvList);
442           assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
443               forward,
444               Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
445         }
446         lastResult = Result.create(kvList);
447         kvList.clear();
448       }
449     } finally {
450       scanner.close();
451     }
452     if (!kvList.isEmpty()) {
453       rowCount++;
454       kvCount += kvList.size();
455       kvList.clear();
456     }
457     assertEquals(expectedKVCount, kvCount);
458     assertEquals(expectedRowCount, rowCount);
459   }
460 
461   private void internalTestSeekAndNextForReversibleKeyValueHeap(
462       ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
463     // Test next and seek
464     for (int i = startRowNum; i >= 0; i--) {
465       if (i % 2 == 1 && i - 2 >= 0) {
466         i = i - 2;
467         kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
468       }
469       for (int j = 0; j < QUALSIZE; j++) {
470         if (j % 2 == 1 && (j + 1) < QUALSIZE) {
471           j = j + 1;
472           kvHeap.backwardSeek(makeKV(i, j));
473         }
474         assertEquals(makeKV(i, j), kvHeap.peek());
475         kvHeap.next();
476       }
477     }
478     assertEquals(null, kvHeap.peek());
479   }
480 
481   private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
482       StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
483       throws IOException {
484     List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
485         true, readPoint);
486     ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
487         KeyValue.COMPARATOR);
488     return kvHeap;
489   }
490 
491   private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
492       StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
493       throws IOException {
494     List<StoreFileScanner> fileScanners = StoreFileScanner
495         .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
496             false, readPoint);
497     List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
498     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
499         fileScanners.size() + 1);
500     scanners.addAll(fileScanners);
501     scanners.addAll(memScanners);
502 
503     if (doSeek) {
504       if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
505         for (KeyValueScanner scanner : scanners) {
506           scanner.seekToLastRow();
507         }
508       } else {
509         KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
510         for (KeyValueScanner scanner : scanners) {
511           scanner.backwardSeek(startKey);
512         }
513       }
514     }
515     return scanners;
516   }
517 
518   private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
519       throws IOException {
520     /**
521      * Test without MVCC
522      */
523     // Test seek to last row
524     assertTrue(scanner.seekToLastRow());
525     assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
526 
527     // Test backward seek in three cases
528     // Case1: seek in the same row in backwardSeek
529     KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
530     assertTrue(scanner.backwardSeek(seekKey));
531     assertEquals(seekKey, scanner.peek());
532 
533     // Case2: seek to the previous row in backwardSeek
534     int seekRowNum = ROWSIZE - 2;
535     assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
536     KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
537     assertEquals(expectedKey, scanner.peek());
538 
539     // Case3: unable to backward seek
540     assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
541     assertEquals(null, scanner.peek());
542 
543     // Test seek to previous row
544     seekRowNum = ROWSIZE - 4;
545     assertTrue(scanner.seekToPreviousRow(KeyValueUtil
546         .createFirstOnRow(ROWS[seekRowNum])));
547     expectedKey = makeKV(seekRowNum - 1, 0);
548     assertEquals(expectedKey, scanner.peek());
549 
550     // Test seek to previous row for the first row
551     assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
552     assertEquals(null, scanner.peek());
553 
554   }
555 
556   private void seekTestOfReversibleKeyValueScannerWithMVCC(
557       KeyValueScanner scanner, int readPoint) throws IOException {
558     /**
559      * Test with MVCC
560      */
561       // Test seek to last row
562       KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
563           ROWSIZE - 1, 0, readPoint);
564       assertEquals(expectedKey != null, scanner.seekToLastRow());
565       assertEquals(expectedKey, scanner.peek());
566 
567       // Test backward seek in two cases
568       // Case1: seek in the same row in backwardSeek
569       expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
570           QUALSIZE - 2, readPoint);
571       assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
572       assertEquals(expectedKey, scanner.peek());
573 
574       // Case2: seek to the previous row in backwardSeek
575     int seekRowNum = ROWSIZE - 3;
576     KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
577       expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
578           readPoint);
579       assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
580       assertEquals(expectedKey, scanner.peek());
581 
582       // Test seek to previous row
583       seekRowNum = ROWSIZE - 4;
584       expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
585           readPoint);
586       assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil
587           .createFirstOnRow(ROWS[seekRowNum])));
588       assertEquals(expectedKey, scanner.peek());
589   }
590 
591   private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
592       int startQualNum, int readPoint) {
593     Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
594         startRowNum, startQualNum, readPoint);
595     if (nextReadableNum == null)
596       return null;
597     return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
598   }
599 
600   private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
601       int startRowNum, int startQualNum, int readPoint) {
602     Pair<Integer, Integer> nextReadableNum = null;
603     boolean findExpected = false;
604     for (int i = startRowNum; i >= 0; i--) {
605       for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
606         if (makeMVCC(i, j) <= readPoint) {
607           nextReadableNum = new Pair<Integer, Integer>(i, j);
608           findExpected = true;
609           break;
610         }
611       }
612       if (findExpected)
613         break;
614     }
615     return nextReadableNum;
616   }
617 
618   private static void loadDataToRegion(Region region, byte[] additionalFamily)
619       throws IOException {
620     for (int i = 0; i < ROWSIZE; i++) {
621       Put put = new Put(ROWS[i]);
622       for (int j = 0; j < QUALSIZE; j++) {
623         put.add(makeKV(i, j));
624         // put additional family
625         put.add(makeKV(i, j, additionalFamily));
626       }
627       region.put(put);
628       if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
629         region.flush(true);
630       }
631     }
632   }
633 
634   private static void writeMemstoreAndStoreFiles(MemStore memstore,
635       final StoreFile.Writer[] writers) throws IOException {
636     try {
637       for (int i = 0; i < ROWSIZE; i++) {
638         for (int j = 0; j < QUALSIZE; j++) {
639           if (i % 2 == 0) {
640             memstore.add(makeKV(i, j));
641           } else {
642             writers[(i + j) % writers.length].append(makeKV(i, j));
643           }
644         }
645       }
646     } finally {
647       for (int i = 0; i < writers.length; i++) {
648         writers[i].close();
649       }
650     }
651   }
652 
653   private static void writeStoreFile(final StoreFile.Writer writer)
654       throws IOException {
655     try {
656       for (int i = 0; i < ROWSIZE; i++) {
657         for (int j = 0; j < QUALSIZE; j++) {
658           writer.append(makeKV(i, j));
659         }
660       }
661     } finally {
662       writer.close();
663     }
664   }
665 
666   private static void writeMemstore(MemStore memstore) throws IOException {
667     // Add half of the keyvalues to memstore
668     for (int i = 0; i < ROWSIZE; i++) {
669       for (int j = 0; j < QUALSIZE; j++) {
670         if ((i + j) % 2 == 0) {
671           memstore.add(makeKV(i, j));
672         }
673       }
674     }
675     memstore.snapshot();
676     // Add another half of the keyvalues to snapshot
677     for (int i = 0; i < ROWSIZE; i++) {
678       for (int j = 0; j < QUALSIZE; j++) {
679         if ((i + j) % 2 == 1) {
680           memstore.add(makeKV(i, j));
681         }
682       }
683     }
684   }
685 
686   private static KeyValue makeKV(int rowNum, int cqNum) {
687     return makeKV(rowNum, cqNum, FAMILYNAME);
688   }
689 
690   private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
691     KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
692         VALUES[rowNum % VALUESIZE]);
693     kv.setSequenceId(makeMVCC(rowNum, cqNum));
694     return kv;
695   }
696 
697   private static long makeMVCC(int rowNum, int cqNum) {
698     return (rowNum + cqNum) % (MAXMVCC + 1);
699   }
700 
701   private static byte[][] makeN(byte[] base, int n) {
702     byte[][] ret = new byte[n][];
703     for (int i = 0; i < n; i++) {
704       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
705     }
706     return ret;
707   }
708 }