View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.security.access;
20  
21  import com.google.common.net.HostAndPort;
22  
23  import java.io.IOException;
24  import java.net.InetAddress;
25  import java.security.PrivilegedExceptionAction;
26  import java.util.Collection;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Map.Entry;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.TreeSet;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.CompoundConfiguration;
43  import org.apache.hadoop.hbase.CoprocessorEnvironment;
44  import org.apache.hadoop.hbase.DoNotRetryIOException;
45  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.KeyValue;
51  import org.apache.hadoop.hbase.KeyValue.Type;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.NamespaceDescriptor;
54  import org.apache.hadoop.hbase.ProcedureInfo;
55  import org.apache.hadoop.hbase.ServerName;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.Tag;
58  import org.apache.hadoop.hbase.TagRewriteCell;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.client.Append;
61  import org.apache.hadoop.hbase.client.Delete;
62  import org.apache.hadoop.hbase.client.Durability;
63  import org.apache.hadoop.hbase.client.Get;
64  import org.apache.hadoop.hbase.client.Increment;
65  import org.apache.hadoop.hbase.client.Mutation;
66  import org.apache.hadoop.hbase.client.Put;
67  import org.apache.hadoop.hbase.client.Query;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.client.Table;
71  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
72  import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
73  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
74  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
75  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
76  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
77  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
78  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
79  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
80  import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
81  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
82  import org.apache.hadoop.hbase.filter.CompareFilter;
83  import org.apache.hadoop.hbase.filter.Filter;
84  import org.apache.hadoop.hbase.filter.FilterList;
85  import org.apache.hadoop.hbase.io.hfile.HFile;
86  import org.apache.hadoop.hbase.ipc.RpcServer;
87  import org.apache.hadoop.hbase.master.MasterServices;
88  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
89  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
90  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
91  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
92  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
93  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
94  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
96  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
97  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest;
98  import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest;
99  import org.apache.hadoop.hbase.regionserver.InternalScanner;
100 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
101 import org.apache.hadoop.hbase.regionserver.Region;
102 import org.apache.hadoop.hbase.regionserver.RegionScanner;
103 import org.apache.hadoop.hbase.regionserver.ScanType;
104 import org.apache.hadoop.hbase.regionserver.ScannerContext;
105 import org.apache.hadoop.hbase.regionserver.Store;
106 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
107 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
108 import org.apache.hadoop.hbase.security.AccessDeniedException;
109 import org.apache.hadoop.hbase.security.Superusers;
110 import org.apache.hadoop.hbase.security.User;
111 import org.apache.hadoop.hbase.security.UserProvider;
112 import org.apache.hadoop.hbase.security.access.Permission.Action;
113 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
114 import org.apache.hadoop.hbase.util.ByteRange;
115 import org.apache.hadoop.hbase.util.Bytes;
116 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
117 import org.apache.hadoop.hbase.util.Pair;
118 import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
119 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
120 
121 import com.google.common.collect.ArrayListMultimap;
122 import com.google.common.collect.ImmutableSet;
123 import com.google.common.collect.ListMultimap;
124 import com.google.common.collect.Lists;
125 import com.google.common.collect.MapMaker;
126 import com.google.common.collect.Maps;
127 import com.google.common.collect.Sets;
128 import com.google.protobuf.Message;
129 import com.google.protobuf.RpcCallback;
130 import com.google.protobuf.RpcController;
131 import com.google.protobuf.Service;
132 
133 /**
134  * Provides basic authorization checks for data access and administrative
135  * operations.
136  *
137  * <p>
138  * {@code AccessController} performs authorization checks for HBase operations
139  * based on:
140  * <ul>
141  *   <li>the identity of the user performing the operation</li>
142  *   <li>the scope over which the operation is performed, in increasing
143  *   specificity: global, table, column family, or qualifier</li>
144  *   <li>the type of action being performed (as mapped to
145  *   {@link Permission.Action} values)</li>
146  * </ul>
147  * If the authorization check fails, an {@link AccessDeniedException}
148  * will be thrown for the operation.
149  * </p>
150  *
151  * <p>
152  * To perform authorization checks, {@code AccessController} relies on the
153  * RpcServerEngine being loaded to provide
154  * the user identities for remote requests.
155  * </p>
156  *
157  * <p>
158  * The access control lists used for authorization can be manipulated via the
159  * exposed {@link AccessControlService} Interface implementation, and the associated
160  * {@code grant}, {@code revoke}, and {@code user_permission} HBase shell
161  * commands.
162  * </p>
163  */
164 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
165 public class AccessController extends BaseMasterAndRegionObserver
166     implements RegionServerObserver,
167       AccessControlService.Interface, CoprocessorService, EndpointObserver, BulkLoadObserver {
168 
169   public static final Log LOG = LogFactory.getLog(AccessController.class);
170 
171   private static final Log AUDITLOG =
172     LogFactory.getLog("SecurityLogger."+AccessController.class.getName());
173   private static final String CHECK_COVERING_PERM = "check_covering_perm";
174   private static final String TAG_CHECK_PASSED = "tag_check_passed";
175   private static final byte[] TRUE = Bytes.toBytes(true);
176 
177   TableAuthManager authManager = null;
178 
179   /** flags if we are running on a region of the _acl_ table */
180   boolean aclRegion = false;
181 
182   /** defined only for Endpoint implementation, so it can have way to
183    access region services */
184   private RegionCoprocessorEnvironment regionEnv;
185 
186   /** Mapping of scanner instances to the user who created them */
187   private Map<InternalScanner,String> scannerOwners =
188       new MapMaker().weakKeys().makeMap();
189 
190   private Map<TableName, List<UserPermission>> tableAcls;
191 
192   /** Provider for mapping principal names to Users */
193   private UserProvider userProvider;
194 
195   /** if we are active, usually true, only not true if "hbase.security.authorization"
196    has been set to false in site configuration */
197   boolean authorizationEnabled;
198 
199   /** if we are able to support cell ACLs */
200   boolean cellFeaturesEnabled;
201 
202   /** if we should check EXEC permissions */
203   boolean shouldCheckExecPermission;
204 
205   /** if we should terminate access checks early as soon as table or CF grants
206     allow access; pre-0.98 compatible behavior */
207   boolean compatibleEarlyTermination;
208 
209   /** if we have been successfully initialized */
210   private volatile boolean initialized = false;
211 
212   /** if the ACL table is available, only relevant in the master */
213   private volatile boolean aclTabAvailable = false;
214 
215   public Region getRegion() {
216     return regionEnv != null ? regionEnv.getRegion() : null;
217   }
218 
219   public TableAuthManager getAuthManager() {
220     return authManager;
221   }
222 
223   void initialize(RegionCoprocessorEnvironment e) throws IOException {
224     final Region region = e.getRegion();
225     Configuration conf = e.getConfiguration();
226     Map<byte[], ListMultimap<String,TablePermission>> tables =
227         AccessControlLists.loadAll(region);
228     // For each table, write out the table's permissions to the respective
229     // znode for that table.
230     for (Map.Entry<byte[], ListMultimap<String,TablePermission>> t:
231       tables.entrySet()) {
232       byte[] entry = t.getKey();
233       ListMultimap<String,TablePermission> perms = t.getValue();
234       byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
235       this.authManager.getZKPermissionWatcher().writeToZookeeper(entry, serialized);
236     }
237     initialized = true;
238   }
239 
240   /**
241    * Writes all table ACLs for the tables in the given Map up into ZooKeeper
242    * znodes.  This is called to synchronize ACL changes following {@code _acl_}
243    * table updates.
244    */
245   void updateACL(RegionCoprocessorEnvironment e,
246       final Map<byte[], List<Cell>> familyMap) {
247     Set<byte[]> entries =
248         new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
249     for (Map.Entry<byte[], List<Cell>> f : familyMap.entrySet()) {
250       List<Cell> cells = f.getValue();
251       for (Cell cell: cells) {
252         if (Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
253             cell.getFamilyLength(), AccessControlLists.ACL_LIST_FAMILY, 0,
254             AccessControlLists.ACL_LIST_FAMILY.length)) {
255           entries.add(CellUtil.cloneRow(cell));
256         }
257       }
258     }
259     ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
260     Configuration conf = regionEnv.getConfiguration();
261     for (byte[] entry: entries) {
262       try {
263         try (Table t = regionEnv.getTable(AccessControlLists.ACL_TABLE_NAME)) {
264           ListMultimap<String,TablePermission> perms =
265               AccessControlLists.getPermissions(conf, entry, t);
266           byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
267           zkw.writeToZookeeper(entry, serialized);
268         }
269       } catch (IOException ex) {
270         LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
271             ex);
272       }
273     }
274   }
275 
276   /**
277    * Check the current user for authorization to perform a specific action
278    * against the given set of row data.
279    *
280    * <p>Note: Ordering of the authorization checks
281    * has been carefully optimized to short-circuit the most common requests
282    * and minimize the amount of processing required.</p>
283    *
284    * @param permRequest the action being requested
285    * @param e the coprocessor environment
286    * @param families the map of column families to qualifiers present in
287    * the request
288    * @return an authorization result
289    */
290   AuthResult permissionGranted(String request, User user, Action permRequest,
291       RegionCoprocessorEnvironment e,
292       Map<byte [], ? extends Collection<?>> families) {
293     HRegionInfo hri = e.getRegion().getRegionInfo();
294     TableName tableName = hri.getTable();
295 
296     // 1. All users need read access to hbase:meta table.
297     // this is a very common operation, so deal with it quickly.
298     if (hri.isMetaRegion()) {
299       if (permRequest == Action.READ) {
300         return AuthResult.allow(request, "All users allowed", user,
301           permRequest, tableName, families);
302       }
303     }
304 
305     if (user == null) {
306       return AuthResult.deny(request, "No user associated with request!", null,
307         permRequest, tableName, families);
308     }
309 
310     // 2. check for the table-level, if successful we can short-circuit
311     if (authManager.authorize(user, tableName, (byte[])null, permRequest)) {
312       return AuthResult.allow(request, "Table permission granted", user,
313         permRequest, tableName, families);
314     }
315 
316     // 3. check permissions against the requested families
317     if (families != null && families.size() > 0) {
318       // all families must pass
319       for (Map.Entry<byte [], ? extends Collection<?>> family : families.entrySet()) {
320         // a) check for family level access
321         if (authManager.authorize(user, tableName, family.getKey(),
322             permRequest)) {
323           continue;  // family-level permission overrides per-qualifier
324         }
325 
326         // b) qualifier level access can still succeed
327         if ((family.getValue() != null) && (family.getValue().size() > 0)) {
328           if (family.getValue() instanceof Set) {
329             // for each qualifier of the family
330             Set<byte[]> familySet = (Set<byte[]>)family.getValue();
331             for (byte[] qualifier : familySet) {
332               if (!authManager.authorize(user, tableName, family.getKey(),
333                                          qualifier, permRequest)) {
334                 return AuthResult.deny(request, "Failed qualifier check", user,
335                     permRequest, tableName, makeFamilyMap(family.getKey(), qualifier));
336               }
337             }
338           } else if (family.getValue() instanceof List) { // List<KeyValue>
339             List<KeyValue> kvList = (List<KeyValue>)family.getValue();
340             for (KeyValue kv : kvList) {
341               if (!authManager.authorize(user, tableName, family.getKey(),
342                       kv.getQualifier(), permRequest)) {
343                 return AuthResult.deny(request, "Failed qualifier check", user,
344                     permRequest, tableName, makeFamilyMap(family.getKey(), kv.getQualifier()));
345               }
346             }
347           }
348         } else {
349           // no qualifiers and family-level check already failed
350           return AuthResult.deny(request, "Failed family check", user, permRequest,
351               tableName, makeFamilyMap(family.getKey(), null));
352         }
353       }
354 
355       // all family checks passed
356       return AuthResult.allow(request, "All family checks passed", user, permRequest,
357           tableName, families);
358     }
359 
360     // 4. no families to check and table level access failed
361     return AuthResult.deny(request, "No families to check and table permission failed",
362         user, permRequest, tableName, families);
363   }
364 
365   /**
366    * Check the current user for authorization to perform a specific action
367    * against the given set of row data.
368    * @param opType the operation type
369    * @param user the user
370    * @param e the coprocessor environment
371    * @param families the map of column families to qualifiers present in
372    * the request
373    * @param actions the desired actions
374    * @return an authorization result
375    */
376   AuthResult permissionGranted(OpType opType, User user, RegionCoprocessorEnvironment e,
377       Map<byte [], ? extends Collection<?>> families, Action... actions) {
378     AuthResult result = null;
379     for (Action action: actions) {
380       result = permissionGranted(opType.toString(), user, action, e, families);
381       if (!result.isAllowed()) {
382         return result;
383       }
384     }
385     return result;
386   }
387 
388   private void logResult(AuthResult result) {
389     if (AUDITLOG.isTraceEnabled()) {
390       InetAddress remoteAddr = RpcServer.getRemoteAddress();
391       AUDITLOG.trace("Access " + (result.isAllowed() ? "allowed" : "denied") +
392           " for user " + (result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN") +
393           "; reason: " + result.getReason() +
394           "; remote address: " + (remoteAddr != null ? remoteAddr : "") +
395           "; request: " + result.getRequest() +
396           "; context: " + result.toContextString());
397     }
398   }
399 
400   /**
401    * Returns the active user to which authorization checks should be applied.
402    * If we are in the context of an RPC call, the remote user is used,
403    * otherwise the currently logged in user is used.
404    */
405   private User getActiveUser() throws IOException {
406     User user = RpcServer.getRequestUser();
407     if (user == null) {
408       // for non-rpc handling, fallback to system user
409       user = userProvider.getCurrent();
410     }
411     return user;
412   }
413 
414   /**
415    * Authorizes that the current user has any of the given permissions for the
416    * given table, column family and column qualifier.
417    * @param tableName Table requested
418    * @param family Column family requested
419    * @param qualifier Column qualifier requested
420    * @throws IOException if obtaining the current user fails
421    * @throws AccessDeniedException if user has no authorization
422    */
423   private void requirePermission(String request, TableName tableName, byte[] family,
424       byte[] qualifier, Action... permissions) throws IOException {
425     User user = getActiveUser();
426     AuthResult result = null;
427 
428     for (Action permission : permissions) {
429       if (authManager.authorize(user, tableName, family, qualifier, permission)) {
430         result = AuthResult.allow(request, "Table permission granted", user,
431                                   permission, tableName, family, qualifier);
432         break;
433       } else {
434         // rest of the world
435         result = AuthResult.deny(request, "Insufficient permissions", user,
436                                  permission, tableName, family, qualifier);
437       }
438     }
439     logResult(result);
440     if (authorizationEnabled && !result.isAllowed()) {
441       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
442     }
443   }
444 
445   /**
446    * Authorizes that the current user has any of the given permissions for the
447    * given table, column family and column qualifier.
448    * @param tableName Table requested
449    * @param family Column family param
450    * @param qualifier Column qualifier param
451    * @throws IOException if obtaining the current user fails
452    * @throws AccessDeniedException if user has no authorization
453    */
454   private void requireTablePermission(String request, TableName tableName, byte[] family,
455       byte[] qualifier, Action... permissions) throws IOException {
456     User user = getActiveUser();
457     AuthResult result = null;
458 
459     for (Action permission : permissions) {
460       if (authManager.authorize(user, tableName, null, null, permission)) {
461         result = AuthResult.allow(request, "Table permission granted", user,
462             permission, tableName, null, null);
463         result.getParams().setFamily(family).setQualifier(qualifier);
464         break;
465       } else {
466         // rest of the world
467         result = AuthResult.deny(request, "Insufficient permissions", user,
468             permission, tableName, family, qualifier);
469         result.getParams().setFamily(family).setQualifier(qualifier);
470       }
471     }
472     logResult(result);
473     if (authorizationEnabled && !result.isAllowed()) {
474       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
475     }
476   }
477 
478   /**
479    * Authorizes that the current user has any of the given permissions to access the table.
480    *
481    * @param tableName Table requested
482    * @param permissions Actions being requested
483    * @throws IOException if obtaining the current user fails
484    * @throws AccessDeniedException if user has no authorization
485    */
486   private void requireAccess(String request, TableName tableName,
487       Action... permissions) throws IOException {
488     User user = getActiveUser();
489     AuthResult result = null;
490 
491     for (Action permission : permissions) {
492       if (authManager.hasAccess(user, tableName, permission)) {
493         result = AuthResult.allow(request, "Table permission granted", user,
494                                   permission, tableName, null, null);
495         break;
496       } else {
497         // rest of the world
498         result = AuthResult.deny(request, "Insufficient permissions", user,
499                                  permission, tableName, null, null);
500       }
501     }
502     logResult(result);
503     if (authorizationEnabled && !result.isAllowed()) {
504       throw new AccessDeniedException("Insufficient permissions " + result.toContextString());
505     }
506   }
507 
508   /**
509    * Authorizes that the current user has global privileges for the given action.
510    * @param perm The action being requested
511    * @throws IOException if obtaining the current user fails
512    * @throws AccessDeniedException if authorization is denied
513    */
514   private void requirePermission(String request, Action perm) throws IOException {
515     requireGlobalPermission(request, perm, null, null);
516   }
517 
518   /**
519    * Checks that the user has the given global permission. The generated
520    * audit log message will contain context information for the operation
521    * being authorized, based on the given parameters.
522    * @param perm Action being requested
523    * @param tableName Affected table name.
524    * @param familyMap Affected column families.
525    */
526   private void requireGlobalPermission(String request, Action perm, TableName tableName,
527       Map<byte[], ? extends Collection<byte[]>> familyMap) throws IOException {
528     User user = getActiveUser();
529     AuthResult result = null;
530     if (authManager.authorize(user, perm)) {
531       result = AuthResult.allow(request, "Global check allowed", user, perm, tableName, familyMap);
532       result.getParams().setTableName(tableName).setFamilies(familyMap);
533       logResult(result);
534     } else {
535       result = AuthResult.deny(request, "Global check failed", user, perm, tableName, familyMap);
536       result.getParams().setTableName(tableName).setFamilies(familyMap);
537       logResult(result);
538       if (authorizationEnabled) {
539         throw new AccessDeniedException("Insufficient permissions for user '" +
540           (user != null ? user.getShortName() : "null") +"' (global, action=" +
541           perm.toString() + ")");
542       }
543     }
544   }
545 
546   /**
547    * Checks that the user has the given global permission. The generated
548    * audit log message will contain context information for the operation
549    * being authorized, based on the given parameters.
550    * @param perm Action being requested
551    * @param namespace
552    */
553   private void requireGlobalPermission(String request, Action perm,
554                                        String namespace) throws IOException {
555     User user = getActiveUser();
556     AuthResult authResult = null;
557     if (authManager.authorize(user, perm)) {
558       authResult = AuthResult.allow(request, "Global check allowed", user, perm, null);
559       authResult.getParams().setNamespace(namespace);
560       logResult(authResult);
561     } else {
562       authResult = AuthResult.deny(request, "Global check failed", user, perm, null);
563       authResult.getParams().setNamespace(namespace);
564       logResult(authResult);
565       if (authorizationEnabled) {
566         throw new AccessDeniedException("Insufficient permissions for user '" +
567           (user != null ? user.getShortName() : "null") +"' (global, action=" +
568           perm.toString() + ")");
569       }
570     }
571   }
572 
573   /**
574    * Checks that the user has the given global or namespace permission.
575    * @param namespace
576    * @param permissions Actions being requested
577    */
578   public void requireNamespacePermission(String request, String namespace,
579       Action... permissions) throws IOException {
580     User user = getActiveUser();
581     AuthResult result = null;
582 
583     for (Action permission : permissions) {
584       if (authManager.authorize(user, namespace, permission)) {
585         result = AuthResult.allow(request, "Namespace permission granted",
586             user, permission, namespace);
587         break;
588       } else {
589         // rest of the world
590         result = AuthResult.deny(request, "Insufficient permissions", user,
591             permission, namespace);
592       }
593     }
594     logResult(result);
595     if (authorizationEnabled && !result.isAllowed()) {
596       throw new AccessDeniedException("Insufficient permissions "
597           + result.toContextString());
598     }
599   }
600 
601   /**
602    * Checks that the user has the given global or namespace permission.
603    * @param namespace
604    * @param permissions Actions being requested
605    */
606   public void requireNamespacePermission(String request, String namespace, TableName tableName,
607       Map<byte[], ? extends Collection<byte[]>> familyMap, Action... permissions)
608       throws IOException {
609     User user = getActiveUser();
610     AuthResult result = null;
611 
612     for (Action permission : permissions) {
613       if (authManager.authorize(user, namespace, permission)) {
614         result = AuthResult.allow(request, "Namespace permission granted",
615             user, permission, namespace);
616         result.getParams().setTableName(tableName).setFamilies(familyMap);
617         break;
618       } else {
619         // rest of the world
620         result = AuthResult.deny(request, "Insufficient permissions", user,
621             permission, namespace);
622         result.getParams().setTableName(tableName).setFamilies(familyMap);
623       }
624     }
625     logResult(result);
626     if (authorizationEnabled && !result.isAllowed()) {
627       throw new AccessDeniedException("Insufficient permissions "
628           + result.toContextString());
629     }
630   }
631 
632   /**
633    * Returns <code>true</code> if the current user is allowed the given action
634    * over at least one of the column qualifiers in the given column families.
635    */
636   private boolean hasFamilyQualifierPermission(User user,
637       Action perm,
638       RegionCoprocessorEnvironment env,
639       Map<byte[], ? extends Collection<byte[]>> familyMap)
640     throws IOException {
641     HRegionInfo hri = env.getRegion().getRegionInfo();
642     TableName tableName = hri.getTable();
643 
644     if (user == null) {
645       return false;
646     }
647 
648     if (familyMap != null && familyMap.size() > 0) {
649       // at least one family must be allowed
650       for (Map.Entry<byte[], ? extends Collection<byte[]>> family :
651           familyMap.entrySet()) {
652         if (family.getValue() != null && !family.getValue().isEmpty()) {
653           for (byte[] qualifier : family.getValue()) {
654             if (authManager.matchPermission(user, tableName,
655                 family.getKey(), qualifier, perm)) {
656               return true;
657             }
658           }
659         } else {
660           if (authManager.matchPermission(user, tableName, family.getKey(),
661               perm)) {
662             return true;
663           }
664         }
665       }
666     } else if (LOG.isDebugEnabled()) {
667       LOG.debug("Empty family map passed for permission check");
668     }
669 
670     return false;
671   }
672 
673   private enum OpType {
674     GET_CLOSEST_ROW_BEFORE("getClosestRowBefore"),
675     GET("get"),
676     EXISTS("exists"),
677     SCAN("scan"),
678     PUT("put"),
679     DELETE("delete"),
680     CHECK_AND_PUT("checkAndPut"),
681     CHECK_AND_DELETE("checkAndDelete"),
682     INCREMENT_COLUMN_VALUE("incrementColumnValue"),
683     APPEND("append"),
684     INCREMENT("increment");
685 
686     private String type;
687 
688     private OpType(String type) {
689       this.type = type;
690     }
691 
692     @Override
693     public String toString() {
694       return type;
695     }
696   }
697 
698   /**
699    * Determine if cell ACLs covered by the operation grant access. This is expensive.
700    * @return false if cell ACLs failed to grant access, true otherwise
701    * @throws IOException
702    */
703   private boolean checkCoveringPermission(OpType request, RegionCoprocessorEnvironment e,
704       byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs, Action... actions)
705       throws IOException {
706     if (!cellFeaturesEnabled) {
707       return false;
708     }
709     long cellGrants = 0;
710     User user = getActiveUser();
711     long latestCellTs = 0;
712     Get get = new Get(row);
713     // Only in case of Put/Delete op, consider TS within cell (if set for individual cells).
714     // When every cell, within a Mutation, can be linked with diff TS we can not rely on only one
715     // version. We have to get every cell version and check its TS against the TS asked for in
716     // Mutation and skip those Cells which is outside this Mutation TS.In case of Put, we have to
717     // consider only one such passing cell. In case of Delete we have to consider all the cell
718     // versions under this passing version. When Delete Mutation contains columns which are a
719     // version delete just consider only one version for those column cells.
720     boolean considerCellTs  = (request == OpType.PUT || request == OpType.DELETE);
721     if (considerCellTs) {
722       get.setMaxVersions();
723     } else {
724       get.setMaxVersions(1);
725     }
726     boolean diffCellTsFromOpTs = false;
727     for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
728       byte[] col = entry.getKey();
729       // TODO: HBASE-7114 could possibly unify the collection type in family
730       // maps so we would not need to do this
731       if (entry.getValue() instanceof Set) {
732         Set<byte[]> set = (Set<byte[]>)entry.getValue();
733         if (set == null || set.isEmpty()) {
734           get.addFamily(col);
735         } else {
736           for (byte[] qual: set) {
737             get.addColumn(col, qual);
738           }
739         }
740       } else if (entry.getValue() instanceof List) {
741         List<Cell> list = (List<Cell>)entry.getValue();
742         if (list == null || list.isEmpty()) {
743           get.addFamily(col);
744         } else {
745           // In case of family delete, a Cell will be added into the list with Qualifier as null.
746           for (Cell cell : list) {
747             if (cell.getQualifierLength() == 0
748                 && (cell.getTypeByte() == Type.DeleteFamily.getCode()
749                 || cell.getTypeByte() == Type.DeleteFamilyVersion.getCode())) {
750               get.addFamily(col);
751             } else {
752               get.addColumn(col, CellUtil.cloneQualifier(cell));
753             }
754             if (considerCellTs) {
755               long cellTs = cell.getTimestamp();
756               latestCellTs = Math.max(latestCellTs, cellTs);
757               diffCellTsFromOpTs = diffCellTsFromOpTs || (opTs != cellTs);
758             }
759           }
760         }
761       } else if (entry.getValue() == null) {
762         get.addFamily(col);        
763       } else {
764         throw new RuntimeException("Unhandled collection type " +
765           entry.getValue().getClass().getName());
766       }
767     }
768     // We want to avoid looking into the future. So, if the cells of the
769     // operation specify a timestamp, or the operation itself specifies a
770     // timestamp, then we use the maximum ts found. Otherwise, we bound
771     // the Get to the current server time. We add 1 to the timerange since
772     // the upper bound of a timerange is exclusive yet we need to examine
773     // any cells found there inclusively.
774     long latestTs = Math.max(opTs, latestCellTs);
775     if (latestTs == 0 || latestTs == HConstants.LATEST_TIMESTAMP) {
776       latestTs = EnvironmentEdgeManager.currentTime();
777     }
778     get.setTimeRange(0, latestTs + 1);
779     // In case of Put operation we set to read all versions. This was done to consider the case
780     // where columns are added with TS other than the Mutation TS. But normally this wont be the
781     // case with Put. There no need to get all versions but get latest version only.
782     if (!diffCellTsFromOpTs && request == OpType.PUT) {
783       get.setMaxVersions(1);
784     }
785     if (LOG.isTraceEnabled()) {
786       LOG.trace("Scanning for cells with " + get);
787     }
788     // This Map is identical to familyMap. The key is a BR rather than byte[].
789     // It will be easy to do gets over this new Map as we can create get keys over the Cell cf by
790     // new SimpleByteRange(cell.familyArray, cell.familyOffset, cell.familyLen)
791     Map<ByteRange, List<Cell>> familyMap1 = new HashMap<ByteRange, List<Cell>>();
792     for (Entry<byte[], ? extends Collection<?>> entry : familyMap.entrySet()) {
793       if (entry.getValue() instanceof List) {
794         familyMap1.put(new SimpleMutableByteRange(entry.getKey()), (List<Cell>) entry.getValue());
795       }
796     }
797     RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
798     List<Cell> cells = Lists.newArrayList();
799     Cell prevCell = null;
800     ByteRange curFam = new SimpleMutableByteRange();
801     boolean curColAllVersions = (request == OpType.DELETE);
802     long curColCheckTs = opTs;
803     boolean foundColumn = false;
804     try {
805       boolean more = false;
806       ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(1).build();
807 
808       do {
809         cells.clear();
810         // scan with limit as 1 to hold down memory use on wide rows
811         more = scanner.next(cells, scannerContext);
812         for (Cell cell: cells) {
813           if (LOG.isTraceEnabled()) {
814             LOG.trace("Found cell " + cell);
815           }
816           boolean colChange = prevCell == null || !CellUtil.matchingColumn(prevCell, cell);
817           if (colChange) foundColumn = false;
818           prevCell = cell;
819           if (!curColAllVersions && foundColumn) {
820             continue;
821           }
822           if (colChange && considerCellTs) {
823             curFam.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
824             List<Cell> cols = familyMap1.get(curFam);
825             for (Cell col : cols) {
826               // null/empty qualifier is used to denote a Family delete. The TS and delete type
827               // associated with this is applicable for all columns within the family. That is
828               // why the below (col.getQualifierLength() == 0) check.
829               if ((col.getQualifierLength() == 0 && request == OpType.DELETE)
830                   || CellUtil.matchingQualifier(cell, col)) {
831                 byte type = col.getTypeByte();
832                 if (considerCellTs) {
833                   curColCheckTs = col.getTimestamp();
834                 }
835                 // For a Delete op we pass allVersions as true. When a Delete Mutation contains
836                 // a version delete for a column no need to check all the covering cells within
837                 // that column. Check all versions when Type is DeleteColumn or DeleteFamily
838                 // One version delete types are Delete/DeleteFamilyVersion
839                 curColAllVersions = (KeyValue.Type.DeleteColumn.getCode() == type)
840                     || (KeyValue.Type.DeleteFamily.getCode() == type);
841                 break;
842               }
843             }
844           }
845           if (cell.getTimestamp() > curColCheckTs) {
846             // Just ignore this cell. This is not a covering cell.
847             continue;
848           }
849           foundColumn = true;
850           for (Action action: actions) {
851             // Are there permissions for this user for the cell?
852             if (!authManager.authorize(user, getTableName(e), cell, action)) {
853               // We can stop if the cell ACL denies access
854               return false;
855             }
856           }
857           cellGrants++;
858         }
859       } while (more);
860     } catch (AccessDeniedException ex) {
861       throw ex;
862     } catch (IOException ex) {
863       LOG.error("Exception while getting cells to calculate covering permission", ex);
864     } finally {
865       scanner.close();
866     }
867     // We should not authorize unless we have found one or more cell ACLs that
868     // grant access. This code is used to check for additional permissions
869     // after no table or CF grants are found.
870     return cellGrants > 0;
871   }
872 
873   private static void addCellPermissions(final byte[] perms, Map<byte[], List<Cell>> familyMap) {
874     // Iterate over the entries in the familyMap, replacing the cells therein
875     // with new cells including the ACL data
876     for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
877       List<Cell> newCells = Lists.newArrayList();
878       for (Cell cell: e.getValue()) {
879         // Prepend the supplied perms in a new ACL tag to an update list of tags for the cell
880         List<Tag> tags = Lists.newArrayList(new Tag(AccessControlLists.ACL_TAG_TYPE, perms));
881         if (cell.getTagsLength() > 0) {
882           Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
883             cell.getTagsOffset(), cell.getTagsLength());
884           while (tagIterator.hasNext()) {
885             tags.add(tagIterator.next());
886           }
887         }
888         newCells.add(new TagRewriteCell(cell, Tag.fromList(tags)));
889       }
890       // This is supposed to be safe, won't CME
891       e.setValue(newCells);
892     }
893   }
894 
895   // Checks whether incoming cells contain any tag with type as ACL_TAG_TYPE. This tag
896   // type is reserved and should not be explicitly set by user.
897   private void checkForReservedTagPresence(User user, Mutation m) throws IOException {
898     // No need to check if we're not going to throw
899     if (!authorizationEnabled) {
900       m.setAttribute(TAG_CHECK_PASSED, TRUE);
901       return;
902     }
903     // Superusers are allowed to store cells unconditionally.
904     if (Superusers.isSuperUser(user)) {
905       m.setAttribute(TAG_CHECK_PASSED, TRUE);
906       return;
907     }
908     // We already checked (prePut vs preBatchMutation)
909     if (m.getAttribute(TAG_CHECK_PASSED) != null) {
910       return;
911     }
912     for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
913       Cell cell = cellScanner.current();
914       if (cell.getTagsLength() > 0) {
915         Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
916           cell.getTagsLength());
917         while (tagsItr.hasNext()) {
918           if (tagsItr.next().getType() == AccessControlLists.ACL_TAG_TYPE) {
919             throw new AccessDeniedException("Mutation contains cell with reserved type tag");
920           }
921         }
922       }
923     }
924     m.setAttribute(TAG_CHECK_PASSED, TRUE);
925   }
926 
927   /* ---- MasterObserver implementation ---- */
928   @Override
929   public void start(CoprocessorEnvironment env) throws IOException {
930     CompoundConfiguration conf = new CompoundConfiguration();
931     conf.add(env.getConfiguration());
932 
933     authorizationEnabled = conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
934     if (!authorizationEnabled) {
935       LOG.warn("The AccessController has been loaded with authorization checks disabled.");
936     }
937 
938     shouldCheckExecPermission = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
939       AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
940 
941     cellFeaturesEnabled = HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS;
942     if (!cellFeaturesEnabled) {
943       LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
944           + " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
945           + " accordingly.");
946     }
947 
948     ZooKeeperWatcher zk = null;
949     if (env instanceof MasterCoprocessorEnvironment) {
950       // if running on HMaster
951       MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
952       zk = mEnv.getMasterServices().getZooKeeper();
953     } else if (env instanceof RegionServerCoprocessorEnvironment) {
954       RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
955       zk = rsEnv.getRegionServerServices().getZooKeeper();
956     } else if (env instanceof RegionCoprocessorEnvironment) {
957       // if running at region
958       regionEnv = (RegionCoprocessorEnvironment) env;
959       conf.addStringMap(regionEnv.getRegion().getTableDesc().getConfiguration());
960       zk = regionEnv.getRegionServerServices().getZooKeeper();
961       compatibleEarlyTermination = conf.getBoolean(AccessControlConstants.CF_ATTRIBUTE_EARLY_OUT,
962         AccessControlConstants.DEFAULT_ATTRIBUTE_EARLY_OUT);
963     }
964 
965     // set the user-provider.
966     this.userProvider = UserProvider.instantiate(env.getConfiguration());
967 
968     // If zk is null or IOException while obtaining auth manager,
969     // throw RuntimeException so that the coprocessor is unloaded.
970     if (zk != null) {
971       try {
972         this.authManager = TableAuthManager.get(zk, env.getConfiguration());
973       } catch (IOException ioe) {
974         throw new RuntimeException("Error obtaining TableAuthManager", ioe);
975       }
976     } else {
977       throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
978     }
979 
980     tableAcls = new MapMaker().weakValues().makeMap();
981   }
982 
983   @Override
984   public void stop(CoprocessorEnvironment env) {
985 
986   }
987 
988   @Override
989   public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> c,
990       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
991     Set<byte[]> families = desc.getFamiliesKeys();
992     Map<byte[], Set<byte[]>> familyMap = new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
993     for (byte[] family: families) {
994       familyMap.put(family, null);
995     }
996     requireNamespacePermission("createTable", desc.getTableName().getNamespaceAsString(),
997         desc.getTableName(), familyMap, Action.CREATE);
998   }
999 
1000   @Override
1001   public void postCreateTableHandler(final ObserverContext<MasterCoprocessorEnvironment> c,
1002       HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
1003     // When AC is used, it should be configured as the 1st CP.
1004     // In Master, the table operations like create, are handled by a Thread pool but the max size
1005     // for this pool is 1. So if multiple CPs create tables on startup, these creations will happen
1006     // sequentially only.
1007     // Related code in HMaster#startServiceThreads
1008     // {code}
1009     //   // We depend on there being only one instance of this executor running
1010     //   // at a time. To do concurrency, would need fencing of enable/disable of
1011     //   // tables.
1012     //   this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1013     // {code}
1014     // In future if we change this pool to have more threads, then there is a chance for thread,
1015     // creating acl table, getting delayed and by that time another table creation got over and
1016     // this hook is getting called. In such a case, we will need a wait logic here which will
1017     // wait till the acl table is created.
1018     if (AccessControlLists.isAclTable(desc)) {
1019       this.aclTabAvailable = true;
1020       LOG.info(AccessControlLists.ACL_TABLE_NAME + " is created.");
1021     } else if (!(TableName.NAMESPACE_TABLE_NAME.equals(desc.getTableName()))) {
1022       if (!aclTabAvailable) {
1023         LOG.warn("Not adding owner permission for table " + desc.getTableName() + ". "
1024             + AccessControlLists.ACL_TABLE_NAME + " is not yet created. "
1025             + getClass().getSimpleName() + " should be configured as the first Coprocessor");
1026       } else {
1027         String owner = desc.getOwnerString();
1028         // default the table owner to current user, if not specified.
1029         if (owner == null)
1030           owner = getActiveUser().getShortName();
1031         final UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1032             desc.getTableName(), null, Action.values());
1033         // switch to the real hbase master user for doing the RPC on the ACL table
1034         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1035           @Override
1036           public Void run() throws Exception {
1037             AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
1038                 userperm, c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1039             return null;
1040           }
1041         });
1042       }
1043     }
1044   }
1045 
1046   @Override
1047   public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1048       throws IOException {
1049     requirePermission("deleteTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1050   }
1051 
1052   @Override
1053   public void postDeleteTable(final ObserverContext<MasterCoprocessorEnvironment> c,
1054       final TableName tableName) throws IOException {
1055     final Configuration conf = c.getEnvironment().getConfiguration();
1056     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1057       @Override
1058       public Void run() throws Exception {
1059         AccessControlLists.removeTablePermissions(conf, tableName,
1060             c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1061         return null;
1062       }
1063     });
1064     this.authManager.getZKPermissionWatcher().deleteTableACLNode(tableName);
1065   }
1066 
1067   @Override
1068   public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
1069       final TableName tableName) throws IOException {
1070     requirePermission("truncateTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1071 
1072     final Configuration conf = c.getEnvironment().getConfiguration();
1073     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1074       @Override
1075       public Void run() throws Exception {
1076         List<UserPermission> acls = AccessControlLists.getUserTablePermissions(conf, tableName);
1077         if (acls != null) {
1078           tableAcls.put(tableName, acls);
1079         }
1080         return null;
1081       }
1082     });
1083   }
1084 
1085   @Override
1086   public void postTruncateTable(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1087       final TableName tableName) throws IOException {
1088     final Configuration conf = ctx.getEnvironment().getConfiguration();
1089     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1090       @Override
1091       public Void run() throws Exception {
1092         List<UserPermission> perms = tableAcls.get(tableName);
1093         if (perms != null) {
1094           for (UserPermission perm : perms) {
1095             AccessControlLists.addUserPermission(conf, perm,
1096                 ctx.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1097           }
1098         }
1099         tableAcls.remove(tableName);
1100         return null;
1101       }
1102     });
1103   }
1104 
1105   @Override
1106   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1107       HTableDescriptor htd) throws IOException {
1108     requirePermission("modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1109   }
1110 
1111   @Override
1112   public void postModifyTable(final ObserverContext<MasterCoprocessorEnvironment> c,
1113       TableName tableName, final HTableDescriptor htd) throws IOException {
1114     final Configuration conf = c.getEnvironment().getConfiguration();
1115     // default the table owner to current user, if not specified.
1116     final String owner = (htd.getOwnerString() != null) ? htd.getOwnerString() :
1117       getActiveUser().getShortName();
1118     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1119       @Override
1120       public Void run() throws Exception {
1121         UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
1122           htd.getTableName(), null, Action.values());
1123         AccessControlLists.addUserPermission(conf, userperm,
1124             c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1125         return null;
1126       }
1127     });
1128   }
1129 
1130   @Override
1131   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1132       HColumnDescriptor column) throws IOException {
1133     requireTablePermission("addColumn", tableName, column.getName(), null, Action.ADMIN,
1134         Action.CREATE);
1135   }
1136 
1137   @Override
1138   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1139       HColumnDescriptor descriptor) throws IOException {
1140     requirePermission("modifyColumn", tableName, descriptor.getName(), null, Action.ADMIN,
1141       Action.CREATE);
1142   }
1143 
1144   @Override
1145   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
1146       byte[] col) throws IOException {
1147     requirePermission("deleteColumn", tableName, col, null, Action.ADMIN, Action.CREATE);
1148   }
1149 
1150   @Override
1151   public void postDeleteColumn(final ObserverContext<MasterCoprocessorEnvironment> c,
1152       final TableName tableName, final byte[] col) throws IOException {
1153     final Configuration conf = c.getEnvironment().getConfiguration();
1154     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1155       @Override
1156       public Void run() throws Exception {
1157         AccessControlLists.removeTablePermissions(conf, tableName, col,
1158             c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1159         return null;
1160       }
1161     });
1162   }
1163 
1164   @Override
1165   public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1166       throws IOException {
1167     requirePermission("enableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1168   }
1169 
1170   @Override
1171   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
1172       throws IOException {
1173     if (Bytes.equals(tableName.getName(), AccessControlLists.ACL_GLOBAL_NAME)) {
1174       // We have to unconditionally disallow disable of the ACL table when we are installed,
1175       // even if not enforcing authorizations. We are still allowing grants and revocations,
1176       // checking permissions and logging audit messages, etc. If the ACL table is not
1177       // available we will fail random actions all over the place.
1178       throw new AccessDeniedException("Not allowed to disable "
1179           + AccessControlLists.ACL_TABLE_NAME + " table with AccessController installed");
1180     }
1181     requirePermission("disableTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1182   }
1183 
1184   @Override
1185   public void preAbortProcedure(
1186       ObserverContext<MasterCoprocessorEnvironment> ctx,
1187       final ProcedureExecutor<MasterProcedureEnv> procEnv,
1188       final long procId) throws IOException {
1189     if (!procEnv.isProcedureOwner(procId, getActiveUser())) {
1190       // If the user is not the procedure owner, then we should further probe whether
1191       // he can abort the procedure.
1192       requirePermission("abortProcedure", Action.ADMIN);
1193     }
1194   }
1195 
1196   @Override
1197   public void postAbortProcedure(ObserverContext<MasterCoprocessorEnvironment> ctx)
1198       throws IOException {
1199     // There is nothing to do at this time after the procedure abort request was sent.
1200   }
1201 
1202   @Override
1203   public void preListProcedures(ObserverContext<MasterCoprocessorEnvironment> ctx)
1204       throws IOException {
1205     // We are delegating the authorization check to postListProcedures as we don't have
1206     // any concrete set of procedures to work with
1207   }
1208 
1209   @Override
1210   public void postListProcedures(
1211       ObserverContext<MasterCoprocessorEnvironment> ctx,
1212       List<ProcedureInfo> procInfoList) throws IOException {
1213     if (procInfoList.isEmpty()) {
1214       return;
1215     }
1216 
1217     // Retains only those which passes authorization checks, as the checks weren't done as part
1218     // of preListProcedures.
1219     Iterator<ProcedureInfo> itr = procInfoList.iterator();
1220     User user = getActiveUser();
1221     while (itr.hasNext()) {
1222       ProcedureInfo procInfo = itr.next();
1223       try {
1224         if (!ProcedureInfo.isProcedureOwner(procInfo, user)) {
1225           // If the user is not the procedure owner, then we should further probe whether
1226           // he can see the procedure.
1227           requirePermission("listProcedures", Action.ADMIN);
1228         }
1229       } catch (AccessDeniedException e) {
1230         itr.remove();
1231       }
1232     }
1233   }
1234 
1235   @Override
1236   public void preMove(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo region,
1237       ServerName srcServer, ServerName destServer) throws IOException {
1238     requirePermission("move", region.getTable(), null, null, Action.ADMIN);
1239   }
1240 
1241   @Override
1242   public void preAssign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo)
1243       throws IOException {
1244     requirePermission("assign", regionInfo.getTable(), null, null, Action.ADMIN);
1245   }
1246 
1247   @Override
1248   public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> c, HRegionInfo regionInfo,
1249       boolean force) throws IOException {
1250     requirePermission("unassign", regionInfo.getTable(), null, null, Action.ADMIN);
1251   }
1252 
1253   @Override
1254   public void preRegionOffline(ObserverContext<MasterCoprocessorEnvironment> c,
1255       HRegionInfo regionInfo) throws IOException {
1256     requirePermission("regionOffline", regionInfo.getTable(), null, null, Action.ADMIN);
1257   }
1258 
1259   @Override
1260   public void preBalance(ObserverContext<MasterCoprocessorEnvironment> c)
1261       throws IOException {
1262     requirePermission("balance", Action.ADMIN);
1263   }
1264 
1265   @Override
1266   public boolean preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> c,
1267       boolean newValue) throws IOException {
1268     requirePermission("balanceSwitch", Action.ADMIN);
1269     return newValue;
1270   }
1271 
1272   @Override
1273   public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> c)
1274       throws IOException {
1275     requirePermission("shutdown", Action.ADMIN);
1276   }
1277 
1278   @Override
1279   public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> c)
1280       throws IOException {
1281     requirePermission("stopMaster", Action.ADMIN);
1282   }
1283 
1284   @Override
1285   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
1286       throws IOException {
1287     if (!MetaTableAccessor.tableExists(ctx.getEnvironment().getMasterServices()
1288       .getConnection(), AccessControlLists.ACL_TABLE_NAME)) {
1289       // initialize the ACL storage table
1290       AccessControlLists.createACLTable(ctx.getEnvironment().getMasterServices());
1291       LOG.info("Creating " + AccessControlLists.ACL_TABLE_NAME + " table.");
1292     } else {
1293       LOG.info(AccessControlLists.ACL_TABLE_NAME + " is existing.");
1294       aclTabAvailable = true;
1295     }
1296   }
1297 
1298   @Override
1299   public void preSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1300       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1301       throws IOException {
1302     requirePermission("snapshot", hTableDescriptor.getTableName(), null, null,
1303       Permission.Action.ADMIN);
1304   }
1305 
1306   @Override
1307   public void preListSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
1308       final SnapshotDescription snapshot) throws IOException {
1309     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1310       // list it, if user is the owner of snapshot
1311     } else {
1312       requirePermission("listSnapshot", Action.ADMIN);
1313     }
1314   }
1315   
1316   @Override
1317   public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1318       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1319       throws IOException {
1320     requirePermission("clone", Action.ADMIN);
1321   }
1322 
1323   @Override
1324   public void preRestoreSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1325       final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
1326       throws IOException {
1327     User usr = getActiveUser();
1328     LOG.info("Checking permission for " + usr + "(" + usr.getShortName() + ") on " +
1329         snapshot.getOwner());
1330     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, usr)) {
1331       requirePermission("restoreSnapshot", hTableDescriptor.getTableName(), null, null,
1332         Permission.Action.ADMIN);
1333     } else {
1334       requirePermission("restore", Action.ADMIN);
1335     }
1336   }
1337 
1338   @Override
1339   public void preDeleteSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1340       final SnapshotDescription snapshot) throws IOException {
1341     if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, getActiveUser())) {
1342       // Snapshot owner is allowed to delete the snapshot
1343       // TODO: We are not logging this for audit
1344     } else {
1345       requirePermission("deleteSnapshot", Action.ADMIN);
1346     }
1347   }
1348 
1349   @Override
1350   public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1351       NamespaceDescriptor ns) throws IOException {
1352     requireGlobalPermission("createNamespace", Action.ADMIN, ns.getName());
1353   }
1354 
1355   @Override
1356   public void preDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1357       throws IOException {
1358     requireGlobalPermission("deleteNamespace", Action.ADMIN, namespace);
1359   }
1360 
1361   @Override
1362   public void postDeleteNamespace(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1363       final String namespace) throws IOException {
1364     final Configuration conf = ctx.getEnvironment().getConfiguration();
1365     User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
1366       @Override
1367       public Void run() throws Exception {
1368         AccessControlLists.removeNamespacePermissions(conf, namespace,
1369             ctx.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
1370         return null;
1371       }
1372     });
1373     this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
1374     LOG.info(namespace + " entry deleted in "+AccessControlLists.ACL_TABLE_NAME+" table.");
1375   }
1376 
1377   @Override
1378   public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
1379       NamespaceDescriptor ns) throws IOException {
1380     // We require only global permission so that 
1381     // a user with NS admin cannot altering namespace configurations. i.e. namespace quota
1382     requireGlobalPermission("modifyNamespace", Action.ADMIN, ns.getName());
1383   }
1384 
1385   @Override
1386   public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx, String namespace)
1387       throws IOException {
1388     requireNamespacePermission("getNamespaceDescriptor", namespace, Action.ADMIN);
1389   }
1390 
1391   @Override
1392   public void postListNamespaceDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
1393       List<NamespaceDescriptor> descriptors) throws IOException {
1394     // Retains only those which passes authorization checks, as the checks weren't done as part
1395     // of preGetTableDescriptors.
1396     Iterator<NamespaceDescriptor> itr = descriptors.iterator();
1397     while (itr.hasNext()) {
1398       NamespaceDescriptor desc = itr.next();
1399       try {
1400         requireNamespacePermission("listNamespaces", desc.getName(), Action.ADMIN);
1401       } catch (AccessDeniedException e) {
1402         itr.remove();
1403       }
1404     }
1405   }
1406 
1407   @Override
1408   public void preTableFlush(final ObserverContext<MasterCoprocessorEnvironment> ctx,
1409       final TableName tableName) throws IOException {
1410     requirePermission("flushTable", tableName, null, null, Action.ADMIN, Action.CREATE);
1411   }
1412 
1413   /* ---- RegionObserver implementation ---- */
1414 
1415   @Override
1416   public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e)
1417       throws IOException {
1418     RegionCoprocessorEnvironment env = e.getEnvironment();
1419     final Region region = env.getRegion();
1420     if (region == null) {
1421       LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
1422     } else {
1423       HRegionInfo regionInfo = region.getRegionInfo();
1424       if (regionInfo.getTable().isSystemTable()) {
1425         checkSystemOrSuperUser();
1426       } else {
1427         requirePermission("preOpen", Action.ADMIN);
1428       }
1429     }
1430   }
1431 
1432   @Override
1433   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
1434     RegionCoprocessorEnvironment env = c.getEnvironment();
1435     final Region region = env.getRegion();
1436     if (region == null) {
1437       LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
1438       return;
1439     }
1440     if (AccessControlLists.isAclRegion(region)) {
1441       aclRegion = true;
1442       // When this region is under recovering state, initialize will be handled by postLogReplay
1443       if (!region.isRecovering()) {
1444         try {
1445           initialize(env);
1446         } catch (IOException ex) {
1447           // if we can't obtain permissions, it's better to fail
1448           // than perform checks incorrectly
1449           throw new RuntimeException("Failed to initialize permissions cache", ex);
1450         }
1451       }
1452     } else {
1453       initialized = true;
1454     }
1455   }
1456 
1457   @Override
1458   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
1459     if (aclRegion) {
1460       try {
1461         initialize(c.getEnvironment());
1462       } catch (IOException ex) {
1463         // if we can't obtain permissions, it's better to fail
1464         // than perform checks incorrectly
1465         throw new RuntimeException("Failed to initialize permissions cache", ex);
1466       }
1467     }
1468   }
1469 
1470   @Override
1471   public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1472     requirePermission("flush", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1473         Action.CREATE);
1474   }
1475 
1476   @Override
1477   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
1478     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1479   }
1480 
1481   @Override
1482   public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e,
1483       byte[] splitRow) throws IOException {
1484     requirePermission("split", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
1485   }
1486 
1487   @Override
1488   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
1489       final Store store, final InternalScanner scanner, final ScanType scanType)
1490           throws IOException {
1491     requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN,
1492         Action.CREATE);
1493     return scanner;
1494   }
1495 
1496   @Override
1497   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> c,
1498       final byte [] row, final byte [] family, final Result result)
1499       throws IOException {
1500     assert family != null;
1501     RegionCoprocessorEnvironment env = c.getEnvironment();
1502     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, null);
1503     User user = getActiveUser();
1504     AuthResult authResult = permissionGranted(OpType.GET_CLOSEST_ROW_BEFORE, user, env, families,
1505       Action.READ);
1506     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1507       authResult.setAllowed(checkCoveringPermission(OpType.GET_CLOSEST_ROW_BEFORE, env, row,
1508         families, HConstants.LATEST_TIMESTAMP, Action.READ));
1509       authResult.setReason("Covering cell set");
1510     }
1511     logResult(authResult);
1512     if (authorizationEnabled && !authResult.isAllowed()) {
1513       throw new AccessDeniedException("Insufficient permissions " +
1514         authResult.toContextString());
1515     }
1516   }
1517 
1518   private void internalPreRead(final ObserverContext<RegionCoprocessorEnvironment> c,
1519       final Query query, OpType opType) throws IOException {
1520     Filter filter = query.getFilter();
1521     // Don't wrap an AccessControlFilter
1522     if (filter != null && filter instanceof AccessControlFilter) {
1523       return;
1524     }
1525     User user = getActiveUser();
1526     RegionCoprocessorEnvironment env = c.getEnvironment();
1527     Map<byte[],? extends Collection<byte[]>> families = null;
1528     switch (opType) {
1529     case GET:
1530     case EXISTS:
1531       families = ((Get)query).getFamilyMap();
1532       break;
1533     case SCAN:
1534       families = ((Scan)query).getFamilyMap();
1535       break;
1536     default:
1537       throw new RuntimeException("Unhandled operation " + opType);
1538     }
1539     AuthResult authResult = permissionGranted(opType, user, env, families, Action.READ);
1540     Region region = getRegion(env);
1541     TableName table = getTableName(region);
1542     Map<ByteRange, Integer> cfVsMaxVersions = Maps.newHashMap();
1543     for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
1544       cfVsMaxVersions.put(new SimpleMutableByteRange(hcd.getName()), hcd.getMaxVersions());
1545     }
1546     if (!authResult.isAllowed()) {
1547       if (!cellFeaturesEnabled || compatibleEarlyTermination) {
1548         // Old behavior: Scan with only qualifier checks if we have partial
1549         // permission. Backwards compatible behavior is to throw an
1550         // AccessDeniedException immediately if there are no grants for table
1551         // or CF or CF+qual. Only proceed with an injected filter if there are
1552         // grants for qualifiers. Otherwise we will fall through below and log
1553         // the result and throw an ADE. We may end up checking qualifier
1554         // grants three times (permissionGranted above, here, and in the
1555         // filter) but that's the price of backwards compatibility.
1556         if (hasFamilyQualifierPermission(user, Action.READ, env, families)) {
1557           authResult.setAllowed(true);
1558           authResult.setReason("Access allowed with filter");
1559           // Only wrap the filter if we are enforcing authorizations
1560           if (authorizationEnabled) {
1561             Filter ourFilter = new AccessControlFilter(authManager, user, table,
1562               AccessControlFilter.Strategy.CHECK_TABLE_AND_CF_ONLY,
1563               cfVsMaxVersions);
1564             // wrap any existing filter
1565             if (filter != null) {
1566               ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1567                 Lists.newArrayList(ourFilter, filter));
1568             }
1569             switch (opType) {
1570               case GET:
1571               case EXISTS:
1572                 ((Get)query).setFilter(ourFilter);
1573                 break;
1574               case SCAN:
1575                 ((Scan)query).setFilter(ourFilter);
1576                 break;
1577               default:
1578                 throw new RuntimeException("Unhandled operation " + opType);
1579             }
1580           }
1581         }
1582       } else {
1583         // New behavior: Any access we might be granted is more fine-grained
1584         // than whole table or CF. Simply inject a filter and return what is
1585         // allowed. We will not throw an AccessDeniedException. This is a
1586         // behavioral change since 0.96.
1587         authResult.setAllowed(true);
1588         authResult.setReason("Access allowed with filter");
1589         // Only wrap the filter if we are enforcing authorizations
1590         if (authorizationEnabled) {
1591           Filter ourFilter = new AccessControlFilter(authManager, user, table,
1592             AccessControlFilter.Strategy.CHECK_CELL_DEFAULT, cfVsMaxVersions);
1593           // wrap any existing filter
1594           if (filter != null) {
1595             ourFilter = new FilterList(FilterList.Operator.MUST_PASS_ALL,
1596               Lists.newArrayList(ourFilter, filter));
1597           }
1598           switch (opType) {
1599             case GET:
1600             case EXISTS:
1601               ((Get)query).setFilter(ourFilter);
1602               break;
1603             case SCAN:
1604               ((Scan)query).setFilter(ourFilter);
1605               break;
1606             default:
1607               throw new RuntimeException("Unhandled operation " + opType);
1608           }
1609         }
1610       }
1611     }
1612 
1613     logResult(authResult);
1614     if (authorizationEnabled && !authResult.isAllowed()) {
1615       throw new AccessDeniedException("Insufficient permissions for user '"
1616           + (user != null ? user.getShortName() : "null")
1617           + "' (table=" + table + ", action=READ)");
1618     }
1619   }
1620 
1621   @Override
1622   public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
1623       final Get get, final List<Cell> result) throws IOException {
1624     internalPreRead(c, get, OpType.GET);
1625   }
1626 
1627   @Override
1628   public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
1629       final Get get, final boolean exists) throws IOException {
1630     internalPreRead(c, get, OpType.EXISTS);
1631     return exists;
1632   }
1633 
1634   @Override
1635   public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
1636       final Put put, final WALEdit edit, final Durability durability)
1637       throws IOException {
1638     User user = getActiveUser();
1639     checkForReservedTagPresence(user, put);
1640 
1641     // Require WRITE permission to the table, CF, or top visible value, if any.
1642     // NOTE: We don't need to check the permissions for any earlier Puts
1643     // because we treat the ACLs in each Put as timestamped like any other
1644     // HBase value. A new ACL in a new Put applies to that Put. It doesn't
1645     // change the ACL of any previous Put. This allows simple evolution of
1646     // security policy over time without requiring expensive updates.
1647     RegionCoprocessorEnvironment env = c.getEnvironment();
1648     Map<byte[],? extends Collection<Cell>> families = put.getFamilyCellMap();
1649     AuthResult authResult = permissionGranted(OpType.PUT, user, env, families, Action.WRITE);
1650     logResult(authResult);
1651     if (!authResult.isAllowed()) {
1652       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1653         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1654       } else if (authorizationEnabled) {
1655         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1656       }
1657     }
1658 
1659     // Add cell ACLs from the operation to the cells themselves
1660     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1661     if (bytes != null) {
1662       if (cellFeaturesEnabled) {
1663         addCellPermissions(bytes, put.getFamilyCellMap());
1664       } else {
1665         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1666       }
1667     }
1668   }
1669 
1670   @Override
1671   public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1672       final Put put, final WALEdit edit, final Durability durability) {
1673     if (aclRegion) {
1674       updateACL(c.getEnvironment(), put.getFamilyCellMap());
1675     }
1676   }
1677 
1678   @Override
1679   public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1680       final Delete delete, final WALEdit edit, final Durability durability)
1681       throws IOException {
1682     // An ACL on a delete is useless, we shouldn't allow it
1683     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1684       throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
1685     }
1686     // Require WRITE permissions on all cells covered by the delete. Unlike
1687     // for Puts we need to check all visible prior versions, because a major
1688     // compaction could remove them. If the user doesn't have permission to
1689     // overwrite any of the visible versions ('visible' defined as not covered
1690     // by a tombstone already) then we have to disallow this operation.
1691     RegionCoprocessorEnvironment env = c.getEnvironment();
1692     Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
1693     User user = getActiveUser();
1694     AuthResult authResult = permissionGranted(OpType.DELETE, user, env, families, Action.WRITE);
1695     logResult(authResult);
1696     if (!authResult.isAllowed()) {
1697       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1698         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1699       } else if (authorizationEnabled) {
1700         throw new AccessDeniedException("Insufficient permissions " +
1701           authResult.toContextString());
1702       }
1703     }
1704   }
1705 
1706   @Override
1707   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
1708       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1709     if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1710       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1711       for (int i = 0; i < miniBatchOp.size(); i++) {
1712         Mutation m = miniBatchOp.getOperation(i);
1713         if (m.getAttribute(CHECK_COVERING_PERM) != null) {
1714           // We have a failure with table, cf and q perm checks and now giving a chance for cell
1715           // perm check
1716           OpType opType;
1717           if (m instanceof Put) {
1718             checkForReservedTagPresence(getActiveUser(), m);
1719             opType = OpType.PUT;
1720           } else {
1721             opType = OpType.DELETE;
1722           }
1723           AuthResult authResult = null;
1724           if (checkCoveringPermission(opType, c.getEnvironment(), m.getRow(),
1725             m.getFamilyCellMap(), m.getTimeStamp(), Action.WRITE)) {
1726             authResult = AuthResult.allow(opType.toString(), "Covering cell set",
1727               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1728           } else {
1729             authResult = AuthResult.deny(opType.toString(), "Covering cell set",
1730               getActiveUser(), Action.WRITE, table, m.getFamilyCellMap());
1731           }
1732           logResult(authResult);
1733           if (authorizationEnabled && !authResult.isAllowed()) {
1734             throw new AccessDeniedException("Insufficient permissions "
1735               + authResult.toContextString());
1736           }
1737         }
1738       }
1739     }
1740   }
1741 
1742   @Override
1743   public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1744       final Delete delete, final WALEdit edit, final Durability durability)
1745       throws IOException {
1746     if (aclRegion) {
1747       updateACL(c.getEnvironment(), delete.getFamilyCellMap());
1748     }
1749   }
1750 
1751   @Override
1752   public boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
1753       final byte [] row, final byte [] family, final byte [] qualifier,
1754       final CompareFilter.CompareOp compareOp,
1755       final ByteArrayComparable comparator, final Put put,
1756       final boolean result) throws IOException {
1757     User user = getActiveUser();
1758     checkForReservedTagPresence(user, put);
1759 
1760     // Require READ and WRITE permissions on the table, CF, and KV to update
1761     RegionCoprocessorEnvironment env = c.getEnvironment();
1762     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1763     AuthResult authResult = permissionGranted(OpType.CHECK_AND_PUT, user, env, families,
1764       Action.READ, Action.WRITE);
1765     logResult(authResult);
1766     if (!authResult.isAllowed()) {
1767       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1768         put.setAttribute(CHECK_COVERING_PERM, TRUE);
1769       } else if (authorizationEnabled) {
1770         throw new AccessDeniedException("Insufficient permissions " +
1771           authResult.toContextString());
1772       }
1773     }
1774 
1775     byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1776     if (bytes != null) {
1777       if (cellFeaturesEnabled) {
1778         addCellPermissions(bytes, put.getFamilyCellMap());
1779       } else {
1780         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1781       }
1782     }
1783     return result;
1784   }
1785 
1786   @Override
1787   public boolean preCheckAndPutAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1788       final byte[] row, final byte[] family, final byte[] qualifier,
1789       final CompareFilter.CompareOp compareOp, final ByteArrayComparable comparator, final Put put,
1790       final boolean result) throws IOException {
1791     if (put.getAttribute(CHECK_COVERING_PERM) != null) {
1792       // We had failure with table, cf and q perm checks and now giving a chance for cell
1793       // perm check
1794       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1795       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1796       AuthResult authResult = null;
1797       if (checkCoveringPermission(OpType.CHECK_AND_PUT, c.getEnvironment(), row, families,
1798           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1799         authResult = AuthResult.allow(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1800             getActiveUser(), Action.READ, table, families);
1801       } else {
1802         authResult = AuthResult.deny(OpType.CHECK_AND_PUT.toString(), "Covering cell set",
1803             getActiveUser(), Action.READ, table, families);
1804       }
1805       logResult(authResult);
1806       if (authorizationEnabled && !authResult.isAllowed()) {
1807         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1808       }
1809     }
1810     return result;
1811   }
1812 
1813   @Override
1814   public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
1815       final byte [] row, final byte [] family, final byte [] qualifier,
1816       final CompareFilter.CompareOp compareOp,
1817       final ByteArrayComparable comparator, final Delete delete,
1818       final boolean result) throws IOException {
1819     // An ACL on a delete is useless, we shouldn't allow it
1820     if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
1821       throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
1822           delete.toString());
1823     }
1824     // Require READ and WRITE permissions on the table, CF, and the KV covered
1825     // by the delete
1826     RegionCoprocessorEnvironment env = c.getEnvironment();
1827     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1828     User user = getActiveUser();
1829     AuthResult authResult = permissionGranted(OpType.CHECK_AND_DELETE, user, env, families,
1830       Action.READ, Action.WRITE);
1831     logResult(authResult);
1832     if (!authResult.isAllowed()) {
1833       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1834         delete.setAttribute(CHECK_COVERING_PERM, TRUE);
1835       } else if (authorizationEnabled) {
1836         throw new AccessDeniedException("Insufficient permissions " +
1837           authResult.toContextString());
1838       }
1839     }
1840     return result;
1841   }
1842 
1843   @Override
1844   public boolean preCheckAndDeleteAfterRowLock(
1845       final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] row, final byte[] family,
1846       final byte[] qualifier, final CompareFilter.CompareOp compareOp,
1847       final ByteArrayComparable comparator, final Delete delete, final boolean result)
1848       throws IOException {
1849     if (delete.getAttribute(CHECK_COVERING_PERM) != null) {
1850       // We had failure with table, cf and q perm checks and now giving a chance for cell
1851       // perm check
1852       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1853       Map<byte[], ? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1854       AuthResult authResult = null;
1855       if (checkCoveringPermission(OpType.CHECK_AND_DELETE, c.getEnvironment(), row, families,
1856           HConstants.LATEST_TIMESTAMP, Action.READ)) {
1857         authResult = AuthResult.allow(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1858             getActiveUser(), Action.READ, table, families);
1859       } else {
1860         authResult = AuthResult.deny(OpType.CHECK_AND_DELETE.toString(), "Covering cell set",
1861             getActiveUser(), Action.READ, table, families);
1862       }
1863       logResult(authResult);
1864       if (authorizationEnabled && !authResult.isAllowed()) {
1865         throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1866       }
1867     }
1868     return result;
1869   }
1870 
1871   @Override
1872   public long preIncrementColumnValue(final ObserverContext<RegionCoprocessorEnvironment> c,
1873       final byte [] row, final byte [] family, final byte [] qualifier,
1874       final long amount, final boolean writeToWAL)
1875       throws IOException {
1876     // Require WRITE permission to the table, CF, and the KV to be replaced by the
1877     // incremented value
1878     RegionCoprocessorEnvironment env = c.getEnvironment();
1879     Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
1880     User user = getActiveUser();
1881     AuthResult authResult = permissionGranted(OpType.INCREMENT_COLUMN_VALUE, user, env, families,
1882       Action.WRITE);
1883     if (!authResult.isAllowed() && cellFeaturesEnabled && !compatibleEarlyTermination) {
1884       authResult.setAllowed(checkCoveringPermission(OpType.INCREMENT_COLUMN_VALUE, env, row,
1885         families, HConstants.LATEST_TIMESTAMP, Action.WRITE));
1886       authResult.setReason("Covering cell set");
1887     }
1888     logResult(authResult);
1889     if (authorizationEnabled && !authResult.isAllowed()) {
1890       throw new AccessDeniedException("Insufficient permissions " + authResult.toContextString());
1891     }
1892     return -1;
1893   }
1894 
1895   @Override
1896   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
1897       throws IOException {
1898     User user = getActiveUser();
1899     checkForReservedTagPresence(user, append);
1900 
1901     // Require WRITE permission to the table, CF, and the KV to be appended
1902     RegionCoprocessorEnvironment env = c.getEnvironment();
1903     Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
1904     AuthResult authResult = permissionGranted(OpType.APPEND, user, env, families, Action.WRITE);
1905     logResult(authResult);
1906     if (!authResult.isAllowed()) {
1907       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1908         append.setAttribute(CHECK_COVERING_PERM, TRUE);
1909       } else if (authorizationEnabled)  {
1910         throw new AccessDeniedException("Insufficient permissions " +
1911           authResult.toContextString());
1912       }
1913     }
1914 
1915     byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1916     if (bytes != null) {
1917       if (cellFeaturesEnabled) {
1918         addCellPermissions(bytes, append.getFamilyCellMap());
1919       } else {
1920         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1921       }
1922     }
1923 
1924     return null;
1925   }
1926 
1927   @Override
1928   public Result preAppendAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1929       final Append append) throws IOException {
1930     if (append.getAttribute(CHECK_COVERING_PERM) != null) {
1931       // We had failure with table, cf and q perm checks and now giving a chance for cell
1932       // perm check
1933       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1934       AuthResult authResult = null;
1935       if (checkCoveringPermission(OpType.APPEND, c.getEnvironment(), append.getRow(),
1936           append.getFamilyCellMap(), HConstants.LATEST_TIMESTAMP, Action.WRITE)) {
1937         authResult = AuthResult.allow(OpType.APPEND.toString(), "Covering cell set",
1938             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1939       } else {
1940         authResult = AuthResult.deny(OpType.APPEND.toString(), "Covering cell set",
1941             getActiveUser(), Action.WRITE, table, append.getFamilyCellMap());
1942       }
1943       logResult(authResult);
1944       if (authorizationEnabled && !authResult.isAllowed()) {
1945         throw new AccessDeniedException("Insufficient permissions " +
1946           authResult.toContextString());
1947       }
1948     }
1949     return null;
1950   }
1951 
1952   @Override
1953   public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
1954       final Increment increment)
1955       throws IOException {
1956     User user = getActiveUser();
1957     checkForReservedTagPresence(user, increment);
1958 
1959     // Require WRITE permission to the table, CF, and the KV to be replaced by
1960     // the incremented value
1961     RegionCoprocessorEnvironment env = c.getEnvironment();
1962     Map<byte[],? extends Collection<Cell>> families = increment.getFamilyCellMap();
1963     AuthResult authResult = permissionGranted(OpType.INCREMENT, user, env, families,
1964       Action.WRITE);
1965     logResult(authResult);
1966     if (!authResult.isAllowed()) {
1967       if (cellFeaturesEnabled && !compatibleEarlyTermination) {
1968         increment.setAttribute(CHECK_COVERING_PERM, TRUE);
1969       } else if (authorizationEnabled) {
1970         throw new AccessDeniedException("Insufficient permissions " +
1971           authResult.toContextString());
1972       }
1973     }
1974 
1975     byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
1976     if (bytes != null) {
1977       if (cellFeaturesEnabled) {
1978         addCellPermissions(bytes, increment.getFamilyCellMap());
1979       } else {
1980         throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
1981       }
1982     }
1983 
1984     return null;
1985   }
1986 
1987   @Override
1988   public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> c,
1989       final Increment increment) throws IOException {
1990     if (increment.getAttribute(CHECK_COVERING_PERM) != null) {
1991       // We had failure with table, cf and q perm checks and now giving a chance for cell
1992       // perm check
1993       TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
1994       AuthResult authResult = null;
1995       if (checkCoveringPermission(OpType.INCREMENT, c.getEnvironment(), increment.getRow(),
1996           increment.getFamilyCellMap(), increment.getTimeRange().getMax(), Action.WRITE)) {
1997         authResult = AuthResult.allow(OpType.INCREMENT.toString(), "Covering cell set",
1998             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
1999       } else {
2000         authResult = AuthResult.deny(OpType.INCREMENT.toString(), "Covering cell set",
2001             getActiveUser(), Action.WRITE, table, increment.getFamilyCellMap());
2002       }
2003       logResult(authResult);
2004       if (authorizationEnabled && !authResult.isAllowed()) {
2005         throw new AccessDeniedException("Insufficient permissions " +
2006           authResult.toContextString());
2007       }
2008     }
2009     return null;
2010   }
2011 
2012   @Override
2013   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
2014       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
2015     // If the HFile version is insufficient to persist tags, we won't have any
2016     // work to do here
2017     if (!cellFeaturesEnabled) {
2018       return newCell;
2019     }
2020 
2021     // Collect any ACLs from the old cell
2022     List<Tag> tags = Lists.newArrayList();
2023     ListMultimap<String,Permission> perms = ArrayListMultimap.create();
2024     if (oldCell != null) {
2025       // Save an object allocation where we can
2026       if (oldCell.getTagsLength() > 0) {
2027         Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
2028           oldCell.getTagsOffset(), oldCell.getTagsLength());
2029         while (tagIterator.hasNext()) {
2030           Tag tag = tagIterator.next();
2031           if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
2032             // Not an ACL tag, just carry it through
2033             if (LOG.isTraceEnabled()) {
2034               LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
2035                 " length " + tag.getTagLength());
2036             }
2037             tags.add(tag);
2038           } else {
2039             // Merge the perms from the older ACL into the current permission set
2040             // TODO: The efficiency of this can be improved. Don't build just to unpack
2041             // again, use the builder
2042             AccessControlProtos.UsersAndPermissions.Builder builder =
2043               AccessControlProtos.UsersAndPermissions.newBuilder();
2044             ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
2045             ListMultimap<String,Permission> kvPerms =
2046               ProtobufUtil.toUsersAndPermissions(builder.build());
2047             perms.putAll(kvPerms);
2048           }
2049         }
2050       }
2051     }
2052 
2053     // Do we have an ACL on the operation?
2054     byte[] aclBytes = mutation.getACL();
2055     if (aclBytes != null) {
2056       // Yes, use it
2057       tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
2058     } else {
2059       // No, use what we carried forward
2060       if (perms != null) {
2061         // TODO: If we collected ACLs from more than one tag we may have a
2062         // List<Permission> of size > 1, this can be collapsed into a single
2063         // Permission
2064         if (LOG.isTraceEnabled()) {
2065           LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
2066         }
2067         tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
2068           ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
2069       }
2070     }
2071 
2072     // If we have no tags to add, just return
2073     if (tags.isEmpty()) {
2074       return newCell;
2075     }
2076 
2077     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
2078     return rewriteCell;
2079   }
2080 
2081   @Override
2082   public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2083       final Scan scan, final RegionScanner s) throws IOException {
2084     internalPreRead(c, scan, OpType.SCAN);
2085     return s;
2086   }
2087 
2088   @Override
2089   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
2090       final Scan scan, final RegionScanner s) throws IOException {
2091     User user = getActiveUser();
2092     if (user != null && user.getShortName() != null) {
2093       // store reference to scanner owner for later checks
2094       scannerOwners.put(s, user.getShortName());
2095     }
2096     return s;
2097   }
2098 
2099   @Override
2100   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
2101       final InternalScanner s, final List<Result> result,
2102       final int limit, final boolean hasNext) throws IOException {
2103     requireScannerOwner(s);
2104     return hasNext;
2105   }
2106 
2107   @Override
2108   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2109       final InternalScanner s) throws IOException {
2110     requireScannerOwner(s);
2111   }
2112 
2113   @Override
2114   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
2115       final InternalScanner s) throws IOException {
2116     // clean up any associated owner mapping
2117     scannerOwners.remove(s);
2118   }
2119 
2120   /**
2121    * Verify, when servicing an RPC, that the caller is the scanner owner.
2122    * If so, we assume that access control is correctly enforced based on
2123    * the checks performed in preScannerOpen()
2124    */
2125   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
2126     if (!RpcServer.isInRpcCallContext())
2127       return;
2128     String requestUserName = RpcServer.getRequestUserName();
2129     String owner = scannerOwners.get(s);
2130     if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
2131       throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner(" +
2132           owner + ")!");
2133     }
2134   }
2135 
2136   /**
2137    * Verifies user has CREATE privileges on
2138    * the Column Families involved in the bulkLoadHFile
2139    * request. Specific Column Write privileges are presently
2140    * ignored.
2141    */
2142   @Override
2143   public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
2144       List<Pair<byte[], String>> familyPaths) throws IOException {
2145     for(Pair<byte[],String> el : familyPaths) {
2146       requirePermission("preBulkLoadHFile",
2147           ctx.getEnvironment().getRegion().getTableDesc().getTableName(),
2148           el.getFirst(),
2149           null,
2150           Action.CREATE);
2151     }
2152   }
2153 
2154   /**
2155    * Authorization check for
2156    * SecureBulkLoadProtocol.prepareBulkLoad()
2157    * @param ctx the context
2158    * @param request the request
2159    * @throws IOException
2160    */
2161   @Override
2162   public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2163                                  PrepareBulkLoadRequest request) throws IOException {
2164     requireAccess("prePareBulkLoad",
2165         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2166   }
2167 
2168   /**
2169    * Authorization security check for
2170    * SecureBulkLoadProtocol.cleanupBulkLoad()
2171    * @param ctx the context
2172    * @param request the request
2173    * @throws IOException
2174    */
2175   @Override
2176   public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx,
2177                                  CleanupBulkLoadRequest request) throws IOException {
2178     requireAccess("preCleanupBulkLoad",
2179         ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE);
2180   }
2181 
2182   /* ---- EndpointObserver implementation ---- */
2183 
2184   @Override
2185   public Message preEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2186       Service service, String methodName, Message request) throws IOException {
2187     // Don't intercept calls to our own AccessControlService, we check for
2188     // appropriate permissions in the service handlers
2189     if (shouldCheckExecPermission && !(service instanceof AccessControlService)) {
2190       requirePermission("invoke(" + service.getDescriptorForType().getName() + "." +
2191         methodName + ")",
2192         getTableName(ctx.getEnvironment()), null, null,
2193         Action.EXEC);
2194     }
2195     return request;
2196   }
2197 
2198   @Override
2199   public void postEndpointInvocation(ObserverContext<RegionCoprocessorEnvironment> ctx,
2200       Service service, String methodName, Message request, Message.Builder responseBuilder)
2201       throws IOException { }
2202 
2203   /* ---- Protobuf AccessControlService implementation ---- */
2204 
2205   @Override
2206   public void grant(RpcController controller,
2207                     AccessControlProtos.GrantRequest request,
2208                     RpcCallback<AccessControlProtos.GrantResponse> done) {
2209     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2210     AccessControlProtos.GrantResponse response = null;
2211     try {
2212       // verify it's only running at .acl.
2213       if (aclRegion) {
2214         if (!initialized) {
2215           throw new CoprocessorException("AccessController not yet initialized");
2216         }
2217         if (LOG.isDebugEnabled()) {
2218           LOG.debug("Received request to grant access permission " + perm.toString());
2219         }
2220 
2221         switch(request.getUserPermission().getPermission().getType()) {
2222           case Global :
2223           case Table :
2224             requirePermission("grant", perm.getTableName(), perm.getFamily(),
2225               perm.getQualifier(), Action.ADMIN);
2226             break;
2227           case Namespace :
2228             requireNamespacePermission("grant", perm.getNamespace(), Action.ADMIN);
2229            break;
2230         }
2231 
2232         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2233           @Override
2234           public Void run() throws Exception {
2235             AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm,
2236                 regionEnv.getTable(AccessControlLists.ACL_TABLE_NAME));
2237             return null;
2238           }
2239         });
2240 
2241         if (AUDITLOG.isTraceEnabled()) {
2242           // audit log should store permission changes in addition to auth results
2243           AUDITLOG.trace("Granted permission " + perm.toString());
2244         }
2245       } else {
2246         throw new CoprocessorException(AccessController.class, "This method "
2247             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2248       }
2249       response = AccessControlProtos.GrantResponse.getDefaultInstance();
2250     } catch (IOException ioe) {
2251       // pass exception back up
2252       ResponseConverter.setControllerException(controller, ioe);
2253     }
2254     done.run(response);
2255   }
2256 
2257   @Override
2258   public void revoke(RpcController controller,
2259                      AccessControlProtos.RevokeRequest request,
2260                      RpcCallback<AccessControlProtos.RevokeResponse> done) {
2261     final UserPermission perm = ProtobufUtil.toUserPermission(request.getUserPermission());
2262     AccessControlProtos.RevokeResponse response = null;
2263     try {
2264       // only allowed to be called on _acl_ region
2265       if (aclRegion) {
2266         if (!initialized) {
2267           throw new CoprocessorException("AccessController not yet initialized");
2268         }
2269         if (LOG.isDebugEnabled()) {
2270           LOG.debug("Received request to revoke access permission " + perm.toString());
2271         }
2272 
2273         switch(request.getUserPermission().getPermission().getType()) {
2274           case Global :
2275           case Table :
2276             requirePermission("revoke", perm.getTableName(), perm.getFamily(),
2277               perm.getQualifier(), Action.ADMIN);
2278             break;
2279           case Namespace :
2280             requireNamespacePermission("revoke", perm.getNamespace(), Action.ADMIN);
2281             break;
2282         }
2283 
2284         User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
2285           @Override
2286           public Void run() throws Exception {
2287             AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm, null);
2288             return null;
2289           }
2290         });
2291 
2292         if (AUDITLOG.isTraceEnabled()) {
2293           // audit log should record all permission changes
2294           AUDITLOG.trace("Revoked permission " + perm.toString());
2295         }
2296       } else {
2297         throw new CoprocessorException(AccessController.class, "This method "
2298             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2299       }
2300       response = AccessControlProtos.RevokeResponse.getDefaultInstance();
2301     } catch (IOException ioe) {
2302       // pass exception back up
2303       ResponseConverter.setControllerException(controller, ioe);
2304     }
2305     done.run(response);
2306   }
2307 
2308   @Override
2309   public void getUserPermissions(RpcController controller,
2310                                  AccessControlProtos.GetUserPermissionsRequest request,
2311                                  RpcCallback<AccessControlProtos.GetUserPermissionsResponse> done) {
2312     AccessControlProtos.GetUserPermissionsResponse response = null;
2313     try {
2314       // only allowed to be called on _acl_ region
2315       if (aclRegion) {
2316         if (!initialized) {
2317           throw new CoprocessorException("AccessController not yet initialized");
2318         }
2319         List<UserPermission> perms = null;
2320         if (request.getType() == AccessControlProtos.Permission.Type.Table) {
2321           final TableName table = request.hasTableName() ?
2322             ProtobufUtil.toTableName(request.getTableName()) : null;
2323           requirePermission("userPermissions", table, null, null, Action.ADMIN);
2324           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2325             @Override
2326             public List<UserPermission> run() throws Exception {
2327               return AccessControlLists.getUserTablePermissions(regionEnv.getConfiguration(), table);
2328             }
2329           });
2330         } else if (request.getType() == AccessControlProtos.Permission.Type.Namespace) {
2331           final String namespace = request.getNamespaceName().toStringUtf8();
2332           requireNamespacePermission("userPermissions", namespace, Action.ADMIN);
2333           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2334             @Override
2335             public List<UserPermission> run() throws Exception {
2336               return AccessControlLists.getUserNamespacePermissions(regionEnv.getConfiguration(),
2337                 namespace);
2338             }
2339           });
2340         } else {
2341           requirePermission("userPermissions", Action.ADMIN);
2342           perms = User.runAsLoginUser(new PrivilegedExceptionAction<List<UserPermission>>() {
2343             @Override
2344             public List<UserPermission> run() throws Exception {
2345               return AccessControlLists.getUserPermissions(regionEnv.getConfiguration(), null);
2346             }
2347           });
2348         }
2349         response = ResponseConverter.buildGetUserPermissionsResponse(perms);
2350       } else {
2351         throw new CoprocessorException(AccessController.class, "This method "
2352             + "can only execute at " + AccessControlLists.ACL_TABLE_NAME + " table.");
2353       }
2354     } catch (IOException ioe) {
2355       // pass exception back up
2356       ResponseConverter.setControllerException(controller, ioe);
2357     }
2358     done.run(response);
2359   }
2360 
2361   @Override
2362   public void checkPermissions(RpcController controller,
2363                                AccessControlProtos.CheckPermissionsRequest request,
2364                                RpcCallback<AccessControlProtos.CheckPermissionsResponse> done) {
2365     Permission[] permissions = new Permission[request.getPermissionCount()];
2366     for (int i=0; i < request.getPermissionCount(); i++) {
2367       permissions[i] = ProtobufUtil.toPermission(request.getPermission(i));
2368     }
2369     AccessControlProtos.CheckPermissionsResponse response = null;
2370     try {
2371       User user = getActiveUser();
2372       TableName tableName = regionEnv.getRegion().getTableDesc().getTableName();
2373       for (Permission permission : permissions) {
2374         if (permission instanceof TablePermission) {
2375           // Check table permissions
2376 
2377           TablePermission tperm = (TablePermission) permission;
2378           for (Action action : permission.getActions()) {
2379             if (!tperm.getTableName().equals(tableName)) {
2380               throw new CoprocessorException(AccessController.class, String.format("This method "
2381                   + "can only execute at the table specified in TablePermission. " +
2382                   "Table of the region:%s , requested table:%s", tableName,
2383                   tperm.getTableName()));
2384             }
2385 
2386             Map<byte[], Set<byte[]>> familyMap =
2387                 new TreeMap<byte[], Set<byte[]>>(Bytes.BYTES_COMPARATOR);
2388             if (tperm.getFamily() != null) {
2389               if (tperm.getQualifier() != null) {
2390                 Set<byte[]> qualifiers = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
2391                 qualifiers.add(tperm.getQualifier());
2392                 familyMap.put(tperm.getFamily(), qualifiers);
2393               } else {
2394                 familyMap.put(tperm.getFamily(), null);
2395               }
2396             }
2397 
2398             AuthResult result = permissionGranted("checkPermissions", user, action, regionEnv,
2399               familyMap);
2400             logResult(result);
2401             if (!result.isAllowed()) {
2402               // Even if passive we need to throw an exception here, we support checking
2403               // effective permissions, so throw unconditionally
2404               throw new AccessDeniedException("Insufficient permissions (table=" + tableName +
2405                 (familyMap.size() > 0 ? ", family: " + result.toFamilyString() : "") +
2406                 ", action=" + action.toString() + ")");
2407             }
2408           }
2409 
2410         } else {
2411           // Check global permissions
2412 
2413           for (Action action : permission.getActions()) {
2414             AuthResult result;
2415             if (authManager.authorize(user, action)) {
2416               result = AuthResult.allow("checkPermissions", "Global action allowed", user,
2417                 action, null, null);
2418             } else {
2419               result = AuthResult.deny("checkPermissions", "Global action denied", user, action,
2420                 null, null);
2421             }
2422             logResult(result);
2423             if (!result.isAllowed()) {
2424               // Even if passive we need to throw an exception here, we support checking
2425               // effective permissions, so throw unconditionally
2426               throw new AccessDeniedException("Insufficient permissions (action=" +
2427                 action.toString() + ")");
2428             }
2429           }
2430         }
2431       }
2432       response = AccessControlProtos.CheckPermissionsResponse.getDefaultInstance();
2433     } catch (IOException ioe) {
2434       ResponseConverter.setControllerException(controller, ioe);
2435     }
2436     done.run(response);
2437   }
2438 
2439   @Override
2440   public Service getService() {
2441     return AccessControlProtos.AccessControlService.newReflectiveService(this);
2442   }
2443 
2444   private Region getRegion(RegionCoprocessorEnvironment e) {
2445     return e.getRegion();
2446   }
2447 
2448   private TableName getTableName(RegionCoprocessorEnvironment e) {
2449     Region region = e.getRegion();
2450     if (region != null) {
2451       return getTableName(region);
2452     }
2453     return null;
2454   }
2455 
2456   private TableName getTableName(Region region) {
2457     HRegionInfo regionInfo = region.getRegionInfo();
2458     if (regionInfo != null) {
2459       return regionInfo.getTable();
2460     }
2461     return null;
2462   }
2463 
2464   @Override
2465   public void preClose(ObserverContext<RegionCoprocessorEnvironment> e, boolean abortRequested)
2466       throws IOException {
2467     requirePermission("preClose", Action.ADMIN);
2468   }
2469 
2470   private void checkSystemOrSuperUser() throws IOException {
2471     // No need to check if we're not going to throw
2472     if (!authorizationEnabled) {
2473       return;
2474     }
2475     User activeUser = getActiveUser();
2476     if (!Superusers.isSuperUser(activeUser)) {
2477       throw new AccessDeniedException("User '" + (activeUser != null ?
2478         activeUser.getShortName() : "null") + "is not system or super user.");
2479     }
2480   }
2481 
2482   @Override
2483   public void preStopRegionServer(
2484       ObserverContext<RegionServerCoprocessorEnvironment> env)
2485       throws IOException {
2486     requirePermission("preStopRegionServer", Action.ADMIN);
2487   }
2488 
2489   private Map<byte[], ? extends Collection<byte[]>> makeFamilyMap(byte[] family,
2490       byte[] qualifier) {
2491     if (family == null) {
2492       return null;
2493     }
2494 
2495     Map<byte[], Collection<byte[]>> familyMap = new TreeMap<byte[], Collection<byte[]>>(Bytes.BYTES_COMPARATOR);
2496     familyMap.put(family, qualifier != null ? ImmutableSet.of(qualifier) : null);
2497     return familyMap;
2498   }
2499 
2500   @Override
2501   public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2502        List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2503        String regex) throws IOException {
2504     // We are delegating the authorization check to postGetTableDescriptors as we don't have
2505     // any concrete set of table names when a regex is present or the full list is requested.
2506     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2507       // Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
2508       // request can be granted.
2509       MasterServices masterServices = ctx.getEnvironment().getMasterServices();
2510       for (TableName tableName: tableNamesList) {
2511         // Skip checks for a table that does not exist
2512         if (masterServices.getTableDescriptors().get(tableName) == null) {
2513           continue;
2514         }
2515         requirePermission("getTableDescriptors", tableName, null, null,
2516             Action.ADMIN, Action.CREATE);
2517       }
2518     }
2519   }
2520 
2521   @Override
2522   public void postGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
2523       List<TableName> tableNamesList, List<HTableDescriptor> descriptors,
2524       String regex) throws IOException {
2525     // Skipping as checks in this case are already done by preGetTableDescriptors.
2526     if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
2527       return;
2528     }
2529 
2530     // Retains only those which passes authorization checks, as the checks weren't done as part
2531     // of preGetTableDescriptors.
2532     Iterator<HTableDescriptor> itr = descriptors.iterator();
2533     while (itr.hasNext()) {
2534       HTableDescriptor htd = itr.next();
2535       try {
2536         requirePermission("getTableDescriptors", htd.getTableName(), null, null,
2537             Action.ADMIN, Action.CREATE);
2538       } catch (AccessDeniedException e) {
2539         itr.remove();
2540       }
2541     }
2542   }
2543 
2544   @Override
2545   public void postGetTableNames(ObserverContext<MasterCoprocessorEnvironment> ctx,
2546       List<HTableDescriptor> descriptors, String regex) throws IOException {
2547     // Retains only those which passes authorization checks.
2548     Iterator<HTableDescriptor> itr = descriptors.iterator();
2549     while (itr.hasNext()) {
2550       HTableDescriptor htd = itr.next();
2551       try {
2552         requireAccess("getTableNames", htd.getTableName(), Action.values());
2553       } catch (AccessDeniedException e) {
2554         itr.remove();
2555       }
2556     }
2557   }
2558 
2559   @Override
2560   public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx, Region regionA,
2561       Region regionB) throws IOException {
2562     requirePermission("mergeRegions", regionA.getTableDesc().getTableName(), null, null,
2563       Action.ADMIN);
2564   }
2565 
2566   @Override
2567   public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c, Region regionA,
2568       Region regionB, Region mergedRegion) throws IOException { }
2569 
2570   @Override
2571   public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2572       Region regionA, Region regionB, List<Mutation> metaEntries) throws IOException { }
2573 
2574   @Override
2575   public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2576       Region regionA, Region regionB, Region mergedRegion) throws IOException { }
2577 
2578   @Override
2579   public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2580       Region regionA, Region regionB) throws IOException { }
2581 
2582   @Override
2583   public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2584       Region regionA, Region regionB) throws IOException { }
2585 
2586   @Override
2587   public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2588       throws IOException {
2589     requirePermission("preRollLogWriterRequest", Permission.Action.ADMIN);
2590   }
2591 
2592   @Override
2593   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
2594       throws IOException { }
2595 
2596   @Override
2597   public ReplicationEndpoint postCreateReplicationEndPoint(
2598       ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
2599     return endpoint;
2600   }
2601 
2602   @Override
2603   public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2604       List<WALEntry> entries, CellScanner cells) throws IOException {
2605     requirePermission("replicateLogEntries", Action.WRITE);
2606   }
2607 
2608   @Override
2609   public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
2610       List<WALEntry> entries, CellScanner cells) throws IOException {
2611   }
2612   
2613   @Override
2614   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2615       final String userName, final Quotas quotas) throws IOException {
2616     requirePermission("setUserQuota", Action.ADMIN);
2617   }
2618 
2619   @Override
2620   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2621       final String userName, final TableName tableName, final Quotas quotas) throws IOException {
2622     requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
2623   }
2624 
2625   @Override
2626   public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2627       final String userName, final String namespace, final Quotas quotas) throws IOException {
2628     requirePermission("setUserNamespaceQuota", Action.ADMIN);
2629   }
2630 
2631   @Override
2632   public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2633       final TableName tableName, final Quotas quotas) throws IOException {
2634     requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
2635   }
2636 
2637   @Override
2638   public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
2639       final String namespace, final Quotas quotas) throws IOException {
2640     requirePermission("setNamespaceQuota", Action.ADMIN);
2641   }
2642 
2643   @Override
2644   public void preMoveServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
2645                              Set<HostAndPort> servers, String targetGroup) throws IOException {
2646     requirePermission("moveServers", Action.ADMIN);
2647   }
2648 
2649   @Override
2650   public void preMoveTables(ObserverContext<MasterCoprocessorEnvironment> ctx,
2651                             Set<TableName> tables, String targetGroup) throws IOException {
2652     requirePermission("moveTables", Action.ADMIN);
2653   }
2654 
2655   @Override
2656   public void preAddRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2657                             String name) throws IOException {
2658     requirePermission("addRSGroup", Action.ADMIN);
2659   }
2660 
2661   @Override
2662   public void preRemoveRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2663                                String name) throws IOException {
2664     requirePermission("removeRSGroup", Action.ADMIN);
2665   }
2666 
2667   @Override
2668   public void preBalanceRSGroup(ObserverContext<MasterCoprocessorEnvironment> ctx,
2669                                 String groupName) throws IOException {
2670     requirePermission("balanceRSGroup", Action.ADMIN);
2671   }
2672 }