View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.ConcurrentSkipListMap;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.atomic.AtomicLong;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.hbase.classification.InterfaceAudience;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HRegionLocation;
50  import org.apache.hadoop.hbase.RegionLocations;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.TableName;
53  import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
54  import org.apache.hadoop.hbase.client.coprocessor.Batch;
55  import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
56  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
57  import org.apache.hadoop.hbase.util.Bytes;
58  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59  import org.apache.htrace.Trace;
60  
61  import com.google.common.annotations.VisibleForTesting;
62  
63  /**
64   * This class  allows a continuous flow of requests. It's written to be compatible with a
65   * synchronous caller such as HTable.
66   * <p>
67   * The caller sends a buffer of operation, by calling submit. This class extract from this list
68   * the operations it can send, i.e. the operations that are on region that are not considered
69   * as busy. The process is asynchronous, i.e. it returns immediately when if has finished to
70   * iterate on the list. If, and only if, the maximum number of current task is reached, the call
71   * to submit will block. Alternatively, the caller can call submitAll, in which case all the
72   * operations will be sent. Each call to submit returns a future-like object that can be used
73   * to track operation progress.
74   * </p>
75   * <p>
76   * The class manages internally the retries.
77   * </p>
78   * <p>
79   * The class can be constructed in regular mode, or "global error" mode. In global error mode,
80   * AP tracks errors across all calls (each "future" also has global view of all errors). That
81   * mode is necessary for backward compat with HTable behavior, where multiple submissions are
82   * made and the errors can propagate using any put/flush call, from previous calls.
83   * In "regular" mode, the errors are tracked inside the Future object that is returned.
84   * The results are always tracked inside the Future object and can be retrieved when the call
85   * has finished. Partial results can also be retrieved if some part of multi-request failed.
86   * </p>
87   * <p>
88   * This class is thread safe in regular mode; in global error code, submitting operations and
89   * retrieving errors from different threads may be not thread safe.
90   * Internally, the class is thread safe enough to manage simultaneously new submission and results
91   * arising from older operations.
92   * </p>
93   * <p>
94   * Internally, this class works with {@link Row}, this mean it could be theoretically used for
95   * gets as well.
96   * </p>
97   */
98  @InterfaceAudience.Private
99  class AsyncProcess {
100   protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
101   protected static final AtomicLong COUNTER = new AtomicLong();
102 
103   public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
104 
105   /**
106    * Configure the number of failures after which the client will start logging. A few failures
107    * is fine: region moved, then is not opened, then is overloaded. We try to have an acceptable
108    * heuristic for the number of errors we don't log. 9 was chosen because we wait for 1s at
109    * this stage.
110    */
111   public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
112       "hbase.client.start.log.errors.counter";
113   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
114 
115   private final int thresholdToLogUndoneTaskDetails;
116   private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
117       "hbase.client.threshold.log.details";
118   private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
119   private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
120 
121   /**
122    * The context used to wait for results from one submit call.
123    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
124    *    then errors and failed operations in this object will reflect global errors.
125    * 2) If submit call is made with needResults false, results will not be saved.
126    *  */
127   public static interface AsyncRequestFuture {
128     public boolean hasError();
129     public RetriesExhaustedWithDetailsException getErrors();
130     public List<? extends Row> getFailedOperations();
131     public Object[] getResults() throws InterruptedIOException;
132     /** Wait until all tasks are executed, successfully or not. */
133     public void waitUntilDone() throws InterruptedIOException;
134   }
135 
136   /** Return value from a submit that didn't contain any requests. */
137   private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
138     public final Object[] result = new Object[0];
139     @Override
140     public boolean hasError() { return false; }
141     @Override
142     public RetriesExhaustedWithDetailsException getErrors() { return null; }
143     @Override
144     public List<? extends Row> getFailedOperations() { return null; }
145     @Override
146     public Object[] getResults() { return result; }
147     @Override
148     public void waitUntilDone() throws InterruptedIOException {}
149   };
150 
151   /** Sync point for calls to multiple replicas for the same user request (Get).
152    * Created and put in the results array (we assume replica calls require results) when
153    * the replica calls are launched. See results for details of this process.
154    * POJO, all fields are public. To modify them, the object itself is locked. */
155   private static class ReplicaResultState {
156     public ReplicaResultState(int callCount) {
157       this.callCount = callCount;
158     }
159 
160     /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
161     int callCount;
162     /** Errors for which it is not decided whether we will report them to user. If one of the
163      * calls succeeds, we will discard the errors that may have happened in the other calls. */
164     BatchErrors replicaErrors = null;
165 
166     @Override
167     public String toString() {
168       return "[call count " + callCount + "; errors " + replicaErrors + "]";
169     }
170   }
171 
172 
173   // TODO: many of the fields should be made private
174   protected final long id;
175 
176   protected final ClusterConnection connection;
177   protected final RpcRetryingCallerFactory rpcCallerFactory;
178   protected final RpcControllerFactory rpcFactory;
179   protected final BatchErrors globalErrors;
180   protected final ExecutorService pool;
181 
182   protected final AtomicLong tasksInProgress = new AtomicLong(0);
183   protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
184       new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
185   protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
186       new ConcurrentHashMap<ServerName, AtomicInteger>();
187 
188   // Start configuration settings.
189   private final int startLogErrorsCnt;
190 
191   /**
192    * The number of tasks simultaneously executed on the cluster.
193    */
194   protected final int maxTotalConcurrentTasks;
195 
196   /**
197    * The number of tasks we run in parallel on a single region.
198    * With 1 (the default) , we ensure that the ordering of the queries is respected: we don't start
199    * a set of operations on a region before the previous one is done. As well, this limits
200    * the pressure we put on the region server.
201    */
202   protected final int maxConcurrentTasksPerRegion;
203 
204   /**
205    * The number of task simultaneously executed on a single region server.
206    */
207   protected final int maxConcurrentTasksPerServer;
208   protected final long pause;
209   protected int numTries;
210   protected int serverTrackerTimeout;
211   protected int timeout;
212   protected long primaryCallTimeoutMicroseconds;
213   // End configuration settings.
214 
215   protected static class BatchErrors {
216     private final List<Throwable> throwables = new ArrayList<Throwable>();
217     private final List<Row> actions = new ArrayList<Row>();
218     private final List<String> addresses = new ArrayList<String>();
219 
220     public synchronized void add(Throwable ex, Row row, ServerName serverName) {
221       if (row == null){
222         throw new IllegalArgumentException("row cannot be null. location=" + serverName);
223       }
224 
225       throwables.add(ex);
226       actions.add(row);
227       addresses.add(serverName != null ? serverName.toString() : "null");
228     }
229 
230     public boolean hasErrors() {
231       return !throwables.isEmpty();
232     }
233 
234     private synchronized RetriesExhaustedWithDetailsException makeException() {
235       return new RetriesExhaustedWithDetailsException(
236           new ArrayList<Throwable>(throwables),
237           new ArrayList<Row>(actions), new ArrayList<String>(addresses));
238     }
239 
240     public synchronized void clear() {
241       throwables.clear();
242       actions.clear();
243       addresses.clear();
244     }
245 
246     public synchronized void merge(BatchErrors other) {
247       throwables.addAll(other.throwables);
248       actions.addAll(other.actions);
249       addresses.addAll(other.addresses);
250     }
251   }
252 
253   public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
254       RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
255     if (hc == null) {
256       throw new IllegalArgumentException("HConnection cannot be null.");
257     }
258 
259     this.connection = hc;
260     this.pool = pool;
261     this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
262 
263     this.id = COUNTER.incrementAndGet();
264 
265     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
266         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
267     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
268         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
269     this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
270         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
271     this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
272 
273     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
274       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
275     this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
276           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
277     this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
278           HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
279 
280     this.startLogErrorsCnt =
281         conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
282 
283     if (this.maxTotalConcurrentTasks <= 0) {
284       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
285     }
286     if (this.maxConcurrentTasksPerServer <= 0) {
287       throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
288           maxConcurrentTasksPerServer);
289     }
290     if (this.maxConcurrentTasksPerRegion <= 0) {
291       throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
292           maxConcurrentTasksPerRegion);
293     }
294 
295     // Server tracker allows us to do faster, and yet useful (hopefully), retries.
296     // However, if we are too useful, we might fail very quickly due to retry count limit.
297     // To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
298     // retry time if normal retries were used. Then we will retry until this time runs out.
299     // If we keep hitting one server, the net effect will be the incremental backoff, and
300     // essentially the same number of retries as planned. If we have to do faster retries,
301     // we will do more retries in aggregate, but the user will be none the wiser.
302     this.serverTrackerTimeout = 0;
303     for (int i = 0; i < this.numTries; ++i) {
304       serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
305     }
306 
307     this.rpcCallerFactory = rpcCaller;
308     this.rpcFactory = rpcFactory;
309     this.thresholdToLogUndoneTaskDetails =
310         conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
311             DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
312   }
313 
314   /**
315    * @return pool if non null, otherwise returns this.pool if non null, otherwise throws
316    *         RuntimeException
317    */
318   private ExecutorService getPool(ExecutorService pool) {
319     if (pool != null) return pool;
320     if (this.pool != null) return this.pool;
321     throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
322   }
323 
324   /**
325    * See {@link #submit(ExecutorService, TableName, List, boolean, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, boolean)}.
326    * Uses default ExecutorService for this AP (must have been created with one).
327    */
328   public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
329       boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
330       throws InterruptedIOException {
331     return submit(null, tableName, rows, atLeastOne, callback, needResults);
332   }
333 
334   /**
335    * Extract from the rows list what we can submit. The rows we can not submit are kept in the
336    * list. Does not send requests to replicas (not currently used for anything other
337    * than streaming puts anyway).
338    *
339    * @param pool ExecutorService to use.
340    * @param tableName The table for which this request is needed.
341    * @param callback Batch callback. Only called on success (94 behavior).
342    * @param needResults Whether results are needed, or can be discarded.
343    * @param rows - the submitted row. Modified by the method: we remove the rows we took.
344    * @param atLeastOne true if we should submit at least a subset.
345    */
346   public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
347       List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
348       boolean needResults) throws InterruptedIOException {
349     if (rows.isEmpty()) {
350       return NO_REQS_RESULT;
351     }
352 
353     Map<ServerName, MultiAction<Row>> actionsByServer =
354         new HashMap<ServerName, MultiAction<Row>>();
355     List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
356 
357     NonceGenerator ng = this.connection.getNonceGenerator();
358     long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
359 
360     // Location errors that happen before we decide what requests to take.
361     List<Exception> locationErrors = null;
362     List<Integer> locationErrorRows = null;
363     do {
364       // Wait until there is at least one slot for a new task.
365       waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
366 
367       // Remember the previous decisions about regions or region servers we put in the
368       //  final multi.
369       Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
370       Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
371 
372       int posInList = -1;
373       Iterator<? extends Row> it = rows.iterator();
374       while (it.hasNext()) {
375         Row r = it.next();
376         HRegionLocation loc;
377         try {
378           if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
379           // Make sure we get 0-s replica.
380           RegionLocations locs = connection.locateRegion(
381               tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
382           if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
383             throw new IOException("#" + id + ", no location found, aborting submit for"
384                 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
385           }
386           loc = locs.getDefaultRegionLocation();
387         } catch (IOException ex) {
388           locationErrors = new ArrayList<Exception>();
389           locationErrorRows = new ArrayList<Integer>();
390           LOG.error("Failed to get region location ", ex);
391           // This action failed before creating ars. Retain it, but do not add to submit list.
392           // We will then add it to ars in an already-failed state.
393           retainedActions.add(new Action<Row>(r, ++posInList));
394           locationErrors.add(ex);
395           locationErrorRows.add(posInList);
396           it.remove();
397           break; // Backward compat: we stop considering actions on location error.
398         }
399 
400         if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
401           Action<Row> action = new Action<Row>(r, ++posInList);
402           setNonce(ng, r, action);
403           retainedActions.add(action);
404           // TODO: replica-get is not supported on this path
405           byte[] regionName = loc.getRegionInfo().getRegionName();
406           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
407           it.remove();
408         }
409       }
410     } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
411 
412     if (retainedActions.isEmpty()) return NO_REQS_RESULT;
413 
414     return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
415       locationErrors, locationErrorRows, actionsByServer, pool);
416   }
417 
418   <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
419       List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
420       Object[] results, boolean needResults, List<Exception> locationErrors,
421       List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
422       ExecutorService pool) {
423     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
424       tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
425     // Add location errors if any
426     if (locationErrors != null) {
427       for (int i = 0; i < locationErrors.size(); ++i) {
428         int originalIndex = locationErrorRows.get(i);
429         Row row = retainedActions.get(originalIndex).getAction();
430         ars.manageError(originalIndex, row,
431           Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
432       }
433     }
434     ars.sendMultiAction(actionsByServer, 1, null, false);
435     return ars;
436   }
437 
438   /**
439    * Helper that is used when grouping the actions per region server.
440    *
441    * @param loc - the destination. Must not be null.
442    * @param action - the action to add to the multiaction
443    * @param actionsByServer the multiaction per server
444    * @param nonceGroup Nonce group.
445    */
446   private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
447       Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
448     MultiAction<Row> multiAction = actionsByServer.get(server);
449     if (multiAction == null) {
450       multiAction = new MultiAction<Row>();
451       actionsByServer.put(server, multiAction);
452     }
453     if (action.hasNonce() && !multiAction.hasNonceGroup()) {
454       multiAction.setNonceGroup(nonceGroup);
455     }
456 
457     multiAction.add(regionName, action);
458   }
459 
460   /**
461    * Check if we should send new operations to this region or region server.
462    * We're taking into account the past decision; if we have already accepted
463    * operation on a given region, we accept all operations for this region.
464    *
465    * @param loc; the region and the server name we want to use.
466    * @return true if this region is considered as busy.
467    */
468   protected boolean canTakeOperation(HRegionLocation loc,
469                                      Map<Long, Boolean> regionsIncluded,
470                                      Map<ServerName, Boolean> serversIncluded) {
471     long regionId = loc.getRegionInfo().getRegionId();
472     Boolean regionPrevious = regionsIncluded.get(regionId);
473 
474     if (regionPrevious != null) {
475       // We already know what to do with this region.
476       return regionPrevious;
477     }
478 
479     Boolean serverPrevious = serversIncluded.get(loc.getServerName());
480     if (Boolean.FALSE.equals(serverPrevious)) {
481       // It's a new region, on a region server that we have already excluded.
482       regionsIncluded.put(regionId, Boolean.FALSE);
483       return false;
484     }
485 
486     AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
487     if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
488       // Too many tasks on this region already.
489       regionsIncluded.put(regionId, Boolean.FALSE);
490       return false;
491     }
492 
493     if (serverPrevious == null) {
494       // The region is ok, but we need to decide for this region server.
495       int newServers = 0; // number of servers we're going to contact so far
496       for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
497         if (kv.getValue()) {
498           newServers++;
499         }
500       }
501 
502       // Do we have too many total tasks already?
503       boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
504 
505       if (ok) {
506         // If the total is fine, is it ok for this individual server?
507         AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
508         ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
509       }
510 
511       if (!ok) {
512         regionsIncluded.put(regionId, Boolean.FALSE);
513         serversIncluded.put(loc.getServerName(), Boolean.FALSE);
514         return false;
515       }
516 
517       serversIncluded.put(loc.getServerName(), Boolean.TRUE);
518     } else {
519       assert serverPrevious.equals(Boolean.TRUE);
520     }
521 
522     regionsIncluded.put(regionId, Boolean.TRUE);
523 
524     return true;
525   }
526 
527   /**
528    * See {@link #submitAll(ExecutorService, TableName, List, org.apache.hadoop.hbase.client.coprocessor.Batch.Callback, Object[])}.
529    * Uses default ExecutorService for this AP (must have been created with one).
530    */
531   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
532       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
533     return submitAll(null, tableName, rows, callback, results);
534   }
535 
536   /**
537    * Submit immediately the list of rows, whatever the server status. Kept for backward
538    * compatibility: it allows to be used with the batch interface that return an array of objects.
539    *
540    * @param pool ExecutorService to use.
541    * @param tableName name of the table for which the submission is made.
542    * @param rows the list of rows.
543    * @param callback the callback.
544    * @param results Optional array to return the results thru; backward compat.
545    */
546   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
547       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
548     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
549 
550     // The position will be used by the processBatch to match the object array returned.
551     int posInList = -1;
552     NonceGenerator ng = this.connection.getNonceGenerator();
553     for (Row r : rows) {
554       posInList++;
555       if (r instanceof Put) {
556         Put put = (Put) r;
557         if (put.isEmpty()) {
558           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
559         }
560       }
561       Action<Row> action = new Action<Row>(r, posInList);
562       setNonce(ng, r, action);
563       actions.add(action);
564     }
565     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
566         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
567     ars.groupAndSendMultiAction(actions, 1);
568     return ars;
569   }
570 
571   private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
572     if (!(r instanceof Append) && !(r instanceof Increment)) return;
573     action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
574   }
575 
576   /**
577    * The context, and return value, for a single submit/submitAll call.
578    * Note on how this class (one AP submit) works. Initially, all requests are split into groups
579    * by server; request is sent to each server in parallel; the RPC calls are not async so a
580    * thread per server is used. Every time some actions fail, regions/locations might have
581    * changed, so we re-group them by server and region again and send these groups in parallel
582    * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
583    * scheduling children. This is why lots of code doesn't require any synchronization.
584    */
585   protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
586 
587     /**
588      * Runnable (that can be submitted to thread pool) that waits for when it's time
589      * to issue replica calls, finds region replicas, groups the requests by replica and
590      * issues the calls (on separate threads, via sendMultiAction).
591      * This is done on a separate thread because we don't want to wait on user thread for
592      * our asynchronous call, and usually we have to wait before making replica calls.
593      */
594     private final class ReplicaCallIssuingRunnable implements Runnable {
595       private final long startTime;
596       private final List<Action<Row>> initialActions;
597 
598       public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
599         this.initialActions = initialActions;
600         this.startTime = startTime;
601       }
602 
603       @Override
604       public void run() {
605         boolean done = false;
606         if (primaryCallTimeoutMicroseconds > 0) {
607           try {
608             done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
609           } catch (InterruptedException ex) {
610             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
611             return;
612           }
613         }
614         if (done) return; // Done within primary timeout
615         Map<ServerName, MultiAction<Row>> actionsByServer =
616             new HashMap<ServerName, MultiAction<Row>>();
617         List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
618         if (replicaGetIndices == null) {
619           for (int i = 0; i < results.length; ++i) {
620             addReplicaActions(i, actionsByServer, unknownLocActions);
621           }
622         } else {
623           for (int replicaGetIndice : replicaGetIndices) {
624             addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
625           }
626         }
627         if (!actionsByServer.isEmpty()) {
628           sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
629         }
630         if (!unknownLocActions.isEmpty()) {
631           actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
632           for (Action<Row> action : unknownLocActions) {
633             addReplicaActionsAgain(action, actionsByServer);
634           }
635           // Some actions may have completely failed, they are handled inside addAgain.
636           if (!actionsByServer.isEmpty()) {
637             sendMultiAction(actionsByServer, 1, null, true);
638           }
639         }
640       }
641 
642       /**
643        * Add replica actions to action map by server.
644        * @param index Index of the original action.
645        * @param actionsByServer The map by server to add it to.
646        */
647       private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
648           List<Action<Row>> unknownReplicaActions) {
649         if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
650         Action<Row> action = initialActions.get(index);
651         RegionLocations loc = findAllLocationsOrFail(action, true);
652         if (loc == null) return;
653         HRegionLocation[] locs = loc.getRegionLocations();
654         if (locs.length == 1) {
655           LOG.warn("No replicas found for " + action.getAction());
656           return;
657         }
658         synchronized (replicaResultLock) {
659           // Don't run replica calls if the original has finished. We could do it e.g. if
660           // original has already failed before first replica call (unlikely given retries),
661           // but that would require additional synchronization w.r.t. returning to caller.
662           if (results[index] != null) return;
663           // We set the number of calls here. After that any path must call setResult/setError.
664           // True even for replicas that are not found - if we refuse to send we MUST set error.
665           results[index] = new ReplicaResultState(locs.length);
666         }
667         for (int i = 1; i < locs.length; ++i) {
668           Action<Row> replicaAction = new Action<Row>(action, i);
669           if (locs[i] != null) {
670             addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
671                 replicaAction, actionsByServer, nonceGroup);
672           } else {
673             unknownReplicaActions.add(replicaAction);
674           }
675         }
676       }
677 
678       private void addReplicaActionsAgain(
679           Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
680         if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
681           throw new AssertionError("Cannot have default replica here");
682         }
683         HRegionLocation loc = getReplicaLocationOrFail(action);
684         if (loc == null) return;
685         addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
686             action, actionsByServer, nonceGroup);
687       }
688     }
689 
690     /**
691      * Runnable (that can be submitted to thread pool) that submits MultiAction to a
692      * single server. The server call is synchronous, therefore we do it on a thread pool.
693      */
694     private final class SingleServerRequestRunnable implements Runnable {
695       private final MultiAction<Row> multiAction;
696       private final int numAttempt;
697       private final ServerName server;
698       private final Set<MultiServerCallable<Row>> callsInProgress;
699 
700       private SingleServerRequestRunnable(
701           MultiAction<Row> multiAction, int numAttempt, ServerName server,
702           Set<MultiServerCallable<Row>> callsInProgress) {
703         this.multiAction = multiAction;
704         this.numAttempt = numAttempt;
705         this.server = server;
706         this.callsInProgress = callsInProgress;
707       }
708 
709       @Override
710       public void run() {
711         MultiResponse res;
712         MultiServerCallable<Row> callable = null;
713         try {
714           callable = createCallable(server, tableName, multiAction);
715           try {
716             RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
717             if (callsInProgress != null) callsInProgress.add(callable);
718             res = caller.callWithoutRetries(callable, timeout);
719 
720             if (res == null) {
721               // Cancelled
722               return;
723             }
724 
725           } catch (IOException e) {
726             // The service itself failed . It may be an error coming from the communication
727             //   layer, but, as well, a functional error raised by the server.
728             receiveGlobalFailure(multiAction, server, numAttempt, e);
729             return;
730           } catch (Throwable t) {
731             // This should not happen. Let's log & retry anyway.
732             LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
733                 " Retrying. Server is " + server + ", tableName=" + tableName, t);
734             receiveGlobalFailure(multiAction, server, numAttempt, t);
735             return;
736           }
737 
738           // Normal case: we received an answer from the server, and it's not an exception.
739           receiveMultiAction(multiAction, server, res, numAttempt);
740         } catch (Throwable t) {
741               // Something really bad happened. We are on the send thread that will now die.
742               LOG.error("Internal AsyncProcess #" + id + " error for "
743                   + tableName + " processing for " + server, t);
744               throw new RuntimeException(t);
745         } finally {
746           decTaskCounters(multiAction.getRegions(), server);
747           if (callsInProgress != null && callable != null) {
748             callsInProgress.remove(callable);
749           }
750         }
751       }
752     }
753 
754     private final Batch.Callback<CResult> callback;
755     private final BatchErrors errors;
756     private final ConnectionManager.ServerErrorTracker errorsByServer;
757     private final ExecutorService pool;
758     private final Set<MultiServerCallable<Row>> callsInProgress;
759 
760 
761     private final TableName tableName;
762     private final AtomicLong actionsInProgress = new AtomicLong(-1);
763     /** The lock controls access to results. It is only held when populating results where
764      * there might be several callers (eventual consistency gets). For other requests,
765      * there's one unique call going on per result index. */
766     private final Object replicaResultLock = new Object();
767     /** Result array.  Null if results are not needed. Otherwise, each index corresponds to
768      * the action index in initial actions submitted. For most request types, has null-s for
769      * requests that are not done, and result/exception for those that are done.
770      * For eventual-consistency gets, initially the same applies; at some point, replica calls
771      * might be started, and ReplicaResultState is put at the corresponding indices. The
772      * returning calls check the type to detect when this is the case. After all calls are done,
773      * ReplicaResultState-s are replaced with results for the user. */
774     private final Object[] results;
775     /** Indices of replica gets in results. If null, all or no actions are replica-gets. */
776     private final int[] replicaGetIndices;
777     private final boolean hasAnyReplicaGets;
778     private final long nonceGroup;
779 
780     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
781         ExecutorService pool, boolean needResults, Object[] results,
782         Batch.Callback<CResult> callback) {
783       this.pool = pool;
784       this.callback = callback;
785       this.nonceGroup = nonceGroup;
786       this.tableName = tableName;
787       this.actionsInProgress.set(actions.size());
788       if (results != null) {
789         assert needResults;
790         if (results.length != actions.size()) throw new AssertionError("results.length");
791         this.results = results;
792         for (int i = 0; i != this.results.length; ++i) {
793           results[i] = null;
794         }
795       } else {
796         this.results = needResults ? new Object[actions.size()] : null;
797       }
798       List<Integer> replicaGetIndices = null;
799       boolean hasAnyReplicaGets = false;
800       if (needResults) {
801         // Check to see if any requests might require replica calls.
802         // We expect that many requests will consist of all or no multi-replica gets; in such
803         // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
804         // store the list of action indexes for which replica gets are possible, and set
805         // hasAnyReplicaGets to true.
806         boolean hasAnyNonReplicaReqs = false;
807         int posInList = 0;
808         for (Action<Row> action : actions) {
809           boolean isReplicaGet = isReplicaGet(action.getAction());
810           if (isReplicaGet) {
811             hasAnyReplicaGets = true;
812             if (hasAnyNonReplicaReqs) { // Mixed case
813               if (replicaGetIndices == null) {
814                 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
815               }
816               replicaGetIndices.add(posInList);
817             }
818           } else if (!hasAnyNonReplicaReqs) {
819             // The first non-multi-replica request in the action list.
820             hasAnyNonReplicaReqs = true;
821             if (posInList > 0) {
822               // Add all the previous requests to the index lists. We know they are all
823               // replica-gets because this is the first non-multi-replica request in the list.
824               replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
825               for (int i = 0; i < posInList; ++i) {
826                 replicaGetIndices.add(i);
827               }
828             }
829           }
830           ++posInList;
831         }
832       }
833       this.hasAnyReplicaGets = hasAnyReplicaGets;
834       if (replicaGetIndices != null) {
835         this.replicaGetIndices = new int[replicaGetIndices.size()];
836         int i = 0;
837         for (Integer el : replicaGetIndices) {
838           this.replicaGetIndices[i++] = el;
839         }
840       } else {
841         this.replicaGetIndices = null;
842       }
843       this.callsInProgress = !hasAnyReplicaGets ? null :
844           Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
845 
846       this.errorsByServer = createServerErrorTracker();
847       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
848     }
849 
850     public Set<MultiServerCallable<Row>> getCallsInProgress() {
851       return callsInProgress;
852     }
853 
854     /**
855      * Group a list of actions per region servers, and send them.
856      *
857      * @param currentActions - the list of row to submit
858      * @param numAttempt - the current numAttempt (first attempt is 1)
859      */
860     private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
861       Map<ServerName, MultiAction<Row>> actionsByServer =
862           new HashMap<ServerName, MultiAction<Row>>();
863 
864       boolean isReplica = false;
865       List<Action<Row>> unknownReplicaActions = null;
866       for (Action<Row> action : currentActions) {
867         RegionLocations locs = findAllLocationsOrFail(action, true);
868         if (locs == null) continue;
869         boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
870         if (isReplica && !isReplicaAction) {
871           // This is the property of the current implementation, not a requirement.
872           throw new AssertionError("Replica and non-replica actions in the same retry");
873         }
874         isReplica = isReplicaAction;
875         HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
876         if (loc == null || loc.getServerName() == null) {
877           if (isReplica) {
878             if (unknownReplicaActions == null) {
879               unknownReplicaActions = new ArrayList<Action<Row>>();
880             }
881             unknownReplicaActions.add(action);
882           } else {
883             if (LOG.isInfoEnabled()) {
884               LOG.info("Failed to find location: " + loc + " for replica: " + action.getReplicaId() + " and action: " + action.getAction());
885             }
886             // TODO: relies on primary location always being fetched
887             manageLocationError(action, null);
888           }
889         } else {
890           byte[] regionName = loc.getRegionInfo().getRegionName();
891           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
892         }
893       }
894       boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
895       boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
896 
897       if (!actionsByServer.isEmpty()) {
898         // If this is a first attempt to group and send, no replicas, we need replica thread.
899         sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
900             ? currentActions : null, numAttempt > 1 && !hasUnknown);
901       }
902 
903       if (hasUnknown) {
904         actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
905         for (Action<Row> action : unknownReplicaActions) {
906           HRegionLocation loc = getReplicaLocationOrFail(action);
907           if (loc == null) continue;
908           byte[] regionName = loc.getRegionInfo().getRegionName();
909           addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
910         }
911         if (!actionsByServer.isEmpty()) {
912           sendMultiAction(
913               actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
914         }
915       }
916     }
917 
918     private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
919       // We are going to try get location once again. For each action, we'll do it once
920       // from cache, because the previous calls in the loop might populate it.
921       int replicaId = action.getReplicaId();
922       RegionLocations locs = findAllLocationsOrFail(action, true);
923       if (locs == null) return null; // manageError already called
924       HRegionLocation loc = locs.getRegionLocation(replicaId);
925       if (loc == null || loc.getServerName() == null) {
926         locs = findAllLocationsOrFail(action, false);
927         if (locs == null) return null; // manageError already called
928         loc = locs.getRegionLocation(replicaId);
929       }
930       if (loc == null || loc.getServerName() == null) {
931         if (LOG.isInfoEnabled()) {
932           LOG.info("Failed to find location: " + loc + " for replica: " + replicaId + " and action: " + action.getAction());
933         }
934         manageLocationError(action, null);
935         return null;
936       }
937       return loc;
938     }
939 
940     private void manageLocationError(Action<Row> action, Exception ex) {
941       String msg = "Cannot get replica " + action.getReplicaId()
942           + " location for " + action.getAction();
943       if (ex == null) {
944         LOG.error(msg);
945         ex = new IOException(msg);
946       } else {
947         LOG.error(msg, ex);
948       }
949       LOG.error(msg, ex);
950       manageError(action.getOriginalIndex(), action.getAction(),
951           Retry.NO_LOCATION_PROBLEM, ex, null);
952     }
953 
954     private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
955       if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
956           ", row cannot be null");
957       RegionLocations loc = null;
958       try {
959         loc = connection.locateRegion(
960             tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
961       } catch (IOException ex) {
962         manageLocationError(action, ex);
963       }
964       return loc;
965     }
966 
967     /**
968      * Send a multi action structure to the servers, after a delay depending on the attempt
969      * number. Asynchronous.
970      *
971      * @param actionsByServer the actions structured by regions
972      * @param numAttempt the attempt number.
973      * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
974      */
975     private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
976         int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
977       // Run the last item on the same thread if we are already on a send thread.
978       // We hope most of the time it will be the only item, so we can cut down on threads.
979       int actionsRemaining = actionsByServer.size();
980       // This iteration is by server (the HRegionLocation comparator is by server portion only).
981       for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
982         ServerName server = e.getKey();
983         MultiAction<Row> multiAction = e.getValue();
984         incTaskCounters(multiAction.getRegions(), server);
985         Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
986             numAttempt);
987         // make sure we correctly count the number of runnables before we try to reuse the send
988         // thread, in case we had to split the request into different runnables because of backoff
989         if (runnables.size() > actionsRemaining) {
990           actionsRemaining = runnables.size();
991         }
992 
993         // run all the runnables
994         for (Runnable runnable : runnables) {
995           if ((--actionsRemaining == 0) && reuseThread) {
996             runnable.run();
997           } else {
998             try {
999               pool.submit(runnable);
1000             } catch (Throwable t) {
1001               if (t instanceof RejectedExecutionException) {
1002                 // This should never happen. But as the pool is provided by the end user,
1003                // let's secure this a little.
1004                LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1005                   " Server is " + server.getServerName(), t);
1006               } else {
1007                 // see #HBASE-14359 for more details
1008                 LOG.warn("Caught unexpected exception/error: ", t);
1009               }
1010               decTaskCounters(multiAction.getRegions(), server);
1011               // We're likely to fail again, but this will increment the attempt counter,
1012              // so it will finish.
1013               receiveGlobalFailure(multiAction, server, numAttempt, t);
1014             }
1015           }
1016         }
1017       }
1018 
1019       if (actionsForReplicaThread != null) {
1020         startWaitingForReplicaCalls(actionsForReplicaThread);
1021       }
1022     }
1023 
1024     private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1025         MultiAction<Row> multiAction,
1026         int numAttempt) {
1027       // no stats to manage, just do the standard action
1028       if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1029         return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1030             new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1031       }
1032 
1033       // group the actions by the amount of delay
1034       Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1035           .size());
1036 
1037       // split up the actions
1038       for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1039         Long backoff = getBackoff(server, e.getKey());
1040         DelayingRunner runner = actions.get(backoff);
1041         if (runner == null) {
1042           actions.put(backoff, new DelayingRunner(backoff, e));
1043         } else {
1044           runner.add(e);
1045         }
1046       }
1047 
1048       List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1049       for (DelayingRunner runner : actions.values()) {
1050         String traceText = "AsyncProcess.sendMultiAction";
1051         Runnable runnable =
1052             new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1053                 callsInProgress);
1054         // use a delay runner only if we need to sleep for some time
1055         if (runner.getSleepTime() > 0) {
1056           runner.setRunner(runnable);
1057           traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1058           runnable = runner;
1059         }
1060         runnable = Trace.wrap(traceText, runnable);
1061         toReturn.add(runnable);
1062 
1063       }
1064       return toReturn;
1065     }
1066 
1067     /**
1068      * @param server server location where the target region is hosted
1069      * @param regionName name of the region which we are going to write some data
1070      * @return the amount of time the client should wait until it submit a request to the
1071      * specified server and region
1072      */
1073     private Long getBackoff(ServerName server, byte[] regionName) {
1074       ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1075       ServerStatistics stats = tracker.getStats(server);
1076       return AsyncProcess.this.connection.getBackoffPolicy()
1077           .getBackoffTime(server, regionName, stats);
1078     }
1079 
1080     /**
1081      * Starts waiting to issue replica calls on a different thread; or issues them immediately.
1082      */
1083     private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1084       long startTime = EnvironmentEdgeManager.currentTime();
1085       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1086           actionsForReplicaThread, startTime);
1087       if (primaryCallTimeoutMicroseconds == 0) {
1088         // Start replica calls immediately.
1089         replicaRunnable.run();
1090       } else {
1091         // Start the thread that may kick off replica gets.
1092         // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
1093         try {
1094           pool.submit(replicaRunnable);
1095         } catch (RejectedExecutionException ree) {
1096           LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1097         }
1098       }
1099     }
1100 
1101     /**
1102      * Check that we can retry acts accordingly: logs, set the error status.
1103      *
1104      * @param originalIndex the position in the list sent
1105      * @param row           the row
1106      * @param canRetry      if false, we won't retry whatever the settings.
1107      * @param throwable     the throwable, if any (can be null)
1108      * @param server        the location, if any (can be null)
1109      * @return true if the action can be retried, false otherwise.
1110      */
1111     public Retry manageError(int originalIndex, Row row, Retry canRetry,
1112                                 Throwable throwable, ServerName server) {
1113       if (canRetry == Retry.YES
1114           && throwable != null && throwable instanceof DoNotRetryIOException) {
1115         canRetry = Retry.NO_NOT_RETRIABLE;
1116       }
1117 
1118       if (canRetry != Retry.YES) {
1119         // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
1120         setError(originalIndex, row, throwable, server);
1121       } else if (isActionComplete(originalIndex, row)) {
1122         canRetry = Retry.NO_OTHER_SUCCEEDED;
1123       }
1124       return canRetry;
1125     }
1126 
1127     /**
1128      * Resubmit all the actions from this multiaction after a failure.
1129      *
1130      * @param rsActions  the actions still to do from the initial list
1131      * @param server   the destination
1132      * @param numAttempt the number of attempts so far
1133      * @param t the throwable (if any) that caused the resubmit
1134      */
1135     private void receiveGlobalFailure(
1136         MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1137       errorsByServer.reportServerError(server);
1138       Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1139           ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1140 
1141       if (tableName == null) {
1142         // tableName is null when we made a cross-table RPC call.
1143         connection.clearCaches(server);
1144       }
1145       int failed = 0, stopped = 0;
1146       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1147       for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1148         byte[] regionName = e.getKey();
1149         byte[] row = e.getValue().iterator().next().getAction().getRow();
1150         // Do not use the exception for updating cache because it might be coming from
1151         // any of the regions in the MultiAction.
1152         // TODO: depending on type of exception we might not want to update cache at all?
1153         if (tableName != null) {
1154           connection.updateCachedLocations(tableName, regionName, row, null, server);
1155         }
1156         for (Action<Row> action : e.getValue()) {
1157           Retry retry = manageError(
1158               action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1159           if (retry == Retry.YES) {
1160             toReplay.add(action);
1161           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1162             ++stopped;
1163           } else {
1164             ++failed;
1165           }
1166         }
1167       }
1168 
1169       if (toReplay.isEmpty()) {
1170         logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1171       } else {
1172         resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1173       }
1174     }
1175 
1176     /**
1177      * Log as much info as possible, and, if there is something to replay,
1178      * submit it again after a back off sleep.
1179      */
1180     private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1181         int numAttempt, int failureCount, Throwable throwable) {
1182       // We have something to replay. We're going to sleep a little before.
1183 
1184       // We have two contradicting needs here:
1185       //  1) We want to get the new location after having slept, as it may change.
1186       //  2) We want to take into account the location when calculating the sleep time.
1187       // It should be possible to have some heuristics to take the right decision. Short term,
1188       //  we go for one.
1189       long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
1190       if (numAttempt > startLogErrorsCnt) {
1191         // We use this value to have some logs when we have multiple failures, but not too many
1192         //  logs, as errors are to be expected when a region moves, splits and so on
1193         LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1194             oldServer, throwable, backOffTime, true, null, -1, -1));
1195       }
1196 
1197       try {
1198         Thread.sleep(backOffTime);
1199       } catch (InterruptedException e) {
1200         LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1201         Thread.currentThread().interrupt();
1202         return;
1203       }
1204 
1205       groupAndSendMultiAction(toReplay, numAttempt + 1);
1206     }
1207 
1208     private void logNoResubmit(ServerName oldServer, int numAttempt,
1209         int failureCount, Throwable throwable, int failed, int stopped) {
1210       if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1211         String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1212         String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1213             throwable, -1, false, timeStr, failed, stopped);
1214         if (failed != 0) {
1215           // Only log final failures as warning
1216           LOG.warn(logMessage);
1217         } else {
1218           LOG.info(logMessage);
1219         }
1220       }
1221     }
1222 
1223     /**
1224      * Called when we receive the result of a server query.
1225      *
1226      * @param multiAction    - the multiAction we sent
1227      * @param server       - the location. It's used as a server name.
1228      * @param responses      - the response, if any
1229      * @param numAttempt     - the attempt
1230      */
1231     private void receiveMultiAction(MultiAction<Row> multiAction,
1232         ServerName server, MultiResponse responses, int numAttempt) {
1233        assert responses != null;
1234 
1235       // Success or partial success
1236       // Analyze detailed results. We can still have individual failures to be redo.
1237       // two specific throwables are managed:
1238       //  - DoNotRetryIOException: we continue to retry for other actions
1239       //  - RegionMovedException: we update the cache with the new region location
1240 
1241       List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1242       Throwable throwable = null;
1243       int failureCount = 0;
1244       boolean canRetry = true;
1245 
1246       // Go by original action.
1247       int failed = 0, stopped = 0;
1248       for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1249         byte[] regionName = regionEntry.getKey();
1250         Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1251         if (regionResults == null) {
1252           if (!responses.getExceptions().containsKey(regionName)) {
1253             LOG.error("Server sent us neither results nor exceptions for "
1254                 + Bytes.toStringBinary(regionName));
1255             responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1256           }
1257           continue;
1258         }
1259         boolean regionFailureRegistered = false;
1260         for (Action<Row> sentAction : regionEntry.getValue()) {
1261           Object result = regionResults.get(sentAction.getOriginalIndex());
1262           // Failure: retry if it's make sense else update the errors lists
1263           if (result == null || result instanceof Throwable) {
1264             Row row = sentAction.getAction();
1265             throwable = ClientExceptionsUtil.findException(result);
1266             // Register corresponding failures once per server/once per region.
1267             if (!regionFailureRegistered) {
1268               regionFailureRegistered = true;
1269               connection.updateCachedLocations(
1270                   tableName, regionName, row.getRow(), result, server);
1271             }
1272             if (failureCount == 0) {
1273               errorsByServer.reportServerError(server);
1274               // We determine canRetry only once for all calls, after reporting server failure.
1275               canRetry = errorsByServer.canRetryMore(numAttempt);
1276             }
1277             ++failureCount;
1278             Retry retry = manageError(sentAction.getOriginalIndex(), row,
1279                 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1280             if (retry == Retry.YES) {
1281               toReplay.add(sentAction);
1282             } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1283               ++stopped;
1284             } else {
1285               ++failed;
1286             }
1287           } else {
1288             // update the stats about the region, if its a user table. We don't want to slow down
1289             // updates to meta tables, especially from internal updates (master, etc).
1290             if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1291               result = ResultStatsUtil.updateStats(result,
1292                   AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1293             }
1294 
1295             if (callback != null) {
1296               try {
1297                 //noinspection unchecked
1298                 // TODO: would callback expect a replica region name if it gets one?
1299                 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1300               } catch (Throwable t) {
1301                 LOG.error("User callback threw an exception for "
1302                     + Bytes.toStringBinary(regionName) + ", ignoring", t);
1303               }
1304             }
1305             setResult(sentAction, result);
1306           }
1307         }
1308       }
1309 
1310       // The failures global to a region. We will use for multiAction we sent previously to find the
1311       //   actions to replay.
1312       for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1313         throwable = throwableEntry.getValue();
1314         byte[] region = throwableEntry.getKey();
1315         List<Action<Row>> actions = multiAction.actions.get(region);
1316         if (actions == null || actions.isEmpty()) {
1317           throw new IllegalStateException("Wrong response for the region: " +
1318               HRegionInfo.encodeRegionName(region));
1319         }
1320 
1321         if (failureCount == 0) {
1322           errorsByServer.reportServerError(server);
1323           canRetry = errorsByServer.canRetryMore(numAttempt);
1324         }
1325         if (null == tableName) {
1326           // For multi-actions, we don't have a table name, but we want to make sure to clear the
1327           // cache in case there were location-related exceptions
1328           connection.clearCaches(server);
1329         } else {
1330           connection.updateCachedLocations(
1331               tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1332         }
1333         failureCount += actions.size();
1334 
1335         for (Action<Row> action : actions) {
1336           Row row = action.getAction();
1337           Retry retry = manageError(action.getOriginalIndex(), row,
1338               canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1339           if (retry == Retry.YES) {
1340             toReplay.add(action);
1341           } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1342             ++stopped;
1343           } else {
1344             ++failed;
1345           }
1346         }
1347       }
1348 
1349       if (toReplay.isEmpty()) {
1350         logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1351       } else {
1352         resubmit(server, toReplay, numAttempt, failureCount, throwable);
1353       }
1354     }
1355 
1356     private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1357         Throwable error, long backOffTime, boolean willRetry, String startTime,
1358         int failed, int stopped) {
1359       StringBuilder sb = new StringBuilder();
1360       sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1361         .append("attempt=").append(numAttempt)
1362         .append("/").append(numTries).append(" ");
1363 
1364       if (failureCount > 0 || error != null){
1365         sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1366             append(error == null ? "null" : error);
1367       } else {
1368         sb.append("succeeded");
1369       }
1370 
1371       sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1372 
1373       if (willRetry) {
1374         sb.append(", retrying after=").append(backOffTime).append("ms").
1375             append(", replay=").append(replaySize).append("ops");
1376       } else if (failureCount > 0) {
1377         if (stopped > 0) {
1378           sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1379         }
1380         if (failed > 0) {
1381           sb.append("; not retrying ").append(failed).append(" - final failure");
1382         }
1383 
1384       }
1385 
1386       return sb.toString();
1387     }
1388 
1389     /**
1390      * Sets the non-error result from a particular action.
1391      * @param action Action (request) that the server responded to.
1392      * @param result The result.
1393      */
1394     private void setResult(Action<Row> action, Object result) {
1395       if (result == null) {
1396         throw new RuntimeException("Result cannot be null");
1397       }
1398       ReplicaResultState state = null;
1399       boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1400       int index = action.getOriginalIndex();
1401       if (results == null) {
1402          decActionCounter(index);
1403          return; // Simple case, no replica requests.
1404       } else if ((state = trySetResultSimple(
1405           index, action.getAction(), false, result, null, isStale)) == null) {
1406         return; // Simple case, no replica requests.
1407       }
1408       assert state != null;
1409       // At this point we know that state is set to replica tracking class.
1410       // It could be that someone else is also looking at it; however, we know there can
1411       // only be one state object, and only one thread can set callCount to 0. Other threads
1412       // will either see state with callCount 0 after locking it; or will not see state at all
1413       // we will replace it with the result.
1414       synchronized (state) {
1415         if (state.callCount == 0) return; // someone already set the result
1416         state.callCount = 0;
1417       }
1418       synchronized (replicaResultLock) {
1419         if (results[index] != state) {
1420           throw new AssertionError("We set the callCount but someone else replaced the result");
1421         }
1422         results[index] = result;
1423       }
1424 
1425       decActionCounter(index);
1426     }
1427 
1428     /**
1429      * Sets the error from a particular action.
1430      * @param index Original action index.
1431      * @param row Original request.
1432      * @param throwable The resulting error.
1433      * @param server The source server.
1434      */
1435     private void setError(int index, Row row, Throwable throwable, ServerName server) {
1436       ReplicaResultState state = null;
1437       if (results == null) {
1438         // Note that we currently cannot have replica requests with null results. So it shouldn't
1439         // happen that multiple replica calls will call dAC for same actions with results == null.
1440         // Only one call per action should be present in this case.
1441         errors.add(throwable, row, server);
1442         decActionCounter(index);
1443         return; // Simple case, no replica requests.
1444       } else if ((state = trySetResultSimple(
1445           index, row, true, throwable, server, false)) == null) {
1446         return; // Simple case, no replica requests.
1447       }
1448       assert state != null;
1449       BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1450       boolean isActionDone = false;
1451       synchronized (state) {
1452         switch (state.callCount) {
1453           case 0: return; // someone already set the result
1454           case 1: { // All calls failed, we are the last error.
1455             target = errors;
1456             isActionDone = true;
1457             break;
1458           }
1459           default: {
1460             assert state.callCount > 1;
1461             if (state.replicaErrors == null) {
1462               state.replicaErrors = new BatchErrors();
1463             }
1464             target = state.replicaErrors;
1465             break;
1466           }
1467         }
1468         --state.callCount;
1469       }
1470       target.add(throwable, row, server);
1471       if (isActionDone) {
1472         if (state.replicaErrors != null) { // last call, no need to lock
1473           errors.merge(state.replicaErrors);
1474         }
1475         // See setResult for explanations.
1476         synchronized (replicaResultLock) {
1477           if (results[index] != state) {
1478             throw new AssertionError("We set the callCount but someone else replaced the result");
1479           }
1480           results[index] = throwable;
1481         }
1482         decActionCounter(index);
1483       }
1484     }
1485 
1486     /**
1487      * Checks if the action is complete; used on error to prevent needless retries.
1488      * Does not synchronize, assuming element index/field accesses are atomic.
1489      * This is an opportunistic optimization check, doesn't have to be strict.
1490      * @param index Original action index.
1491      * @param row Original request.
1492      */
1493     private boolean isActionComplete(int index, Row row) {
1494       if (!isReplicaGet(row)) return false;
1495       Object resObj = results[index];
1496       return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1497           || ((ReplicaResultState)resObj).callCount == 0);
1498     }
1499 
1500     /**
1501      * Tries to set the result or error for a particular action as if there were no replica calls.
1502      * @return null if successful; replica state if there were in fact replica calls.
1503      */
1504     private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1505         Object result, ServerName server, boolean isFromReplica) {
1506       Object resObj = null;
1507       if (!isReplicaGet(row)) {
1508         if (isFromReplica) {
1509           throw new AssertionError("Unexpected stale result for " + row);
1510         }
1511         results[index] = result;
1512       } else {
1513         synchronized (replicaResultLock) {
1514           if ((resObj = results[index]) == null) {
1515             if (isFromReplica) {
1516               throw new AssertionError("Unexpected stale result for " + row);
1517             }
1518             results[index] = result;
1519           }
1520         }
1521       }
1522 
1523       ReplicaResultState rrs =
1524           (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1525       if (rrs == null && isError) {
1526         // The resObj is not replica state (null or already set).
1527         errors.add((Throwable)result, row, server);
1528       }
1529 
1530       if (resObj == null) {
1531         // resObj is null - no replica calls were made.
1532         decActionCounter(index);
1533         return null;
1534       }
1535       return rrs;
1536     }
1537 
1538     private void decActionCounter(int index) {
1539       long actionsRemaining = actionsInProgress.decrementAndGet();
1540       if (actionsRemaining < 0) {
1541         String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1542         throw new AssertionError(error);
1543       } else if (actionsRemaining == 0) {
1544         synchronized (actionsInProgress) {
1545           actionsInProgress.notifyAll();
1546         }
1547       }
1548     }
1549 
1550     private String buildDetailedErrorMsg(String string, int index) {
1551       String error = string + "; called for " + index +
1552           ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
1553       if (replicaGetIndices != null) {
1554         for (int i = 0; i < replicaGetIndices.length; ++i) {
1555           error += replicaGetIndices[i] + ", ";
1556         }
1557       } else {
1558         error += (hasAnyReplicaGets ? "all" : "none");
1559       }
1560       error += "; results ";
1561       if (results != null) {
1562         for (int i = 0; i < results.length; ++i) {
1563           Object o = results[i];
1564           error += ((o == null) ? "null" : o.toString()) + ", ";
1565         }
1566       }
1567       return error;
1568     }
1569 
1570     @Override
1571     public void waitUntilDone() throws InterruptedIOException {
1572       try {
1573         waitUntilDone(Long.MAX_VALUE);
1574       } catch (InterruptedException iex) {
1575         throw new InterruptedIOException(iex.getMessage());
1576       } finally {
1577         if (callsInProgress != null) {
1578           for (MultiServerCallable<Row> clb : callsInProgress) {
1579             clb.cancel();
1580           }
1581         }
1582       }
1583     }
1584 
1585     private boolean waitUntilDone(long cutoff) throws InterruptedException {
1586       boolean hasWait = cutoff != Long.MAX_VALUE;
1587       long lastLog = EnvironmentEdgeManager.currentTime();
1588       long currentInProgress;
1589       while (0 != (currentInProgress = actionsInProgress.get())) {
1590         long now = EnvironmentEdgeManager.currentTime();
1591         if (hasWait && (now * 1000L) > cutoff) {
1592           return false;
1593         }
1594         if (!hasWait) { // Only log if wait is infinite.
1595           if (now > lastLog + 10000) {
1596             lastLog = now;
1597             LOG.info("#" + id + ", waiting for " + currentInProgress + "  actions to finish");
1598           }
1599         }
1600         synchronized (actionsInProgress) {
1601           if (actionsInProgress.get() == 0) break;
1602           if (!hasWait) {
1603             actionsInProgress.wait(10);
1604           } else {
1605             long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1606             TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1607           }
1608         }
1609       }
1610       return true;
1611     }
1612 
1613     @Override
1614     public boolean hasError() {
1615       return errors.hasErrors();
1616     }
1617 
1618     @Override
1619     public List<? extends Row> getFailedOperations() {
1620       return errors.actions;
1621     }
1622 
1623     @Override
1624     public RetriesExhaustedWithDetailsException getErrors() {
1625       return errors.makeException();
1626     }
1627 
1628     @Override
1629     public Object[] getResults() throws InterruptedIOException {
1630       waitUntilDone();
1631       return results;
1632     }
1633   }
1634 
1635   @VisibleForTesting
1636   /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */
1637   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1638       TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1639       Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1640     return new AsyncRequestFutureImpl<CResult>(
1641         tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1642   }
1643 
1644   /**
1645    * Create a callable. Isolated to be easily overridden in the tests.
1646    */
1647   @VisibleForTesting
1648   protected MultiServerCallable<Row> createCallable(final ServerName server,
1649       TableName tableName, final MultiAction<Row> multi) {
1650     return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1651   }
1652 
1653   /**
1654    * Create a caller. Isolated to be easily overridden in the tests.
1655    */
1656   @VisibleForTesting
1657   protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1658     return rpcCallerFactory.<MultiResponse> newCaller();
1659   }
1660 
1661   @VisibleForTesting
1662   /** Waits until all outstanding tasks are done. Used in tests. */
1663   void waitUntilDone() throws InterruptedIOException {
1664     waitForMaximumCurrentTasks(0, null);
1665   }
1666 
1667   /** Wait until the async does not have more than max tasks in progress. */
1668   private void waitForMaximumCurrentTasks(int max, String tableName)
1669       throws InterruptedIOException {
1670     waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1671   }
1672 
1673   // Break out this method so testable
1674   @VisibleForTesting
1675   void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1676       String tableName) throws InterruptedIOException {
1677     long lastLog = EnvironmentEdgeManager.currentTime();
1678     long currentInProgress, oldInProgress = Long.MAX_VALUE;
1679     while ((currentInProgress = tasksInProgress.get()) > max) {
1680       if (oldInProgress != currentInProgress) { // Wait for in progress to change.
1681         long now = EnvironmentEdgeManager.currentTime();
1682         if (now > lastLog + 10000) {
1683           lastLog = now;
1684           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1685               + max + ", tasksInProgress=" + currentInProgress +
1686               " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
1687           if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1688             logDetailsOfUndoneTasks(currentInProgress);
1689           }
1690         }
1691       }
1692       oldInProgress = currentInProgress;
1693       try {
1694         synchronized (tasksInProgress) {
1695           if (tasksInProgress.get() == oldInProgress) {
1696             tasksInProgress.wait(10);
1697           }
1698         }
1699       } catch (InterruptedException e) {
1700         throw new InterruptedIOException("#" + id + ", interrupted." +
1701             " currentNumberOfTask=" + currentInProgress);
1702       }
1703     }
1704   }
1705 
1706   private void logDetailsOfUndoneTasks(long taskInProgress) {
1707     ArrayList<ServerName> servers = new ArrayList<ServerName>();
1708     for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1709       if (entry.getValue().get() > 0) {
1710         servers.add(entry.getKey());
1711       }
1712     }
1713     LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1714     if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1715       ArrayList<String> regions = new ArrayList<String>();
1716       for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1717         if (entry.getValue().get() > 0) {
1718           regions.add(Bytes.toString(entry.getKey()));
1719         }
1720       }
1721       LOG.info("Regions against which left over task(s) are processed: " + regions);
1722     }
1723   }
1724 
1725   /**
1726    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1727    * @return Whether there were any errors in any request since the last time
1728    *          {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created.
1729    */
1730   public boolean hasError() {
1731     return globalErrors.hasErrors();
1732   }
1733 
1734   /**
1735    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
1736    * Waits for all previous operations to finish, and returns errors and (optionally)
1737    * failed operations themselves.
1738    * @param failedRows an optional list into which the rows that failed since the last time
1739    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
1740    * @param tableName name of the table
1741    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
1742    *          was called, or AP was created.
1743    */
1744   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1745       List<Row> failedRows, String tableName) throws InterruptedIOException {
1746     waitForMaximumCurrentTasks(0, tableName);
1747     if (!globalErrors.hasErrors()) {
1748       return null;
1749     }
1750     if (failedRows != null) {
1751       failedRows.addAll(globalErrors.actions);
1752     }
1753     RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1754     globalErrors.clear();
1755     return result;
1756   }
1757 
1758   /**
1759    * increment the tasks counters for a given set of regions. MT safe.
1760    */
1761   protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1762     tasksInProgress.incrementAndGet();
1763 
1764     AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1765     if (serverCnt == null) {
1766       taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1767       serverCnt = taskCounterPerServer.get(sn);
1768     }
1769     serverCnt.incrementAndGet();
1770 
1771     for (byte[] regBytes : regions) {
1772       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1773       if (regionCnt == null) {
1774         regionCnt = new AtomicInteger();
1775         AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1776         if (oldCnt != null) {
1777           regionCnt = oldCnt;
1778         }
1779       }
1780       regionCnt.incrementAndGet();
1781     }
1782   }
1783 
1784   /**
1785    * Decrements the counters for a given region and the region server. MT Safe.
1786    */
1787   protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1788     for (byte[] regBytes : regions) {
1789       AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1790       regionCnt.decrementAndGet();
1791     }
1792 
1793     taskCounterPerServer.get(sn).decrementAndGet();
1794     tasksInProgress.decrementAndGet();
1795     synchronized (tasksInProgress) {
1796       tasksInProgress.notifyAll();
1797     }
1798   }
1799 
1800   /**
1801    * Creates the server error tracker to use inside process.
1802    * Currently, to preserve the main assumption about current retries, and to work well with
1803    * the retry-limit-based calculation, the calculation is local per Process object.
1804    * We may benefit from connection-wide tracking of server errors.
1805    * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1806    */
1807   protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1808     return new ConnectionManager.ServerErrorTracker(
1809         this.serverTrackerTimeout, this.numTries);
1810   }
1811 
1812   private static boolean isReplicaGet(Row row) {
1813     return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1814   }
1815 
1816   /**
1817    * For manageError. Only used to make logging more clear, we don't actually care why we don't retry.
1818    */
1819   private enum Retry {
1820     YES,
1821     NO_LOCATION_PROBLEM,
1822     NO_NOT_RETRIABLE,
1823     NO_RETRIES_EXHAUSTED,
1824     NO_OTHER_SUCCEEDED
1825   }
1826 }