1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.Random;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31 import org.apache.hadoop.hbase.client.Get;
32 import org.apache.hadoop.hbase.client.HBaseAdmin;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40 import org.apache.hadoop.hbase.testclassification.MediumTests;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.util.StringUtils;
43 import org.apache.hadoop.util.Tool;
44 import org.apache.hadoop.util.ToolRunner;
45 import org.junit.Test;
46 import org.junit.experimental.categories.Category;
47
48 import com.google.common.collect.Lists;
49
50
51
52
53
54
55
56
57 @Category(MediumTests.class)
58 public class TestAcidGuarantees implements Tool {
59 protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
60 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
61 public static final byte [] FAMILY_A = Bytes.toBytes("A");
62 public static final byte [] FAMILY_B = Bytes.toBytes("B");
63 public static final byte [] FAMILY_C = Bytes.toBytes("C");
64 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
65
66 public static final byte[][] FAMILIES = new byte[][] {
67 FAMILY_A, FAMILY_B, FAMILY_C };
68
69 private HBaseTestingUtility util;
70
71 public static int NUM_COLS_TO_CHECK = 50;
72
73
74 private Configuration conf;
75
76 private void createTableIfMissing(boolean useMob)
77 throws IOException {
78 try {
79 util.createTable(TABLE_NAME, FAMILIES);
80 } catch (TableExistsException tee) {
81 }
82
83 if (useMob) {
84 HTableDescriptor htd = util.getHBaseAdmin().getTableDescriptor(TABLE_NAME);
85 HColumnDescriptor hcd = htd.getColumnFamilies()[0];
86
87 hcd.setMobEnabled(true);
88 hcd.setMobThreshold(4);
89 util.getHBaseAdmin().modifyColumn(TABLE_NAME, hcd);
90 }
91 }
92
93 public TestAcidGuarantees() {
94
95 Configuration conf = HBaseConfiguration.create();
96 conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
97
98 conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
99 ConstantSizeRegionSplitPolicy.class.getName());
100 conf.setInt("hfile.format.version", 3);
101 util = new HBaseTestingUtility(conf);
102 }
103
104 public void setHBaseTestingUtil(HBaseTestingUtility util) {
105 this.util = util;
106 }
107
108
109
110
111 public static class AtomicityWriter extends RepeatingTestThread {
112 Random rand = new Random();
113 byte data[] = new byte[10];
114 byte targetRows[][];
115 byte targetFamilies[][];
116 Table table;
117 AtomicLong numWritten = new AtomicLong();
118
119 public AtomicityWriter(TestContext ctx, byte targetRows[][],
120 byte targetFamilies[][]) throws IOException {
121 super(ctx);
122 this.targetRows = targetRows;
123 this.targetFamilies = targetFamilies;
124 table = new HTable(ctx.getConf(), TABLE_NAME);
125 }
126 public void doAnAction() throws Exception {
127
128 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
129 Put p = new Put(targetRow);
130 rand.nextBytes(data);
131
132 for (byte[] family : targetFamilies) {
133 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
134 byte qualifier[] = Bytes.toBytes("col" + i);
135 p.add(family, qualifier, data);
136 }
137 }
138 table.put(p);
139 numWritten.getAndIncrement();
140 }
141 }
142
143
144
145
146
147 public static class AtomicGetReader extends RepeatingTestThread {
148 byte targetRow[];
149 byte targetFamilies[][];
150 Table table;
151 int numVerified = 0;
152 AtomicLong numRead = new AtomicLong();
153
154 public AtomicGetReader(TestContext ctx, byte targetRow[],
155 byte targetFamilies[][]) throws IOException {
156 super(ctx);
157 this.targetRow = targetRow;
158 this.targetFamilies = targetFamilies;
159 table = new HTable(ctx.getConf(), TABLE_NAME);
160 }
161
162 public void doAnAction() throws Exception {
163 Get g = new Get(targetRow);
164 Result res = table.get(g);
165 byte[] gotValue = null;
166 if (res.getRow() == null) {
167
168
169
170 return;
171 }
172
173 for (byte[] family : targetFamilies) {
174 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
175 byte qualifier[] = Bytes.toBytes("col" + i);
176 byte thisValue[] = res.getValue(family, qualifier);
177 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
178 gotFailure(gotValue, res);
179 }
180 numVerified++;
181 gotValue = thisValue;
182 }
183 }
184 numRead.getAndIncrement();
185 }
186
187 private void gotFailure(byte[] expected, Result res) {
188 StringBuilder msg = new StringBuilder();
189 msg.append("Failed after ").append(numVerified).append("!");
190 msg.append("Expected=").append(Bytes.toStringBinary(expected));
191 msg.append("Got:\n");
192 for (Cell kv : res.listCells()) {
193 msg.append(kv.toString());
194 msg.append(" val= ");
195 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
196 msg.append("\n");
197 }
198 throw new RuntimeException(msg.toString());
199 }
200 }
201
202
203
204
205
206 public static class AtomicScanReader extends RepeatingTestThread {
207 byte targetFamilies[][];
208 Table table;
209 AtomicLong numScans = new AtomicLong();
210 AtomicLong numRowsScanned = new AtomicLong();
211
212 public AtomicScanReader(TestContext ctx,
213 byte targetFamilies[][]) throws IOException {
214 super(ctx);
215 this.targetFamilies = targetFamilies;
216 table = new HTable(ctx.getConf(), TABLE_NAME);
217 }
218
219 public void doAnAction() throws Exception {
220 Scan s = new Scan();
221 for (byte[] family : targetFamilies) {
222 s.addFamily(family);
223 }
224 ResultScanner scanner = table.getScanner(s);
225
226 for (Result res : scanner) {
227 byte[] gotValue = null;
228
229 for (byte[] family : targetFamilies) {
230 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
231 byte qualifier[] = Bytes.toBytes("col" + i);
232 byte thisValue[] = res.getValue(family, qualifier);
233 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
234 gotFailure(gotValue, res);
235 }
236 gotValue = thisValue;
237 }
238 }
239 numRowsScanned.getAndIncrement();
240 }
241 numScans.getAndIncrement();
242 }
243
244 private void gotFailure(byte[] expected, Result res) {
245 StringBuilder msg = new StringBuilder();
246 msg.append("Failed after ").append(numRowsScanned).append("!");
247 msg.append("Expected=").append(Bytes.toStringBinary(expected));
248 msg.append("Got:\n");
249 for (Cell kv : res.listCells()) {
250 msg.append(kv.toString());
251 msg.append(" val= ");
252 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
253 msg.append("\n");
254 }
255 throw new RuntimeException(msg.toString());
256 }
257 }
258
259 public void runTestAtomicity(long millisToRun,
260 int numWriters,
261 int numGetters,
262 int numScanners,
263 int numUniqueRows) throws Exception {
264 runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
265 }
266
267 public void runTestAtomicity(long millisToRun,
268 int numWriters,
269 int numGetters,
270 int numScanners,
271 int numUniqueRows,
272 final boolean systemTest) throws Exception {
273 runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest,
274 false);
275 }
276
277 public void runTestAtomicity(long millisToRun,
278 int numWriters,
279 int numGetters,
280 int numScanners,
281 int numUniqueRows,
282 final boolean systemTest,
283 final boolean useMob) throws Exception {
284
285 createTableIfMissing(useMob);
286 TestContext ctx = new TestContext(util.getConfiguration());
287
288 byte rows[][] = new byte[numUniqueRows][];
289 for (int i = 0; i < numUniqueRows; i++) {
290 rows[i] = Bytes.toBytes("test_row_" + i);
291 }
292
293 List<AtomicityWriter> writers = Lists.newArrayList();
294 for (int i = 0; i < numWriters; i++) {
295 AtomicityWriter writer = new AtomicityWriter(
296 ctx, rows, FAMILIES);
297 writers.add(writer);
298 ctx.addThread(writer);
299 }
300
301 ctx.addThread(new RepeatingTestThread(ctx) {
302 HBaseAdmin admin = util.getHBaseAdmin();
303 public void doAnAction() throws Exception {
304 try {
305 admin.flush(TABLE_NAME);
306 } catch(IOException ioe) {
307 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
308 }
309
310
311
312
313
314
315
316
317 if (systemTest) Thread.sleep(60000);
318 }
319 });
320
321 List<AtomicGetReader> getters = Lists.newArrayList();
322 for (int i = 0; i < numGetters; i++) {
323 AtomicGetReader getter = new AtomicGetReader(
324 ctx, rows[i % numUniqueRows], FAMILIES);
325 getters.add(getter);
326 ctx.addThread(getter);
327 }
328
329 List<AtomicScanReader> scanners = Lists.newArrayList();
330 for (int i = 0; i < numScanners; i++) {
331 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
332 scanners.add(scanner);
333 ctx.addThread(scanner);
334 }
335
336 ctx.startThreads();
337 ctx.waitFor(millisToRun);
338 ctx.stop();
339
340 LOG.info("Finished test. Writers:");
341 for (AtomicityWriter writer : writers) {
342 LOG.info(" wrote " + writer.numWritten.get());
343 }
344 LOG.info("Readers:");
345 for (AtomicGetReader reader : getters) {
346 LOG.info(" read " + reader.numRead.get());
347 }
348 LOG.info("Scanners:");
349 for (AtomicScanReader scanner : scanners) {
350 LOG.info(" scanned " + scanner.numScans.get());
351 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
352 }
353 }
354
355 @Test
356 public void testGetAtomicity() throws Exception {
357 util.startMiniCluster(1);
358 try {
359 runTestAtomicity(20000, 5, 5, 0, 3);
360 } finally {
361 util.shutdownMiniCluster();
362 }
363 }
364
365 @Test
366 public void testScanAtomicity() throws Exception {
367 util.startMiniCluster(1);
368 try {
369 runTestAtomicity(20000, 5, 0, 5, 3);
370 } finally {
371 util.shutdownMiniCluster();
372 }
373 }
374
375 @Test
376 public void testMixedAtomicity() throws Exception {
377 util.startMiniCluster(1);
378 try {
379 runTestAtomicity(20000, 5, 2, 2, 3);
380 } finally {
381 util.shutdownMiniCluster();
382 }
383 }
384
385 @Test
386 public void testMobGetAtomicity() throws Exception {
387 util.startMiniCluster(1);
388 try {
389 boolean systemTest = false;
390 boolean useMob = true;
391 runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob);
392 } finally {
393 util.shutdownMiniCluster();
394 }
395 }
396
397 @Test
398 public void testMobScanAtomicity() throws Exception {
399 util.startMiniCluster(1);
400 try {
401 boolean systemTest = false;
402 boolean useMob = true;
403 runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob);
404 } finally {
405 util.shutdownMiniCluster();
406 }
407 }
408
409 @Test
410 public void testMobMixedAtomicity() throws Exception {
411 util.startMiniCluster(1);
412 try {
413 boolean systemTest = false;
414 boolean useMob = true;
415 runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob);
416 } finally {
417 util.shutdownMiniCluster();
418 }
419 }
420
421
422
423
424 @Override
425 public Configuration getConf() {
426 return conf;
427 }
428
429 @Override
430 public void setConf(Configuration c) {
431 this.conf = c;
432 this.util = new HBaseTestingUtility(c);
433 }
434
435 @Override
436 public int run(String[] arg0) throws Exception {
437 Configuration c = getConf();
438 int millis = c.getInt("millis", 5000);
439 int numWriters = c.getInt("numWriters", 50);
440 int numGetters = c.getInt("numGetters", 2);
441 int numScanners = c.getInt("numScanners", 2);
442 int numUniqueRows = c.getInt("numUniqueRows", 3);
443 boolean useMob = c.getBoolean("useMob",false);
444 assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3";
445 runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
446 return 0;
447 }
448
449 public static void main(String args[]) throws Exception {
450 Configuration c = HBaseConfiguration.create();
451 int status;
452 try {
453 TestAcidGuarantees test = new TestAcidGuarantees();
454 status = ToolRunner.run(c, test, args);
455 } catch (Exception e) {
456 LOG.error("Exiting due to error", e);
457 status = -1;
458 }
459 System.exit(status);
460 }
461
462
463 }
464