1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableSet;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.KeepDeletedCells;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.KeyValueUtil;
44 import org.apache.hadoop.hbase.client.Durability;
45 import org.apache.hadoop.hbase.client.Put;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49 import org.apache.hadoop.hbase.filter.Filter;
50 import org.apache.hadoop.hbase.filter.FilterList;
51 import org.apache.hadoop.hbase.filter.FilterList.Operator;
52 import org.apache.hadoop.hbase.filter.PageFilter;
53 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
54 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
55 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
56 import org.apache.hadoop.hbase.io.hfile.HFileContext;
57 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
58 import org.apache.hadoop.hbase.testclassification.MediumTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63
64 import com.google.common.collect.Lists;
65
66
67
68 @Category(MediumTests.class)
69 public class TestReversibleScanners {
70 private static final Log LOG = LogFactory.getLog(TestReversibleScanners.class);
71 HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
72
73 private static byte[] FAMILYNAME = Bytes.toBytes("testCf");
74 private static long TS = System.currentTimeMillis();
75 private static int MAXMVCC = 7;
76 private static byte[] ROW = Bytes.toBytes("testRow");
77 private static final int ROWSIZE = 200;
78 private static byte[][] ROWS = makeN(ROW, ROWSIZE);
79 private static byte[] QUAL = Bytes.toBytes("testQual");
80 private static final int QUALSIZE = 5;
81 private static byte[][] QUALS = makeN(QUAL, QUALSIZE);
82 private static byte[] VALUE = Bytes.toBytes("testValue");
83 private static final int VALUESIZE = 3;
84 private static byte[][] VALUES = makeN(VALUE, VALUESIZE);
85
86 @Test
87 public void testReversibleStoreFileScanner() throws IOException {
88 FileSystem fs = TEST_UTIL.getTestFileSystem();
89 Path hfilePath = new Path(new Path(
90 TEST_UTIL.getDataTestDir("testReversibleStoreFileScanner"),
91 "regionname"), "familyname");
92 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
93 for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
94 HFileContextBuilder hcBuilder = new HFileContextBuilder();
95 hcBuilder.withBlockSize(2 * 1024);
96 hcBuilder.withDataBlockEncoding(encoding);
97 HFileContext hFileContext = hcBuilder.build();
98 StoreFile.Writer writer = new StoreFile.WriterBuilder(
99 TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(hfilePath)
100 .withFileContext(hFileContext).build();
101 writeStoreFile(writer);
102
103 StoreFile sf = new StoreFile(fs, writer.getPath(),
104 TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
105
106 List<StoreFileScanner> scanners = StoreFileScanner
107 .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
108 false, Long.MAX_VALUE);
109 StoreFileScanner scanner = scanners.get(0);
110 seekTestOfReversibleKeyValueScanner(scanner);
111 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
112 LOG.info("Setting read point to " + readPoint);
113 scanners = StoreFileScanner.getScannersForStoreFiles(
114 Collections.singletonList(sf), false, true, false, readPoint);
115 seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
116 }
117 }
118
119 }
120
121 @Test
122 public void testReversibleMemstoreScanner() throws IOException {
123 MemStore memstore = new DefaultMemStore();
124 writeMemstore(memstore);
125 List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
126 seekTestOfReversibleKeyValueScanner(scanners.get(0));
127 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
128 LOG.info("Setting read point to " + readPoint);
129 scanners = memstore.getScanners(readPoint);
130 seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
131 }
132
133 }
134
135 @Test
136 public void testReversibleKeyValueHeap() throws IOException {
137
138 FileSystem fs = TEST_UTIL.getTestFileSystem();
139 Path hfilePath = new Path(new Path(
140 TEST_UTIL.getDataTestDir("testReversibleKeyValueHeap"), "regionname"),
141 "familyname");
142 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
143 HFileContextBuilder hcBuilder = new HFileContextBuilder();
144 hcBuilder.withBlockSize(2 * 1024);
145 HFileContext hFileContext = hcBuilder.build();
146 StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
147 TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
148 hfilePath).withFileContext(hFileContext).build();
149 StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
150 TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
151 hfilePath).withFileContext(hFileContext).build();
152
153 MemStore memstore = new DefaultMemStore();
154 writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
155 writer2 });
156
157 StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
158 TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
159
160 StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
161 TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
162
163
164
165 int startRowNum = ROWSIZE / 2;
166 ReversedKeyValueHeap kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
167 ROWS[startRowNum], MAXMVCC);
168 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
169
170 startRowNum = ROWSIZE - 1;
171 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
172 HConstants.EMPTY_START_ROW, MAXMVCC);
173 internalTestSeekAndNextForReversibleKeyValueHeap(kvHeap, startRowNum);
174
175
176
177
178 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
179 LOG.info("Setting read point to " + readPoint);
180 startRowNum = ROWSIZE - 1;
181 kvHeap = getReversibleKeyValueHeap(memstore, sf1, sf2,
182 HConstants.EMPTY_START_ROW, readPoint);
183 for (int i = startRowNum; i >= 0; i--) {
184 if (i - 2 < 0) break;
185 i = i - 2;
186 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
187 Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
188 i, 0, readPoint);
189 if (nextReadableNum == null) break;
190 KeyValue expecedKey = makeKV(nextReadableNum.getFirst(),
191 nextReadableNum.getSecond());
192 assertEquals(expecedKey, kvHeap.peek());
193 i = nextReadableNum.getFirst();
194 int qualNum = nextReadableNum.getSecond();
195 if (qualNum + 1 < QUALSIZE) {
196 kvHeap.backwardSeek(makeKV(i, qualNum + 1));
197 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
198 readPoint);
199 if (nextReadableNum == null) break;
200 expecedKey = makeKV(nextReadableNum.getFirst(),
201 nextReadableNum.getSecond());
202 assertEquals(expecedKey, kvHeap.peek());
203 i = nextReadableNum.getFirst();
204 qualNum = nextReadableNum.getSecond();
205 }
206
207 kvHeap.next();
208
209 if (qualNum + 1 >= QUALSIZE) {
210 nextReadableNum = getNextReadableNumWithBackwardScan(i - 1, 0,
211 readPoint);
212 } else {
213 nextReadableNum = getNextReadableNumWithBackwardScan(i, qualNum + 1,
214 readPoint);
215 }
216 if (nextReadableNum == null) break;
217 expecedKey = makeKV(nextReadableNum.getFirst(),
218 nextReadableNum.getSecond());
219 assertEquals(expecedKey, kvHeap.peek());
220 i = nextReadableNum.getFirst();
221 }
222 }
223 }
224
225 @Test
226 public void testReversibleStoreScanner() throws IOException {
227
228 FileSystem fs = TEST_UTIL.getTestFileSystem();
229 Path hfilePath = new Path(new Path(
230 TEST_UTIL.getDataTestDir("testReversibleStoreScanner"), "regionname"),
231 "familyname");
232 CacheConfig cacheConf = new CacheConfig(TEST_UTIL.getConfiguration());
233 HFileContextBuilder hcBuilder = new HFileContextBuilder();
234 hcBuilder.withBlockSize(2 * 1024);
235 HFileContext hFileContext = hcBuilder.build();
236 StoreFile.Writer writer1 = new StoreFile.WriterBuilder(
237 TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
238 hfilePath).withFileContext(hFileContext).build();
239 StoreFile.Writer writer2 = new StoreFile.WriterBuilder(
240 TEST_UTIL.getConfiguration(), cacheConf, fs).withOutputDir(
241 hfilePath).withFileContext(hFileContext).build();
242
243 MemStore memstore = new DefaultMemStore();
244 writeMemstoreAndStoreFiles(memstore, new StoreFile.Writer[] { writer1,
245 writer2 });
246
247 StoreFile sf1 = new StoreFile(fs, writer1.getPath(),
248 TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
249
250 StoreFile sf2 = new StoreFile(fs, writer2.getPath(),
251 TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
252
253 ScanType scanType = ScanType.USER_SCAN;
254 ScanInfo scanInfo = new ScanInfo(FAMILYNAME, 0, Integer.MAX_VALUE,
255 Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
256
257
258 Scan scan = new Scan();
259 scan.setReversed(true);
260 StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
261 scan, scanType, scanInfo, MAXMVCC);
262 verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
263
264
265 int startRowNum = ROWSIZE / 2;
266 byte[] startRow = ROWS[startRowNum];
267 scan.setStartRow(startRow);
268 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
269 scanType, scanInfo, MAXMVCC);
270 verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
271 startRowNum + 1, false);
272
273
274
275 assertTrue(QUALSIZE > 2);
276 scan.addColumn(FAMILYNAME, QUALS[0]);
277 scan.addColumn(FAMILYNAME, QUALS[2]);
278 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
279 scanType, scanInfo, MAXMVCC);
280 verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
281 false);
282
283
284 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
285 LOG.info("Setting read point to " + readPoint);
286 storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
287 scanType, scanInfo, readPoint);
288 int expectedRowCount = 0;
289 int expectedKVCount = 0;
290 for (int i = startRowNum; i >= 0; i--) {
291 int kvCount = 0;
292 if (makeMVCC(i, 0) <= readPoint) {
293 kvCount++;
294 }
295 if (makeMVCC(i, 2) <= readPoint) {
296 kvCount++;
297 }
298 if (kvCount > 0) {
299 expectedRowCount++;
300 expectedKVCount += kvCount;
301 }
302 }
303 verifyCountAndOrder(storeScanner, expectedKVCount, expectedRowCount,
304 false);
305 }
306 }
307
308 @Test
309 public void testReversibleRegionScanner() throws IOException {
310 byte[] tableName = Bytes.toBytes("testtable");
311 byte[] FAMILYNAME2 = Bytes.toBytes("testCf2");
312 Configuration conf = HBaseConfiguration.create();
313 HRegion region = TEST_UTIL.createLocalHRegion(tableName, null, null,
314 "testReversibleRegionScanner", conf, false, Durability.SYNC_WAL, null,
315 FAMILYNAME, FAMILYNAME2);
316 loadDataToRegion(region, FAMILYNAME2);
317
318
319 Scan scan = new Scan();
320 InternalScanner scanner = region.getScanner(scan);
321 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, true);
322
323
324 scan.setReversed(true);
325 scanner = region.getScanner(scan);
326 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE * 2, ROWSIZE, false);
327
328
329 scan = new Scan();
330 scan.setReversed(true);
331 scan.addFamily(FAMILYNAME);
332 scanner = region.getScanner(scan);
333 verifyCountAndOrder(scanner, ROWSIZE * QUALSIZE, ROWSIZE, false);
334
335
336 byte[][] specifiedQualifiers = { QUALS[1], QUALS[2] };
337 for (byte[] specifiedQualifier : specifiedQualifiers)
338 scan.addColumn(FAMILYNAME, specifiedQualifier);
339 scanner = region.getScanner(scan);
340 verifyCountAndOrder(scanner, ROWSIZE * 2, ROWSIZE, false);
341
342
343 for (byte[] specifiedQualifier : specifiedQualifiers)
344 scan.addColumn(FAMILYNAME2, specifiedQualifier);
345 scanner = region.getScanner(scan);
346 verifyCountAndOrder(scanner, ROWSIZE * 2 * 2, ROWSIZE, false);
347
348
349 int startRowNum = ROWSIZE * 3 / 4;
350 scan.setStartRow(ROWS[startRowNum]);
351 scanner = region.getScanner(scan);
352 verifyCountAndOrder(scanner, (startRowNum + 1) * 2 * 2, (startRowNum + 1),
353 false);
354
355
356 int stopRowNum = ROWSIZE / 4;
357 scan.setStartRow(HConstants.EMPTY_BYTE_ARRAY);
358 scan.setStopRow(ROWS[stopRowNum]);
359 scanner = region.getScanner(scan);
360 verifyCountAndOrder(scanner, (ROWSIZE - stopRowNum - 1) * 2 * 2, (ROWSIZE
361 - stopRowNum - 1), false);
362
363
364 scan.setStartRow(ROWS[startRowNum]);
365 scanner = region.getScanner(scan);
366 verifyCountAndOrder(scanner, (startRowNum - stopRowNum) * 2 * 2,
367 (startRowNum - stopRowNum), false);
368
369
370 int valueNum = startRowNum % VALUESIZE;
371 Filter filter = new SingleColumnValueFilter(FAMILYNAME,
372 specifiedQualifiers[0], CompareOp.EQUAL, VALUES[valueNum]);
373 scan.setFilter(filter);
374 scanner = region.getScanner(scan);
375 int unfilteredRowNum = (startRowNum - stopRowNum) / VALUESIZE
376 + (stopRowNum / VALUESIZE == valueNum ? 0 : 1);
377 verifyCountAndOrder(scanner, unfilteredRowNum * 2 * 2, unfilteredRowNum,
378 false);
379
380
381 int pageSize = 10;
382 filter = new PageFilter(pageSize);
383 scan.setFilter(filter);
384 scanner = region.getScanner(scan);
385 int expectedRowNum = pageSize;
386 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
387
388
389 SingleColumnValueFilter scvFilter1 = new SingleColumnValueFilter(
390 FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[0]);
391 SingleColumnValueFilter scvFilter2 = new SingleColumnValueFilter(
392 FAMILYNAME, specifiedQualifiers[0], CompareOp.EQUAL, VALUES[1]);
393 expectedRowNum = 0;
394 for (int i = startRowNum; i > stopRowNum; i--) {
395 if (i % VALUESIZE == 0 || i % VALUESIZE == 1) {
396 expectedRowNum++;
397 }
398 }
399 filter = new FilterList(Operator.MUST_PASS_ONE, scvFilter1, scvFilter2);
400 scan.setFilter(filter);
401 scanner = region.getScanner(scan);
402 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
403
404
405 filter = new FilterList(Operator.MUST_PASS_ALL, scvFilter1, scvFilter2);
406 expectedRowNum = 0;
407 scan.setFilter(filter);
408 scanner = region.getScanner(scan);
409 verifyCountAndOrder(scanner, expectedRowNum * 2 * 2, expectedRowNum, false);
410 }
411
412 private StoreScanner getReversibleStoreScanner(MemStore memstore,
413 StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
414 ScanInfo scanInfo, int readPoint) throws IOException {
415 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
416 false, readPoint);
417 NavigableSet<byte[]> columns = null;
418 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
419 .entrySet()) {
420
421 columns = entry.getValue();
422 }
423 StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
424 scanType, columns, scanners);
425 return storeScanner;
426 }
427
428 private void verifyCountAndOrder(InternalScanner scanner,
429 int expectedKVCount, int expectedRowCount, boolean forward)
430 throws IOException {
431 List<Cell> kvList = new ArrayList<Cell>();
432 Result lastResult = null;
433 int rowCount = 0;
434 int kvCount = 0;
435 try {
436 while (scanner.next(kvList)) {
437 if (kvList.isEmpty()) continue;
438 rowCount++;
439 kvCount += kvList.size();
440 if (lastResult != null) {
441 Result curResult = Result.create(kvList);
442 assertEquals("LastResult:" + lastResult + "CurResult:" + curResult,
443 forward,
444 Bytes.compareTo(curResult.getRow(), lastResult.getRow()) > 0);
445 }
446 lastResult = Result.create(kvList);
447 kvList.clear();
448 }
449 } finally {
450 scanner.close();
451 }
452 if (!kvList.isEmpty()) {
453 rowCount++;
454 kvCount += kvList.size();
455 kvList.clear();
456 }
457 assertEquals(expectedKVCount, kvCount);
458 assertEquals(expectedRowCount, rowCount);
459 }
460
461 private void internalTestSeekAndNextForReversibleKeyValueHeap(
462 ReversedKeyValueHeap kvHeap, int startRowNum) throws IOException {
463
464 for (int i = startRowNum; i >= 0; i--) {
465 if (i % 2 == 1 && i - 2 >= 0) {
466 i = i - 2;
467 kvHeap.seekToPreviousRow(KeyValueUtil.createFirstOnRow(ROWS[i + 1]));
468 }
469 for (int j = 0; j < QUALSIZE; j++) {
470 if (j % 2 == 1 && (j + 1) < QUALSIZE) {
471 j = j + 1;
472 kvHeap.backwardSeek(makeKV(i, j));
473 }
474 assertEquals(makeKV(i, j), kvHeap.peek());
475 kvHeap.next();
476 }
477 }
478 assertEquals(null, kvHeap.peek());
479 }
480
481 private ReversedKeyValueHeap getReversibleKeyValueHeap(MemStore memstore,
482 StoreFile sf1, StoreFile sf2, byte[] startRow, int readPoint)
483 throws IOException {
484 List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, startRow,
485 true, readPoint);
486 ReversedKeyValueHeap kvHeap = new ReversedKeyValueHeap(scanners,
487 KeyValue.COMPARATOR);
488 return kvHeap;
489 }
490
491 private List<KeyValueScanner> getScanners(MemStore memstore, StoreFile sf1,
492 StoreFile sf2, byte[] startRow, boolean doSeek, int readPoint)
493 throws IOException {
494 List<StoreFileScanner> fileScanners = StoreFileScanner
495 .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
496 false, readPoint);
497 List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
498 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
499 fileScanners.size() + 1);
500 scanners.addAll(fileScanners);
501 scanners.addAll(memScanners);
502
503 if (doSeek) {
504 if (Bytes.equals(HConstants.EMPTY_START_ROW, startRow)) {
505 for (KeyValueScanner scanner : scanners) {
506 scanner.seekToLastRow();
507 }
508 } else {
509 KeyValue startKey = KeyValueUtil.createFirstOnRow(startRow);
510 for (KeyValueScanner scanner : scanners) {
511 scanner.backwardSeek(startKey);
512 }
513 }
514 }
515 return scanners;
516 }
517
518 private void seekTestOfReversibleKeyValueScanner(KeyValueScanner scanner)
519 throws IOException {
520
521
522
523
524 assertTrue(scanner.seekToLastRow());
525 assertEquals(makeKV(ROWSIZE - 1, 0), scanner.peek());
526
527
528
529 KeyValue seekKey = makeKV(ROWSIZE - 2, QUALSIZE - 2);
530 assertTrue(scanner.backwardSeek(seekKey));
531 assertEquals(seekKey, scanner.peek());
532
533
534 int seekRowNum = ROWSIZE - 2;
535 assertTrue(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[seekRowNum])));
536 KeyValue expectedKey = makeKV(seekRowNum - 1, 0);
537 assertEquals(expectedKey, scanner.peek());
538
539
540 assertFalse(scanner.backwardSeek(KeyValueUtil.createLastOnRow(ROWS[0])));
541 assertEquals(null, scanner.peek());
542
543
544 seekRowNum = ROWSIZE - 4;
545 assertTrue(scanner.seekToPreviousRow(KeyValueUtil
546 .createFirstOnRow(ROWS[seekRowNum])));
547 expectedKey = makeKV(seekRowNum - 1, 0);
548 assertEquals(expectedKey, scanner.peek());
549
550
551 assertFalse(scanner.seekToPreviousRow(makeKV(0, 0)));
552 assertEquals(null, scanner.peek());
553
554 }
555
556 private void seekTestOfReversibleKeyValueScannerWithMVCC(
557 KeyValueScanner scanner, int readPoint) throws IOException {
558
559
560
561
562 KeyValue expectedKey = getNextReadableKeyValueWithBackwardScan(
563 ROWSIZE - 1, 0, readPoint);
564 assertEquals(expectedKey != null, scanner.seekToLastRow());
565 assertEquals(expectedKey, scanner.peek());
566
567
568
569 expectedKey = getNextReadableKeyValueWithBackwardScan(ROWSIZE - 2,
570 QUALSIZE - 2, readPoint);
571 assertEquals(expectedKey != null, scanner.backwardSeek(expectedKey));
572 assertEquals(expectedKey, scanner.peek());
573
574
575 int seekRowNum = ROWSIZE - 3;
576 KeyValue seekKey = KeyValueUtil.createLastOnRow(ROWS[seekRowNum]);
577 expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
578 readPoint);
579 assertEquals(expectedKey != null, scanner.backwardSeek(seekKey));
580 assertEquals(expectedKey, scanner.peek());
581
582
583 seekRowNum = ROWSIZE - 4;
584 expectedKey = getNextReadableKeyValueWithBackwardScan(seekRowNum - 1, 0,
585 readPoint);
586 assertEquals(expectedKey != null, scanner.seekToPreviousRow(KeyValueUtil
587 .createFirstOnRow(ROWS[seekRowNum])));
588 assertEquals(expectedKey, scanner.peek());
589 }
590
591 private KeyValue getNextReadableKeyValueWithBackwardScan(int startRowNum,
592 int startQualNum, int readPoint) {
593 Pair<Integer, Integer> nextReadableNum = getNextReadableNumWithBackwardScan(
594 startRowNum, startQualNum, readPoint);
595 if (nextReadableNum == null)
596 return null;
597 return makeKV(nextReadableNum.getFirst(), nextReadableNum.getSecond());
598 }
599
600 private Pair<Integer, Integer> getNextReadableNumWithBackwardScan(
601 int startRowNum, int startQualNum, int readPoint) {
602 Pair<Integer, Integer> nextReadableNum = null;
603 boolean findExpected = false;
604 for (int i = startRowNum; i >= 0; i--) {
605 for (int j = (i == startRowNum ? startQualNum : 0); j < QUALSIZE; j++) {
606 if (makeMVCC(i, j) <= readPoint) {
607 nextReadableNum = new Pair<Integer, Integer>(i, j);
608 findExpected = true;
609 break;
610 }
611 }
612 if (findExpected)
613 break;
614 }
615 return nextReadableNum;
616 }
617
618 private static void loadDataToRegion(Region region, byte[] additionalFamily)
619 throws IOException {
620 for (int i = 0; i < ROWSIZE; i++) {
621 Put put = new Put(ROWS[i]);
622 for (int j = 0; j < QUALSIZE; j++) {
623 put.add(makeKV(i, j));
624
625 put.add(makeKV(i, j, additionalFamily));
626 }
627 region.put(put);
628 if (i == ROWSIZE / 3 || i == ROWSIZE * 2 / 3) {
629 region.flush(true);
630 }
631 }
632 }
633
634 private static void writeMemstoreAndStoreFiles(MemStore memstore,
635 final StoreFile.Writer[] writers) throws IOException {
636 try {
637 for (int i = 0; i < ROWSIZE; i++) {
638 for (int j = 0; j < QUALSIZE; j++) {
639 if (i % 2 == 0) {
640 memstore.add(makeKV(i, j));
641 } else {
642 writers[(i + j) % writers.length].append(makeKV(i, j));
643 }
644 }
645 }
646 } finally {
647 for (int i = 0; i < writers.length; i++) {
648 writers[i].close();
649 }
650 }
651 }
652
653 private static void writeStoreFile(final StoreFile.Writer writer)
654 throws IOException {
655 try {
656 for (int i = 0; i < ROWSIZE; i++) {
657 for (int j = 0; j < QUALSIZE; j++) {
658 writer.append(makeKV(i, j));
659 }
660 }
661 } finally {
662 writer.close();
663 }
664 }
665
666 private static void writeMemstore(MemStore memstore) throws IOException {
667
668 for (int i = 0; i < ROWSIZE; i++) {
669 for (int j = 0; j < QUALSIZE; j++) {
670 if ((i + j) % 2 == 0) {
671 memstore.add(makeKV(i, j));
672 }
673 }
674 }
675 memstore.snapshot();
676
677 for (int i = 0; i < ROWSIZE; i++) {
678 for (int j = 0; j < QUALSIZE; j++) {
679 if ((i + j) % 2 == 1) {
680 memstore.add(makeKV(i, j));
681 }
682 }
683 }
684 }
685
686 private static KeyValue makeKV(int rowNum, int cqNum) {
687 return makeKV(rowNum, cqNum, FAMILYNAME);
688 }
689
690 private static KeyValue makeKV(int rowNum, int cqNum, byte[] familyName) {
691 KeyValue kv = new KeyValue(ROWS[rowNum], familyName, QUALS[cqNum], TS,
692 VALUES[rowNum % VALUESIZE]);
693 kv.setSequenceId(makeMVCC(rowNum, cqNum));
694 return kv;
695 }
696
697 private static long makeMVCC(int rowNum, int cqNum) {
698 return (rowNum + cqNum) % (MAXMVCC + 1);
699 }
700
701 private static byte[][] makeN(byte[] base, int n) {
702 byte[][] ret = new byte[n][];
703 for (int i = 0; i < n; i++) {
704 ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
705 }
706 return ret;
707 }
708 }