1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.util;
18
19 import java.io.IOException;
20 import java.security.PrivilegedExceptionAction;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.TableName;
28 import org.apache.hadoop.hbase.client.Get;
29 import org.apache.hadoop.hbase.client.HTable;
30 import org.apache.hadoop.hbase.client.HTableInterface;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.Table;
33 import org.apache.hadoop.hbase.security.User;
34 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
35 import org.apache.hadoop.security.UserGroupInformation;
36
37
38
39
40 public class MultiThreadedReaderWithACL extends MultiThreadedReader {
41 private static final Log LOG = LogFactory.getLog(MultiThreadedReaderWithACL.class);
42
43 private static final String COMMA = ",";
44
45
46
47
48 private Map<String, Table> userVsTable = new HashMap<String, Table>();
49 private Map<String, User> users = new HashMap<String, User>();
50 private String[] userNames;
51
52 public MultiThreadedReaderWithACL(LoadTestDataGenerator dataGen, Configuration conf,
53 TableName tableName, double verifyPercent, String userNames) throws IOException {
54 super(dataGen, conf, tableName, verifyPercent);
55 this.userNames = userNames.split(COMMA);
56 }
57
58 @Override
59 protected void addReaderThreads(int numThreads) throws IOException {
60 for (int i = 0; i < numThreads; ++i) {
61 HBaseReaderThread reader = new HBaseReaderThreadWithACL(i);
62 readers.add(reader);
63 }
64 }
65
66 public class HBaseReaderThreadWithACL extends HBaseReaderThread {
67
68 public HBaseReaderThreadWithACL(int readerId) throws IOException {
69 super(readerId);
70 }
71
72 @Override
73 protected HTableInterface createTable() throws IOException {
74 return null;
75 }
76
77 @Override
78 protected void closeTable() {
79 for (Table table : userVsTable.values()) {
80 try {
81 table.close();
82 } catch (Exception e) {
83 LOG.error("Error while closing the table " + table.getName(), e);
84 }
85 }
86 }
87
88 @Override
89 public void queryKey(final Get get, final boolean verify, final long keyToRead)
90 throws IOException {
91 final String rowKey = Bytes.toString(get.getRow());
92
93
94 final long start = System.nanoTime();
95 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
96 @Override
97 public Object run() throws Exception {
98 Table localTable = null;
99 try {
100 Result result = null;
101 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
102 int mod = ((int) keyToRead % userNames.length);
103 if (userVsTable.get(userNames[mod]) == null) {
104 localTable = new HTable(conf, tableName);
105 userVsTable.put(userNames[mod], localTable);
106 result = localTable.get(get);
107 } else {
108 localTable = userVsTable.get(userNames[mod]);
109 result = localTable.get(get);
110 }
111 boolean isNullExpected = ((((int) keyToRead % specialPermCellInsertionFactor)) == 0);
112 long end = System.nanoTime();
113 verifyResultsAndUpdateMetrics(verify, get, end - start, result, localTable, isNullExpected);
114 } catch (IOException e) {
115 recordFailure(keyToRead);
116 }
117 return null;
118 }
119 };
120 if (userNames != null && userNames.length > 0) {
121 int mod = ((int) keyToRead % userNames.length);
122 User user;
123 UserGroupInformation realUserUgi;
124 if(!users.containsKey(userNames[mod])) {
125 if(User.isHBaseSecurityEnabled(conf)) {
126 realUserUgi = LoadTestTool.loginAndReturnUGI(conf, userNames[mod]);
127 } else {
128 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
129 }
130 user = User.create(realUserUgi);
131 users.put(userNames[mod], user);
132 } else {
133 user = users.get(userNames[mod]);
134 }
135 try {
136 user.runAs(action);
137 } catch (Exception e) {
138 recordFailure(keyToRead);
139 }
140 }
141 }
142
143 private void recordFailure(final long keyToRead) {
144 numReadFailures.addAndGet(1);
145 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "") + ", "
146 + "time from start: " + (System.currentTimeMillis() - startTimeMs) + " ms");
147 }
148 }
149
150 }