1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master.procedure;
20
21 import java.io.IOException;
22
23 import java.io.InterruptedIOException;
24 import java.util.Map;
25 import java.util.Map.Entry;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.client.HBaseAdmin;
32 import org.apache.hadoop.hbase.master.MasterServices;
33 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
34 import org.apache.hadoop.hbase.procedure2.Procedure;
35 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
36 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
37 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
38 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
41 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
42 import org.apache.hadoop.hbase.util.NonceKey;
43 import org.apache.hadoop.security.UserGroupInformation;
44
45 @InterfaceAudience.Private
46 @InterfaceStability.Evolving
47 public final class MasterProcedureUtil {
48 private static final Log LOG = LogFactory.getLog(MasterProcedureUtil.class);
49
50 private MasterProcedureUtil() {}
51
52 public static UserInformation toProtoUserInfo(UserGroupInformation ugi) {
53 UserInformation.Builder userInfoPB = UserInformation.newBuilder();
54 userInfoPB.setEffectiveUser(ugi.getUserName());
55 if (ugi.getRealUser() != null) {
56 userInfoPB.setRealUser(ugi.getRealUser().getUserName());
57 }
58 return userInfoPB.build();
59 }
60
61 public static UserGroupInformation toUserInfo(UserInformation userInfoProto) {
62 if (userInfoProto.hasEffectiveUser()) {
63 String effectiveUser = userInfoProto.getEffectiveUser();
64 if (userInfoProto.hasRealUser()) {
65 String realUser = userInfoProto.getRealUser();
66 UserGroupInformation realUserUgi = UserGroupInformation.createRemoteUser(realUser);
67 return UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
68 }
69 return UserGroupInformation.createRemoteUser(effectiveUser);
70 }
71 return null;
72 }
73
74 public static ProcedureDescription buildProcedure(String signature, String instance,
75 Map<String, String> props) {
76 ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
77 builder.setSignature(signature).setInstance(instance);
78 for (Entry<String, String> entry : props.entrySet()) {
79 NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
80 .setValue(entry.getValue()).build();
81 builder.addConfiguration(pair);
82 }
83 ProcedureDescription desc = builder.build();
84 return desc;
85 }
86
87 public static long execProcedure(MasterProcedureManager mpm, String signature, String instance,
88 Map<String, String> props) throws IOException {
89 if (mpm == null) {
90 throw new IOException("The procedure is not registered: " + signature);
91 }
92 ProcedureDescription desc = buildProcedure(signature, instance, props);
93 mpm.execProcedure(desc);
94
95
96
97 long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
98 return waitTime;
99 }
100
101 public static void waitForProcedure(MasterProcedureManager mpm, String signature, String instance,
102 Map<String, String> props, long max, int numRetries, long pause) throws IOException {
103 ProcedureDescription desc = buildProcedure(signature, instance, props);
104 long start = EnvironmentEdgeManager.currentTime();
105 long maxPauseTime = max / numRetries;
106 int tries = 0;
107 LOG.debug("Waiting a max of " + max + " ms for procedure '" +
108 signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)");
109 boolean done = false;
110 while (tries == 0
111 || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
112 try {
113
114 long sleep = HBaseAdmin.getPauseTime(tries++, pause);
115 sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
116 LOG.debug("(#" + tries + ") Sleeping: " + sleep +
117 "ms while waiting for procedure completion.");
118 Thread.sleep(sleep);
119 } catch (InterruptedException e) {
120 throw (InterruptedIOException) new InterruptedIOException("Interrupted").initCause(e);
121 }
122 LOG.debug("Getting current status of procedure from master...");
123 done = mpm.isProcedureDone(desc);
124 }
125 if (!done) {
126 throw new IOException("Procedure '" + signature + " : " + instance
127 + "' wasn't completed in expectedTime:" + max + " ms");
128 }
129 }
130
131
132
133
134
135
136 public static abstract class NonceProcedureRunnable {
137 private final MasterServices master;
138 private final NonceKey nonceKey;
139 private Long procId;
140
141 public NonceProcedureRunnable(final MasterServices master,
142 final long nonceGroup, final long nonce) {
143 this.master = master;
144 this.nonceKey = getProcedureExecutor().createNonceKey(nonceGroup, nonce);
145 }
146
147 protected NonceKey getNonceKey() {
148 return nonceKey;
149 }
150
151 protected MasterServices getMaster() {
152 return master;
153 }
154
155 protected ProcedureExecutor<MasterProcedureEnv> getProcedureExecutor() {
156 return master.getMasterProcedureExecutor();
157 }
158
159 protected long getProcId() {
160 return procId != null ? procId.longValue() : -1;
161 }
162
163 protected long setProcId(final long procId) {
164 this.procId = procId;
165 return procId;
166 }
167
168 protected abstract void run() throws IOException;
169 protected abstract String getDescription();
170
171 protected long submitProcedure(final Procedure proc) {
172 assert procId == null : "submitProcedure() was already called, running procId=" + procId;
173 procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
174 return procId;
175 }
176 }
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 public static long submitProcedure(final NonceProcedureRunnable runnable) throws IOException {
196 final ProcedureExecutor<MasterProcedureEnv> procExec = runnable.getProcedureExecutor();
197 final long procId = procExec.registerNonce(runnable.getNonceKey());
198 if (procId >= 0) return procId;
199 try {
200 runnable.run();
201 } catch (IOException e) {
202 procExec.setFailureResultForNonce(runnable.getNonceKey(),
203 runnable.getDescription(),
204 procExec.getEnvironment().getRequestUser(), e);
205 throw e;
206 } finally {
207 procExec.unregisterNonceIfProcedureWasNotSubmitted(runnable.getNonceKey());
208 }
209 return runnable.getProcId();
210 }
211 }