View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.master.procedure;
20  
21  import java.io.IOException;
22  
23  import java.io.InterruptedIOException;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.client.HBaseAdmin;
32  import org.apache.hadoop.hbase.master.MasterServices;
33  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
34  import org.apache.hadoop.hbase.procedure2.Procedure;
35  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
36  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
37  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
38  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
39  import org.apache.hadoop.hbase.security.User;
40  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
41  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42  import org.apache.hadoop.hbase.util.NonceKey;
43  import org.apache.hadoop.security.UserGroupInformation;
44  
45  @InterfaceAudience.Private
46  @InterfaceStability.Evolving
47  public final class MasterProcedureUtil {
48    private static final Log LOG = LogFactory.getLog(MasterProcedureUtil.class);
49  
50    private MasterProcedureUtil() {}
51  
52    public static UserInformation toProtoUserInfo(UserGroupInformation ugi) {
53      UserInformation.Builder userInfoPB = UserInformation.newBuilder();
54      userInfoPB.setEffectiveUser(ugi.getUserName());
55      if (ugi.getRealUser() != null) {
56        userInfoPB.setRealUser(ugi.getRealUser().getUserName());
57      }
58      return userInfoPB.build();
59    }
60  
61    public static UserGroupInformation toUserInfo(UserInformation userInfoProto) {
62      if (userInfoProto.hasEffectiveUser()) {
63        String effectiveUser = userInfoProto.getEffectiveUser();
64        if (userInfoProto.hasRealUser()) {
65          String realUser = userInfoProto.getRealUser();
66          UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
67          return UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
68        }
69        return UserGroupInformation.createRemoteUser(effectiveUser);
70      }
71      return null;
72    }
73  
74    public static ProcedureDescription buildProcedure(String signature, String instance,
75        Map<String, String> props) {
76      ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
77      builder.setSignature(signature).setInstance(instance);
78      for (Entry<String, String> entry : props.entrySet()) {
79        NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
80            .setValue(entry.getValue()).build();
81        builder.addConfiguration(pair);
82      }
83      ProcedureDescription desc = builder.build();
84      return desc;
85    }
86  
87    public static long execProcedure(MasterProcedureManager mpm, String signature, String instance,
88        Map<String, String> props) throws IOException {
89      if (mpm == null) {
90        throw new IOException("The procedure is not registered: " + signature);
91      }
92      ProcedureDescription desc = buildProcedure(signature, instance, props);
93      mpm.execProcedure(desc);
94  
95      // send back the max amount of time the client should wait for the procedure
96      // to complete
97      long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
98      return waitTime;
99    }
100   
101   public static void waitForProcedure(MasterProcedureManager mpm, String signature, String instance,
102       Map<String, String> props, long max, int numRetries, long pause) throws IOException {
103     ProcedureDescription desc = buildProcedure(signature, instance, props);
104     long start = EnvironmentEdgeManager.currentTime();
105     long maxPauseTime = max / numRetries;
106     int tries = 0;
107     LOG.debug("Waiting a max of " + max + " ms for procedure '" +
108         signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
109     boolean done = false;
110     while (tries == 0
111         || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
112       try {
113         // sleep a backoff <= pauseTime amount
114         long sleep = HBaseAdmin.getPauseTime(tries++, pause);
115         sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
116         LOG.debug("(#" + tries + ") Sleeping: " + sleep +
117           "ms while waiting for procedure completion.");
118         Thread.sleep(sleep);
119       } catch (InterruptedException e) {
120         throw (InterruptedIOException) new InterruptedIOException("Interrupted").initCause(e);
121       }
122       LOG.debug("Getting current status of procedure from master...");
123       done = mpm.isProcedureDone(desc);
124     }
125     if (!done) {
126       throw new IOException("Procedure '" + signature + " : " + instance
127           + "' wasn't completed in expectedTime:" + max + " ms");
128     }
129   }
130 
131   /**
132    * Helper Runnable used in conjunction with submitProcedure() to deal with
133    * submitting procs with nonce.
134    * See submitProcedure() for an example.
135    */
136   public static abstract class NonceProcedureRunnable {
137     private final MasterServices master;
138     private final NonceKey nonceKey;
139     private Long procId;
140 
141     public NonceProcedureRunnable(final MasterServices master,
142         final long nonceGroup, final long nonce) {
143       this.master = master;
144       this.nonceKey = getProcedureExecutor().createNonceKey(nonceGroup, nonce);
145     }
146 
147     protected NonceKey getNonceKey() {
148       return nonceKey;
149     }
150 
151     protected MasterServices getMaster() {
152       return master;
153     }
154 
155     protected ProcedureExecutor<MasterProcedureEnv> getProcedureExecutor() {
156       return master.getMasterProcedureExecutor();
157     }
158 
159     protected long getProcId() {
160       return procId != null ? procId.longValue() : -1;
161     }
162 
163     protected long setProcId(final long procId) {
164       this.procId = procId;
165       return procId;
166     }
167 
168     protected abstract void run() throws IOException;
169     protected abstract String getDescription();
170 
171     protected long submitProcedure(final Procedure proc) {
172       assert procId == null : "submitProcedure() was already called, running procId=" + procId;
173       procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
174       return procId;
175     }
176   }
177 
178   /**
179    * Helper used to deal with submitting procs with nonce.
180    * Internally the NonceProcedureRunnable.run() will be called only if no one else
181    * registered the nonce. any Exception thrown by the run() method will be
182    * collected/handled and rethrown.
183    * <code>
184    * long procId = MasterProcedureUtil.submitProcedure(
185    *      new NonceProcedureRunnable(procExec, nonceGroup, nonce) {
186    *   {@literal @}Override
187    *   public void run() {
188    *     cpHost.preOperation();
189    *     submitProcedure(new MyProc());
190    *     cpHost.postOperation();
191    *   }
192    * });
193    * </code>
194    */
195   public static long submitProcedure(final NonceProcedureRunnable runnable) throws IOException {
196     final ProcedureExecutor<MasterProcedureEnv> procExec = runnable.getProcedureExecutor();
197     final long procId = procExec.registerNonce(runnable.getNonceKey());
198     if (procId >= 0) return procId; // someone already registered the nonce
199     try {
200       runnable.run();
201     } catch (IOException e) {
202       procExec.setFailureResultForNonce(runnable.getNonceKey(),
203           runnable.getDescription(),
204           procExec.getEnvironment().getRequestUser(), e);
205       throw e;
206     } finally {
207       procExec.unregisterNonceIfProcedureWasNotSubmitted(runnable.getNonceKey());
208     }
209     return runnable.getProcId();
210   }
211 }