1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.nio.charset.StandardCharsets;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.KeyValue.Type;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory;
38 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
39 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
40 import org.apache.hadoop.hbase.testclassification.SmallTests;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45 import org.mockito.Mockito;
46 import org.mockito.invocation.InvocationOnMock;
47 import org.mockito.stubbing.Answer;
48
49
50
51
52 @Category(SmallTests.class)
53 public class TestClientSmallReversedScanner {
54
55 Scan scan;
56 ExecutorService pool;
57 Configuration conf;
58
59 ClusterConnection clusterConn;
60 RpcRetryingCallerFactory rpcFactory;
61 RpcControllerFactory controllerFactory;
62 RpcRetryingCaller<Result[]> caller;
63
64 @Before
65 @SuppressWarnings({"deprecation", "unchecked"})
66 public void setup() throws IOException {
67 clusterConn = Mockito.mock(ClusterConnection.class);
68 rpcFactory = Mockito.mock(RpcRetryingCallerFactory.class);
69 controllerFactory = Mockito.mock(RpcControllerFactory.class);
70 pool = Executors.newSingleThreadExecutor();
71 scan = new Scan();
72 conf = new Configuration();
73 Mockito.when(clusterConn.getConfiguration()).thenReturn(conf);
74
75 caller = Mockito.mock(RpcRetryingCaller.class);
76
77 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
78 }
79
80 @After
81 public void teardown() {
82 if (null != pool) {
83 pool.shutdownNow();
84 }
85 }
86
87
88
89
90 private Answer<Boolean> createTrueThenFalseAnswer() {
91 return new Answer<Boolean>() {
92 boolean first = true;
93
94 @Override
95 public Boolean answer(InvocationOnMock invocation) {
96 if (first) {
97 first = false;
98 return true;
99 }
100 return false;
101 }
102 };
103 }
104
105 private SmallReversedScannerCallableFactory getFactory(
106 final ScannerCallableWithReplicas callableWithReplicas) {
107 return new SmallReversedScannerCallableFactory() {
108 @Override
109 public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
110 Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
111 RpcControllerFactory controllerFactory, ExecutorService pool,
112 int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
113 RpcRetryingCaller<Result[]> caller, boolean isFirstRegionToLocate) {
114 return callableWithReplicas;
115 }
116 };
117 }
118
119 @Test
120 public void testContextPresent() throws Exception {
121 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
122 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
123 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
124 Type.Maximum);
125
126 ScannerCallableWithReplicas callableWithReplicas = Mockito
127 .mock(ScannerCallableWithReplicas.class);
128
129
130 @SuppressWarnings("unchecked")
131 RpcRetryingCaller<Result[]> caller = Mockito.mock(RpcRetryingCaller.class);
132
133 Mockito.when(rpcFactory.<Result[]> newCaller()).thenReturn(caller);
134
135
136
137
138 SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
139
140 try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
141 TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
142 Integer.MAX_VALUE)) {
143
144 csrs.setScannerCallableFactory(factory);
145
146
147 Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
148 .thenAnswer(new Answer<Result[]>() {
149 int count = 0;
150
151 @Override
152 public Result[] answer(InvocationOnMock invocation) {
153 Result[] results;
154 if (0 == count) {
155 results = new Result[] {Result.create(new Cell[] {kv3}),
156 Result.create(new Cell[] {kv2})};
157 } else if (1 == count) {
158 results = new Result[] {Result.create(new Cell[] {kv1})};
159 } else {
160 results = new Result[0];
161 }
162 count++;
163 return results;
164 }
165 });
166
167
168 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
169
170 Mockito.when(callableWithReplicas.getServerHasMoreResults()).thenAnswer(
171 createTrueThenFalseAnswer());
172
173
174 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
175 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
176
177 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
178
179 csrs.loadCache();
180
181 List<Result> results = csrs.cache;
182 Iterator<Result> iter = results.iterator();
183 assertEquals(3, results.size());
184 for (int i = 3; i >= 1 && iter.hasNext(); i--) {
185 Result result = iter.next();
186 byte[] row = result.getRow();
187 assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
188 assertEquals(1, result.getMap().size());
189 }
190 assertTrue(csrs.closed);
191 }
192 }
193
194 @Test
195 public void testNoContextFewerRecords() throws Exception {
196 final KeyValue kv1 = new KeyValue("row1".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
197 Type.Maximum), kv2 = new KeyValue("row2".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
198 Type.Maximum), kv3 = new KeyValue("row3".getBytes(), "cf".getBytes(), "cq".getBytes(), 1,
199 Type.Maximum);
200
201 ScannerCallableWithReplicas callableWithReplicas = Mockito
202 .mock(ScannerCallableWithReplicas.class);
203
204
205 scan.setCaching(2);
206
207 SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
208
209 try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
210 TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
211 Integer.MAX_VALUE)) {
212
213 csrs.setScannerCallableFactory(factory);
214
215
216 Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
217 .thenAnswer(new Answer<Result[]>() {
218 int count = 0;
219
220 @Override
221 public Result[] answer(InvocationOnMock invocation) {
222 Result[] results;
223 if (0 == count) {
224 results = new Result[] {Result.create(new Cell[] {kv3}),
225 Result.create(new Cell[] {kv2})};
226 } else if (1 == count) {
227
228 results = new Result[] {Result.create(new Cell[] {kv1})};
229 } else {
230 throw new RuntimeException("Should not fetch a third batch from the server");
231 }
232 count++;
233 return results;
234 }
235 });
236
237
238 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
239
240 Mockito.when(callableWithReplicas.getServerHasMoreResults())
241 .thenThrow(new RuntimeException("Should not be called"));
242
243
244 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
245 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
246
247 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
248
249 csrs.loadCache();
250
251 List<Result> results = csrs.cache;
252 Iterator<Result> iter = results.iterator();
253 assertEquals(2, results.size());
254 for (int i = 3; i >= 2 && iter.hasNext(); i--) {
255 Result result = iter.next();
256 byte[] row = result.getRow();
257 assertEquals("row" + i, new String(row, StandardCharsets.UTF_8));
258 assertEquals(1, result.getMap().size());
259 }
260
261
262 results.clear();
263
264 csrs.loadCache();
265
266 assertEquals(1, results.size());
267 Result result = results.get(0);
268 assertEquals("row1", new String(result.getRow(), StandardCharsets.UTF_8));
269 assertEquals(1, result.getMap().size());
270
271 assertTrue(csrs.closed);
272 }
273 }
274
275 @Test
276 public void testNoContextNoRecords() throws Exception {
277 ScannerCallableWithReplicas callableWithReplicas = Mockito
278 .mock(ScannerCallableWithReplicas.class);
279
280
281 scan.setCaching(2);
282
283 SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
284
285 try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
286 TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
287 Integer.MAX_VALUE)) {
288
289 csrs.setScannerCallableFactory(factory);
290
291
292 Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
293 .thenReturn(new Result[0]);
294
295
296 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(false);
297
298 Mockito.when(callableWithReplicas.getServerHasMoreResults())
299 .thenThrow(new RuntimeException("Should not be called"));
300
301
302 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
303 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
304
305 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
306
307 csrs.loadCache();
308
309 assertEquals(0, csrs.cache.size());
310 assertTrue(csrs.closed);
311 }
312 }
313
314 @Test
315 public void testContextNoRecords() throws Exception {
316 ScannerCallableWithReplicas callableWithReplicas = Mockito
317 .mock(ScannerCallableWithReplicas.class);
318
319 SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
320
321 try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
322 TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
323 Integer.MAX_VALUE)) {
324
325 csrs.setScannerCallableFactory(factory);
326
327
328 Mockito.when(caller.callWithoutRetries(callableWithReplicas, csrs.getScannerTimeout()))
329 .thenReturn(new Result[0]);
330
331
332 Mockito.when(callableWithReplicas.hasMoreResultsContext()).thenReturn(true);
333
334 Mockito.when(callableWithReplicas.getServerHasMoreResults())
335 .thenReturn(false);
336
337
338 HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);
339 Mockito.when(callableWithReplicas.getHRegionInfo()).thenReturn(regionInfo);
340
341 Mockito.when(regionInfo.getEndKey()).thenReturn(HConstants.EMPTY_BYTE_ARRAY);
342
343 csrs.loadCache();
344
345 assertEquals(0, csrs.cache.size());
346 assertTrue(csrs.closed);
347 }
348 }
349 }