View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.client;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertNotNull;
23  import static org.junit.Assert.assertTrue;
24  import static org.junit.Assert.fail;
25  import static org.mockito.Matchers.any;
26  import static org.mockito.Matchers.anyBoolean;
27  import static org.mockito.Matchers.anyInt;
28  import static org.mockito.Mockito.when;
29  
30  import java.io.IOException;
31  import java.util.Iterator;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellScanner;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.KeyValue.Type;
40  import org.apache.hadoop.hbase.RegionLocations;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43  import org.apache.hadoop.hbase.testclassification.SmallTests;
44  import org.junit.After;
45  import org.junit.Before;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  import org.mockito.InOrder;
49  import org.mockito.Mockito;
50  import org.mockito.invocation.InvocationOnMock;
51  import org.mockito.stubbing.Answer;
52  
53  /**
54   * Test the ClientScanner.
55   */
56  @Category(SmallTests.class)
57  public class TestClientScanner {
58  
59    Scan scan;
60    ExecutorService pool;
61    Configuration conf;
62  
63    ClusterConnection clusterConn;
64    RpcRetryingCallerFactory rpcFactory;
65    RpcControllerFactory controllerFactory;
66  
67    @Before
68    @SuppressWarnings("deprecation")
69    public void setup() throws IOException {
70      clusterConn = Mockito.mock(ClusterConnection.class);
71      rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
72      controllerFactory = Mockito.mock(RpcControllerFactory.class);
73      pool = Executors.newSingleThreadExecutor();
74      scan = new Scan();
75      conf = new Configuration();
76      Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
77    }
78  
79    @After
80    public void teardown() {
81      if (null != pool) {
82        pool.shutdownNow();
83      }
84    }
85  
86    private static class MockClientScanner extends ClientScanner {
87  
88      private boolean rpcFinished = false;
89      private boolean rpcFinishedFired = false;
90  
91      public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
92          ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
93          RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
94          throws IOException {
95        super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
96            primaryOperationTimeout);
97      }
98  
99      @Override
100     protected boolean nextScanner(int nbRows, final boolean done) throws IOException {
101       if (!rpcFinished) {
102         return super.nextScanner(nbRows, done);
103       }
104 
105       // Enforce that we don't short-circuit more than once
106       if (rpcFinishedFired) {
107         throw new RuntimeException("Expected nextScanner to only be called once after " +
108             " short-circuit was triggered.");
109       }
110       rpcFinishedFired = true;
111       return false;
112     }
113 
114     @Override
115     protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
116         int nbRows) {
117       scan.setStartRow(localStartKey);
118       ScannerCallable s =
119           new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
120               this.rpcControllerFactory);
121       s.setCaching(nbRows);
122       ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(getTable(), getConnection(),
123        s, pool, primaryOperationTimeout, scan,
124        getRetries(), scannerTimeout, caching, conf, caller);
125       return sr;
126     }
127 
128     public void setRpcFinished(boolean rpcFinished) {
129       this.rpcFinished = rpcFinished;
130     }
131   }
132 
133   @Test
134   @SuppressWarnings("unchecked")
135   public void testNoResultsHint() throws IOException {
136     final Result[] results = new Result[1];
137     KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
138         Type.Maximum);
139     results[0] = Result.create(new Cell[] {kv1});
140 
141     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
142 
143     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
144     Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
145       Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
146         private int count = 0;
147         @Override
148         public Result[] answer(InvocationOnMock invocation) throws Throwable {
149             ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
150                 ScannerCallableWithReplicas.class);
151           switch (count) {
152             case 0: // initialize
153             case 2: // close
154               count++;
155               return null;
156             case 1:
157               count++;
158               callable.setHasMoreResultsContext(false);
159               return results;
160             default:
161               throw new RuntimeException("Expected only 2 invocations");
162           }
163         }
164     });
165 
166     // Set a much larger cache and buffer size than we'll provide
167     scan.setCaching(100);
168     scan.setMaxResultSize(1000*1000);
169 
170     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
171         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
172 
173       scanner.setRpcFinished(true);
174 
175       InOrder inOrder = Mockito.inOrder(caller);
176 
177       scanner.loadCache();
178 
179       // One more call due to initializeScannerInConstruction()
180       inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
181           Mockito.any(RetryingCallable.class), Mockito.anyInt());
182 
183       assertEquals(1, scanner.cache.size());
184       Result r = scanner.cache.poll();
185       assertNotNull(r);
186       CellScanner cs = r.cellScanner();
187       assertTrue(cs.advance());
188       assertEquals(kv1, cs.current());
189       assertFalse(cs.advance());
190     }
191   }
192 
193   @Test
194   @SuppressWarnings("unchecked")
195   public void testSizeLimit() throws IOException {
196     final Result[] results = new Result[1];
197     KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
198         Type.Maximum);
199     results[0] = Result.create(new Cell[] {kv1});
200 
201     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
202 
203     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
204     Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
205       Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
206         private int count = 0;
207         @Override
208         public Result[] answer(InvocationOnMock invocation) throws Throwable {
209           ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
210               ScannerCallableWithReplicas.class);
211           switch (count) {
212             case 0: // initialize
213             case 2: // close
214               count++;
215               return null;
216             case 1:
217               count++;
218               callable.setHasMoreResultsContext(true);
219               callable.setServerHasMoreResults(false);
220               return results;
221             default:
222               throw new RuntimeException("Expected only 2 invocations");
223           }
224         }
225     });
226 
227     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
228 
229     // Set a much larger cache
230     scan.setCaching(100);
231     // The single key-value will exit the loop
232     scan.setMaxResultSize(1);
233 
234     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
235         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
236 
237       // Due to initializeScannerInConstruction()
238       Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
239           Mockito.anyInt());
240 
241       InOrder inOrder = Mockito.inOrder(caller);
242 
243       scanner.loadCache();
244 
245       inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
246           Mockito.any(RetryingCallable.class), Mockito.anyInt());
247 
248       assertEquals(1, scanner.cache.size());
249       Result r = scanner.cache.poll();
250       assertNotNull(r);
251       CellScanner cs = r.cellScanner();
252       assertTrue(cs.advance());
253       assertEquals(kv1, cs.current());
254       assertFalse(cs.advance());
255     }
256   }
257 
258   @Test
259   @SuppressWarnings("unchecked")
260   public void testCacheLimit() throws IOException {
261     KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
262         Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
263         Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
264         Type.Maximum);
265     final Result[] results = new Result[] {Result.create(new Cell[] {kv1}),
266         Result.create(new Cell[] {kv2}), Result.create(new Cell[] {kv3})};
267 
268     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
269 
270     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
271     Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
272       Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
273         private int count = 0;
274         @Override
275         public Result[] answer(InvocationOnMock invocation) throws Throwable {
276           ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
277               ScannerCallableWithReplicas.class);
278           switch (count) {
279             case 0: // initialize
280             case 2: // close
281               count++;
282               return null;
283             case 1:
284               count++;
285               callable.setHasMoreResultsContext(true);
286               callable.setServerHasMoreResults(false);
287               return results;
288             default:
289               throw new RuntimeException("Expected only 2 invocations");
290           }
291         }
292     });
293 
294     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
295 
296     // Set a small cache
297     scan.setCaching(1);
298     // Set a very large size
299     scan.setMaxResultSize(1000*1000);
300 
301     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
302         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
303 
304       // Due to initializeScannerInConstruction()
305       Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
306           Mockito.anyInt());
307 
308       InOrder inOrder = Mockito.inOrder(caller);
309 
310       scanner.loadCache();
311 
312       // Ensures that possiblyNextScanner isn't called at the end which would trigger
313       // another call to callWithoutRetries
314       inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
315           Mockito.any(RetryingCallable.class), Mockito.anyInt());
316 
317       assertEquals(3, scanner.cache.size());
318       Result r = scanner.cache.poll();
319       assertNotNull(r);
320       CellScanner cs = r.cellScanner();
321       assertTrue(cs.advance());
322       assertEquals(kv1, cs.current());
323       assertFalse(cs.advance());
324 
325       r = scanner.cache.poll();
326       assertNotNull(r);
327       cs = r.cellScanner();
328       assertTrue(cs.advance());
329       assertEquals(kv2, cs.current());
330       assertFalse(cs.advance());
331 
332       r = scanner.cache.poll();
333       assertNotNull(r);
334       cs = r.cellScanner();
335       assertTrue(cs.advance());
336       assertEquals(kv3, cs.current());
337       assertFalse(cs.advance());
338     }
339   }
340 
341   @Test
342   @SuppressWarnings("unchecked")
343   public void testNoMoreResults() throws IOException {
344     final Result[] results = new Result[1];
345     KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
346         Type.Maximum);
347     results[0] = Result.create(new Cell[] {kv1});
348 
349     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
350 
351     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
352     Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
353       Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
354         private int count = 0;
355         @Override
356         public Result[] answer(InvocationOnMock invocation) throws Throwable {
357           ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
358               ScannerCallableWithReplicas.class);
359           switch (count) {
360             case 0: // initialize
361             case 2: // close
362               count++;
363               return null;
364             case 1:
365               count++;
366               callable.setHasMoreResultsContext(true);
367               callable.setServerHasMoreResults(false);
368               return results;
369             default:
370               throw new RuntimeException("Expected only 2 invocations");
371           }
372         }
373     });
374 
375     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
376 
377     // Set a much larger cache and buffer size than we'll provide
378     scan.setCaching(100);
379     scan.setMaxResultSize(1000*1000);
380 
381     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
382         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
383 
384       // Due to initializeScannerInConstruction()
385       Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
386           Mockito.anyInt());
387 
388       scanner.setRpcFinished(true);
389 
390       InOrder inOrder = Mockito.inOrder(caller);
391 
392       scanner.loadCache();
393 
394       inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
395           Mockito.any(RetryingCallable.class), Mockito.anyInt());
396 
397       assertEquals(1, scanner.cache.size());
398       Result r = scanner.cache.poll();
399       assertNotNull(r);
400       CellScanner cs = r.cellScanner();
401       assertTrue(cs.advance());
402       assertEquals(kv1, cs.current());
403       assertFalse(cs.advance());
404     }
405   }
406 
407   @Test
408   @SuppressWarnings("unchecked")
409   public void testMoreResults() throws IOException {
410     final Result[] results1 = new Result[1];
411     KeyValue kv1 = new KeyValue("row".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
412         Type.Maximum);
413     results1[0] = Result.create(new Cell[] {kv1});
414 
415     final Result[] results2 = new Result[1];
416     KeyValue kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
417         Type.Maximum);
418     results2[0] = Result.create(new Cell[] {kv2});
419 
420 
421     RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
422 
423     Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
424     Mockito.when(caller.callWithoutRetries(Mockito.any(RetryingCallable.class),
425         Mockito.anyInt())).thenAnswer(new Answer<Result[]>() {
426           private int count = 0;
427           @Override
428           public Result[] answer(InvocationOnMock invocation) throws Throwable {
429             ScannerCallableWithReplicas callable = invocation.getArgumentAt(0,
430                 ScannerCallableWithReplicas.class);
431             switch (count) {
432               case 0: // initialize
433               case 3: // close
434                 count++;
435                 return null;
436               case 1:
437                 count++;
438                 callable.setHasMoreResultsContext(true);
439                 callable.setServerHasMoreResults(true);
440                 return results1;
441               case 2:
442                 count++;
443                 // The server reports back false WRT more results
444                 callable.setHasMoreResultsContext(true);
445                 callable.setServerHasMoreResults(false);
446                 return results2;
447               default:
448                 throw new RuntimeException("Expected only 2 invocations");
449             }
450           }
451       });
452 
453     // Set a much larger cache and buffer size than we'll provide
454     scan.setCaching(100);
455     scan.setMaxResultSize(1000*1000);
456 
457     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
458         clusterConn, rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
459 
460       // Due to initializeScannerInConstruction()
461       Mockito.verify(caller).callWithoutRetries(Mockito.any(RetryingCallable.class),
462           Mockito.anyInt());
463 
464       InOrder inOrder = Mockito.inOrder(caller);
465 
466       scanner.loadCache();
467 
468       inOrder.verify(caller, Mockito.times(2)).callWithoutRetries(
469           Mockito.any(RetryingCallable.class), Mockito.anyInt());
470 
471       assertEquals(1, scanner.cache.size());
472       Result r = scanner.cache.poll();
473       assertNotNull(r);
474       CellScanner cs = r.cellScanner();
475       assertTrue(cs.advance());
476       assertEquals(kv1, cs.current());
477       assertFalse(cs.advance());
478 
479       scanner.setRpcFinished(true);
480 
481       inOrder = Mockito.inOrder(caller);
482 
483       scanner.loadCache();
484 
485       inOrder.verify(caller, Mockito.times(3)).callWithoutRetries(
486           Mockito.any(RetryingCallable.class), Mockito.anyInt());
487 
488       r = scanner.cache.poll();
489       assertNotNull(r);
490       cs = r.cellScanner();
491       assertTrue(cs.advance());
492       assertEquals(kv2, cs.current());
493       assertFalse(cs.advance());
494     }
495   }
496 
497   /**
498    * Tests the case where all replicas of a region throw an exception. It should not cause a hang
499    * but the exception should propagate to the client
500    */
501   @Test (timeout = 30000)
502   public void testExceptionsFromReplicasArePropagated() throws IOException {
503     scan.setConsistency(Consistency.TIMELINE);
504 
505     // Mock a caller which calls the callable for ScannerCallableWithReplicas,
506     // but throws an exception for the actual scanner calls via callWithRetries.
507     rpcFactory = new MockRpcRetryingCallerFactory(conf);
508     conf.set(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
509       MockRpcRetryingCallerFactory.class.getName());
510 
511     // mock 3 replica locations
512     when(clusterConn.locateRegion((TableName)any(), (byte[])any(), anyBoolean(),
513       anyBoolean(), anyInt())).thenReturn(new RegionLocations(null, null, null));
514 
515     try (MockClientScanner scanner = new MockClientScanner(conf, scan, TableName.valueOf("table"),
516       clusterConn, rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
517       Iterator<Result> iter = scanner.iterator();
518       while (iter.hasNext()) {
519         iter.next();
520       }
521       fail("Should have failed with RetriesExhaustedException");
522     } catch (RetriesExhaustedException expected) {
523 
524     }
525   }
526 
527   public static class MockRpcRetryingCallerFactory extends RpcRetryingCallerFactory {
528 
529     public MockRpcRetryingCallerFactory(Configuration conf) {
530       super(conf);
531     }
532 
533     @Override
534     public <T> RpcRetryingCaller<T> newCaller() {
535       return new RpcRetryingCaller<T>(0, 0, 0) {
536         @Override
537         public void cancel() {
538         }
539         @Override
540         public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
541             throws IOException, RuntimeException {
542           throw new IOException("Scanner exception");
543         }
544 
545         @Override
546         public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
547             throws IOException, RuntimeException {
548           try {
549             return callable.call(callTimeout);
550           } catch (IOException e) {
551             throw e;
552           } catch (Exception e) {
553             throw new RuntimeException(e);
554           }
555         }
556       };
557     }
558 
559   }
560 
561 }