1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import javax.crypto.SecretKey;
22 import java.io.IOException;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Stoppable;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
36 import org.apache.hadoop.hbase.zookeeper.ZKLeaderManager;
37 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.hadoop.io.Text;
40 import org.apache.hadoop.security.token.SecretManager;
41 import org.apache.hadoop.security.token.Token;
42 import org.apache.zookeeper.KeeperException;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 @InterfaceAudience.Private
58 public class AuthenticationTokenSecretManager
59 extends SecretManager<AuthenticationTokenIdentifier> {
60
61 static final String NAME_PREFIX = "SecretManager-";
62
63 private static Log LOG = LogFactory.getLog(
64 AuthenticationTokenSecretManager.class);
65
66 private long lastKeyUpdate;
67 private long keyUpdateInterval;
68 private long tokenMaxLifetime;
69 private ZKSecretWatcher zkWatcher;
70 private LeaderElector leaderElector;
71 private ZKClusterId clusterId;
72
73 private Map<Integer,AuthenticationKey> allKeys =
74 new ConcurrentHashMap<Integer, AuthenticationKey>();
75 private AuthenticationKey currentKey;
76
77 private int idSeq;
78 private AtomicLong tokenSeq = new AtomicLong();
79 private String name;
80
81
82
83
84
85
86
87
88
89
90
91
92 public AuthenticationTokenSecretManager(Configuration conf,
93 ZooKeeperWatcher zk, String serverName,
94 long keyUpdateInterval, long tokenMaxLifetime) {
95 this.zkWatcher = new ZKSecretWatcher(conf, zk, this);
96 this.keyUpdateInterval = keyUpdateInterval;
97 this.tokenMaxLifetime = tokenMaxLifetime;
98 this.leaderElector = new LeaderElector(zk, serverName);
99 this.name = NAME_PREFIX+serverName;
100 this.clusterId = new ZKClusterId(zk, zk);
101 }
102
103 public void start() {
104 try {
105
106 this.zkWatcher.start();
107
108 this.leaderElector.start();
109 } catch (KeeperException ke) {
110 LOG.error("Zookeeper initialization failed", ke);
111 }
112 }
113
114 public void stop() {
115 this.leaderElector.stop("SecretManager stopping");
116 }
117
118 public boolean isMaster() {
119 return leaderElector.isMaster();
120 }
121
122 public String getName() {
123 return name;
124 }
125
126 @Override
127 protected byte[] createPassword(AuthenticationTokenIdentifier identifier) {
128 long now = EnvironmentEdgeManager.currentTime();
129 AuthenticationKey secretKey = currentKey;
130 identifier.setKeyId(secretKey.getKeyId());
131 identifier.setIssueDate(now);
132 identifier.setExpirationDate(now + tokenMaxLifetime);
133 identifier.setSequenceNumber(tokenSeq.getAndIncrement());
134 return createPassword(identifier.getBytes(),
135 secretKey.getKey());
136 }
137
138 @Override
139 public byte[] retrievePassword(AuthenticationTokenIdentifier identifier)
140 throws InvalidToken {
141 long now = EnvironmentEdgeManager.currentTime();
142 if (identifier.getExpirationDate() < now) {
143 throw new InvalidToken("Token has expired");
144 }
145 AuthenticationKey masterKey = allKeys.get(identifier.getKeyId());
146 if(masterKey == null) {
147 if(zkWatcher.getWatcher().isAborted()) {
148 LOG.error("ZookeeperWatcher is abort");
149 throw new InvalidToken("Token keys could not be sync from zookeeper"
150 + " because of ZookeeperWatcher abort");
151 }
152 synchronized (this) {
153 if (!leaderElector.isAlive() || leaderElector.isStopped()) {
154 LOG.warn("Thread leaderElector[" + leaderElector.getName() + ":"
155 + leaderElector.getId() + "] is stoped or not alive");
156 leaderElector.start();
157 LOG.info("Thread leaderElector [" + leaderElector.getName() + ":"
158 + leaderElector.getId() + "] is started");
159 }
160 }
161 zkWatcher.refreshKeys();
162 if (LOG.isDebugEnabled()) {
163 LOG.debug("Sync token keys from zookeeper");
164 }
165 masterKey = allKeys.get(identifier.getKeyId());
166 }
167 if (masterKey == null) {
168 throw new InvalidToken("Unknown master key for token (id="+
169 identifier.getKeyId()+")");
170 }
171
172 return createPassword(identifier.getBytes(),
173 masterKey.getKey());
174 }
175
176 @Override
177 public AuthenticationTokenIdentifier createIdentifier() {
178 return new AuthenticationTokenIdentifier();
179 }
180
181 public Token<AuthenticationTokenIdentifier> generateToken(String username) {
182 AuthenticationTokenIdentifier ident =
183 new AuthenticationTokenIdentifier(username);
184 Token<AuthenticationTokenIdentifier> token =
185 new Token<AuthenticationTokenIdentifier>(ident, this);
186 if (clusterId.hasId()) {
187 token.setService(new Text(clusterId.getId()));
188 }
189 return token;
190 }
191
192 public synchronized void addKey(AuthenticationKey key) throws IOException {
193
194 if (leaderElector.isMaster()) {
195 if (LOG.isDebugEnabled()) {
196 LOG.debug("Running as master, ignoring new key "+key.getKeyId());
197 }
198 return;
199 }
200
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("Adding key "+key.getKeyId());
203 }
204
205 allKeys.put(key.getKeyId(), key);
206 if (currentKey == null || key.getKeyId() > currentKey.getKeyId()) {
207 currentKey = key;
208 }
209
210 if (key.getKeyId() > idSeq) {
211 idSeq = key.getKeyId();
212 }
213 }
214
215 synchronized boolean removeKey(Integer keyId) {
216
217 if (leaderElector.isMaster()) {
218 if (LOG.isDebugEnabled()) {
219 LOG.debug("Running as master, ignoring removed key "+keyId);
220 }
221 return false;
222 }
223
224 if (LOG.isDebugEnabled()) {
225 LOG.debug("Removing key "+keyId);
226 }
227
228 allKeys.remove(keyId);
229 return true;
230 }
231
232 AuthenticationKey getCurrentKey() {
233 return currentKey;
234 }
235
236 AuthenticationKey getKey(int keyId) {
237 return allKeys.get(keyId);
238 }
239
240 synchronized void removeExpiredKeys() {
241 if (!leaderElector.isMaster()) {
242 LOG.info("Skipping removeExpiredKeys() because not running as master.");
243 return;
244 }
245
246 long now = EnvironmentEdgeManager.currentTime();
247 Iterator<AuthenticationKey> iter = allKeys.values().iterator();
248 while (iter.hasNext()) {
249 AuthenticationKey key = iter.next();
250 if (key.getExpiration() < now) {
251 if (LOG.isDebugEnabled()) {
252 LOG.debug("Removing expired key "+key.getKeyId());
253 }
254 iter.remove();
255 zkWatcher.removeKeyFromZK(key);
256 }
257 }
258 }
259
260 synchronized boolean isCurrentKeyRolled() {
261 return currentKey != null;
262 }
263
264 synchronized void rollCurrentKey() {
265 if (!leaderElector.isMaster()) {
266 LOG.info("Skipping rollCurrentKey() because not running as master.");
267 return;
268 }
269
270 long now = EnvironmentEdgeManager.currentTime();
271 AuthenticationKey prev = currentKey;
272 AuthenticationKey newKey = new AuthenticationKey(++idSeq,
273 Long.MAX_VALUE,
274 generateSecret());
275 allKeys.put(newKey.getKeyId(), newKey);
276 currentKey = newKey;
277 zkWatcher.addKeyToZK(newKey);
278 lastKeyUpdate = now;
279
280 if (prev != null) {
281
282 prev.setExpiration(now + tokenMaxLifetime);
283 allKeys.put(prev.getKeyId(), prev);
284 zkWatcher.updateKeyInZK(prev);
285 }
286 }
287
288 public static SecretKey createSecretKey(byte[] raw) {
289 return SecretManager.createSecretKey(raw);
290 }
291
292 private class LeaderElector extends Thread implements Stoppable {
293 private boolean stopped = false;
294
295 private boolean isMaster = false;
296 private ZKLeaderManager zkLeader;
297
298 public LeaderElector(ZooKeeperWatcher watcher, String serverName) {
299 setDaemon(true);
300 setName("ZKSecretWatcher-leaderElector");
301 zkLeader = new ZKLeaderManager(watcher,
302 ZKUtil.joinZNode(zkWatcher.getRootKeyZNode(), "keymaster"),
303 Bytes.toBytes(serverName), this);
304 }
305
306 public boolean isMaster() {
307 return isMaster;
308 }
309
310 @Override
311 public boolean isStopped() {
312 return stopped;
313 }
314
315 @Override
316 public void stop(String reason) {
317 if (stopped) {
318 return;
319 }
320
321 stopped = true;
322
323 if (isMaster) {
324 zkLeader.stepDownAsLeader();
325 }
326 isMaster = false;
327 LOG.info("Stopping leader election, because: "+reason);
328 interrupt();
329 }
330
331 public void run() {
332 zkLeader.start();
333 zkLeader.waitToBecomeLeader();
334 isMaster = true;
335
336 while (!stopped) {
337 long now = EnvironmentEdgeManager.currentTime();
338
339
340 removeExpiredKeys();
341
342 if (lastKeyUpdate + keyUpdateInterval < now) {
343
344 rollCurrentKey();
345 }
346
347 try {
348 Thread.sleep(5000);
349 } catch (InterruptedException ie) {
350 if (LOG.isDebugEnabled()) {
351 LOG.debug("Interrupted waiting for next update", ie);
352 }
353 }
354 }
355 }
356 }
357 }