View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitations
17   * under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  
23  import com.google.protobuf.ServiceException;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellUtil;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
33  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
34  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
35  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36  import org.apache.hadoop.hbase.protobuf.RequestConverter;
37  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
38  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
39  import org.apache.hadoop.hbase.util.Bytes;
40  
41  import com.google.common.annotations.VisibleForTesting;
42  
43  import java.io.IOException;
44  import java.io.InterruptedIOException;
45  import java.util.concurrent.ExecutorService;
46  
47  /**
48   * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
49   * scan results, unless the results cross multiple regions or the row count of
50   * results exceed the caching.
51   * <p/>
52   * For small scan, it will get better performance than {@link ReversedClientScanner}
53   */
54  @InterfaceAudience.Private
55  public class ClientSmallReversedScanner extends ReversedClientScanner {
56    private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
57    private ScannerCallableWithReplicas smallReversedScanCallable = null;
58    private SmallReversedScannerCallableFactory callableFactory;
59  
60    /**
61     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
62     * {@link Scan} 's start row maybe changed changed.
63     *
64     * @param conf
65     *          The {@link Configuration} to use.
66     * @param scan
67     *          {@link Scan} to use in this scanner
68     * @param tableName
69     *          The table that we wish to rangeGet
70     * @param connection
71     *          Connection identifying the cluster
72     * @param rpcFactory
73     *          Factory used to create the {@link RpcRetryingCaller}
74     * @param controllerFactory
75     *          Factory used to access RPC payloads
76     * @param pool
77     *          Threadpool for RPC threads
78     * @param primaryOperationTimeout
79     *          Call timeout
80     * @throws IOException
81     *           If the remote call fails
82     */
83    public ClientSmallReversedScanner(final Configuration conf, final Scan scan,
84        final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
85        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
86        throws IOException {
87      this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
88          primaryOperationTimeout, new SmallReversedScannerCallableFactory());
89    }
90  
91    /**
92     * Create a new ReversibleClientScanner for the specified table. Take note that the passed
93     * {@link Scan}'s start row may be changed.
94     *
95     * @param conf
96     *          The {@link Configuration} to use.
97     * @param scan
98     *          {@link Scan} to use in this scanner
99     * @param tableName
100    *          The table that we wish to rangeGet
101    * @param connection
102    *          Connection identifying the cluster
103    * @param rpcFactory
104    *          Factory used to create the {@link RpcRetryingCaller}
105    * @param controllerFactory
106    *          Factory used to access RPC payloads
107    * @param pool
108    *          Threadpool for RPC threads
109    * @param primaryOperationTimeout
110    *          Call timeout
111    * @param callableFactory
112    *          Factory used to create the {@link SmallScannerCallable}
113    * @throws IOException
114    *           If the remote call fails
115    */
116   @VisibleForTesting
117   ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
118       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
119       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
120       SmallReversedScannerCallableFactory callableFactory) throws IOException {
121     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
122         primaryOperationTimeout);
123     this.callableFactory = callableFactory;
124   }
125 
126   /**
127    * Gets a scanner for following scan. Move to next region or continue from the last result or
128    * start from the start row.
129    *
130    * @param nbRows
131    * @param done
132    *          true if Server-side says we're done scanning.
133    * @param currentRegionDone
134    *          true if scan is over on current region
135    * @return true if has next scanner
136    * @throws IOException
137    */
138   private boolean nextScanner(int nbRows, final boolean done,
139                               boolean currentRegionDone) throws IOException {
140     // Where to start the next getter
141     byte[] localStartKey;
142     int cacheNum = nbRows;
143     boolean regionChanged = true;
144     boolean isFirstRegionToLocate = false;
145     // if we're at end of table, close and return false to stop iterating
146     if (this.currentRegion != null && currentRegionDone) {
147       byte[] startKey = this.currentRegion.getStartKey();
148       if (startKey == null
149           || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
150           || checkScanStopRow(startKey) || done) {
151         close();
152         if (LOG.isDebugEnabled()) {
153           LOG.debug("Finished with small scan at " + this.currentRegion);
154         }
155         return false;
156       }
157       // We take the row just under to get to the previous region.
158       localStartKey = createClosestRowBefore(startKey);
159       if (LOG.isDebugEnabled()) {
160         LOG.debug("Finished with region " + this.currentRegion);
161       }
162     } else if (this.lastResult != null) {
163       regionChanged = false;
164       localStartKey = createClosestRowBefore(lastResult.getRow());
165     } else {
166       localStartKey = this.scan.getStartRow();
167       isFirstRegionToLocate = true;
168     }
169 
170     if (!isFirstRegionToLocate
171         && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
172       // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
173       // otherwise, maybe infinity results with RowKey=0x00 will return.
174       return false;
175     }
176 
177     if (LOG.isTraceEnabled()) {
178       LOG.trace("Advancing internal small scanner to startKey at '"
179           + Bytes.toStringBinary(localStartKey) + "'");
180     }
181 
182     smallReversedScanCallable =
183         callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
184           localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
185           getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
186 
187     if (this.scanMetrics != null && regionChanged) {
188       this.scanMetrics.countOfRegions.incrementAndGet();
189     }
190     return true;
191   }
192 
193   @Override
194   public Result next() throws IOException {
195     // If the scanner is closed and there's nothing left in the cache, next is a
196     // no-op.
197     if (cache.size() == 0 && this.closed) {
198       return null;
199     }
200     if (cache.size() == 0) {
201       loadCache();
202     }
203 
204     if (cache.size() > 0) {
205       return cache.poll();
206     }
207     // if we exhausted this scanner before calling close, write out the scan
208     // metrics
209     writeScanMetrics();
210     return null;
211   }
212 
213   @Override
214   protected void loadCache() throws IOException {
215     Result[] values = null;
216     long remainingResultSize = maxScannerResultSize;
217     int countdown = this.caching;
218     boolean currentRegionDone = false;
219     // Values == null means server-side filter has determined we must STOP
220     while (remainingResultSize > 0 && countdown > 0
221         && nextScanner(countdown, values == null, currentRegionDone)) {
222       // Server returns a null values if scanning is to stop. Else,
223       // returns an empty array if scanning is to go on and we've just
224       // exhausted current region.
225       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
226       // we do a callWithRetries
227       values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
228       this.currentRegion = smallReversedScanCallable.getHRegionInfo();
229       long currentTime = System.currentTimeMillis();
230       if (this.scanMetrics != null) {
231         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
232             - lastNext);
233       }
234       lastNext = currentTime;
235       if (values != null && values.length > 0) {
236         for (int i = 0; i < values.length; i++) {
237           Result rs = values[i];
238           cache.add(rs);
239           // We don't make Iterator here
240           for (Cell cell : rs.rawCells()) {
241             remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
242           }
243           countdown--;
244           this.lastResult = rs;
245         }
246       }
247       if (smallReversedScanCallable.hasMoreResultsContext()) {
248         currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
249       } else {
250         currentRegionDone = countdown > 0;
251       }
252     }
253   }
254 
255   @Override
256   protected void initializeScannerInConstruction() throws IOException {
257     // No need to initialize the scanner when constructing instance, do it when
258     // calling next(). Do nothing here.
259   }
260 
261   @Override
262   public void close() {
263     if (!scanMetricsPublished) writeScanMetrics();
264     closed = true;
265   }
266 
267   @VisibleForTesting
268   protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
269     this.callableFactory = callableFactory;
270   }
271 
272   /**
273    * A reversed ScannerCallable which supports backward small scanning.
274    */
275   static class SmallReversedScannerCallable extends ReversedScannerCallable {
276 
277     public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
278         ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
279         int caching, int replicaId) {
280       super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
281       this.setCaching(caching);
282     }
283 
284     @Override
285     public Result[] call(int timeout) throws IOException {
286       if (this.closed) return null;
287       if (Thread.interrupted()) {
288         throw new InterruptedIOException();
289       }
290       ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
291         getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
292       ClientProtos.ScanResponse response = null;
293       controller = controllerFactory.newController();
294       try {
295         controller.setPriority(getTableName());
296         controller.setCallTimeout(timeout);
297         response = getStub().scan(controller, request);
298         Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
299         if (response.hasMoreResultsInRegion()) {
300           setHasMoreResultsContext(true);
301           setServerHasMoreResults(response.getMoreResultsInRegion());
302         } else {
303           setHasMoreResultsContext(false);
304         }
305         // We need to update result metrics since we are overriding call()
306         updateResultsMetrics(results);
307         return results;
308       } catch (ServiceException se) {
309         throw ProtobufUtil.getRemoteException(se);
310       }
311     }
312 
313     @Override
314     public ScannerCallable getScannerCallableForReplica(int id) {
315       return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
316           scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
317     }
318   }
319 
320   protected static class SmallReversedScannerCallableFactory {
321 
322     public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
323         Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
324         RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
325         int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
326         boolean isFirstRegionToLocate) {
327       byte[] locateStartRow = null;
328       if (isFirstRegionToLocate
329           && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
330         // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
331         // locate a region list, and the last one in region list is the region where our scan start.
332         locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
333       }
334 
335       scan.setStartRow(localStartKey);
336       SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
337           scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
338       ScannerCallableWithReplicas scannerCallableWithReplicas =
339           new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
340               retries, scannerTimeout, cacheNum, conf, caller);
341       return scannerCallableWithReplicas;
342     }
343   }
344 }