1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20 import java.io.IOException;
21 import java.io.PrintWriter;
22 import java.io.StringWriter;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.HashMap;
25 import java.util.Map;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Append;
32 import org.apache.hadoop.hbase.client.Delete;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.client.HTableInterface;
36 import org.apache.hadoop.hbase.client.Increment;
37 import org.apache.hadoop.hbase.client.Mutation;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
41 import org.apache.hadoop.hbase.client.Table;
42 import org.apache.hadoop.hbase.security.User;
43 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
44 import org.apache.hadoop.security.UserGroupInformation;
45 import org.apache.hadoop.util.StringUtils;
46
47
48
49
50 public class MultiThreadedUpdaterWithACL extends MultiThreadedUpdater {
51 private static final Log LOG = LogFactory.getLog(MultiThreadedUpdaterWithACL.class);
52 private final static String COMMA= ",";
53 private User userOwner;
54
55
56
57
58 private Map<String, Table> userVsTable = new HashMap<String, Table>();
59 private Map<String, User> users = new HashMap<String, User>();
60 private String[] userNames;
61
62 public MultiThreadedUpdaterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
63 TableName tableName, double updatePercent, User userOwner, String userNames)
64 throws IOException {
65 super(dataGen, conf, tableName, updatePercent);
66 this.userOwner = userOwner;
67 this.userNames = userNames.split(COMMA);
68 }
69
70 @Override
71 protected void addUpdaterThreads(int numThreads) throws IOException {
72 for (int i = 0; i < numThreads; ++i) {
73 HBaseUpdaterThread updater = new HBaseUpdaterThreadWithACL(i);
74 updaters.add(updater);
75 }
76 }
77
78 public class HBaseUpdaterThreadWithACL extends HBaseUpdaterThread {
79
80 private Table table;
81 private MutateAccessAction mutateAction = new MutateAccessAction();
82
83 public HBaseUpdaterThreadWithACL(int updaterId) throws IOException {
84 super(updaterId);
85 }
86
87 @Override
88 protected HTableInterface createTable() throws IOException {
89 return null;
90 }
91
92 @Override
93 protected void closeHTable() {
94 try {
95 if (table != null) {
96 table.close();
97 }
98 for (Table table : userVsTable.values()) {
99 try {
100 table.close();
101 } catch (Exception e) {
102 LOG.error("Error while closing the table " + table.getName(), e);
103 }
104 }
105 } catch (Exception e) {
106 LOG.error("Error while closing the HTable "+table.getName(), e);
107 }
108 }
109
110 @Override
111 protected Result getRow(final Get get, final long rowKeyBase, final byte[] cf) {
112 PrivilegedExceptionAction<Object> action = new PrivilegedExceptionAction<Object>() {
113
114 @Override
115 public Object run() throws Exception {
116 Result res = null;
117 Table localTable = null;
118 try {
119 int mod = ((int) rowKeyBase % userNames.length);
120 if (userVsTable.get(userNames[mod]) == null) {
121 localTable = new HTable(conf, tableName);
122 userVsTable.put(userNames[mod], localTable);
123 res = localTable.get(get);
124 } else {
125 localTable = userVsTable.get(userNames[mod]);
126 res = localTable.get(get);
127 }
128 } catch (IOException ie) {
129 LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
130 + Bytes.toString(cf) + "]", ie);
131 }
132 return res;
133 }
134 };
135
136 if (userNames != null && userNames.length > 0) {
137 int mod = ((int) rowKeyBase % userNames.length);
138 User user;
139 UserGroupInformation realUserUgi;
140 try {
141 if (!users.containsKey(userNames[mod])) {
142 if (User.isHBaseSecurityEnabled(conf)) {
143 realUserUgi = LoadTestTool.loginAndReturnUGI(conf, userNames[mod]);
144 } else {
145 realUserUgi = UserGroupInformation.createRemoteUser(userNames[mod]);
146 }
147 user = User.create(realUserUgi);
148 users.put(userNames[mod], user);
149 } else {
150 user = users.get(userNames[mod]);
151 }
152 Result result = (Result) user.runAs(action);
153 return result;
154 } catch (Exception ie) {
155 LOG.warn("Failed to get the row for key = [" + get.getRow() + "], column family = ["
156 + Bytes.toString(cf) + "]", ie);
157 }
158 }
159
160 return null;
161 }
162
163 @Override
164 public void mutate(final Table table, Mutation m, final long keyBase, final byte[] row,
165 final byte[] cf, final byte[] q, final byte[] v) {
166 final long start = System.currentTimeMillis();
167 try {
168 m = dataGenerator.beforeMutate(keyBase, m);
169 mutateAction.setMutation(m);
170 mutateAction.setCF(cf);
171 mutateAction.setRow(row);
172 mutateAction.setQualifier(q);
173 mutateAction.setValue(v);
174 mutateAction.setStartTime(start);
175 mutateAction.setKeyBase(keyBase);
176 userOwner.runAs(mutateAction);
177 } catch (IOException e) {
178 recordFailure(m, keyBase, start, e);
179 } catch (InterruptedException e) {
180 failedKeySet.add(keyBase);
181 }
182 }
183
184 class MutateAccessAction implements PrivilegedExceptionAction<Object> {
185 private Table table;
186 private long start;
187 private Mutation m;
188 private long keyBase;
189 private byte[] row;
190 private byte[] cf;
191 private byte[] q;
192 private byte[] v;
193
194 public MutateAccessAction() {
195
196 }
197
198 public void setStartTime(final long start) {
199 this.start = start;
200 }
201
202 public void setMutation(final Mutation m) {
203 this.m = m;
204 }
205
206 public void setRow(final byte[] row) {
207 this.row = row;
208 }
209
210 public void setCF(final byte[] cf) {
211 this.cf = cf;
212 }
213
214 public void setQualifier(final byte[] q) {
215 this.q = q;
216 }
217
218 public void setValue(final byte[] v) {
219 this.v = v;
220 }
221
222 public void setKeyBase(final long keyBase) {
223 this.keyBase = keyBase;
224 }
225
226 @Override
227 public Object run() throws Exception {
228 try {
229 if (table == null) {
230 table = new HTable(conf, tableName);
231 }
232 if (m instanceof Increment) {
233 table.increment((Increment) m);
234 } else if (m instanceof Append) {
235 table.append((Append) m);
236 } else if (m instanceof Put) {
237 table.checkAndPut(row, cf, q, v, (Put) m);
238 } else if (m instanceof Delete) {
239 table.checkAndDelete(row, cf, q, v, (Delete) m);
240 } else {
241 throw new IllegalArgumentException("unsupported mutation "
242 + m.getClass().getSimpleName());
243 }
244 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
245 } catch (IOException e) {
246 recordFailure(m, keyBase, start, e);
247 }
248 return null;
249 }
250 }
251
252 private void recordFailure(final Mutation m, final long keyBase,
253 final long start, IOException e) {
254 failedKeySet.add(keyBase);
255 String exceptionInfo;
256 if (e instanceof RetriesExhaustedWithDetailsException) {
257 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
258 exceptionInfo = aggEx.getExhaustiveDescription();
259 } else {
260 StringWriter stackWriter = new StringWriter();
261 PrintWriter pw = new PrintWriter(stackWriter);
262 e.printStackTrace(pw);
263 pw.flush();
264 exceptionInfo = StringUtils.stringifyException(e);
265 }
266 LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start)
267 + "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
268 + exceptionInfo);
269 }
270 }
271 }