View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.DataInput;
23  import java.io.DataInputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.TreeMap;
32  import java.util.TreeSet;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.AuthUtil;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellUtil;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.NamespaceDescriptor;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.Tag;
46  import org.apache.hadoop.hbase.TagType;
47  import org.apache.hadoop.hbase.classification.InterfaceAudience;
48  import org.apache.hadoop.hbase.client.Connection;
49  import org.apache.hadoop.hbase.client.ConnectionFactory;
50  import org.apache.hadoop.hbase.client.Delete;
51  import org.apache.hadoop.hbase.client.Get;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.exceptions.DeserializationException;
58  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
59  import org.apache.hadoop.hbase.filter.QualifierFilter;
60  import org.apache.hadoop.hbase.filter.RegexStringComparator;
61  import org.apache.hadoop.hbase.master.MasterServices;
62  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
63  import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
64  import org.apache.hadoop.hbase.regionserver.BloomType;
65  import org.apache.hadoop.hbase.regionserver.InternalScanner;
66  import org.apache.hadoop.hbase.regionserver.Region;
67  import org.apache.hadoop.hbase.security.User;
68  import org.apache.hadoop.hbase.util.Bytes;
69  import org.apache.hadoop.hbase.util.Pair;
70  import org.apache.hadoop.io.Text;
71  
72  import com.google.common.collect.ArrayListMultimap;
73  import com.google.common.collect.ListMultimap;
74  import com.google.common.collect.Lists;
75  
76  /**
77   * Maintains lists of permission grants to users and groups to allow for
78   * authorization checks by {@link AccessController}.
79   *
80   * <p>
81   * Access control lists are stored in an "internal" metadata table named
82   * {@code _acl_}. Each table's permission grants are stored as a separate row,
83   * keyed by the table name. KeyValues for permissions assignments are stored
84   * in one of the formats:
85   * <pre>
86   * Key                      Desc
87   * --------                 --------
88   * user                     table level permissions for a user [R=read, W=write]
89   * group                    table level permissions for a group
90   * user,family              column family level permissions for a user
91   * group,family             column family level permissions for a group
92   * user,family,qualifier    column qualifier level permissions for a user
93   * group,family,qualifier   column qualifier level permissions for a group
94   * </pre>
95   * All values are encoded as byte arrays containing the codes from the
96   * org.apache.hadoop.hbase.security.access.TablePermission.Action enum.
97   * </p>
98   */
99  @InterfaceAudience.Private
100 public class AccessControlLists {
101   /** Internal storage table for access control lists */
102   public static final TableName ACL_TABLE_NAME =
103       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "acl");
104   public static final byte[] ACL_GLOBAL_NAME = ACL_TABLE_NAME.getName();
105   /** Column family used to store ACL grants */
106   public static final String ACL_LIST_FAMILY_STR = "l";
107   public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
108   /** KV tag to store per cell access control lists */
109   public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
110 
111   public static final char NAMESPACE_PREFIX = '@';
112 
113   /**
114    * Delimiter to separate user, column family, and qualifier in
115    * _acl_ table info: column keys */
116   public static final char ACL_KEY_DELIMITER = ',';
117 
118   private static Log LOG = LogFactory.getLog(AccessControlLists.class);
119 
120   /**
121    * Create the ACL table
122    * @param master
123    * @throws IOException
124    */
125   static void createACLTable(MasterServices master) throws IOException {
126     master.createTable(new HTableDescriptor(ACL_TABLE_NAME)
127       .addFamily(new HColumnDescriptor(ACL_LIST_FAMILY)
128         .setMaxVersions(1)
129         .setInMemory(true)
130         .setBlockCacheEnabled(true)
131         .setBlocksize(8 * 1024)
132         .setBloomFilterType(BloomType.NONE)
133         .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
134         // Set cache data blocks in L1 if more than one cache tier deployed; e.g. this will
135         // be the case if we are using CombinedBlockCache (Bucket Cache).
136         .setCacheDataInL1(true)),
137     null,
138     HConstants.NO_NONCE,
139     HConstants.NO_NONCE);
140   }
141 
142   /**
143    * Stores a new user permission grant in the access control lists table.
144    * @param conf the configuration
145    * @param userPerm the details of the permission to be granted
146    * @param t acl table instance. It is closed upon method return
147    * @throws IOException in the case of an error accessing the metadata table
148    */
149   static void addUserPermission(Configuration conf, UserPermission userPerm, Table t)
150       throws IOException {
151     Permission.Action[] actions = userPerm.getActions();
152     byte[] rowKey = userPermissionRowKey(userPerm);
153     Put p = new Put(rowKey);
154     byte[] key = userPermissionKey(userPerm);
155 
156     if ((actions == null) || (actions.length == 0)) {
157       String msg = "No actions associated with user '" + Bytes.toString(userPerm.getUser()) + "'";
158       LOG.warn(msg);
159       throw new IOException(msg);
160     }
161 
162     byte[] value = new byte[actions.length];
163     for (int i = 0; i < actions.length; i++) {
164       value[i] = actions[i].code();
165     }
166     p.addImmutable(ACL_LIST_FAMILY, key, value);
167     if (LOG.isDebugEnabled()) {
168       LOG.debug("Writing permission with rowKey "+
169           Bytes.toString(rowKey)+" "+
170           Bytes.toString(key)+": "+Bytes.toStringBinary(value)
171       );
172     }
173     try {
174       t.put(p);
175     } finally {
176       t.close();
177     }
178   }
179 
180   /**
181    * Removes a previously granted permission from the stored access control
182    * lists.  The {@link TablePermission} being removed must exactly match what
183    * is stored -- no wildcard matching is attempted.  Ie, if user "bob" has
184    * been granted "READ" access to the "data" table, but only to column family
185    * plus qualifier "info:colA", then trying to call this method with only
186    * user "bob" and the table name "data" (but without specifying the
187    * column qualifier "info:colA") will have no effect.
188    *
189    * @param conf the configuration
190    * @param userPerm the details of the permission to be revoked
191    * @param t acl table
192    * @throws IOException if there is an error accessing the metadata table
193    */
194   static void removeUserPermission(Configuration conf, UserPermission userPerm, Table t)
195       throws IOException {
196     Delete d = new Delete(userPermissionRowKey(userPerm));
197     byte[] key = userPermissionKey(userPerm);
198 
199     if (LOG.isDebugEnabled()) {
200       LOG.debug("Removing permission "+ userPerm.toString());
201     }
202     d.addColumns(ACL_LIST_FAMILY, key);
203     if (t == null) {
204       try (Connection connection = ConnectionFactory.createConnection(conf)) {
205         try (Table table = connection.getTable(ACL_TABLE_NAME)) {
206           table.delete(d);
207         }
208       }
209     } else {
210       try {
211         t.delete(d);
212       } finally {
213         t.close();
214       }
215     }
216   }
217 
218   /**
219    * Remove specified table from the _acl_ table.
220    */
221   static void removeTablePermissions(Configuration conf, TableName tableName, Table t)
222       throws IOException{
223     Delete d = new Delete(tableName.getName());
224 
225     if (LOG.isDebugEnabled()) {
226       LOG.debug("Removing permissions of removed table "+ tableName);
227     }
228     try {
229       t.delete(d);
230     } finally {
231       t.close();
232     }
233   }
234 
235   /**
236    * Remove specified namespace from the acl table.
237    */
238   static void removeNamespacePermissions(Configuration conf, String namespace, Table t)
239       throws IOException{
240     Delete d = new Delete(Bytes.toBytes(toNamespaceEntry(namespace)));
241 
242     if (LOG.isDebugEnabled()) {
243       LOG.debug("Removing permissions of removed namespace "+ namespace);
244     }
245 
246     try {
247       t.delete(d);
248     } finally {
249       t.close();
250     }
251   }
252 
253   /**
254    * Remove specified table column from the acl table.
255    */
256   static void removeTablePermissions(TableName tableName, byte[] column, Table table,
257       boolean closeTable) throws IOException{
258 
259     if (LOG.isDebugEnabled()) {
260       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
261                 " from table "+ tableName);
262     }
263     Scan scan = new Scan();
264     scan.addFamily(ACL_LIST_FAMILY);
265 
266     String columnName = Bytes.toString(column);
267     scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
268         String.format("(%s%s%s)|(%s%s)$",
269             ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
270             ACL_KEY_DELIMITER, columnName))));
271 
272     Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
273     ResultScanner scanner = null;
274     try {
275       scanner = table.getScanner(scan);
276       for (Result res : scanner) {
277         for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
278           qualifierSet.add(q);
279         }
280       }
281 
282       if (qualifierSet.size() > 0) {
283         Delete d = new Delete(tableName.getName());
284         for (byte[] qualifier : qualifierSet) {
285           d.addColumns(ACL_LIST_FAMILY, qualifier);
286         }
287         table.delete(d);
288       }
289     }  finally {
290       if (scanner != null) scanner.close();
291       if (closeTable) table.close();
292     }
293   }
294 
295   static void removeTablePermissions(Configuration conf, TableName tableName, byte[] column,
296       Table t) throws IOException {
297     if (LOG.isDebugEnabled()) {
298       LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
299           " from table "+ tableName);
300     }
301     removeTablePermissions(tableName, column, t, true);
302   }
303 
304   static byte[] userPermissionRowKey(UserPermission userPerm) {
305     byte[] row;
306     if(userPerm.hasNamespace()) {
307       row = Bytes.toBytes(toNamespaceEntry(userPerm.getNamespace()));
308     } else if(userPerm.isGlobal()) {
309       row = ACL_GLOBAL_NAME;
310     } else {
311       row = userPerm.getTableName().getName();
312     }
313     return row;
314   }
315 
316   /**
317    * Build qualifier key from user permission:
318    *  username
319    *  username,family
320    *  username,family,qualifier
321    */
322   static byte[] userPermissionKey(UserPermission userPerm) {
323     byte[] qualifier = userPerm.getQualifier();
324     byte[] family = userPerm.getFamily();
325     byte[] key = userPerm.getUser();
326 
327     if (family != null && family.length > 0) {
328       key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, family));
329       if (qualifier != null && qualifier.length > 0) {
330         key = Bytes.add(key, Bytes.add(new byte[]{ACL_KEY_DELIMITER}, qualifier));
331       }
332     }
333 
334     return key;
335   }
336 
337   /**
338    * Returns {@code true} if the given region is part of the {@code _acl_}
339    * metadata table.
340    */
341   static boolean isAclRegion(Region region) {
342     return ACL_TABLE_NAME.equals(region.getTableDesc().getTableName());
343   }
344 
345   /**
346    * Returns {@code true} if the given table is {@code _acl_} metadata table.
347    */
348   static boolean isAclTable(HTableDescriptor desc) {
349     return ACL_TABLE_NAME.equals(desc.getTableName());
350   }
351 
352   /**
353    * Loads all of the permission grants stored in a region of the {@code _acl_}
354    * table.
355    *
356    * @param aclRegion
357    * @return a map of the permissions for this table.
358    * @throws IOException
359    */
360   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(Region aclRegion)
361     throws IOException {
362 
363     if (!isAclRegion(aclRegion)) {
364       throw new IOException("Can only load permissions from "+ACL_TABLE_NAME);
365     }
366 
367     Map<byte[], ListMultimap<String, TablePermission>> allPerms =
368         new TreeMap<byte[], ListMultimap<String, TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
369 
370     // do a full scan of _acl_ table
371 
372     Scan scan = new Scan();
373     scan.addFamily(ACL_LIST_FAMILY);
374 
375     InternalScanner iScanner = null;
376     try {
377       iScanner = aclRegion.getScanner(scan);
378 
379       while (true) {
380         List<Cell> row = new ArrayList<Cell>();
381 
382         boolean hasNext = iScanner.next(row);
383         ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
384         byte[] entry = null;
385         for (Cell kv : row) {
386           if (entry == null) {
387             entry = CellUtil.cloneRow(kv);
388           }
389           Pair<String,TablePermission> permissionsOfUserOnTable =
390               parsePermissionRecord(entry, kv);
391           if (permissionsOfUserOnTable != null) {
392             String username = permissionsOfUserOnTable.getFirst();
393             TablePermission permissions = permissionsOfUserOnTable.getSecond();
394             perms.put(username, permissions);
395           }
396         }
397         if (entry != null) {
398           allPerms.put(entry, perms);
399         }
400         if (!hasNext) {
401           break;
402         }
403       }
404     } finally {
405       if (iScanner != null) {
406         iScanner.close();
407       }
408     }
409 
410     return allPerms;
411   }
412 
413   /**
414    * Load all permissions from the region server holding {@code _acl_},
415    * primarily intended for testing purposes.
416    */
417   static Map<byte[], ListMultimap<String,TablePermission>> loadAll(
418       Configuration conf) throws IOException {
419     Map<byte[], ListMultimap<String,TablePermission>> allPerms =
420         new TreeMap<byte[], ListMultimap<String,TablePermission>>(Bytes.BYTES_RAWCOMPARATOR);
421 
422     // do a full scan of _acl_, filtering on only first table region rows
423 
424     Scan scan = new Scan();
425     scan.addFamily(ACL_LIST_FAMILY);
426 
427     ResultScanner scanner = null;
428     // TODO: Pass in a Connection rather than create one each time.
429     try (Connection connection = ConnectionFactory.createConnection(conf)) {
430       try (Table table = connection.getTable(ACL_TABLE_NAME)) {
431         scanner = table.getScanner(scan);
432         try {
433           for (Result row : scanner) {
434             ListMultimap<String,TablePermission> resultPerms = parsePermissions(row.getRow(), row);
435             allPerms.put(row.getRow(), resultPerms);
436           }
437         } finally {
438           if (scanner != null) scanner.close();
439         }
440       }
441     }
442 
443     return allPerms;
444   }
445 
446   public static ListMultimap<String, TablePermission> getTablePermissions(Configuration conf,
447         TableName tableName) throws IOException {
448     return getPermissions(conf, tableName != null ? tableName.getName() : null, null);
449   }
450 
451   static ListMultimap<String, TablePermission> getNamespacePermissions(Configuration conf,
452         String namespace) throws IOException {
453     return getPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)), null);
454   }
455 
456   /**
457    * Reads user permission assignments stored in the <code>l:</code> column
458    * family of the first table row in <code>_acl_</code>.
459    *
460    * <p>
461    * See {@link AccessControlLists class documentation} for the key structure
462    * used for storage.
463    * </p>
464    */
465   static ListMultimap<String, TablePermission> getPermissions(Configuration conf,
466       byte[] entryName, Table t) throws IOException {
467     if (entryName == null) entryName = ACL_GLOBAL_NAME;
468 
469     // for normal user tables, we just read the table row from _acl_
470     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
471     Get get = new Get(entryName);
472     get.addFamily(ACL_LIST_FAMILY);
473     Result row = null;
474     if (t == null) {
475       try (Connection connection = ConnectionFactory.createConnection(conf)) {
476         try (Table table = connection.getTable(ACL_TABLE_NAME)) {
477           row = table.get(get);
478         }
479       }
480     } else {
481       row = t.get(get);
482     }
483 
484     if (!row.isEmpty()) {
485       perms = parsePermissions(entryName, row);
486     } else {
487       LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
488           + Bytes.toString(entryName));
489     }
490 
491     return perms;
492   }
493 
494   /**
495    * Returns the currently granted permissions for a given table as a list of
496    * user plus associated permissions.
497    */
498   static List<UserPermission> getUserTablePermissions(
499       Configuration conf, TableName tableName) throws IOException {
500     return getUserPermissions(conf, tableName == null ? null : tableName.getName());
501   }
502 
503   static List<UserPermission> getUserNamespacePermissions(
504       Configuration conf, String namespace) throws IOException {
505     return getUserPermissions(conf, Bytes.toBytes(toNamespaceEntry(namespace)));
506   }
507 
508   static List<UserPermission> getUserPermissions(
509       Configuration conf, byte[] entryName)
510   throws IOException {
511     ListMultimap<String,TablePermission> allPerms = getPermissions(
512       conf, entryName, null);
513 
514     List<UserPermission> perms = new ArrayList<UserPermission>();
515 
516     if(isNamespaceEntry(entryName)) {  // Namespace
517       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
518         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
519           entry.getValue().getNamespace(), entry.getValue().getActions());
520         perms.add(up);
521       }
522     } else {  // Table
523       for (Map.Entry<String, TablePermission> entry : allPerms.entries()) {
524         UserPermission up = new UserPermission(Bytes.toBytes(entry.getKey()),
525             entry.getValue().getTableName(), entry.getValue().getFamily(),
526             entry.getValue().getQualifier(), entry.getValue().getActions());
527         perms.add(up);
528       }
529     }
530     return perms;
531   }
532 
533   private static ListMultimap<String, TablePermission> parsePermissions(
534       byte[] entryName, Result result) {
535     ListMultimap<String, TablePermission> perms = ArrayListMultimap.create();
536     if (result != null && result.size() > 0) {
537       for (Cell kv : result.rawCells()) {
538 
539         Pair<String,TablePermission> permissionsOfUserOnTable =
540             parsePermissionRecord(entryName, kv);
541 
542         if (permissionsOfUserOnTable != null) {
543           String username = permissionsOfUserOnTable.getFirst();
544           TablePermission permissions = permissionsOfUserOnTable.getSecond();
545           perms.put(username, permissions);
546         }
547       }
548     }
549     return perms;
550   }
551 
552   private static Pair<String, TablePermission> parsePermissionRecord(
553       byte[] entryName, Cell kv) {
554     // return X given a set of permissions encoded in the permissionRecord kv.
555     byte[] family = CellUtil.cloneFamily(kv);
556 
557     if (!Bytes.equals(family, ACL_LIST_FAMILY)) {
558       return null;
559     }
560 
561     byte[] key = CellUtil.cloneQualifier(kv);
562     byte[] value = CellUtil.cloneValue(kv);
563     if (LOG.isDebugEnabled()) {
564       LOG.debug("Read acl: kv ["+
565                 Bytes.toStringBinary(key)+": "+
566                 Bytes.toStringBinary(value)+"]");
567     }
568 
569     // check for a column family appended to the key
570     // TODO: avoid the string conversion to make this more efficient
571     String username = Bytes.toString(key);
572 
573     //Handle namespace entry
574     if(isNamespaceEntry(entryName)) {
575       return new Pair<String, TablePermission>(username,
576           new TablePermission(Bytes.toString(fromNamespaceEntry(entryName)), value));
577     }
578 
579     //Handle table and global entry
580     //TODO global entry should be handled differently
581     int idx = username.indexOf(ACL_KEY_DELIMITER);
582     byte[] permFamily = null;
583     byte[] permQualifier = null;
584     if (idx > 0 && idx < username.length()-1) {
585       String remainder = username.substring(idx+1);
586       username = username.substring(0, idx);
587       idx = remainder.indexOf(ACL_KEY_DELIMITER);
588       if (idx > 0 && idx < remainder.length()-1) {
589         permFamily = Bytes.toBytes(remainder.substring(0, idx));
590         permQualifier = Bytes.toBytes(remainder.substring(idx+1));
591       } else {
592         permFamily = Bytes.toBytes(remainder);
593       }
594     }
595 
596     return new Pair<String,TablePermission>(username,
597         new TablePermission(TableName.valueOf(entryName), permFamily, permQualifier, value));
598   }
599 
600   /**
601    * Writes a set of permissions as {@link org.apache.hadoop.io.Writable} instances
602    * and returns the resulting byte array.
603    *
604    * Writes a set of permission [user: table permission]
605    */
606   public static byte[] writePermissionsAsBytes(ListMultimap<String, TablePermission> perms,
607       Configuration conf) {
608     return ProtobufUtil.prependPBMagic(ProtobufUtil.toUserTablePermissions(perms).toByteArray());
609   }
610 
611   /**
612    * Reads a set of permissions as {@link org.apache.hadoop.io.Writable} instances
613    * from the input stream.
614    */
615   public static ListMultimap<String, TablePermission> readPermissions(byte[] data,
616       Configuration conf)
617   throws DeserializationException {
618     if (ProtobufUtil.isPBMagicPrefix(data)) {
619       int pblen = ProtobufUtil.lengthOfPBMagic();
620       try {
621         AccessControlProtos.UsersAndPermissions.Builder builder =
622           AccessControlProtos.UsersAndPermissions.newBuilder();
623         ProtobufUtil.mergeFrom(builder, data, pblen, data.length - pblen);
624         return ProtobufUtil.toUserTablePermissions(builder.build());
625       } catch (IOException e) {
626         throw new DeserializationException(e);
627       }
628     } else {
629       ListMultimap<String,TablePermission> perms = ArrayListMultimap.create();
630       try {
631         DataInput in = new DataInputStream(new ByteArrayInputStream(data));
632         int length = in.readInt();
633         for (int i=0; i<length; i++) {
634           String user = Text.readString(in);
635           List<TablePermission> userPerms =
636             (List)HbaseObjectWritableFor96Migration.readObject(in, conf);
637           perms.putAll(user, userPerms);
638         }
639       } catch (IOException e) {
640         throw new DeserializationException(e);
641       }
642       return perms;
643     }
644   }
645 
646   public static boolean isNamespaceEntry(String entryName) {
647     return entryName != null && entryName.charAt(0) == NAMESPACE_PREFIX;
648   }
649 
650   public static boolean isNamespaceEntry(byte[] entryName) {
651     return entryName != null && entryName.length !=0 && entryName[0] == NAMESPACE_PREFIX;
652   }
653 
654   public static String toNamespaceEntry(String namespace) {
655      return NAMESPACE_PREFIX + namespace;
656    }
657 
658    public static String fromNamespaceEntry(String namespace) {
659      if(namespace.charAt(0) != NAMESPACE_PREFIX)
660        throw new IllegalArgumentException("Argument is not a valid namespace entry");
661      return namespace.substring(1);
662    }
663 
664    public static byte[] toNamespaceEntry(byte[] namespace) {
665      byte[] ret = new byte[namespace.length+1];
666      ret[0] = NAMESPACE_PREFIX;
667      System.arraycopy(namespace, 0, ret, 1, namespace.length);
668      return ret;
669    }
670 
671    public static byte[] fromNamespaceEntry(byte[] namespace) {
672      if(namespace[0] != NAMESPACE_PREFIX) {
673        throw new IllegalArgumentException("Argument is not a valid namespace entry: " +
674            Bytes.toString(namespace));
675      }
676      return Arrays.copyOfRange(namespace, 1, namespace.length);
677    }
678 
679    public static List<Permission> getCellPermissionsForUser(User user, Cell cell)
680        throws IOException {
681      // Save an object allocation where we can
682      if (cell.getTagsLength() == 0) {
683        return null;
684      }
685      List<Permission> results = Lists.newArrayList();
686      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
687         cell.getTagsLength());
688      while (tagsIterator.hasNext()) {
689        Tag tag = tagsIterator.next();
690        if (tag.getType() == ACL_TAG_TYPE) {
691          // Deserialize the table permissions from the KV
692          // TODO: This can be improved. Don't build UsersAndPermissions just to unpack it again,
693          // use the builder
694          AccessControlProtos.UsersAndPermissions.Builder builder = 
695            AccessControlProtos.UsersAndPermissions.newBuilder();
696          ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
697          ListMultimap<String,Permission> kvPerms =
698            ProtobufUtil.toUsersAndPermissions(builder.build());
699          // Are there permissions for this user?
700          List<Permission> userPerms = kvPerms.get(user.getShortName());
701          if (userPerms != null) {
702            results.addAll(userPerms);
703          }
704          // Are there permissions for any of the groups this user belongs to?
705          String groupNames[] = user.getGroupNames();
706          if (groupNames != null) {
707            for (String group : groupNames) {
708              List<Permission> groupPerms = kvPerms.get(AuthUtil.toGroupEntry(group));
709              if (results != null) {
710                results.addAll(groupPerms);
711              }
712            }
713          }
714        }
715      }
716      return results;
717    }
718 }