1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.ConcurrentSkipListMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.hbase.classification.InterfaceAudience;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HRegionLocation;
50 import org.apache.hadoop.hbase.RegionLocations;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.TableName;
53 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch;
55 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
56 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
59 import org.apache.htrace.Trace;
60
61 import com.google.common.annotations.VisibleForTesting;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @InterfaceAudience.Private
99 class AsyncProcess {
100 protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
101 protected static final AtomicLong COUNTER = new AtomicLong();
102
103 public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
104
105
106
107
108
109
110
111 public static final String START_LOG_ERRORS_AFTER_COUNT_KEY =
112 "hbase.client.start.log.errors.counter";
113 public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
114
115 private final int thresholdToLogUndoneTaskDetails;
116 private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
117 "hbase.client.threshold.log.details";
118 private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
119 private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
120
121
122
123
124
125
126
127 public static interface AsyncRequestFuture {
128 public boolean hasError();
129 public RetriesExhaustedWithDetailsException getErrors();
130 public List<? extends Row> getFailedOperations();
131 public Object[] getResults() throws InterruptedIOException;
132
133 public void waitUntilDone() throws InterruptedIOException;
134 }
135
136
137 private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture() {
138 public final Object[] result = new Object[0];
139 @Override
140 public boolean hasError() { return false; }
141 @Override
142 public RetriesExhaustedWithDetailsException getErrors() { return null; }
143 @Override
144 public List<? extends Row> getFailedOperations() { return null; }
145 @Override
146 public Object[] getResults() { return result; }
147 @Override
148 public void waitUntilDone() throws InterruptedIOException {}
149 };
150
151
152
153
154
155 private static class ReplicaResultState {
156 public ReplicaResultState(int callCount) {
157 this.callCount = callCount;
158 }
159
160
161 int callCount;
162
163
164 BatchErrors replicaErrors = null;
165
166 @Override
167 public String toString() {
168 return "[call count " + callCount + "; errors " + replicaErrors + "]";
169 }
170 }
171
172
173
174 protected final long id;
175
176 protected final ClusterConnection connection;
177 protected final RpcRetryingCallerFactory rpcCallerFactory;
178 protected final RpcControllerFactory rpcFactory;
179 protected final BatchErrors globalErrors;
180 protected final ExecutorService pool;
181
182 protected final AtomicLong tasksInProgress = new AtomicLong(0);
183 protected final ConcurrentMap<byte[], AtomicInteger> taskCounterPerRegion =
184 new ConcurrentSkipListMap<byte[], AtomicInteger>(Bytes.BYTES_COMPARATOR);
185 protected final ConcurrentMap<ServerName, AtomicInteger> taskCounterPerServer =
186 new ConcurrentHashMap<ServerName, AtomicInteger>();
187
188
189 private final int startLogErrorsCnt;
190
191
192
193
194 protected final int maxTotalConcurrentTasks;
195
196
197
198
199
200
201
202 protected final int maxConcurrentTasksPerRegion;
203
204
205
206
207 protected final int maxConcurrentTasksPerServer;
208 protected final long pause;
209 protected int numTries;
210 protected int serverTrackerTimeout;
211 protected int timeout;
212 protected long primaryCallTimeoutMicroseconds;
213
214
215 protected static class BatchErrors {
216 private final List<Throwable> throwables = new ArrayList<Throwable>();
217 private final List<Row> actions = new ArrayList<Row>();
218 private final List<String> addresses = new ArrayList<String>();
219
220 public synchronized void add(Throwable ex, Row row, ServerName serverName) {
221 if (row == null){
222 throw new IllegalArgumentException("row cannot be null. location=" + serverName);
223 }
224
225 throwables.add(ex);
226 actions.add(row);
227 addresses.add(serverName != null ? serverName.toString() : "null");
228 }
229
230 public boolean hasErrors() {
231 return !throwables.isEmpty();
232 }
233
234 private synchronized RetriesExhaustedWithDetailsException makeException() {
235 return new RetriesExhaustedWithDetailsException(
236 new ArrayList<Throwable>(throwables),
237 new ArrayList<Row>(actions), new ArrayList<String>(addresses));
238 }
239
240 public synchronized void clear() {
241 throwables.clear();
242 actions.clear();
243 addresses.clear();
244 }
245
246 public synchronized void merge(BatchErrors other) {
247 throwables.addAll(other.throwables);
248 actions.addAll(other.actions);
249 addresses.addAll(other.addresses);
250 }
251 }
252
253 public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
254 RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) {
255 if (hc == null) {
256 throw new IllegalArgumentException("HConnection cannot be null.");
257 }
258
259 this.connection = hc;
260 this.pool = pool;
261 this.globalErrors = useGlobalErrors ? new BatchErrors() : null;
262
263 this.id = COUNTER.incrementAndGet();
264
265 this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
266 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
267 this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
268 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
269 this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
270 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
271 this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
272
273 this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
274 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
275 this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
276 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
277 this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
278 HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
279
280 this.startLogErrorsCnt =
281 conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
282
283 if (this.maxTotalConcurrentTasks <= 0) {
284 throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
285 }
286 if (this.maxConcurrentTasksPerServer <= 0) {
287 throw new IllegalArgumentException("maxConcurrentTasksPerServer=" +
288 maxConcurrentTasksPerServer);
289 }
290 if (this.maxConcurrentTasksPerRegion <= 0) {
291 throw new IllegalArgumentException("maxConcurrentTasksPerRegion=" +
292 maxConcurrentTasksPerRegion);
293 }
294
295
296
297
298
299
300
301
302 this.serverTrackerTimeout = 0;
303 for (int i = 0; i < this.numTries; ++i) {
304 serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
305 }
306
307 this.rpcCallerFactory = rpcCaller;
308 this.rpcFactory = rpcFactory;
309 this.thresholdToLogUndoneTaskDetails =
310 conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
311 DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
312 }
313
314
315
316
317
318 private ExecutorService getPool(ExecutorService pool) {
319 if (pool != null) return pool;
320 if (this.pool != null) return this.pool;
321 throw new RuntimeException("Neither AsyncProcess nor request have ExecutorService");
322 }
323
324
325
326
327
328 public <CResult> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows,
329 boolean atLeastOne, Batch.Callback<CResult> callback, boolean needResults)
330 throws InterruptedIOException {
331 return submit(null, tableName, rows, atLeastOne, callback, needResults);
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346 public <CResult> AsyncRequestFuture submit(ExecutorService pool, TableName tableName,
347 List<? extends Row> rows, boolean atLeastOne, Batch.Callback<CResult> callback,
348 boolean needResults) throws InterruptedIOException {
349 if (rows.isEmpty()) {
350 return NO_REQS_RESULT;
351 }
352
353 Map<ServerName, MultiAction<Row>> actionsByServer =
354 new HashMap<ServerName, MultiAction<Row>>();
355 List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
356
357 NonceGenerator ng = this.connection.getNonceGenerator();
358 long nonceGroup = ng.getNonceGroup();
359
360
361 List<Exception> locationErrors = null;
362 List<Integer> locationErrorRows = null;
363 do {
364
365 waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
366
367
368
369 Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
370 Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();
371
372 int posInList = -1;
373 Iterator<? extends Row> it = rows.iterator();
374 while (it.hasNext()) {
375 Row r = it.next();
376 HRegionLocation loc;
377 try {
378 if (r == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
379
380 RegionLocations locs = connection.locateRegion(
381 tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
382 if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
383 throw new IOException("#" + id + ", no location found, aborting submit for"
384 + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
385 }
386 loc = locs.getDefaultRegionLocation();
387 } catch (IOException ex) {
388 locationErrors = new ArrayList<Exception>();
389 locationErrorRows = new ArrayList<Integer>();
390 LOG.error("Failed to get region location ", ex);
391
392
393 retainedActions.add(new Action<Row>(r, ++posInList));
394 locationErrors.add(ex);
395 locationErrorRows.add(posInList);
396 it.remove();
397 break;
398 }
399
400 if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
401 Action<Row> action = new Action<Row>(r, ++posInList);
402 setNonce(ng, r, action);
403 retainedActions.add(action);
404
405 byte[] regionName = loc.getRegionInfo().getRegionName();
406 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
407 it.remove();
408 }
409 }
410 } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null));
411
412 if (retainedActions.isEmpty()) return NO_REQS_RESULT;
413
414 return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
415 locationErrors, locationErrorRows, actionsByServer, pool);
416 }
417
418 <CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
419 List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
420 Object[] results, boolean needResults, List<Exception> locationErrors,
421 List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
422 ExecutorService pool) {
423 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
424 tableName, retainedActions, nonceGroup, pool, callback, results, needResults);
425
426 if (locationErrors != null) {
427 for (int i = 0; i < locationErrors.size(); ++i) {
428 int originalIndex = locationErrorRows.get(i);
429 Row row = retainedActions.get(originalIndex).getAction();
430 ars.manageError(originalIndex, row,
431 Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
432 }
433 }
434 ars.sendMultiAction(actionsByServer, 1, null, false);
435 return ars;
436 }
437
438
439
440
441
442
443
444
445
446 private static void addAction(ServerName server, byte[] regionName, Action<Row> action,
447 Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
448 MultiAction<Row> multiAction = actionsByServer.get(server);
449 if (multiAction == null) {
450 multiAction = new MultiAction<Row>();
451 actionsByServer.put(server, multiAction);
452 }
453 if (action.hasNonce() && !multiAction.hasNonceGroup()) {
454 multiAction.setNonceGroup(nonceGroup);
455 }
456
457 multiAction.add(regionName, action);
458 }
459
460
461
462
463
464
465
466
467
468 protected boolean canTakeOperation(HRegionLocation loc,
469 Map<Long, Boolean> regionsIncluded,
470 Map<ServerName, Boolean> serversIncluded) {
471 long regionId = loc.getRegionInfo().getRegionId();
472 Boolean regionPrevious = regionsIncluded.get(regionId);
473
474 if (regionPrevious != null) {
475
476 return regionPrevious;
477 }
478
479 Boolean serverPrevious = serversIncluded.get(loc.getServerName());
480 if (Boolean.FALSE.equals(serverPrevious)) {
481
482 regionsIncluded.put(regionId, Boolean.FALSE);
483 return false;
484 }
485
486 AtomicInteger regionCnt = taskCounterPerRegion.get(loc.getRegionInfo().getRegionName());
487 if (regionCnt != null && regionCnt.get() >= maxConcurrentTasksPerRegion) {
488
489 regionsIncluded.put(regionId, Boolean.FALSE);
490 return false;
491 }
492
493 if (serverPrevious == null) {
494
495 int newServers = 0;
496 for (Map.Entry<ServerName, Boolean> kv : serversIncluded.entrySet()) {
497 if (kv.getValue()) {
498 newServers++;
499 }
500 }
501
502
503 boolean ok = (newServers + tasksInProgress.get()) < maxTotalConcurrentTasks;
504
505 if (ok) {
506
507 AtomicInteger serverCnt = taskCounterPerServer.get(loc.getServerName());
508 ok = (serverCnt == null || serverCnt.get() < maxConcurrentTasksPerServer);
509 }
510
511 if (!ok) {
512 regionsIncluded.put(regionId, Boolean.FALSE);
513 serversIncluded.put(loc.getServerName(), Boolean.FALSE);
514 return false;
515 }
516
517 serversIncluded.put(loc.getServerName(), Boolean.TRUE);
518 } else {
519 assert serverPrevious.equals(Boolean.TRUE);
520 }
521
522 regionsIncluded.put(regionId, Boolean.TRUE);
523
524 return true;
525 }
526
527
528
529
530
531 public <CResult> AsyncRequestFuture submitAll(TableName tableName,
532 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
533 return submitAll(null, tableName, rows, callback, results);
534 }
535
536
537
538
539
540
541
542
543
544
545
546 public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
547 List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
548 List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
549
550
551 int posInList = -1;
552 NonceGenerator ng = this.connection.getNonceGenerator();
553 for (Row r : rows) {
554 posInList++;
555 if (r instanceof Put) {
556 Put put = (Put) r;
557 if (put.isEmpty()) {
558 throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
559 }
560 }
561 Action<Row> action = new Action<Row>(r, posInList);
562 setNonce(ng, r, action);
563 actions.add(action);
564 }
565 AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
566 tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null);
567 ars.groupAndSendMultiAction(actions, 1);
568 return ars;
569 }
570
571 private static void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
572 if (!(r instanceof Append) && !(r instanceof Increment)) return;
573 action.setNonce(ng.newNonce());
574 }
575
576
577
578
579
580
581
582
583
584
585 protected class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
586
587
588
589
590
591
592
593
594 private final class ReplicaCallIssuingRunnable implements Runnable {
595 private final long startTime;
596 private final List<Action<Row>> initialActions;
597
598 public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
599 this.initialActions = initialActions;
600 this.startTime = startTime;
601 }
602
603 @Override
604 public void run() {
605 boolean done = false;
606 if (primaryCallTimeoutMicroseconds > 0) {
607 try {
608 done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
609 } catch (InterruptedException ex) {
610 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
611 return;
612 }
613 }
614 if (done) return;
615 Map<ServerName, MultiAction<Row>> actionsByServer =
616 new HashMap<ServerName, MultiAction<Row>>();
617 List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
618 if (replicaGetIndices == null) {
619 for (int i = 0; i < results.length; ++i) {
620 addReplicaActions(i, actionsByServer, unknownLocActions);
621 }
622 } else {
623 for (int replicaGetIndice : replicaGetIndices) {
624 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
625 }
626 }
627 if (!actionsByServer.isEmpty()) {
628 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
629 }
630 if (!unknownLocActions.isEmpty()) {
631 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
632 for (Action<Row> action : unknownLocActions) {
633 addReplicaActionsAgain(action, actionsByServer);
634 }
635
636 if (!actionsByServer.isEmpty()) {
637 sendMultiAction(actionsByServer, 1, null, true);
638 }
639 }
640 }
641
642
643
644
645
646
647 private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
648 List<Action<Row>> unknownReplicaActions) {
649 if (results[index] != null) return;
650 Action<Row> action = initialActions.get(index);
651 RegionLocations loc = findAllLocationsOrFail(action, true);
652 if (loc == null) return;
653 HRegionLocation[] locs = loc.getRegionLocations();
654 if (locs.length == 1) {
655 LOG.warn("No replicas found for " + action.getAction());
656 return;
657 }
658 synchronized (replicaResultLock) {
659
660
661
662 if (results[index] != null) return;
663
664
665 results[index] = new ReplicaResultState(locs.length);
666 }
667 for (int i = 1; i < locs.length; ++i) {
668 Action<Row> replicaAction = new Action<Row>(action, i);
669 if (locs[i] != null) {
670 addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
671 replicaAction, actionsByServer, nonceGroup);
672 } else {
673 unknownReplicaActions.add(replicaAction);
674 }
675 }
676 }
677
678 private void addReplicaActionsAgain(
679 Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
680 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
681 throw new AssertionError("Cannot have default replica here");
682 }
683 HRegionLocation loc = getReplicaLocationOrFail(action);
684 if (loc == null) return;
685 addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
686 action, actionsByServer, nonceGroup);
687 }
688 }
689
690
691
692
693
694 private final class SingleServerRequestRunnable implements Runnable {
695 private final MultiAction<Row> multiAction;
696 private final int numAttempt;
697 private final ServerName server;
698 private final Set<MultiServerCallable<Row>> callsInProgress;
699
700 private SingleServerRequestRunnable(
701 MultiAction<Row> multiAction, int numAttempt, ServerName server,
702 Set<MultiServerCallable<Row>> callsInProgress) {
703 this.multiAction = multiAction;
704 this.numAttempt = numAttempt;
705 this.server = server;
706 this.callsInProgress = callsInProgress;
707 }
708
709 @Override
710 public void run() {
711 MultiResponse res;
712 MultiServerCallable<Row> callable = null;
713 try {
714 callable = createCallable(server, tableName, multiAction);
715 try {
716 RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
717 if (callsInProgress != null) callsInProgress.add(callable);
718 res = caller.callWithoutRetries(callable, timeout);
719
720 if (res == null) {
721
722 return;
723 }
724
725 } catch (IOException e) {
726
727
728 receiveGlobalFailure(multiAction, server, numAttempt, e);
729 return;
730 } catch (Throwable t) {
731
732 LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." +
733 " Retrying. Server is " + server + ", tableName=" + tableName, t);
734 receiveGlobalFailure(multiAction, server, numAttempt, t);
735 return;
736 }
737
738
739 receiveMultiAction(multiAction, server, res, numAttempt);
740 } catch (Throwable t) {
741
742 LOG.error("Internal AsyncProcess #" + id + " error for "
743 + tableName + " processing for " + server, t);
744 throw new RuntimeException(t);
745 } finally {
746 decTaskCounters(multiAction.getRegions(), server);
747 if (callsInProgress != null && callable != null) {
748 callsInProgress.remove(callable);
749 }
750 }
751 }
752 }
753
754 private final Batch.Callback<CResult> callback;
755 private final BatchErrors errors;
756 private final ConnectionManager.ServerErrorTracker errorsByServer;
757 private final ExecutorService pool;
758 private final Set<MultiServerCallable<Row>> callsInProgress;
759
760
761 private final TableName tableName;
762 private final AtomicLong actionsInProgress = new AtomicLong(-1);
763
764
765
766 private final Object replicaResultLock = new Object();
767
768
769
770
771
772
773
774 private final Object[] results;
775
776 private final int[] replicaGetIndices;
777 private final boolean hasAnyReplicaGets;
778 private final long nonceGroup;
779
780 public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
781 ExecutorService pool, boolean needResults, Object[] results,
782 Batch.Callback<CResult> callback) {
783 this.pool = pool;
784 this.callback = callback;
785 this.nonceGroup = nonceGroup;
786 this.tableName = tableName;
787 this.actionsInProgress.set(actions.size());
788 if (results != null) {
789 assert needResults;
790 if (results.length != actions.size()) throw new AssertionError("results.length");
791 this.results = results;
792 for (int i = 0; i != this.results.length; ++i) {
793 results[i] = null;
794 }
795 } else {
796 this.results = needResults ? new Object[actions.size()] : null;
797 }
798 List<Integer> replicaGetIndices = null;
799 boolean hasAnyReplicaGets = false;
800 if (needResults) {
801
802
803
804
805
806 boolean hasAnyNonReplicaReqs = false;
807 int posInList = 0;
808 for (Action<Row> action : actions) {
809 boolean isReplicaGet = isReplicaGet(action.getAction());
810 if (isReplicaGet) {
811 hasAnyReplicaGets = true;
812 if (hasAnyNonReplicaReqs) {
813 if (replicaGetIndices == null) {
814 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
815 }
816 replicaGetIndices.add(posInList);
817 }
818 } else if (!hasAnyNonReplicaReqs) {
819
820 hasAnyNonReplicaReqs = true;
821 if (posInList > 0) {
822
823
824 replicaGetIndices = new ArrayList<Integer>(actions.size() - 1);
825 for (int i = 0; i < posInList; ++i) {
826 replicaGetIndices.add(i);
827 }
828 }
829 }
830 ++posInList;
831 }
832 }
833 this.hasAnyReplicaGets = hasAnyReplicaGets;
834 if (replicaGetIndices != null) {
835 this.replicaGetIndices = new int[replicaGetIndices.size()];
836 int i = 0;
837 for (Integer el : replicaGetIndices) {
838 this.replicaGetIndices[i++] = el;
839 }
840 } else {
841 this.replicaGetIndices = null;
842 }
843 this.callsInProgress = !hasAnyReplicaGets ? null :
844 Collections.newSetFromMap(new ConcurrentHashMap<MultiServerCallable<Row>, Boolean>());
845
846 this.errorsByServer = createServerErrorTracker();
847 this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
848 }
849
850 public Set<MultiServerCallable<Row>> getCallsInProgress() {
851 return callsInProgress;
852 }
853
854
855
856
857
858
859
860 private void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
861 Map<ServerName, MultiAction<Row>> actionsByServer =
862 new HashMap<ServerName, MultiAction<Row>>();
863
864 boolean isReplica = false;
865 List<Action<Row>> unknownReplicaActions = null;
866 for (Action<Row> action : currentActions) {
867 RegionLocations locs = findAllLocationsOrFail(action, true);
868 if (locs == null) continue;
869 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
870 if (isReplica && !isReplicaAction) {
871
872 throw new AssertionError("Replica and non-replica actions in the same retry");
873 }
874 isReplica = isReplicaAction;
875 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
876 if (loc == null || loc.getServerName() == null) {
877 if (isReplica) {
878 if (unknownReplicaActions == null) {
879 unknownReplicaActions = new ArrayList<Action<Row>>();
880 }
881 unknownReplicaActions.add(action);
882 } else {
883 if (LOG.isInfoEnabled()) {
884 LOG.info("Failed to find location: " + loc + " for replica: " + action.getReplicaId() + " and action: " + action.getAction());
885 }
886
887 manageLocationError(action, null);
888 }
889 } else {
890 byte[] regionName = loc.getRegionInfo().getRegionName();
891 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
892 }
893 }
894 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
895 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
896
897 if (!actionsByServer.isEmpty()) {
898
899 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
900 ? currentActions : null, numAttempt > 1 && !hasUnknown);
901 }
902
903 if (hasUnknown) {
904 actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
905 for (Action<Row> action : unknownReplicaActions) {
906 HRegionLocation loc = getReplicaLocationOrFail(action);
907 if (loc == null) continue;
908 byte[] regionName = loc.getRegionInfo().getRegionName();
909 addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
910 }
911 if (!actionsByServer.isEmpty()) {
912 sendMultiAction(
913 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
914 }
915 }
916 }
917
918 private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
919
920
921 int replicaId = action.getReplicaId();
922 RegionLocations locs = findAllLocationsOrFail(action, true);
923 if (locs == null) return null;
924 HRegionLocation loc = locs.getRegionLocation(replicaId);
925 if (loc == null || loc.getServerName() == null) {
926 locs = findAllLocationsOrFail(action, false);
927 if (locs == null) return null;
928 loc = locs.getRegionLocation(replicaId);
929 }
930 if (loc == null || loc.getServerName() == null) {
931 if (LOG.isInfoEnabled()) {
932 LOG.info("Failed to find location: " + loc + " for replica: " + replicaId + " and action: " + action.getAction());
933 }
934 manageLocationError(action, null);
935 return null;
936 }
937 return loc;
938 }
939
940 private void manageLocationError(Action<Row> action, Exception ex) {
941 String msg = "Cannot get replica " + action.getReplicaId()
942 + " location for " + action.getAction();
943 if (ex == null) {
944 LOG.error(msg);
945 ex = new IOException(msg);
946 } else {
947 LOG.error(msg, ex);
948 }
949 LOG.error(msg, ex);
950 manageError(action.getOriginalIndex(), action.getAction(),
951 Retry.NO_LOCATION_PROBLEM, ex, null);
952 }
953
954 private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
955 if (action.getAction() == null) throw new IllegalArgumentException("#" + id +
956 ", row cannot be null");
957 RegionLocations loc = null;
958 try {
959 loc = connection.locateRegion(
960 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
961 } catch (IOException ex) {
962 manageLocationError(action, ex);
963 }
964 return loc;
965 }
966
967
968
969
970
971
972
973
974
975 private void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
976 int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
977
978
979 int actionsRemaining = actionsByServer.size();
980
981 for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
982 ServerName server = e.getKey();
983 MultiAction<Row> multiAction = e.getValue();
984 incTaskCounters(multiAction.getRegions(), server);
985 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
986 numAttempt);
987
988
989 if (runnables.size() > actionsRemaining) {
990 actionsRemaining = runnables.size();
991 }
992
993
994 for (Runnable runnable : runnables) {
995 if ((--actionsRemaining == 0) && reuseThread) {
996 runnable.run();
997 } else {
998 try {
999 pool.submit(runnable);
1000 } catch (Throwable t) {
1001 if (t instanceof RejectedExecutionException) {
1002
1003
1004 LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
1005 " Server is " + server.getServerName(), t);
1006 } else {
1007
1008 LOG.warn("Caught unexpected exception/error: ", t);
1009 }
1010 decTaskCounters(multiAction.getRegions(), server);
1011
1012
1013 receiveGlobalFailure(multiAction, server, numAttempt, t);
1014 }
1015 }
1016 }
1017 }
1018
1019 if (actionsForReplicaThread != null) {
1020 startWaitingForReplicaCalls(actionsForReplicaThread);
1021 }
1022 }
1023
1024 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
1025 MultiAction<Row> multiAction,
1026 int numAttempt) {
1027
1028 if (AsyncProcess.this.connection.getStatisticsTracker() == null) {
1029 return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction",
1030 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)));
1031 }
1032
1033
1034 Map<Long, DelayingRunner> actions = new HashMap<Long, DelayingRunner>(multiAction
1035 .size());
1036
1037
1038 for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
1039 Long backoff = getBackoff(server, e.getKey());
1040 DelayingRunner runner = actions.get(backoff);
1041 if (runner == null) {
1042 actions.put(backoff, new DelayingRunner(backoff, e));
1043 } else {
1044 runner.add(e);
1045 }
1046 }
1047
1048 List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
1049 for (DelayingRunner runner : actions.values()) {
1050 String traceText = "AsyncProcess.sendMultiAction";
1051 Runnable runnable =
1052 new SingleServerRequestRunnable(runner.getActions(), numAttempt, server,
1053 callsInProgress);
1054
1055 if (runner.getSleepTime() > 0) {
1056 runner.setRunner(runnable);
1057 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
1058 runnable = runner;
1059 }
1060 runnable = Trace.wrap(traceText, runnable);
1061 toReturn.add(runnable);
1062
1063 }
1064 return toReturn;
1065 }
1066
1067
1068
1069
1070
1071
1072
1073 private Long getBackoff(ServerName server, byte[] regionName) {
1074 ServerStatisticTracker tracker = AsyncProcess.this.connection.getStatisticsTracker();
1075 ServerStatistics stats = tracker.getStats(server);
1076 return AsyncProcess.this.connection.getBackoffPolicy()
1077 .getBackoffTime(server, regionName, stats);
1078 }
1079
1080
1081
1082
1083 private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
1084 long startTime = EnvironmentEdgeManager.currentTime();
1085 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
1086 actionsForReplicaThread, startTime);
1087 if (primaryCallTimeoutMicroseconds == 0) {
1088
1089 replicaRunnable.run();
1090 } else {
1091
1092
1093 try {
1094 pool.submit(replicaRunnable);
1095 } catch (RejectedExecutionException ree) {
1096 LOG.warn("#" + id + ", replica task was rejected by the pool - no replica calls", ree);
1097 }
1098 }
1099 }
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111 public Retry manageError(int originalIndex, Row row, Retry canRetry,
1112 Throwable throwable, ServerName server) {
1113 if (canRetry == Retry.YES
1114 && throwable != null && throwable instanceof DoNotRetryIOException) {
1115 canRetry = Retry.NO_NOT_RETRIABLE;
1116 }
1117
1118 if (canRetry != Retry.YES) {
1119
1120 setError(originalIndex, row, throwable, server);
1121 } else if (isActionComplete(originalIndex, row)) {
1122 canRetry = Retry.NO_OTHER_SUCCEEDED;
1123 }
1124 return canRetry;
1125 }
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135 private void receiveGlobalFailure(
1136 MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
1137 errorsByServer.reportServerError(server);
1138 Retry canRetry = errorsByServer.canRetryMore(numAttempt)
1139 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
1140
1141 if (tableName == null) {
1142
1143 connection.clearCaches(server);
1144 }
1145 int failed = 0, stopped = 0;
1146 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1147 for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
1148 byte[] regionName = e.getKey();
1149 byte[] row = e.getValue().iterator().next().getAction().getRow();
1150
1151
1152
1153 if (tableName != null) {
1154 connection.updateCachedLocations(tableName, regionName, row, null, server);
1155 }
1156 for (Action<Row> action : e.getValue()) {
1157 Retry retry = manageError(
1158 action.getOriginalIndex(), action.getAction(), canRetry, t, server);
1159 if (retry == Retry.YES) {
1160 toReplay.add(action);
1161 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1162 ++stopped;
1163 } else {
1164 ++failed;
1165 }
1166 }
1167 }
1168
1169 if (toReplay.isEmpty()) {
1170 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
1171 } else {
1172 resubmit(server, toReplay, numAttempt, rsActions.size(), t);
1173 }
1174 }
1175
1176
1177
1178
1179
1180 private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
1181 int numAttempt, int failureCount, Throwable throwable) {
1182
1183
1184
1185
1186
1187
1188
1189 long backOffTime = errorsByServer.calculateBackoffTime(oldServer, pause);
1190 if (numAttempt > startLogErrorsCnt) {
1191
1192
1193 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
1194 oldServer, throwable, backOffTime, true, null, -1, -1));
1195 }
1196
1197 try {
1198 Thread.sleep(backOffTime);
1199 } catch (InterruptedException e) {
1200 LOG.warn("#" + id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
1201 Thread.currentThread().interrupt();
1202 return;
1203 }
1204
1205 groupAndSendMultiAction(toReplay, numAttempt + 1);
1206 }
1207
1208 private void logNoResubmit(ServerName oldServer, int numAttempt,
1209 int failureCount, Throwable throwable, int failed, int stopped) {
1210 if (failureCount != 0 || numAttempt > startLogErrorsCnt + 1) {
1211 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
1212 String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
1213 throwable, -1, false, timeStr, failed, stopped);
1214 if (failed != 0) {
1215
1216 LOG.warn(logMessage);
1217 } else {
1218 LOG.info(logMessage);
1219 }
1220 }
1221 }
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231 private void receiveMultiAction(MultiAction<Row> multiAction,
1232 ServerName server, MultiResponse responses, int numAttempt) {
1233 assert responses != null;
1234
1235
1236
1237
1238
1239
1240
1241 List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
1242 Throwable throwable = null;
1243 int failureCount = 0;
1244 boolean canRetry = true;
1245
1246
1247 int failed = 0, stopped = 0;
1248 for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
1249 byte[] regionName = regionEntry.getKey();
1250 Map<Integer, Object> regionResults = responses.getResults().get(regionName);
1251 if (regionResults == null) {
1252 if (!responses.getExceptions().containsKey(regionName)) {
1253 LOG.error("Server sent us neither results nor exceptions for "
1254 + Bytes.toStringBinary(regionName));
1255 responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
1256 }
1257 continue;
1258 }
1259 boolean regionFailureRegistered = false;
1260 for (Action<Row> sentAction : regionEntry.getValue()) {
1261 Object result = regionResults.get(sentAction.getOriginalIndex());
1262
1263 if (result == null || result instanceof Throwable) {
1264 Row row = sentAction.getAction();
1265 throwable = ClientExceptionsUtil.findException(result);
1266
1267 if (!regionFailureRegistered) {
1268 regionFailureRegistered = true;
1269 connection.updateCachedLocations(
1270 tableName, regionName, row.getRow(), result, server);
1271 }
1272 if (failureCount == 0) {
1273 errorsByServer.reportServerError(server);
1274
1275 canRetry = errorsByServer.canRetryMore(numAttempt);
1276 }
1277 ++failureCount;
1278 Retry retry = manageError(sentAction.getOriginalIndex(), row,
1279 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable)result, server);
1280 if (retry == Retry.YES) {
1281 toReplay.add(sentAction);
1282 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1283 ++stopped;
1284 } else {
1285 ++failed;
1286 }
1287 } else {
1288
1289
1290 if (AsyncProcess.this.connection.getStatisticsTracker() != null) {
1291 result = ResultStatsUtil.updateStats(result,
1292 AsyncProcess.this.connection.getStatisticsTracker(), server, regionName);
1293 }
1294
1295 if (callback != null) {
1296 try {
1297
1298
1299 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult)result);
1300 } catch (Throwable t) {
1301 LOG.error("User callback threw an exception for "
1302 + Bytes.toStringBinary(regionName) + ", ignoring", t);
1303 }
1304 }
1305 setResult(sentAction, result);
1306 }
1307 }
1308 }
1309
1310
1311
1312 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
1313 throwable = throwableEntry.getValue();
1314 byte[] region = throwableEntry.getKey();
1315 List<Action<Row>> actions = multiAction.actions.get(region);
1316 if (actions == null || actions.isEmpty()) {
1317 throw new IllegalStateException("Wrong response for the region: " +
1318 HRegionInfo.encodeRegionName(region));
1319 }
1320
1321 if (failureCount == 0) {
1322 errorsByServer.reportServerError(server);
1323 canRetry = errorsByServer.canRetryMore(numAttempt);
1324 }
1325 if (null == tableName) {
1326
1327
1328 connection.clearCaches(server);
1329 } else {
1330 connection.updateCachedLocations(
1331 tableName, region, actions.get(0).getAction().getRow(), throwable, server);
1332 }
1333 failureCount += actions.size();
1334
1335 for (Action<Row> action : actions) {
1336 Row row = action.getAction();
1337 Retry retry = manageError(action.getOriginalIndex(), row,
1338 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
1339 if (retry == Retry.YES) {
1340 toReplay.add(action);
1341 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
1342 ++stopped;
1343 } else {
1344 ++failed;
1345 }
1346 }
1347 }
1348
1349 if (toReplay.isEmpty()) {
1350 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
1351 } else {
1352 resubmit(server, toReplay, numAttempt, failureCount, throwable);
1353 }
1354 }
1355
1356 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
1357 Throwable error, long backOffTime, boolean willRetry, String startTime,
1358 int failed, int stopped) {
1359 StringBuilder sb = new StringBuilder();
1360 sb.append("#").append(id).append(", table=").append(tableName).append(", ")
1361 .append("attempt=").append(numAttempt)
1362 .append("/").append(numTries).append(" ");
1363
1364 if (failureCount > 0 || error != null){
1365 sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1366 append(error == null ? "null" : error);
1367 } else {
1368 sb.append("succeeded");
1369 }
1370
1371 sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1372
1373 if (willRetry) {
1374 sb.append(", retrying after=").append(backOffTime).append("ms").
1375 append(", replay=").append(replaySize).append("ops");
1376 } else if (failureCount > 0) {
1377 if (stopped > 0) {
1378 sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1379 }
1380 if (failed > 0) {
1381 sb.append("; not retrying ").append(failed).append(" - final failure");
1382 }
1383
1384 }
1385
1386 return sb.toString();
1387 }
1388
1389
1390
1391
1392
1393
1394 private void setResult(Action<Row> action, Object result) {
1395 if (result == null) {
1396 throw new RuntimeException("Result cannot be null");
1397 }
1398 ReplicaResultState state = null;
1399 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1400 int index = action.getOriginalIndex();
1401 if (results == null) {
1402 decActionCounter(index);
1403 return;
1404 } else if ((state = trySetResultSimple(
1405 index, action.getAction(), false, result, null, isStale)) == null) {
1406 return;
1407 }
1408 assert state != null;
1409
1410
1411
1412
1413
1414 synchronized (state) {
1415 if (state.callCount == 0) return;
1416 state.callCount = 0;
1417 }
1418 synchronized (replicaResultLock) {
1419 if (results[index] != state) {
1420 throw new AssertionError("We set the callCount but someone else replaced the result");
1421 }
1422 results[index] = result;
1423 }
1424
1425 decActionCounter(index);
1426 }
1427
1428
1429
1430
1431
1432
1433
1434
1435 private void setError(int index, Row row, Throwable throwable, ServerName server) {
1436 ReplicaResultState state = null;
1437 if (results == null) {
1438
1439
1440
1441 errors.add(throwable, row, server);
1442 decActionCounter(index);
1443 return;
1444 } else if ((state = trySetResultSimple(
1445 index, row, true, throwable, server, false)) == null) {
1446 return;
1447 }
1448 assert state != null;
1449 BatchErrors target = null;
1450 boolean isActionDone = false;
1451 synchronized (state) {
1452 switch (state.callCount) {
1453 case 0: return;
1454 case 1: {
1455 target = errors;
1456 isActionDone = true;
1457 break;
1458 }
1459 default: {
1460 assert state.callCount > 1;
1461 if (state.replicaErrors == null) {
1462 state.replicaErrors = new BatchErrors();
1463 }
1464 target = state.replicaErrors;
1465 break;
1466 }
1467 }
1468 --state.callCount;
1469 }
1470 target.add(throwable, row, server);
1471 if (isActionDone) {
1472 if (state.replicaErrors != null) {
1473 errors.merge(state.replicaErrors);
1474 }
1475
1476 synchronized (replicaResultLock) {
1477 if (results[index] != state) {
1478 throw new AssertionError("We set the callCount but someone else replaced the result");
1479 }
1480 results[index] = throwable;
1481 }
1482 decActionCounter(index);
1483 }
1484 }
1485
1486
1487
1488
1489
1490
1491
1492
1493 private boolean isActionComplete(int index, Row row) {
1494 if (!isReplicaGet(row)) return false;
1495 Object resObj = results[index];
1496 return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1497 || ((ReplicaResultState)resObj).callCount == 0);
1498 }
1499
1500
1501
1502
1503
1504 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1505 Object result, ServerName server, boolean isFromReplica) {
1506 Object resObj = null;
1507 if (!isReplicaGet(row)) {
1508 if (isFromReplica) {
1509 throw new AssertionError("Unexpected stale result for " + row);
1510 }
1511 results[index] = result;
1512 } else {
1513 synchronized (replicaResultLock) {
1514 if ((resObj = results[index]) == null) {
1515 if (isFromReplica) {
1516 throw new AssertionError("Unexpected stale result for " + row);
1517 }
1518 results[index] = result;
1519 }
1520 }
1521 }
1522
1523 ReplicaResultState rrs =
1524 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1525 if (rrs == null && isError) {
1526
1527 errors.add((Throwable)result, row, server);
1528 }
1529
1530 if (resObj == null) {
1531
1532 decActionCounter(index);
1533 return null;
1534 }
1535 return rrs;
1536 }
1537
1538 private void decActionCounter(int index) {
1539 long actionsRemaining = actionsInProgress.decrementAndGet();
1540 if (actionsRemaining < 0) {
1541 String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1542 throw new AssertionError(error);
1543 } else if (actionsRemaining == 0) {
1544 synchronized (actionsInProgress) {
1545 actionsInProgress.notifyAll();
1546 }
1547 }
1548 }
1549
1550 private String buildDetailedErrorMsg(String string, int index) {
1551 String error = string + "; called for " + index +
1552 ", actionsInProgress " + actionsInProgress.get() + "; replica gets: ";
1553 if (replicaGetIndices != null) {
1554 for (int i = 0; i < replicaGetIndices.length; ++i) {
1555 error += replicaGetIndices[i] + ", ";
1556 }
1557 } else {
1558 error += (hasAnyReplicaGets ? "all" : "none");
1559 }
1560 error += "; results ";
1561 if (results != null) {
1562 for (int i = 0; i < results.length; ++i) {
1563 Object o = results[i];
1564 error += ((o == null) ? "null" : o.toString()) + ", ";
1565 }
1566 }
1567 return error;
1568 }
1569
1570 @Override
1571 public void waitUntilDone() throws InterruptedIOException {
1572 try {
1573 waitUntilDone(Long.MAX_VALUE);
1574 } catch (InterruptedException iex) {
1575 throw new InterruptedIOException(iex.getMessage());
1576 } finally {
1577 if (callsInProgress != null) {
1578 for (MultiServerCallable<Row> clb : callsInProgress) {
1579 clb.cancel();
1580 }
1581 }
1582 }
1583 }
1584
1585 private boolean waitUntilDone(long cutoff) throws InterruptedException {
1586 boolean hasWait = cutoff != Long.MAX_VALUE;
1587 long lastLog = EnvironmentEdgeManager.currentTime();
1588 long currentInProgress;
1589 while (0 != (currentInProgress = actionsInProgress.get())) {
1590 long now = EnvironmentEdgeManager.currentTime();
1591 if (hasWait && (now * 1000L) > cutoff) {
1592 return false;
1593 }
1594 if (!hasWait) {
1595 if (now > lastLog + 10000) {
1596 lastLog = now;
1597 LOG.info("#" + id + ", waiting for " + currentInProgress + " actions to finish");
1598 }
1599 }
1600 synchronized (actionsInProgress) {
1601 if (actionsInProgress.get() == 0) break;
1602 if (!hasWait) {
1603 actionsInProgress.wait(10);
1604 } else {
1605 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1606 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1607 }
1608 }
1609 }
1610 return true;
1611 }
1612
1613 @Override
1614 public boolean hasError() {
1615 return errors.hasErrors();
1616 }
1617
1618 @Override
1619 public List<? extends Row> getFailedOperations() {
1620 return errors.actions;
1621 }
1622
1623 @Override
1624 public RetriesExhaustedWithDetailsException getErrors() {
1625 return errors.makeException();
1626 }
1627
1628 @Override
1629 public Object[] getResults() throws InterruptedIOException {
1630 waitUntilDone();
1631 return results;
1632 }
1633 }
1634
1635 @VisibleForTesting
1636
1637 protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
1638 TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
1639 Batch.Callback<CResult> callback, Object[] results, boolean needResults) {
1640 return new AsyncRequestFutureImpl<CResult>(
1641 tableName, actions, nonceGroup, getPool(pool), needResults, results, callback);
1642 }
1643
1644
1645
1646
1647 @VisibleForTesting
1648 protected MultiServerCallable<Row> createCallable(final ServerName server,
1649 TableName tableName, final MultiAction<Row> multi) {
1650 return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi);
1651 }
1652
1653
1654
1655
1656 @VisibleForTesting
1657 protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
1658 return rpcCallerFactory.<MultiResponse> newCaller();
1659 }
1660
1661 @VisibleForTesting
1662
1663 void waitUntilDone() throws InterruptedIOException {
1664 waitForMaximumCurrentTasks(0, null);
1665 }
1666
1667
1668 private void waitForMaximumCurrentTasks(int max, String tableName)
1669 throws InterruptedIOException {
1670 waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
1671 }
1672
1673
1674 @VisibleForTesting
1675 void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
1676 String tableName) throws InterruptedIOException {
1677 long lastLog = EnvironmentEdgeManager.currentTime();
1678 long currentInProgress, oldInProgress = Long.MAX_VALUE;
1679 while ((currentInProgress = tasksInProgress.get()) > max) {
1680 if (oldInProgress != currentInProgress) {
1681 long now = EnvironmentEdgeManager.currentTime();
1682 if (now > lastLog + 10000) {
1683 lastLog = now;
1684 LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
1685 + max + ", tasksInProgress=" + currentInProgress +
1686 " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
1687 if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
1688 logDetailsOfUndoneTasks(currentInProgress);
1689 }
1690 }
1691 }
1692 oldInProgress = currentInProgress;
1693 try {
1694 synchronized (tasksInProgress) {
1695 if (tasksInProgress.get() == oldInProgress) {
1696 tasksInProgress.wait(10);
1697 }
1698 }
1699 } catch (InterruptedException e) {
1700 throw new InterruptedIOException("#" + id + ", interrupted." +
1701 " currentNumberOfTask=" + currentInProgress);
1702 }
1703 }
1704 }
1705
1706 private void logDetailsOfUndoneTasks(long taskInProgress) {
1707 ArrayList<ServerName> servers = new ArrayList<ServerName>();
1708 for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
1709 if (entry.getValue().get() > 0) {
1710 servers.add(entry.getKey());
1711 }
1712 }
1713 LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
1714 if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
1715 ArrayList<String> regions = new ArrayList<String>();
1716 for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
1717 if (entry.getValue().get() > 0) {
1718 regions.add(Bytes.toString(entry.getKey()));
1719 }
1720 }
1721 LOG.info("Regions against which left over task(s) are processed: " + regions);
1722 }
1723 }
1724
1725
1726
1727
1728
1729
1730 public boolean hasError() {
1731 return globalErrors.hasErrors();
1732 }
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744 public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
1745 List<Row> failedRows, String tableName) throws InterruptedIOException {
1746 waitForMaximumCurrentTasks(0, tableName);
1747 if (!globalErrors.hasErrors()) {
1748 return null;
1749 }
1750 if (failedRows != null) {
1751 failedRows.addAll(globalErrors.actions);
1752 }
1753 RetriesExhaustedWithDetailsException result = globalErrors.makeException();
1754 globalErrors.clear();
1755 return result;
1756 }
1757
1758
1759
1760
1761 protected void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
1762 tasksInProgress.incrementAndGet();
1763
1764 AtomicInteger serverCnt = taskCounterPerServer.get(sn);
1765 if (serverCnt == null) {
1766 taskCounterPerServer.putIfAbsent(sn, new AtomicInteger());
1767 serverCnt = taskCounterPerServer.get(sn);
1768 }
1769 serverCnt.incrementAndGet();
1770
1771 for (byte[] regBytes : regions) {
1772 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1773 if (regionCnt == null) {
1774 regionCnt = new AtomicInteger();
1775 AtomicInteger oldCnt = taskCounterPerRegion.putIfAbsent(regBytes, regionCnt);
1776 if (oldCnt != null) {
1777 regionCnt = oldCnt;
1778 }
1779 }
1780 regionCnt.incrementAndGet();
1781 }
1782 }
1783
1784
1785
1786
1787 protected void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
1788 for (byte[] regBytes : regions) {
1789 AtomicInteger regionCnt = taskCounterPerRegion.get(regBytes);
1790 regionCnt.decrementAndGet();
1791 }
1792
1793 taskCounterPerServer.get(sn).decrementAndGet();
1794 tasksInProgress.decrementAndGet();
1795 synchronized (tasksInProgress) {
1796 tasksInProgress.notifyAll();
1797 }
1798 }
1799
1800
1801
1802
1803
1804
1805
1806
1807 protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
1808 return new ConnectionManager.ServerErrorTracker(
1809 this.serverTrackerTimeout, this.numTries);
1810 }
1811
1812 private static boolean isReplicaGet(Row row) {
1813 return (row instanceof Get) && (((Get)row).getConsistency() == Consistency.TIMELINE);
1814 }
1815
1816
1817
1818
1819 private enum Retry {
1820 YES,
1821 NO_LOCATION_PROBLEM,
1822 NO_NOT_RETRIABLE,
1823 NO_RETRIES_EXHAUSTED,
1824 NO_OTHER_SUCCEEDED
1825 }
1826 }