1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21 import java.util.ArrayList;
22 import java.util.concurrent.CancellationException;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.RunnableFuture;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33 import org.apache.htrace.Trace;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 public class ResultBoundedCompletionService<V> {
50 private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class);
51 private final RpcRetryingCallerFactory retryingCallerFactory;
52 private final Executor executor;
53 private final QueueingFuture<V>[] tasks;
54 private final ArrayList<QueueingFuture> completedTasks;
55 private volatile boolean cancelled = false;
56
57 class QueueingFuture<T> implements RunnableFuture<T> {
58 private final RetryingCallable<T> future;
59 private T result = null;
60 private ExecutionException exeEx = null;
61 private volatile boolean cancelled = false;
62 private final int callTimeout;
63 private final RpcRetryingCaller<T> retryingCaller;
64 private boolean resultObtained = false;
65 private final int replicaId;
66
67
68 public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
69 this.future = future;
70 this.callTimeout = callTimeout;
71 this.retryingCaller = retryingCallerFactory.<T>newCaller();
72 this.replicaId = id;
73 }
74
75 @SuppressWarnings("unchecked")
76 @Override
77 public void run() {
78 try {
79 if (!cancelled) {
80 result = this.retryingCaller.callWithRetries(future, callTimeout);
81 resultObtained = true;
82 }
83 } catch (Throwable t) {
84 exeEx = new ExecutionException(t);
85 } finally {
86 synchronized (tasks) {
87
88 if (!cancelled) {
89 completedTasks.add(QueueingFuture.this);
90 }
91
92
93
94 tasks.notify();
95 }
96 }
97 }
98
99 @Override
100 public boolean cancel(boolean mayInterruptIfRunning) {
101 if (resultObtained || exeEx != null) return false;
102 retryingCaller.cancel();
103 if (future instanceof Cancellable) ((Cancellable)future).cancel();
104 cancelled = true;
105 return true;
106 }
107
108 @Override
109 public boolean isCancelled() {
110 return cancelled;
111 }
112
113 @Override
114 public boolean isDone() {
115 return resultObtained || exeEx != null;
116 }
117
118 @Override
119 public T get() throws InterruptedException, ExecutionException {
120 try {
121 return get(1000, TimeUnit.DAYS);
122 } catch (TimeoutException e) {
123 throw new RuntimeException("You did wait for 1000 days here?", e);
124 }
125 }
126
127 @Override
128 public T get(long timeout, TimeUnit unit)
129 throws InterruptedException, ExecutionException, TimeoutException {
130 synchronized (tasks) {
131 if (resultObtained) {
132 return result;
133 }
134 if (exeEx != null) {
135 throw exeEx;
136 }
137 unit.timedWait(tasks, timeout);
138 }
139 if (resultObtained) {
140 return result;
141 }
142 if (exeEx != null) {
143 throw exeEx;
144 }
145
146 throw new TimeoutException("timeout=" + timeout + ", " + unit);
147 }
148
149 public int getReplicaId() {
150 return replicaId;
151 }
152
153 public ExecutionException getExeEx() {
154 return exeEx;
155 }
156 }
157
158 @SuppressWarnings("unchecked")
159 public ResultBoundedCompletionService(
160 RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
161 int maxTasks) {
162 this.retryingCallerFactory = retryingCallerFactory;
163 this.executor = executor;
164 this.tasks = new QueueingFuture[maxTasks];
165 this.completedTasks = new ArrayList<>(maxTasks);
166 }
167
168
169 public void submit(RetryingCallable<V> task, int callTimeout, int id) {
170 QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
171 executor.execute(Trace.wrap(newFuture));
172 tasks[id] = newFuture;
173 }
174
175 public QueueingFuture<V> take() throws InterruptedException {
176 synchronized (tasks) {
177 while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
178 }
179 return completedTasks.get(0);
180 }
181
182
183
184
185
186
187
188 public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
189 return pollForSpecificCompletedTask(timeout, unit, 0);
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
206 int startIndex, int endIndex)
207 throws InterruptedException, CancellationException, ExecutionException {
208
209 QueueingFuture<V> f;
210 long start, duration;
211 for (int i = startIndex; i < endIndex; i ++) {
212
213 start = EnvironmentEdgeManager.currentTime();
214 f = pollForSpecificCompletedTask(timeout, unit, i);
215 duration = EnvironmentEdgeManager.currentTime() - start;
216
217
218
219 timeout -= duration;
220
221 if (f == null) {
222 return null;
223 } else if (f.getExeEx() != null) {
224
225 if (LOG.isDebugEnabled()) {
226 LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
227 f.getExeEx().getCause());
228 }
229
230 if (i == (endIndex - 1)) {
231
232 throw f.getExeEx();
233 }
234 continue;
235 }
236
237 return f;
238 }
239
240
241 return null;
242 }
243
244
245
246
247
248
249
250
251 private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
252 throws InterruptedException {
253 if (index < 0) {
254 return null;
255 }
256
257 synchronized (tasks) {
258 if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
259 if (completedTasks.size() <= index) return null;
260 }
261 return completedTasks.get(index);
262 }
263
264 public void cancelAll() {
265
266 synchronized (tasks) {
267 cancelled = true;
268 }
269 for (QueueingFuture<V> future : tasks) {
270 if (future != null) future.cancel(true);
271 }
272 }
273 }