1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.net.UnknownHostException;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellScanner;
31 import org.apache.hadoop.hbase.CellUtil;
32 import org.apache.hadoop.hbase.DoNotRetryIOException;
33 import org.apache.hadoop.hbase.HBaseIOException;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.NotServingRegionException;
37 import org.apache.hadoop.hbase.RegionLocations;
38 import org.apache.hadoop.hbase.RemoteExceptionHandler;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.UnknownScannerException;
42 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
43 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
44 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
47 import org.apache.hadoop.hbase.protobuf.RequestConverter;
48 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
49 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
50 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
51 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
52 import org.apache.hadoop.ipc.RemoteException;
53 import org.apache.hadoop.net.DNS;
54
55 import com.google.protobuf.ServiceException;
56 import com.google.protobuf.TextFormat;
57
58
59
60
61
62
63 @InterfaceAudience.Private
64 public class ScannerCallable extends RegionServerCallable<Result[]> {
65 public static final String LOG_SCANNER_LATENCY_CUTOFF
66 = "hbase.client.log.scanner.latency.cutoff";
67 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
68
69 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
70 protected long scannerId = -1L;
71 protected boolean instantiated = false;
72 protected boolean closed = false;
73 protected boolean renew = false;
74 private Scan scan;
75 private int caching = 1;
76 protected final ClusterConnection cConnection;
77 protected ScanMetrics scanMetrics;
78 private boolean logScannerActivity = false;
79 private int logCutOffLatency = 1000;
80 private static String myAddress;
81 protected final int id;
82 protected boolean serverHasMoreResultsContext;
83 protected boolean serverHasMoreResults;
84
85
86
87
88
89 protected boolean heartbeatMessage = false;
90 static {
91 try {
92 myAddress = DNS.getDefaultHost("default", "default");
93 } catch (UnknownHostException uhe) {
94 LOG.error("cannot determine my address", uhe);
95 }
96 }
97
98
99 protected boolean isRegionServerRemote = true;
100 private long nextCallSeq = 0;
101 protected RpcControllerFactory controllerFactory;
102 protected PayloadCarryingRpcController controller;
103
104
105
106
107
108
109
110
111
112
113 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
114 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
115 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
116 }
117
118
119
120
121
122
123
124
125 public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan,
126 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
127 super(connection, tableName, scan.getStartRow());
128 this.id = id;
129 this.cConnection = connection;
130 this.scan = scan;
131 this.scanMetrics = scanMetrics;
132 Configuration conf = connection.getConfiguration();
133 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
134 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
135 this.controllerFactory = rpcControllerFactory;
136 }
137
138 PayloadCarryingRpcController getController() {
139 return controller;
140 }
141
142
143
144
145
146 @Override
147 public void prepare(boolean reload) throws IOException {
148 if (Thread.interrupted()) {
149 throw new InterruptedIOException();
150 }
151 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
152 id, getConnection(), getTableName(), getRow());
153 location = id < rl.size() ? rl.getRegionLocation(id) : null;
154 if (location == null || location.getServerName() == null) {
155
156
157 throw new HBaseIOException("There is no location for replica id #" + id);
158 }
159 ServerName dest = location.getServerName();
160 setStub(super.getConnection().getClient(dest));
161 if (!instantiated || reload) {
162 checkIfRegionServerIsRemote();
163 instantiated = true;
164 }
165
166
167
168
169 if (reload && this.scanMetrics != null) {
170 this.scanMetrics.countOfRPCRetries.incrementAndGet();
171 if (isRegionServerRemote) {
172 this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
173 }
174 }
175 }
176
177
178
179
180
181 protected void checkIfRegionServerIsRemote() {
182 if (getLocation().getHostname().equalsIgnoreCase(myAddress)) {
183 isRegionServerRemote = false;
184 } else {
185 isRegionServerRemote = true;
186 }
187 }
188
189
190 @Override
191 public Result [] call(int callTimeout) throws IOException {
192 if (Thread.interrupted()) {
193 throw new InterruptedIOException();
194 }
195
196 if (controller == null) {
197 controller = controllerFactory.newController();
198 controller.setPriority(getTableName());
199 controller.setCallTimeout(callTimeout);
200 }
201
202 if (closed) {
203 if (scannerId != -1) {
204 close();
205 }
206 } else {
207 if (scannerId == -1L) {
208 this.scannerId = openScanner();
209 } else {
210 Result [] rrs = null;
211 ScanRequest request = null;
212
213 setHeartbeatMessage(false);
214 try {
215 incRPCcallsMetrics();
216 request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, renew);
217 ScanResponse response = null;
218 try {
219 response = getStub().scan(controller, request);
220
221
222
223
224
225
226
227
228
229 nextCallSeq++;
230 long timestamp = System.currentTimeMillis();
231 setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage());
232
233 CellScanner cellScanner = controller.cellScanner();
234 rrs = ResponseConverter.getResults(cellScanner, response);
235 if (logScannerActivity) {
236 long now = System.currentTimeMillis();
237 if (now - timestamp > logCutOffLatency) {
238 int rows = rrs == null ? 0 : rrs.length;
239 LOG.info("Took " + (now-timestamp) + "ms to fetch "
240 + rows + " rows from scanner=" + scannerId);
241 }
242 }
243
244 if (response.hasMoreResults() && !response.getMoreResults()) {
245 scannerId = -1L;
246 closed = true;
247
248 return null;
249 }
250
251
252 if (response.hasMoreResultsInRegion()) {
253
254 setHasMoreResultsContext(true);
255 setServerHasMoreResults(response.getMoreResultsInRegion());
256 } else {
257
258 setHasMoreResultsContext(false);
259 }
260 } catch (ServiceException se) {
261 throw ProtobufUtil.getRemoteException(se);
262 }
263 updateResultsMetrics(rrs);
264 } catch (IOException e) {
265 if (logScannerActivity) {
266 LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
267 + " to " + getLocation(), e);
268 }
269 IOException ioe = e;
270 if (e instanceof RemoteException) {
271 ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
272 }
273 if (logScannerActivity) {
274 if (ioe instanceof UnknownScannerException) {
275 try {
276 HRegionLocation location =
277 getConnection().relocateRegion(getTableName(), scan.getStartRow());
278 LOG.info("Scanner=" + scannerId
279 + " expired, current region location is " + location.toString());
280 } catch (Throwable t) {
281 LOG.info("Failed to relocate region", t);
282 }
283 } else if (ioe instanceof ScannerResetException) {
284 LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
285 + "asked us to reset the scanner state.", ioe);
286 }
287 }
288
289
290
291
292
293
294 if (ioe instanceof NotServingRegionException) {
295
296
297
298 if (this.scanMetrics != null) {
299 this.scanMetrics.countOfNSRE.incrementAndGet();
300 }
301 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
302 } else if (ioe instanceof RegionServerStoppedException) {
303
304
305 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
306 } else {
307
308 throw ioe;
309 }
310 }
311 return rrs;
312 }
313 }
314 return null;
315 }
316
317
318
319
320
321
322
323 protected boolean isHeartbeatMessage() {
324 return heartbeatMessage;
325 }
326
327 protected void setHeartbeatMessage(boolean heartbeatMessage) {
328 this.heartbeatMessage = heartbeatMessage;
329 }
330
331 private void incRPCcallsMetrics() {
332 if (this.scanMetrics == null) {
333 return;
334 }
335 this.scanMetrics.countOfRPCcalls.incrementAndGet();
336 if (isRegionServerRemote) {
337 this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
338 }
339 }
340
341 protected void updateResultsMetrics(Result[] rrs) {
342 if (this.scanMetrics == null || rrs == null || rrs.length == 0) {
343 return;
344 }
345 long resultSize = 0;
346 for (Result rr : rrs) {
347 for (Cell cell : rr.rawCells()) {
348 resultSize += CellUtil.estimatedSerializedSizeOf(cell);
349 }
350 }
351 this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
352 if (isRegionServerRemote) {
353 this.scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
354 }
355 }
356
357 private void close() {
358 if (this.scannerId == -1L) {
359 return;
360 }
361 try {
362 incRPCcallsMetrics();
363 ScanRequest request =
364 RequestConverter.buildScanRequest(this.scannerId, 0, true);
365 try {
366 getStub().scan(controller, request);
367 } catch (ServiceException se) {
368 throw ProtobufUtil.getRemoteException(se);
369 }
370 } catch (IOException e) {
371 LOG.warn("Ignore, probably already closed", e);
372 }
373 this.scannerId = -1L;
374 }
375
376 protected long openScanner() throws IOException {
377 incRPCcallsMetrics();
378 ScanRequest request =
379 RequestConverter.buildScanRequest(
380 getLocation().getRegionInfo().getRegionName(),
381 this.scan, 0, false);
382 try {
383 ScanResponse response = getStub().scan(controller, request);
384 long id = response.getScannerId();
385 if (logScannerActivity) {
386 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
387 + " on region " + getLocation().toString());
388 }
389 return id;
390 } catch (ServiceException se) {
391 throw ProtobufUtil.getRemoteException(se);
392 }
393 }
394
395 protected Scan getScan() {
396 return scan;
397 }
398
399
400
401
402 public void setClose() {
403 this.closed = true;
404 }
405
406
407
408
409
410
411 public void setRenew(boolean val) {
412 this.renew = val;
413 }
414
415
416
417
418 @Override
419 public HRegionInfo getHRegionInfo() {
420 if (!instantiated) {
421 return null;
422 }
423 return getLocation().getRegionInfo();
424 }
425
426
427
428
429
430 public int getCaching() {
431 return caching;
432 }
433
434 @Override
435 public ClusterConnection getConnection() {
436 return cConnection;
437 }
438
439
440
441
442
443 public void setCaching(int caching) {
444 this.caching = caching;
445 }
446
447 public ScannerCallable getScannerCallableForReplica(int id) {
448 ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName,
449 this.getScan(), this.scanMetrics, controllerFactory, id);
450 s.setCaching(this.caching);
451 return s;
452 }
453
454
455
456
457
458 protected boolean getServerHasMoreResults() {
459 assert serverHasMoreResultsContext;
460 return this.serverHasMoreResults;
461 }
462
463 protected void setServerHasMoreResults(boolean serverHasMoreResults) {
464 this.serverHasMoreResults = serverHasMoreResults;
465 }
466
467
468
469
470
471
472 protected boolean hasMoreResultsContext() {
473 return serverHasMoreResultsContext;
474 }
475
476 protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
477 this.serverHasMoreResultsContext = serverHasMoreResultsContext;
478 }
479 }