1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22
23 import com.google.protobuf.ServiceException;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellUtil;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
33 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.RequestConverter;
37 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39 import org.apache.hadoop.hbase.util.Bytes;
40
41 import com.google.common.annotations.VisibleForTesting;
42
43 import java.io.IOException;
44 import java.io.InterruptedIOException;
45 import java.util.concurrent.ExecutorService;
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 public class ClientSmallReversedScanner extends ReversedClientScanner {
56 private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
57 private ScannerCallableWithReplicas smallReversedScanCallable = null;
58 private SmallReversedScannerCallableFactory callableFactory;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
84 final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
85 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
86 throws IOException {
87 this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
88 primaryOperationTimeout, new SmallReversedScannerCallableFactory());
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 @VisibleForTesting
117 ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
118 ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
119 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
120 SmallReversedScannerCallableFactory callableFactory) throws IOException {
121 super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
122 primaryOperationTimeout);
123 this.callableFactory = callableFactory;
124 }
125
126
127
128
129
130
131
132
133
134
135
136
137
138 private boolean nextScanner(int nbRows, final boolean done,
139 boolean currentRegionDone) throws IOException {
140
141 byte[] localStartKey;
142 int cacheNum = nbRows;
143 boolean regionChanged = true;
144 boolean isFirstRegionToLocate = false;
145
146 if (this.currentRegion != null && currentRegionDone) {
147 byte[] startKey = this.currentRegion.getStartKey();
148 if (startKey == null
149 || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
150 || checkScanStopRow(startKey) || done) {
151 close();
152 if (LOG.isDebugEnabled()) {
153 LOG.debug("Finished with small scan at " + this.currentRegion);
154 }
155 return false;
156 }
157
158 localStartKey = createClosestRowBefore(startKey);
159 if (LOG.isDebugEnabled()) {
160 LOG.debug("Finished with region " + this.currentRegion);
161 }
162 } else if (this.lastResult != null) {
163 regionChanged = false;
164 localStartKey = createClosestRowBefore(lastResult.getRow());
165 } else {
166 localStartKey = this.scan.getStartRow();
167 isFirstRegionToLocate = true;
168 }
169
170 if (!isFirstRegionToLocate
171 && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
172
173
174 return false;
175 }
176
177 if (LOG.isTraceEnabled()) {
178 LOG.trace("Advancing internal small scanner to startKey at '"
179 + Bytes.toStringBinary(localStartKey) + "'");
180 }
181
182 smallReversedScanCallable =
183 callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
184 localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
185 getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
186
187 if (this.scanMetrics != null && regionChanged) {
188 this.scanMetrics.countOfRegions.incrementAndGet();
189 }
190 return true;
191 }
192
193 @Override
194 public Result next() throws IOException {
195
196
197 if (cache.size() == 0 && this.closed) {
198 return null;
199 }
200 if (cache.size() == 0) {
201 loadCache();
202 }
203
204 if (cache.size() > 0) {
205 return cache.poll();
206 }
207
208
209 writeScanMetrics();
210 return null;
211 }
212
213 @Override
214 protected void loadCache() throws IOException {
215 Result[] values = null;
216 long remainingResultSize = maxScannerResultSize;
217 int countdown = this.caching;
218 boolean currentRegionDone = false;
219
220 while (remainingResultSize > 0 && countdown > 0
221 && nextScanner(countdown, values == null, currentRegionDone)) {
222
223
224
225
226
227 values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
228 this.currentRegion = smallReversedScanCallable.getHRegionInfo();
229 long currentTime = System.currentTimeMillis();
230 if (this.scanMetrics != null) {
231 this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
232 - lastNext);
233 }
234 lastNext = currentTime;
235 if (values != null && values.length > 0) {
236 for (int i = 0; i < values.length; i++) {
237 Result rs = values[i];
238 cache.add(rs);
239
240 for (Cell cell : rs.rawCells()) {
241 remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
242 }
243 countdown--;
244 this.lastResult = rs;
245 }
246 }
247 if (smallReversedScanCallable.hasMoreResultsContext()) {
248 currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
249 } else {
250 currentRegionDone = countdown > 0;
251 }
252 }
253 }
254
255 @Override
256 protected void initializeScannerInConstruction() throws IOException {
257
258
259 }
260
261 @Override
262 public void close() {
263 if (!scanMetricsPublished) writeScanMetrics();
264 closed = true;
265 }
266
267 @VisibleForTesting
268 protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
269 this.callableFactory = callableFactory;
270 }
271
272
273
274
275 static class SmallReversedScannerCallable extends ReversedScannerCallable {
276
277 public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
278 ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
279 int caching, int replicaId) {
280 super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
281 this.setCaching(caching);
282 }
283
284 @Override
285 public Result[] call(int timeout) throws IOException {
286 if (this.closed) return null;
287 if (Thread.interrupted()) {
288 throw new InterruptedIOException();
289 }
290 ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
291 getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
292 ClientProtos.ScanResponse response = null;
293 controller = controllerFactory.newController();
294 try {
295 controller.setPriority(getTableName());
296 controller.setCallTimeout(timeout);
297 response = getStub().scan(controller, request);
298 Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
299 if (response.hasMoreResultsInRegion()) {
300 setHasMoreResultsContext(true);
301 setServerHasMoreResults(response.getMoreResultsInRegion());
302 } else {
303 setHasMoreResultsContext(false);
304 }
305
306 updateResultsMetrics(results);
307 return results;
308 } catch (ServiceException se) {
309 throw ProtobufUtil.getRemoteException(se);
310 }
311 }
312
313 @Override
314 public ScannerCallable getScannerCallableForReplica(int id) {
315 return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
316 scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
317 }
318 }
319
320 protected static class SmallReversedScannerCallableFactory {
321
322 public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
323 Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
324 RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
325 int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
326 boolean isFirstRegionToLocate) {
327 byte[] locateStartRow = null;
328 if (isFirstRegionToLocate
329 && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
330
331
332 locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
333 }
334
335 scan.setStartRow(localStartKey);
336 SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
337 scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
338 ScannerCallableWithReplicas scannerCallableWithReplicas =
339 new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
340 retries, scannerTimeout, cacheNum, conf, caller);
341 return scannerCallableWithReplicas;
342 }
343 }
344 }