1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.security.access;
20
21 import java.io.IOException;
22 import java.math.BigInteger;
23 import java.security.PrivilegedAction;
24 import java.security.SecureRandom;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.fs.permission.FsPermission;
38 import org.apache.hadoop.hbase.Coprocessor;
39 import org.apache.hadoop.hbase.CoprocessorEnvironment;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.classification.InterfaceAudience;
43 import org.apache.hadoop.hbase.client.Connection;
44 import org.apache.hadoop.hbase.client.ConnectionFactory;
45 import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
46 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
47 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
48 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
49 import org.apache.hadoop.hbase.ipc.RpcServer;
50 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
51 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
52 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
53 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
54 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
55 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse;
56 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
57 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse;
58 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest;
59 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse;
60 import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService;
61 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
62 import org.apache.hadoop.hbase.quotas.QuotaUtil;
63 import org.apache.hadoop.hbase.quotas.SpaceLimitingException;
64 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
65 import org.apache.hadoop.hbase.regionserver.Region;
66 import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener;
67 import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
68 import org.apache.hadoop.hbase.security.User;
69 import org.apache.hadoop.hbase.security.UserProvider;
70 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
71 import org.apache.hadoop.hbase.security.token.TokenUtil;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.util.FSHDFSUtils;
74 import org.apache.hadoop.hbase.util.Methods;
75 import org.apache.hadoop.hbase.util.Pair;
76 import org.apache.hadoop.io.Text;
77 import org.apache.hadoop.security.UserGroupInformation;
78 import org.apache.hadoop.security.token.Token;
79
80 import com.google.protobuf.RpcCallback;
81 import com.google.protobuf.RpcController;
82 import com.google.protobuf.Service;
83
84 import java.util.Arrays;
85 import java.util.HashSet;
86 import java.util.Set;
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 @InterfaceAudience.Private
115 public class SecureBulkLoadEndpoint extends SecureBulkLoadService
116 implements CoprocessorService, Coprocessor {
117
118 public static final long VERSION = 0L;
119
120
121 private static final int RANDOM_WIDTH = 320;
122 private static final int RANDOM_RADIX = 32;
123
124 private static Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class);
125
126 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
127 private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x");
128 private final static String[] FsWithoutSupportPermission = {"s3", "s3a", "s3n", "wasb", "wasbs", "swift"};
129
130 private SecureRandom random;
131 private FileSystem fs;
132 private Configuration conf;
133
134
135
136 private Path baseStagingDir;
137
138 private RegionCoprocessorEnvironment env;
139
140 private UserProvider userProvider;
141
142 @Override
143 public void start(CoprocessorEnvironment env) {
144 this.env = (RegionCoprocessorEnvironment)env;
145 random = new SecureRandom();
146 conf = env.getConfiguration();
147 baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf);
148 this.userProvider = UserProvider.instantiate(conf);
149 Set<String> fsSet = new HashSet<String>(Arrays.asList(FsWithoutSupportPermission));
150
151 try {
152 fs = baseStagingDir.getFileSystem(conf);
153 if (!fs.exists(baseStagingDir)) {
154 fs.mkdirs(baseStagingDir, PERM_HIDDEN);
155 } else {
156 fs.setPermission(baseStagingDir, PERM_HIDDEN);
157 }
158
159 fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN);
160 FileStatus status = fs.getFileStatus(baseStagingDir);
161 if(status == null) {
162 throw new IllegalStateException("Failed to create staging directory");
163 }
164 LOG.debug("Permission for " + baseStagingDir + " is " + status.getPermission());
165 String scheme = fs.getScheme().toLowerCase();
166 if (!fsSet.contains(scheme) && !status.getPermission().equals(PERM_HIDDEN)) {
167 throw new IllegalStateException(
168 "Directory already exists but permissions aren't set to '-rwx--x--x' ");
169 }
170 } catch (IOException e) {
171 throw new IllegalStateException("Failed to get FileSystem instance",e);
172 }
173 }
174
175 @Override
176 public void stop(CoprocessorEnvironment env) throws IOException {
177 }
178
179 @Override
180 public void prepareBulkLoad(RpcController controller,
181 PrepareBulkLoadRequest request,
182 RpcCallback<PrepareBulkLoadResponse> done){
183 try {
184 List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
185
186 if(bulkLoadObservers != null) {
187 ObserverContext<RegionCoprocessorEnvironment> ctx =
188 new ObserverContext<RegionCoprocessorEnvironment>();
189 ctx.prepare(env);
190
191 for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
192 bulkLoadObserver.prePrepareBulkLoad(ctx, request);
193 }
194 }
195
196 String bulkToken = createStagingDir(baseStagingDir,
197 getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
198 done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
199 } catch (IOException e) {
200 ResponseConverter.setControllerException(controller, e);
201 }
202 done.run(null);
203 }
204
205 @Override
206 public void cleanupBulkLoad(RpcController controller,
207 CleanupBulkLoadRequest request,
208 RpcCallback<CleanupBulkLoadResponse> done) {
209 try {
210 List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
211
212 if(bulkLoadObservers != null) {
213 ObserverContext<RegionCoprocessorEnvironment> ctx =
214 new ObserverContext<RegionCoprocessorEnvironment>();
215 ctx.prepare(env);
216
217 for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
218 bulkLoadObserver.preCleanupBulkLoad(ctx, request);
219 }
220 }
221
222 Path p = new Path(request.getBulkToken());
223 LOG.info("Deleting " + p);
224 boolean b = fs.delete(p, true);
225 LOG.info("Deleted " + p + " " + b);
226 done.run(CleanupBulkLoadResponse.newBuilder().build());
227 } catch (IOException e) {
228 ResponseConverter.setControllerException(controller, e);
229 }
230 done.run(null);
231 }
232
233 @Override
234 public void secureBulkLoadHFiles(RpcController controller,
235 final SecureBulkLoadHFilesRequest request,
236 RpcCallback<SecureBulkLoadHFilesResponse> done) {
237 final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
238 for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
239 familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
240 }
241
242 Token userToken = null;
243 if (userProvider.isHadoopSecurityEnabled()) {
244 userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
245 .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text(
246 request.getFsToken().getService()));
247 }
248 final String bulkToken = request.getBulkToken();
249 User user = getActiveUser();
250 final UserGroupInformation ugi = user.getUGI();
251 if (userProvider.isHadoopSecurityEnabled()) {
252 try (Connection connection = ConnectionFactory.createConnection(conf)) {
253 Token tok = TokenUtil.obtainToken(connection);
254 if (tok != null) {
255 boolean b = ugi.addToken(tok);
256 LOG.debug("extra token added " + tok + ", ret=" + b);
257 }
258 } catch (IOException ioe) {
259 LOG.warn("unable to add redentials", ioe);
260 }
261 }
262 if (userToken != null) {
263 ugi.addToken(userToken);
264 } else if (userProvider.isHadoopSecurityEnabled()) {
265
266
267 ResponseConverter.setControllerException(controller,
268 new DoNotRetryIOException("User token cannot be null"));
269 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
270 return;
271 }
272
273 Region region = env.getRegion();
274 boolean bypass = false;
275 if (region.getCoprocessorHost() != null) {
276 try {
277 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
278 } catch (IOException e) {
279 ResponseConverter.setControllerException(controller, e);
280 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
281 return;
282 }
283 }
284
285
286 if (QuotaUtil.isQuotaEnabled(conf)) {
287 ActivePolicyEnforcement activeSpaceQuotas = env.getRegionServerServices()
288 .getRegionServerSpaceQuotaManager().getActiveEnforcements();
289 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(region);
290 if (null != enforcement && enforcement.shouldCheckBulkLoads()) {
291
292 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
293 for (FamilyPath familyPath : request.getFamilyPathList()) {
294 filePaths.add(familyPath.getPath());
295 }
296 try {
297
298 enforcement.checkBulkLoad(env.getRegionServerServices().getFileSystem(), filePaths);
299 } catch (SpaceLimitingException e) {
300 ResponseConverter.setControllerException(controller, e);
301 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
302 return;
303 }
304 }
305 }
306
307 boolean loaded = false;
308 Map<byte[], List<Path>> map = null;
309 if (!bypass) {
310
311
312
313
314
315 if (userProvider.isHadoopSecurityEnabled()) {
316 FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer");
317 try {
318 targetfsDelegationToken.acquireDelegationToken(fs);
319 } catch (IOException e) {
320 ResponseConverter.setControllerException(controller, e);
321 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
322 return;
323 }
324 Token<?> targetFsToken = targetfsDelegationToken.getUserToken();
325 if (targetFsToken != null
326 && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) {
327 ugi.addToken(targetFsToken);
328 }
329 }
330
331 map = ugi.doAs(new PrivilegedAction<Map<byte[], List<Path>>>() {
332 @Override
333 public Map<byte[], List<Path>> run() {
334 FileSystem fs = null;
335 try {
336 Configuration conf = env.getConfiguration();
337 fs = FileSystem.get(conf);
338 for(Pair<byte[], String> el: familyPaths) {
339 Path p = new Path(el.getSecond());
340 Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
341 if(!fs.exists(stageFamily)) {
342 fs.mkdirs(stageFamily);
343 fs.setPermission(stageFamily, PERM_ALL_ACCESS);
344 }
345 }
346
347
348 return env.getRegion().bulkLoadHFiles(familyPaths, true,
349 new SecureBulkLoadListener(fs, bulkToken, conf),
350 request.hasCopyFiles() ? request.getCopyFiles() : false);
351 } catch (Exception e) {
352 LOG.error("Failed to complete bulk load", e);
353 }
354 return null;
355 }
356 });
357 loaded = map != null && !map.isEmpty();
358 }
359 if (region.getCoprocessorHost() != null) {
360 try {
361 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
362 } catch (IOException e) {
363 ResponseConverter.setControllerException(controller, e);
364 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());
365 return;
366 }
367 }
368 done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());
369 }
370
371 private List<BulkLoadObserver> getBulkLoadObservers() {
372 List<BulkLoadObserver> coprocessorList =
373 this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);
374
375 return coprocessorList;
376 }
377
378 private Path createStagingDir(Path baseDir,
379 User user,
380 TableName tableName) throws IOException {
381 String tblName = tableName.getNameAsString().replace(":", "_");
382 String randomDir = user.getShortName()+"__"+ tblName +"__"+
383 (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));
384 return createStagingDir(baseDir, user, randomDir);
385 }
386
387 private Path createStagingDir(Path baseDir,
388 User user,
389 String randomDir) throws IOException {
390 Path p = new Path(baseDir, randomDir);
391 fs.mkdirs(p, PERM_ALL_ACCESS);
392 fs.setPermission(p, PERM_ALL_ACCESS);
393 return p;
394 }
395
396 private User getActiveUser() {
397 User user = RpcServer.getRequestUser();
398 if (user == null) {
399 return null;
400 }
401
402
403 if (userProvider.isHadoopSecurityEnabled()
404 && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
405 return User.createUserForTesting(conf, user.getShortName(), new String[]{});
406 }
407
408 return user;
409 }
410
411 @Override
412 public Service getService() {
413 return this;
414 }
415
416 private static class SecureBulkLoadListener implements BulkLoadListener {
417
418 private FileSystem fs;
419 private String stagingDir;
420 private Configuration conf;
421
422 private FileSystem srcFs = null;
423 private Map<String, FsPermission> origPermissions = null;
424
425 public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
426 this.fs = fs;
427 this.stagingDir = stagingDir;
428 this.conf = conf;
429 this.origPermissions = new HashMap<String, FsPermission>();
430 }
431
432 @Override
433 public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
434 throws IOException {
435 Path p = new Path(srcPath);
436 Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
437
438
439 if (p.equals(stageP)) {
440 LOG.debug(p.getName()
441 + " is already available in staging directory. Skipping copy or rename.");
442 return stageP.toString();
443 }
444
445 if (srcFs == null) {
446 srcFs = FileSystem.get(p.toUri(), conf);
447 }
448
449 if(!isFile(p)) {
450 throw new IOException("Path does not reference a file: " + p);
451 }
452
453
454 if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
455 LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
456 "the destination filesystem. Copying file over to destination staging dir.");
457 FileUtil.copy(srcFs, p, fs, stageP, false, conf);
458 } else if (copyFile) {
459 LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
460 FileUtil.copy(srcFs, p, fs, stageP, false, conf);
461 } else {
462 LOG.debug("Moving " + p + " to " + stageP);
463 FileStatus origFileStatus = fs.getFileStatus(p);
464 origPermissions.put(srcPath, origFileStatus.getPermission());
465 if(!fs.rename(p, stageP)) {
466 throw new IOException("Failed to move HFile: " + p + " to " + stageP);
467 }
468 }
469 fs.setPermission(stageP, PERM_ALL_ACCESS);
470 return stageP.toString();
471 }
472
473 @Override
474 public void doneBulkLoad(byte[] family, String srcPath) throws IOException {
475 LOG.debug("Bulk Load done for: " + srcPath);
476 }
477
478 @Override
479 public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
480 if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {
481
482 return;
483 }
484 Path p = new Path(srcPath);
485 Path stageP = new Path(stagingDir,
486 new Path(Bytes.toString(family), p.getName()));
487
488
489
490 if (p.equals(stageP)) {
491 LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
492 return;
493 }
494
495 LOG.debug("Moving " + stageP + " back to " + p);
496 if(!fs.rename(stageP, p))
497 throw new IOException("Failed to move HFile: " + stageP + " to " + p);
498
499
500 if (origPermissions.containsKey(srcPath)) {
501 fs.setPermission(p, origPermissions.get(srcPath));
502 } else {
503 LOG.warn("Can't find previous permission for path=" + srcPath);
504 }
505 }
506
507
508
509
510
511
512
513
514 private boolean isFile(Path p) throws IOException {
515 FileStatus status = srcFs.getFileStatus(p);
516 boolean isFile = !status.isDirectory();
517 try {
518 isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);
519 } catch (Exception e) {
520 }
521 return isFile;
522 }
523 }
524 }