View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.util.ArrayList;
22  import java.util.concurrent.CancellationException;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RunnableFuture;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.TimeoutException;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
33  import org.apache.htrace.Trace;
34  
35  /**
36   * A completion service for the RpcRetryingCallerFactory.
37   * Keeps the list of the futures, and allows to cancel them all.
38   * This means as well that it can be used for a small set of tasks only.
39   * <br>Implementation is not Thread safe.
40   *
41   * CompletedTasks is implemented as a queue, the entry is added based on the time order. I.e,
42   * when the first task completes (whether it is a success or failure), it is added as a first
43   * entry in the queue, the next completed task is added as a second entry in the queue, ...
44   * When iterating through the queue, we know it is based on time order. If the first
45   * completed task succeeds, it is returned. If it is failure, the iteration goes on until it
46   * finds a success.
47   */
48  @InterfaceAudience.Private
49  public class ResultBoundedCompletionService<V> {
50    private static final Log LOG = LogFactory.getLog(ResultBoundedCompletionService.class);
51    private final RpcRetryingCallerFactory retryingCallerFactory;
52    private final Executor executor;
53    private final QueueingFuture<V>[] tasks; // all the tasks
54    private final ArrayList<QueueingFuture> completedTasks; // completed tasks
55    private volatile boolean cancelled = false;
56    
57    class QueueingFuture<T> implements RunnableFuture<T> {
58      private final RetryingCallable<T> future;
59      private T result = null;
60      private ExecutionException exeEx = null;
61      private volatile boolean cancelled = false;
62      private final int callTimeout;
63      private final RpcRetryingCaller<T> retryingCaller;
64      private boolean resultObtained = false;
65      private final int replicaId;  // replica id
66  
67  
68      public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) {
69        this.future = future;
70        this.callTimeout = callTimeout;
71        this.retryingCaller = retryingCallerFactory.<T>newCaller();
72        this.replicaId = id;
73      }
74  
75      @SuppressWarnings("unchecked")
76      @Override
77      public void run() {
78        try {
79          if (!cancelled) {
80            result = this.retryingCaller.callWithRetries(future, callTimeout);
81            resultObtained = true;
82          }
83        } catch (Throwable t) {
84          exeEx = new ExecutionException(t);
85        } finally {
86          synchronized (tasks) {
87            // If this wasn't canceled then store the result.
88            if (!cancelled) {
89              completedTasks.add(QueueingFuture.this);
90            }
91  
92            // Notify just in case there was someone waiting and this was canceled.
93            // That shouldn't happen but better safe than sorry.
94            tasks.notify();
95          }
96        }
97      }
98  
99      @Override
100     public boolean cancel(boolean mayInterruptIfRunning) {
101       if (resultObtained || exeEx != null) return false;
102       retryingCaller.cancel();
103       if (future instanceof Cancellable) ((Cancellable)future).cancel();
104       cancelled = true;
105       return true;
106     }
107 
108     @Override
109     public boolean isCancelled() {
110       return cancelled;
111     }
112 
113     @Override
114     public boolean isDone() {
115       return resultObtained || exeEx != null;
116     }
117 
118     @Override
119     public T get() throws InterruptedException, ExecutionException {
120       try {
121         return get(1000, TimeUnit.DAYS);
122       } catch (TimeoutException e) {
123         throw new RuntimeException("You did wait for 1000 days here?", e);
124       }
125     }
126 
127     @Override
128     public T get(long timeout, TimeUnit unit)
129         throws InterruptedException, ExecutionException, TimeoutException {
130       synchronized (tasks) {
131         if (resultObtained) {
132           return result;
133         }
134         if (exeEx != null) {
135           throw exeEx;
136         }
137         unit.timedWait(tasks, timeout);
138       }
139       if (resultObtained) {
140         return result;
141       }
142       if (exeEx != null) {
143         throw exeEx;
144       }
145 
146       throw new TimeoutException("timeout=" + timeout + ", " + unit);
147     }
148 
149     public int getReplicaId() {
150       return replicaId;
151     }
152 
153     public ExecutionException getExeEx() {
154       return exeEx;
155     }
156   }
157 
158   @SuppressWarnings("unchecked")
159   public ResultBoundedCompletionService(
160       RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
161       int maxTasks) {
162     this.retryingCallerFactory = retryingCallerFactory;
163     this.executor = executor;
164     this.tasks = new QueueingFuture[maxTasks];
165     this.completedTasks = new ArrayList<>(maxTasks);
166   }
167 
168 
169   public void submit(RetryingCallable<V> task, int callTimeout, int id) {
170     QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
171     executor.execute(Trace.wrap(newFuture));
172     tasks[id] = newFuture;
173   }
174 
175   public QueueingFuture<V> take() throws InterruptedException {
176     synchronized (tasks) {
177       while (!cancelled && (completedTasks.size() < 1)) tasks.wait();
178     }
179     return completedTasks.get(0);
180   }
181 
182   /**
183    * Poll for the first completed task whether it is a success or execution exception.
184    *
185    * @param timeout  - time to wait before it times out
186    * @param unit  - time unit for timeout
187    */
188   public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
189     return pollForSpecificCompletedTask(timeout, unit, 0);
190   }
191 
192   /**
193    * Poll for the first successfully completed task whose completed order is in startIndex,
194    * endIndex(exclusive) range
195    *
196    * @param timeout  - time to wait before it times out
197    * @param unit  - time unit for timeout
198    * @param startIndex - start index, starting from 0, inclusive
199    * @param endIndex - end index, exclusive
200    *
201    * @return If within timeout time, there is no successfully completed task, return null; If all
202    *         tasks get execution exception, it will throw out the last execution exception,
203    *         otherwise return the first successfully completed task's result.
204    */
205   public QueueingFuture<V> pollForFirstSuccessfullyCompletedTask(long timeout, TimeUnit unit,
206       int startIndex, int endIndex)
207       throws InterruptedException, CancellationException, ExecutionException {
208 
209     QueueingFuture<V>  f;
210     long start, duration;
211     for (int i = startIndex; i < endIndex; i ++) {
212 
213       start = EnvironmentEdgeManager.currentTime();
214       f = pollForSpecificCompletedTask(timeout, unit, i);
215       duration = EnvironmentEdgeManager.currentTime() - start;
216 
217       // Even with operationTimeout less than 0, still loop through the rest as there could
218       // be other completed tasks before operationTimeout.
219       timeout -= duration;
220 
221       if (f == null) {
222         return null;
223       } else if (f.getExeEx() != null) {
224         // we continue here as we need to loop through all the results.
225         if (LOG.isDebugEnabled()) {
226           LOG.debug("Replica " + ((f == null) ? 0 : f.getReplicaId()) + " returns " +
227               f.getExeEx().getCause());
228         }
229 
230         if (i == (endIndex - 1)) {
231           // Rethrow this exception
232           throw f.getExeEx();
233         }
234         continue;
235       }
236 
237       return f;
238     }
239 
240     // impossible to reach
241     return null;
242   }
243 
244   /**
245    * Poll for the Nth completed task (index starts from 0 (the 1st), 1 (the second)...)
246    *
247    * @param timeout  - time to wait before it times out
248    * @param unit  - time unit for timeout
249    * @param index - the index(th) completed task, index starting from 0
250    */
251   private QueueingFuture<V> pollForSpecificCompletedTask(long timeout, TimeUnit unit, int index)
252       throws InterruptedException {
253     if (index < 0) {
254       return null;
255     }
256 
257     synchronized (tasks) {
258       if (!cancelled && (completedTasks.size() <= index)) unit.timedWait(tasks, timeout);
259       if (completedTasks.size() <= index) return null;
260     }
261     return completedTasks.get(index);
262   }
263 
264   public void cancelAll() {
265     // Grab the lock on tasks so that cancelled is visible everywhere
266     synchronized (tasks) {
267       cancelled = true;
268     }
269     for (QueueingFuture<V> future : tasks) {
270       if (future != null) future.cancel(true);
271     }
272   }
273 }