1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
23
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.commons.lang.math.RandomUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.client.Append;
41 import org.apache.hadoop.hbase.client.Delete;
42 import org.apache.hadoop.hbase.client.Get;
43 import org.apache.hadoop.hbase.client.HTableInterface;
44 import org.apache.hadoop.hbase.client.Increment;
45 import org.apache.hadoop.hbase.client.Mutation;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
49 import org.apache.hadoop.hbase.client.Table;
50 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
51 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
52 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
53 import org.apache.hadoop.util.StringUtils;
54
55 import com.google.common.base.Preconditions;
56
57
58 public class MultiThreadedUpdater extends MultiThreadedWriterBase {
59 private static final Log LOG = LogFactory.getLog(MultiThreadedUpdater.class);
60
61 protected Set<HBaseUpdaterThread> updaters = new HashSet<HBaseUpdaterThread>();
62
63 private MultiThreadedWriterBase writer = null;
64 private boolean isBatchUpdate = false;
65 private boolean ignoreNonceConflicts = false;
66 private final double updatePercent;
67
68 public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
69 TableName tableName, double updatePercent) throws IOException {
70 super(dataGen, conf, tableName, "U");
71 this.updatePercent = updatePercent;
72 }
73
74
75 public void setBatchUpdate(boolean isBatchUpdate) {
76 this.isBatchUpdate = isBatchUpdate;
77 }
78
79 public void linkToWriter(MultiThreadedWriterBase writer) {
80 this.writer = writer;
81 writer.setTrackWroteKeys(true);
82 }
83
84 @Override
85 public void start(long startKey, long endKey, int numThreads) throws IOException {
86 super.start(startKey, endKey, numThreads);
87
88 if (verbose) {
89 LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
90 }
91
92 addUpdaterThreads(numThreads);
93
94 startThreads(updaters);
95 }
96
97 protected void addUpdaterThreads(int numThreads) throws IOException {
98 for (int i = 0; i < numThreads; ++i) {
99 HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
100 updaters.add(updater);
101 }
102 }
103
104 private long getNextKeyToUpdate() {
105 if (writer == null) {
106 return nextKeyToWrite.getAndIncrement();
107 }
108 synchronized (this) {
109 if (nextKeyToWrite.get() >= endKey) {
110
111 return endKey;
112 }
113 while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
114 Threads.sleepWithoutInterrupt(100);
115 }
116 long k = nextKeyToWrite.getAndIncrement();
117 if (writer.failedToWriteKey(k)) {
118 failedKeySet.add(k);
119 return getNextKeyToUpdate();
120 }
121 return k;
122 }
123 }
124
125 protected class HBaseUpdaterThread extends Thread {
126 protected final Table table;
127
128 public HBaseUpdaterThread(int updaterId) throws IOException {
129 setName(getClass().getSimpleName() + "_" + updaterId);
130 table = createTable();
131 }
132
133 protected HTableInterface createTable() throws IOException {
134 return connection.getTable(tableName);
135 }
136
137 @Override
138 public void run() {
139 try {
140 long rowKeyBase;
141 StringBuilder buf = new StringBuilder();
142 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
143 while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
144 if (RandomUtils.nextInt(100) < updatePercent) {
145 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
146 Increment inc = new Increment(rowKey);
147 Append app = new Append(rowKey);
148 numKeys.addAndGet(1);
149 int columnCount = 0;
150 for (byte[] cf : columnFamilies) {
151 long cfHash = Arrays.hashCode(cf);
152 inc.addColumn(cf, INCREMENT, cfHash);
153 buf.setLength(0);
154 buf.append("#").append(Bytes.toString(INCREMENT));
155 buf.append(":").append(MutationType.INCREMENT.getNumber());
156 app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
157 ++columnCount;
158 if (!isBatchUpdate) {
159 mutate(table, inc, rowKeyBase);
160 numCols.addAndGet(1);
161 inc = new Increment(rowKey);
162 mutate(table, app, rowKeyBase);
163 numCols.addAndGet(1);
164 app = new Append(rowKey);
165 }
166 Get get = new Get(rowKey);
167 get.addFamily(cf);
168 try {
169 get = dataGenerator.beforeGet(rowKeyBase, get);
170 } catch (Exception e) {
171
172 LOG.warn("Failed to modify the get from the load generator = [" + get.getRow()
173 + "], column family = [" + Bytes.toString(cf) + "]", e);
174 }
175 Result result = getRow(get, rowKeyBase, cf);
176 Map<byte[], byte[]> columnValues =
177 result != null ? result.getFamilyMap(cf) : null;
178 if (columnValues == null) {
179 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
180 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
181 LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
182 } else {
183 failedKeySet.add(rowKeyBase);
184 LOG.error("Failed to update the row with key = [" + rowKey
185 + "], since we could not get the original row");
186 }
187 }
188 if(columnValues != null) {
189 for (byte[] column : columnValues.keySet()) {
190 if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
191 continue;
192 }
193 MutationType mt = MutationType
194 .valueOf(RandomUtils.nextInt(MutationType.values().length));
195 long columnHash = Arrays.hashCode(column);
196 long hashCode = cfHash + columnHash;
197 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
198 byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
199 if (hashCode % 2 == 0) {
200 Cell kv = result.getColumnLatestCell(cf, column);
201 checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
202 Preconditions.checkNotNull(checkedValue,
203 "Column value to be checked should not be null");
204 }
205 buf.setLength(0);
206 buf.append("#").append(Bytes.toString(column)).append(":");
207 ++columnCount;
208 switch (mt) {
209 case PUT:
210 Put put = new Put(rowKey);
211 put.add(cf, column, hashCodeBytes);
212 mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
213 buf.append(MutationType.PUT.getNumber());
214 break;
215 case DELETE:
216 Delete delete = new Delete(rowKey);
217
218
219 delete.deleteColumns(cf, column);
220 mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
221 buf.append(MutationType.DELETE.getNumber());
222 break;
223 default:
224 buf.append(MutationType.APPEND.getNumber());
225 app.add(cf, column, hashCodeBytes);
226 }
227 app.add(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
228 if (!isBatchUpdate) {
229 mutate(table, app, rowKeyBase);
230 numCols.addAndGet(1);
231 app = new Append(rowKey);
232 }
233 }
234 }
235 }
236 if (isBatchUpdate) {
237 if (verbose) {
238 LOG.debug("Preparing increment and append for key = ["
239 + rowKey + "], " + columnCount + " columns");
240 }
241 mutate(table, inc, rowKeyBase);
242 mutate(table, app, rowKeyBase);
243 numCols.addAndGet(columnCount);
244 }
245 }
246 if (trackWroteKeys) {
247 wroteKeys.add(rowKeyBase);
248 }
249 }
250 } finally {
251 closeHTable();
252 numThreadsWorking.decrementAndGet();
253 }
254 }
255
256 protected void closeHTable() {
257 try {
258 if (table != null) {
259 table.close();
260 }
261 } catch (IOException e) {
262 LOG.error("Error closing table", e);
263 }
264 }
265
266 protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
267 Result result = null;
268 try {
269 result = table.get(get);
270 } catch (IOException ie) {
271 LOG.warn(
272 "Failed to get the row for key = [" + get.getRow() + "], column family = ["
273 + Bytes.toString(cf) + "]", ie);
274 }
275 return result;
276 }
277
278 public void mutate(Table table, Mutation m, long keyBase) {
279 mutate(table, m, keyBase, null, null, null, null);
280 }
281
282 public void mutate(Table table, Mutation m,
283 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
284 long start = System.currentTimeMillis();
285 try {
286 m = dataGenerator.beforeMutate(keyBase, m);
287 if (m instanceof Increment) {
288 table.increment((Increment)m);
289 } else if (m instanceof Append) {
290 table.append((Append)m);
291 } else if (m instanceof Put) {
292 table.checkAndPut(row, cf, q, v, (Put)m);
293 } else if (m instanceof Delete) {
294 table.checkAndDelete(row, cf, q, v, (Delete)m);
295 } else {
296 throw new IllegalArgumentException(
297 "unsupported mutation " + m.getClass().getSimpleName());
298 }
299 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
300 } catch (IOException e) {
301 if (ignoreNonceConflicts && (e instanceof OperationConflictException)) {
302 LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
303 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
304 return;
305 }
306 failedKeySet.add(keyBase);
307 String exceptionInfo;
308 if (e instanceof RetriesExhaustedWithDetailsException) {
309 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
310 exceptionInfo = aggEx.getExhaustiveDescription();
311 } else {
312 exceptionInfo = StringUtils.stringifyException(e);
313 }
314 LOG.error("Failed to mutate: " + keyBase + " after " +
315 (System.currentTimeMillis() - start) +
316 "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
317 + exceptionInfo);
318 }
319 }
320 }
321
322 @Override
323 public void waitForFinish() {
324 super.waitForFinish();
325 System.out.println("Failed to update keys: " + failedKeySet.size());
326 for (Long key : failedKeySet) {
327 System.out.println("Failed to update key: " + key);
328 }
329 }
330
331 public void mutate(Table table, Mutation m, long keyBase) {
332 mutate(table, m, keyBase, null, null, null, null);
333 }
334
335 public void mutate(Table table, Mutation m,
336 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
337 long start = System.currentTimeMillis();
338 try {
339 m = dataGenerator.beforeMutate(keyBase, m);
340 if (m instanceof Increment) {
341 table.increment((Increment)m);
342 } else if (m instanceof Append) {
343 table.append((Append)m);
344 } else if (m instanceof Put) {
345 table.checkAndPut(row, cf, q, v, (Put)m);
346 } else if (m instanceof Delete) {
347 table.checkAndDelete(row, cf, q, v, (Delete)m);
348 } else {
349 throw new IllegalArgumentException(
350 "unsupported mutation " + m.getClass().getSimpleName());
351 }
352 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
353 } catch (IOException e) {
354 failedKeySet.add(keyBase);
355 String exceptionInfo;
356 if (e instanceof RetriesExhaustedWithDetailsException) {
357 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
358 exceptionInfo = aggEx.getExhaustiveDescription();
359 } else {
360 StringWriter stackWriter = new StringWriter();
361 PrintWriter pw = new PrintWriter(stackWriter);
362 e.printStackTrace(pw);
363 pw.flush();
364 exceptionInfo = StringUtils.stringifyException(e);
365 }
366 LOG.error("Failed to mutate: " + keyBase + " after " + (System.currentTimeMillis() - start) +
367 "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
368 + exceptionInfo);
369 }
370 }
371
372 public void setIgnoreNonceConflicts(boolean value) {
373 this.ignoreNonceConflicts = value;
374 }
375 }