1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.token;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.net.InetSocketAddress;
29 import java.security.PrivilegedExceptionAction;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.ExecutorService;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.ChoreService;
39 import org.apache.hadoop.hbase.ClusterId;
40 import org.apache.hadoop.hbase.CoordinatedStateManager;
41 import org.apache.hadoop.hbase.Coprocessor;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionInfo;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.ClusterConnection;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.HTableInterface;
52 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
53 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
54 import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
55 import org.apache.hadoop.hbase.ipc.RpcClient;
56 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
57 import org.apache.hadoop.hbase.ipc.RpcServer;
58 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
59 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
60 import org.apache.hadoop.hbase.ipc.ServerRpcController;
61 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
62 import org.apache.hadoop.hbase.regionserver.HRegion;
63 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
64 import org.apache.hadoop.hbase.security.SecurityInfo;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.MediumTests;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
69 import org.apache.hadoop.hbase.util.Sleeper;
70 import org.apache.hadoop.hbase.util.Strings;
71 import org.apache.hadoop.hbase.util.Threads;
72 import org.apache.hadoop.hbase.util.Writables;
73 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
74 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
75 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
76 import org.apache.hadoop.net.DNS;
77 import org.apache.hadoop.security.UserGroupInformation;
78 import org.apache.hadoop.security.authorize.PolicyProvider;
79 import org.apache.hadoop.security.authorize.Service;
80 import org.apache.hadoop.security.token.SecretManager;
81 import org.apache.hadoop.security.token.Token;
82 import org.apache.hadoop.security.token.TokenIdentifier;
83 import org.junit.AfterClass;
84 import org.junit.BeforeClass;
85 import org.junit.Test;
86 import org.junit.experimental.categories.Category;
87
88 import com.google.protobuf.BlockingRpcChannel;
89 import com.google.protobuf.BlockingService;
90 import com.google.protobuf.RpcController;
91 import com.google.protobuf.ServiceException;
92
93
94
95
96 @Category(MediumTests.class)
97 public class TestTokenAuthentication {
98 static {
99
100
101 System.setProperty("java.security.krb5.realm", "hbase");
102 System.setProperty("java.security.krb5.kdc", "blah");
103 }
104 private static Log LOG = LogFactory.getLog(TestTokenAuthentication.class);
105
106 public interface AuthenticationServiceSecurityInfo {}
107
108
109
110
111 private static class TokenServer extends TokenProvider
112 implements AuthenticationProtos.AuthenticationService.BlockingInterface, Runnable, Server {
113 private static Log LOG = LogFactory.getLog(TokenServer.class);
114 private Configuration conf;
115 private RpcServerInterface rpcServer;
116 private InetSocketAddress isa;
117 private ZooKeeperWatcher zookeeper;
118 private Sleeper sleeper;
119 private boolean started = false;
120 private boolean aborted = false;
121 private boolean stopped = false;
122 private long startcode;
123
124 public TokenServer(Configuration conf) throws IOException {
125 this.conf = conf;
126 this.startcode = EnvironmentEdgeManager.currentTime();
127
128 String hostname =
129 Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default"));
130 int port = 0;
131
132 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
133 if (initialIsa.getAddress() == null) {
134 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
135 }
136 final List<BlockingServiceAndInterface> sai =
137 new ArrayList<BlockingServiceAndInterface>(1);
138 BlockingService service =
139 AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
140 sai.add(new BlockingServiceAndInterface(service,
141 AuthenticationProtos.AuthenticationService.BlockingInterface.class));
142 this.rpcServer =
143 new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
144 InetSocketAddress address = rpcServer.getListenerAddress();
145 if (address == null) {
146 throw new IOException("Listener channel is closed");
147 }
148 this.isa = address;
149 this.sleeper = new Sleeper(1000, this);
150 }
151
152 @Override
153 public Configuration getConfiguration() {
154 return conf;
155 }
156
157 @Override
158 public ClusterConnection getConnection() {
159 return null;
160 }
161
162 @Override
163 public MetaTableLocator getMetaTableLocator() {
164 return null;
165 }
166
167 @Override
168 public ZooKeeperWatcher getZooKeeper() {
169 return zookeeper;
170 }
171
172 @Override
173 public CoordinatedStateManager getCoordinatedStateManager() {
174 return null;
175 }
176
177 @Override
178 public boolean isAborted() {
179 return aborted;
180 }
181
182 @Override
183 public ServerName getServerName() {
184 return ServerName.valueOf(isa.getHostName(), isa.getPort(), startcode);
185 }
186
187 @Override
188 public void abort(String reason, Throwable error) {
189 LOG.fatal("Aborting on: "+reason, error);
190 this.aborted = true;
191 this.stopped = true;
192 sleeper.skipSleepCycle();
193 }
194
195 private void initialize() throws IOException {
196
197 Configuration zkConf = new Configuration(conf);
198 zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
199 this.zookeeper = new ZooKeeperWatcher(zkConf, TokenServer.class.getSimpleName(),
200 this, true);
201 this.rpcServer.start();
202
203
204 final RegionServerServices mockServices = TEST_UTIL.createMockRegionServerService(rpcServer);
205
206
207 super.start(new RegionCoprocessorEnvironment() {
208 @Override
209 public HRegion getRegion() { return null; }
210
211 @Override
212 public RegionServerServices getRegionServerServices() {
213 return mockServices;
214 }
215
216 @Override
217 public ConcurrentMap<String, Object> getSharedData() { return null; }
218
219 @Override
220 public int getVersion() { return 0; }
221
222 @Override
223 public String getHBaseVersion() { return null; }
224
225 @Override
226 public Coprocessor getInstance() { return null; }
227
228 @Override
229 public int getPriority() { return 0; }
230
231 @Override
232 public int getLoadSequence() { return 0; }
233
234 @Override
235 public Configuration getConfiguration() { return conf; }
236
237 @Override
238 public HTableInterface getTable(TableName tableName) throws IOException
239 { return null; }
240
241 @Override
242 public HTableInterface getTable(TableName tableName, ExecutorService service)
243 throws IOException {
244 return null;
245 }
246
247 @Override
248 public ClassLoader getClassLoader() {
249 return Thread.currentThread().getContextClassLoader();
250 }
251
252 @Override
253 public HRegionInfo getRegionInfo() {
254 return null;
255 }
256 });
257
258 started = true;
259 }
260
261 public void run() {
262 try {
263 initialize();
264 while (!stopped) {
265 this.sleeper.sleep();
266 }
267 } catch (Exception e) {
268 abort(e.getMessage(), e);
269 }
270 this.rpcServer.stop();
271 }
272
273 public boolean isStarted() {
274 return started;
275 }
276
277 @Override
278 public void stop(String reason) {
279 LOG.info("Stopping due to: "+reason);
280 this.stopped = true;
281 sleeper.skipSleepCycle();
282 }
283
284 @Override
285 public boolean isStopped() {
286 return stopped;
287 }
288
289 public InetSocketAddress getAddress() {
290 return isa;
291 }
292
293 public SecretManager<? extends TokenIdentifier> getSecretManager() {
294 return ((RpcServer)rpcServer).getSecretManager();
295 }
296
297 @Override
298 public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
299 RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
300 throws ServiceException {
301 LOG.debug("Authentication token request from " + RpcServer.getRequestUserName());
302
303 ServerRpcController serverController = new ServerRpcController();
304 BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> callback =
305 new BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>();
306 getAuthenticationToken(serverController, request, callback);
307 try {
308 serverController.checkFailed();
309 return callback.get();
310 } catch (IOException ioe) {
311 throw new ServiceException(ioe);
312 }
313 }
314
315 @Override
316 public AuthenticationProtos.WhoAmIResponse whoAmI(
317 RpcController controller, AuthenticationProtos.WhoAmIRequest request)
318 throws ServiceException {
319 LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName());
320
321 ServerRpcController serverController = new ServerRpcController();
322 BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
323 new BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse>();
324 whoAmI(serverController, request, callback);
325 try {
326 serverController.checkFailed();
327 return callback.get();
328 } catch (IOException ioe) {
329 throw new ServiceException(ioe);
330 }
331 }
332
333 @Override
334 public ChoreService getChoreService() {
335 return null;
336 }
337 }
338
339 private static HBaseTestingUtility TEST_UTIL;
340 private static TokenServer server;
341 private static Thread serverThread;
342 private static AuthenticationTokenSecretManager secretManager;
343 private static ClusterId clusterId = new ClusterId();
344
345 @BeforeClass
346 public static void setupBeforeClass() throws Exception {
347 TEST_UTIL = new HBaseTestingUtility();
348 TEST_UTIL.startMiniZKCluster();
349
350 SecurityInfo.addInfo(AuthenticationProtos.AuthenticationService.getDescriptor().getName(),
351 new SecurityInfo("hbase.test.kerberos.principal",
352 AuthenticationProtos.TokenIdentifier.Kind.HBASE_AUTH_TOKEN));
353
354 Configuration conf = TEST_UTIL.getConfiguration();
355 conf.set("hadoop.security.authentication", "kerberos");
356 conf.set("hbase.security.authentication", "kerberos");
357 conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION, true);
358 server = new TokenServer(conf);
359 serverThread = new Thread(server);
360 Threads.setDaemonThreadRunning(serverThread, "TokenServer:"+server.getServerName().toString());
361
362 while (!server.isStarted() && !server.isStopped()) {
363 Thread.sleep(10);
364 }
365 server.rpcServer.refreshAuthManager(new PolicyProvider() {
366 @Override
367 public Service[] getServices() {
368 return new Service [] {
369 new Service("security.client.protocol.acl",
370 AuthenticationProtos.AuthenticationService.BlockingInterface.class)};
371 }
372 });
373 ZKClusterId.setClusterId(server.getZooKeeper(), clusterId);
374 secretManager = (AuthenticationTokenSecretManager)server.getSecretManager();
375 while(secretManager.getCurrentKey() == null) {
376 Thread.sleep(1);
377 }
378 }
379
380 @AfterClass
381 public static void tearDownAfterClass() throws Exception {
382 server.stop("Test complete");
383 Threads.shutdown(serverThread);
384 TEST_UTIL.shutdownMiniZKCluster();
385 }
386
387 @Test
388 public void testTokenCreation() throws Exception {
389 Token<AuthenticationTokenIdentifier> token =
390 secretManager.generateToken("testuser");
391
392 AuthenticationTokenIdentifier ident = new AuthenticationTokenIdentifier();
393 Writables.getWritable(token.getIdentifier(), ident);
394 assertEquals("Token username should match", "testuser",
395 ident.getUsername());
396 byte[] passwd = secretManager.retrievePassword(ident);
397 assertTrue("Token password and password from secret manager should match",
398 Bytes.equals(token.getPassword(), passwd));
399 }
400
401 @Test
402 public void testTokenAuthentication() throws Exception {
403 UserGroupInformation testuser =
404 UserGroupInformation.createUserForTesting("testuser", new String[]{"testgroup"});
405
406 testuser.setAuthenticationMethod(
407 UserGroupInformation.AuthenticationMethod.TOKEN);
408 final Configuration conf = TEST_UTIL.getConfiguration();
409 UserGroupInformation.setConfiguration(conf);
410 Token<AuthenticationTokenIdentifier> token =
411 secretManager.generateToken("testuser");
412 LOG.debug("Got token: " + token.toString());
413 testuser.addToken(token);
414
415
416 testuser.doAs(new PrivilegedExceptionAction<Object>() {
417 public Object run() throws Exception {
418 Configuration c = server.getConfiguration();
419 RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
420 ServerName sn =
421 ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
422 System.currentTimeMillis());
423 try {
424 BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
425 User.getCurrent(), HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
426 AuthenticationProtos.AuthenticationService.BlockingInterface stub =
427 AuthenticationProtos.AuthenticationService.newBlockingStub(channel);
428 AuthenticationProtos.WhoAmIResponse response =
429 stub.whoAmI(null, AuthenticationProtos.WhoAmIRequest.getDefaultInstance());
430 String myname = response.getUsername();
431 assertEquals("testuser", myname);
432 String authMethod = response.getAuthMethod();
433 assertEquals("TOKEN", authMethod);
434 } finally {
435 rpcClient.close();
436 }
437 return null;
438 }
439 });
440 }
441
442 @Test
443 public void testUseExistingToken() throws Exception {
444 User user = User.createUserForTesting(TEST_UTIL.getConfiguration(), "testuser2",
445 new String[]{"testgroup"});
446 Token<AuthenticationTokenIdentifier> token =
447 secretManager.generateToken(user.getName());
448 assertNotNull(token);
449 user.addToken(token);
450
451
452 Token<AuthenticationTokenIdentifier> firstToken =
453 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
454 assertNotNull(firstToken);
455 assertEquals(token, firstToken);
456
457 Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
458 try {
459 assertFalse(TokenUtil.addTokenIfMissing(conn, user));
460
461 Token<AuthenticationTokenIdentifier> secondToken =
462 new AuthenticationTokenSelector().selectToken(token.getService(), user.getTokens());
463 assertEquals(firstToken, secondToken);
464 } finally {
465 conn.close();
466 }
467 }
468 }