View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23  import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
24  import static org.junit.Assert.assertEquals;
25  import static org.junit.Assert.assertFalse;
26  import static org.junit.Assert.assertNotNull;
27  import static org.junit.Assert.assertTrue;
28  import static org.junit.Assert.fail;
29  
30  import java.io.IOException;
31  import java.util.ArrayList;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseTestCase;
39  import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon;
40  import org.apache.hadoop.hbase.HBaseTestCase.ScannerIncommon;
41  import org.apache.hadoop.hbase.HBaseTestingUtility;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.UnknownScannerException;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Get;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Result;
52  import org.apache.hadoop.hbase.client.Scan;
53  import org.apache.hadoop.hbase.filter.Filter;
54  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
55  import org.apache.hadoop.hbase.filter.PrefixFilter;
56  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
57  import org.apache.hadoop.hbase.testclassification.SmallTests;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.junit.Rule;
60  import org.junit.Test;
61  import org.junit.experimental.categories.Category;
62  import org.junit.rules.TestName;
63  
64  
65  /**
66   * Test of a long-lived scanner validating as we go.
67   */
68  @Category(SmallTests.class)
69  public class TestScanner {
70    @Rule public TestName name = new TestName();
71    private final Log LOG = LogFactory.getLog(this.getClass());
72    private final static HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
73  
74    private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
75    private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
76    private static final byte [][] EXPLICIT_COLS = {
77      HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
78        // TODO ryan
79        //HConstants.STARTCODE_QUALIFIER
80    };
81  
82    static final HTableDescriptor TESTTABLEDESC =
83      new HTableDescriptor(TableName.valueOf("testscanner"));
84    static {
85      TESTTABLEDESC.addFamily(
86          new HColumnDescriptor(HConstants.CATALOG_FAMILY)
87              // Ten is an arbitrary number.  Keep versions to help debugging.
88              .setMaxVersions(10)
89              .setBlockCacheEnabled(false)
90              .setBlocksize(8 * 1024)
91      );
92    }
93    /** HRegionInfo for root region */
94    public static final HRegionInfo REGION_INFO =
95      new HRegionInfo(TESTTABLEDESC.getTableName(), HConstants.EMPTY_BYTE_ARRAY,
96      HConstants.EMPTY_BYTE_ARRAY);
97  
98    private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
99  
100   private static final long START_CODE = Long.MAX_VALUE;
101 
102   private HRegion r;
103   private HRegionIncommon region;
104 
105   private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
106   final private byte[] col1, col2;
107 
108   public TestScanner() {
109     super();
110 
111     firstRowBytes = START_KEY_BYTES;
112     secondRowBytes = START_KEY_BYTES.clone();
113     // Increment the least significant character so we get to next row.
114     secondRowBytes[START_KEY_BYTES.length - 1]++;
115     thirdRowBytes = START_KEY_BYTES.clone();
116     thirdRowBytes[START_KEY_BYTES.length - 1] += 2;
117     col1 = Bytes.toBytes("column1");
118     col2 = Bytes.toBytes("column2");
119   }
120 
121   /**
122    * Test basic stop row filter works.
123    * @throws Exception
124    */
125   @Test
126   public void testStopRow() throws Exception {
127     byte [] startrow = Bytes.toBytes("bbb");
128     byte [] stoprow = Bytes.toBytes("ccc");
129     try {
130       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
131       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
132       List<Cell> results = new ArrayList<Cell>();
133       // Do simple test of getting one row only first.
134       Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
135       scan.addFamily(HConstants.CATALOG_FAMILY);
136 
137       InternalScanner s = r.getScanner(scan);
138       int count = 0;
139       while (s.next(results)) {
140         count++;
141       }
142       s.close();
143       assertEquals(0, count);
144       // Now do something a bit more imvolved.
145       scan = new Scan(startrow, stoprow);
146       scan.addFamily(HConstants.CATALOG_FAMILY);
147 
148       s = r.getScanner(scan);
149       count = 0;
150       Cell kv = null;
151       results = new ArrayList<Cell>();
152       for (boolean first = true; s.next(results);) {
153         kv = results.get(0);
154         if (first) {
155           assertTrue(CellUtil.matchingRow(kv,  startrow));
156           first = false;
157         }
158         count++;
159       }
160       assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, CellUtil.cloneRow(kv)) > 0);
161       // We got something back.
162       assertTrue(count > 10);
163       s.close();
164     } finally {
165       HRegion.closeHRegion(this.r);
166     }
167   }
168 
169   void rowPrefixFilter(Scan scan) throws IOException {
170     List<Cell> results = new ArrayList<Cell>();
171     scan.addFamily(HConstants.CATALOG_FAMILY);
172     InternalScanner s = r.getScanner(scan);
173     boolean hasMore = true;
174     while (hasMore) {
175       hasMore = s.next(results);
176       for (Cell kv : results) {
177         assertEquals((byte)'a', CellUtil.cloneRow(kv)[0]);
178         assertEquals((byte)'b', CellUtil.cloneRow(kv)[1]);
179       }
180       results.clear();
181     }
182     s.close();
183   }
184 
185   void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
186     List<Cell> results = new ArrayList<Cell>();
187     scan.addFamily(HConstants.CATALOG_FAMILY);
188     InternalScanner s = r.getScanner(scan);
189     boolean hasMore = true;
190     while (hasMore) {
191       hasMore = s.next(results);
192       for (Cell kv : results) {
193         assertTrue(Bytes.compareTo(CellUtil.cloneRow(kv), stopRow) <= 0);
194       }
195       results.clear();
196     }
197     s.close();
198   }
199 
200   @Test
201   public void testFilters() throws IOException {
202     try {
203       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
204       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
205       byte [] prefix = Bytes.toBytes("ab");
206       Filter newFilter = new PrefixFilter(prefix);
207       Scan scan = new Scan();
208       scan.setFilter(newFilter);
209       rowPrefixFilter(scan);
210 
211       byte[] stopRow = Bytes.toBytes("bbc");
212       newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
213       scan = new Scan();
214       scan.setFilter(newFilter);
215       rowInclusiveStopFilter(scan, stopRow);
216 
217     } finally {
218       HRegion.closeHRegion(this.r);
219     }
220   }
221 
222   /**
223    * Test that closing a scanner while a client is using it doesn't throw
224    * NPEs but instead a UnknownScannerException. HBASE-2503
225    * @throws Exception
226    */
227   @Test
228   public void testRaceBetweenClientAndTimeout() throws Exception {
229     try {
230       this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
231       HBaseTestCase.addContent(this.r, HConstants.CATALOG_FAMILY);
232       Scan scan = new Scan();
233       InternalScanner s = r.getScanner(scan);
234       List<Cell> results = new ArrayList<Cell>();
235       try {
236         s.next(results);
237         s.close();
238         s.next(results);
239         fail("We don't want anything more, we should be failing");
240       } catch (UnknownScannerException ex) {
241         // ok!
242         return;
243       }
244     } finally {
245       HRegion.closeHRegion(this.r);
246     }
247   }
248 
249   /** The test!
250    * @throws IOException
251    */
252   @Test
253   public void testScanner() throws IOException {
254     try {
255       r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
256       region = new HRegionIncommon(r);
257 
258       // Write information to the meta table
259 
260       Put put = new Put(ROW_KEY, System.currentTimeMillis());
261 
262       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
263           REGION_INFO.toByteArray());
264       region.put(put);
265 
266       // What we just committed is in the memstore. Verify that we can get
267       // it back both with scanning and get
268 
269       scan(false, null);
270       getRegionInfo();
271 
272       // Close and re-open
273 
274       ((HRegion)r).close();
275       r = HRegion.openHRegion(r, null);
276       region = new HRegionIncommon(r);
277 
278       // Verify we can get the data back now that it is on disk.
279 
280       scan(false, null);
281       getRegionInfo();
282 
283       // Store some new information
284 
285       String address = HConstants.LOCALHOST_IP + ":" + HBaseTestingUtility.randomFreePort();
286 
287       put = new Put(ROW_KEY, System.currentTimeMillis());
288       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
289           Bytes.toBytes(address));
290 
291 //      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
292 
293       region.put(put);
294 
295       // Validate that we can still get the HRegionInfo, even though it is in
296       // an older row on disk and there is a newer row in the memstore
297 
298       scan(true, address.toString());
299       getRegionInfo();
300 
301       // flush cache
302 
303       region.flushcache();
304 
305       // Validate again
306 
307       scan(true, address.toString());
308       getRegionInfo();
309 
310       // Close and reopen
311 
312       ((HRegion)r).close();
313       r = HRegion.openHRegion(r,null);
314       region = new HRegionIncommon(r);
315 
316       // Validate again
317 
318       scan(true, address.toString());
319       getRegionInfo();
320 
321       // Now update the information again
322 
323       address = "bar.foo.com:4321";
324 
325       put = new Put(ROW_KEY, System.currentTimeMillis());
326 
327       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
328           Bytes.toBytes(address));
329       region.put(put);
330 
331       // Validate again
332 
333       scan(true, address.toString());
334       getRegionInfo();
335 
336       // flush cache
337 
338       region.flushcache();
339 
340       // Validate again
341 
342       scan(true, address.toString());
343       getRegionInfo();
344 
345       // Close and reopen
346 
347       ((HRegion)r).close();
348       r = HRegion.openHRegion(r,null);
349       region = new HRegionIncommon(r);
350 
351       // Validate again
352 
353       scan(true, address.toString());
354       getRegionInfo();
355 
356     } finally {
357       // clean up
358       HRegion.closeHRegion(r);
359     }
360   }
361 
362   /** Compare the HRegionInfo we read from HBase to what we stored */
363   private void validateRegionInfo(byte [] regionBytes) throws IOException {
364     HRegionInfo info = HRegionInfo.parseFromOrNull(regionBytes);
365 
366     assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
367     assertEquals(0, info.getStartKey().length);
368     assertEquals(0, info.getEndKey().length);
369     assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
370     //assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
371   }
372 
373   /** Use a scanner to get the region info and then validate the results */
374   private void scan(boolean validateStartcode, String serverName)
375   throws IOException {
376     InternalScanner scanner = null;
377     Scan scan = null;
378     List<Cell> results = new ArrayList<Cell>();
379     byte [][][] scanColumns = {
380         COLS,
381         EXPLICIT_COLS
382     };
383 
384     for(int i = 0; i < scanColumns.length; i++) {
385       try {
386         scan = new Scan(FIRST_ROW);
387         for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
388           scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
389         }
390         scanner = r.getScanner(scan);
391         while (scanner.next(results)) {
392           assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
393               HConstants.REGIONINFO_QUALIFIER));
394           byte [] val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
395               HConstants.REGIONINFO_QUALIFIER));
396           validateRegionInfo(val);
397           if(validateStartcode) {
398 //            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
399 //                HConstants.STARTCODE_QUALIFIER));
400 //            val = getColumn(results, HConstants.CATALOG_FAMILY,
401 //                HConstants.STARTCODE_QUALIFIER).getValue();
402             assertNotNull(val);
403             assertFalse(val.length == 0);
404             long startCode = Bytes.toLong(val);
405             assertEquals(START_CODE, startCode);
406           }
407 
408           if(serverName != null) {
409             assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
410                 HConstants.SERVER_QUALIFIER));
411             val = CellUtil.cloneValue(getColumn(results, HConstants.CATALOG_FAMILY,
412                 HConstants.SERVER_QUALIFIER));
413             assertNotNull(val);
414             assertFalse(val.length == 0);
415             String server = Bytes.toString(val);
416             assertEquals(0, server.compareTo(serverName));
417           }
418         }
419       } finally {
420         InternalScanner s = scanner;
421         scanner = null;
422         if(s != null) {
423           s.close();
424         }
425       }
426     }
427   }
428 
429   private boolean hasColumn(final List<Cell> kvs, final byte [] family,
430       final byte [] qualifier) {
431     for (Cell kv: kvs) {
432       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
433         return true;
434       }
435     }
436     return false;
437   }
438 
439   private Cell getColumn(final List<Cell> kvs, final byte [] family,
440       final byte [] qualifier) {
441     for (Cell kv: kvs) {
442       if (CellUtil.matchingFamily(kv, family) && CellUtil.matchingQualifier(kv, qualifier)) {
443         return kv;
444       }
445     }
446     return null;
447   }
448 
449 
450   /** Use get to retrieve the HRegionInfo and validate it */
451   private void getRegionInfo() throws IOException {
452     Get get = new Get(ROW_KEY);
453     get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
454     Result result = region.get(get);
455     byte [] bytes = result.value();
456     validateRegionInfo(bytes);
457   }
458 
459   /**
460    * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
461    * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
462    * HBase-910.
463    * @throws Exception
464    */
465   @Test
466   public void testScanAndSyncFlush() throws Exception {
467     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
468     HRegionIncommon hri = new HRegionIncommon(r);
469     try {
470         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
471             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
472       int count = count(hri, -1, false);
473       assertEquals(count, count(hri, 100, false)); // do a sync flush.
474     } catch (Exception e) {
475       LOG.error("Failed", e);
476       throw e;
477     } finally {
478       HRegion.closeHRegion(this.r);
479     }
480   }
481 
482   /**
483    * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
484    * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
485    *
486    * @throws Exception
487    */
488   @Test
489   public void testScanAndRealConcurrentFlush() throws Exception {
490     this.r = TEST_UTIL.createLocalHRegion(TESTTABLEDESC, null, null);
491     HRegionIncommon hri = new HRegionIncommon(r);
492     try {
493         LOG.info("Added: " + HBaseTestCase.addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
494             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
495       int count = count(hri, -1, false);
496       assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
497     } catch (Exception e) {
498       LOG.error("Failed", e);
499       throw e;
500     } finally {
501       HRegion.closeHRegion(this.r);
502     }
503   }
504 
505   /**
506    * Make sure scanner returns correct result when we run a major compaction
507    * with deletes.
508    *
509    * @throws Exception
510    */
511   @Test
512   @SuppressWarnings("deprecation")
513   public void testScanAndConcurrentMajorCompact() throws Exception {
514     HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name.getMethodName());
515     this.r = TEST_UTIL.createLocalHRegion(htd, null, null);
516     HRegionIncommon hri = new HRegionIncommon(r);
517 
518     try {
519       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
520           firstRowBytes, secondRowBytes);
521       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
522           firstRowBytes, secondRowBytes);
523 
524       Delete dc = new Delete(firstRowBytes);
525       /* delete column1 of firstRow */
526       dc.deleteColumns(fam1, col1);
527       r.delete(dc);
528       r.flush(true);
529 
530       HBaseTestCase.addContent(hri, Bytes.toString(fam1), Bytes.toString(col1),
531           secondRowBytes, thirdRowBytes);
532       HBaseTestCase.addContent(hri, Bytes.toString(fam2), Bytes.toString(col1),
533           secondRowBytes, thirdRowBytes);
534       r.flush(true);
535 
536       InternalScanner s = r.getScanner(new Scan());
537       // run a major compact, column1 of firstRow will be cleaned.
538       r.compact(true);
539 
540       List<Cell> results = new ArrayList<Cell>();
541       s.next(results);
542 
543       // make sure returns column2 of firstRow
544       assertTrue("result is not correct, keyValues : " + results,
545           results.size() == 1);
546       assertTrue(CellUtil.matchingRow(results.get(0), firstRowBytes)); 
547       assertTrue(CellUtil.matchingFamily(results.get(0), fam2));
548 
549       results = new ArrayList<Cell>();
550       s.next(results);
551 
552       // get secondRow
553       assertTrue(results.size() == 2);
554       assertTrue(CellUtil.matchingRow(results.get(0), secondRowBytes));
555       assertTrue(CellUtil.matchingFamily(results.get(0), fam1));
556       assertTrue(CellUtil.matchingFamily(results.get(1), fam2));
557     } finally {
558       HRegion.closeHRegion(this.r);
559     }
560   }
561 
562 
563   /*
564    * @param hri Region
565    * @param flushIndex At what row we start the flush.
566    * @param concurrent if the flush should be concurrent or sync.
567    * @return Count of rows found.
568    * @throws IOException
569    */
570   private int count(final HRegionIncommon hri, final int flushIndex,
571                     boolean concurrent)
572   throws IOException {
573     LOG.info("Taking out counting scan");
574     ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
575         HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
576     List<Cell> values = new ArrayList<Cell>();
577     int count = 0;
578     boolean justFlushed = false;
579     while (s.next(values)) {
580       if (justFlushed) {
581         LOG.info("after next() just after next flush");
582         justFlushed=false;
583       }
584       count++;
585       if (flushIndex == count) {
586         LOG.info("Starting flush at flush index " + flushIndex);
587         Thread t = new Thread() {
588           public void run() {
589             try {
590               hri.flushcache();
591               LOG.info("Finishing flush");
592             } catch (IOException e) {
593               LOG.info("Failed flush cache");
594             }
595           }
596         };
597         if (concurrent) {
598           t.start(); // concurrently flush.
599         } else {
600           t.run(); // sync flush
601         }
602         LOG.info("Continuing on after kicking off background flush");
603         justFlushed = true;
604       }
605     }
606     s.close();
607     LOG.info("Found " + count + " items");
608     return count;
609   }
610 
611 }