View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.Random;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
30  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31  import org.apache.hadoop.hbase.client.Get;
32  import org.apache.hadoop.hbase.client.HBaseAdmin;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Put;
35  import org.apache.hadoop.hbase.client.Result;
36  import org.apache.hadoop.hbase.client.ResultScanner;
37  import org.apache.hadoop.hbase.client.Scan;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40  import org.apache.hadoop.hbase.testclassification.MediumTests;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.util.StringUtils;
43  import org.apache.hadoop.util.Tool;
44  import org.apache.hadoop.util.ToolRunner;
45  import org.junit.Test;
46  import org.junit.experimental.categories.Category;
47  
48  import com.google.common.collect.Lists;
49  
50  /**
51   * Test case that uses multiple threads to read and write multifamily rows
52   * into a table, verifying that reads never see partially-complete writes.
53   *
54   * This can run as a junit test, or with a main() function which runs against
55   * a real cluster (eg for testing with failures, region movement, etc)
56   */
57  @Category(MediumTests.class)
58  public class TestAcidGuarantees implements Tool {
59    protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
60    public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
61    public static final byte [] FAMILY_A = Bytes.toBytes("A");
62    public static final byte [] FAMILY_B = Bytes.toBytes("B");
63    public static final byte [] FAMILY_C = Bytes.toBytes("C");
64    public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
65  
66    public static final byte[][] FAMILIES = new byte[][] {
67      FAMILY_A, FAMILY_B, FAMILY_C };
68  
69    private HBaseTestingUtility util;
70  
71    public static int NUM_COLS_TO_CHECK = 50;
72  
73    // when run as main
74    private Configuration conf;
75  
76    private void createTableIfMissing(boolean useMob)
77      throws IOException {
78      try {
79        util.createTable(TABLE_NAME, FAMILIES);
80      } catch (TableExistsException tee) {
81      }
82  
83      if (useMob) {
84        HTableDescriptor htd = util.getHBaseAdmin().getTableDescriptor(TABLE_NAME);
85        HColumnDescriptor hcd =  htd.getColumnFamilies()[0];
86        // force mob enabled such that all data is mob data
87        hcd.setMobEnabled(true);
88        hcd.setMobThreshold(4);
89        util.getHBaseAdmin().modifyColumn(TABLE_NAME, hcd);
90      }
91    }
92  
93    public TestAcidGuarantees() {
94      // Set small flush size for minicluster so we exercise reseeking scanners
95      Configuration conf = HBaseConfiguration.create();
96      conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
97      // prevent aggressive region split
98      conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
99              ConstantSizeRegionSplitPolicy.class.getName());
100     conf.setInt("hfile.format.version", 3); // for mob tests
101     util = new HBaseTestingUtility(conf);
102   }
103 
104   public void setHBaseTestingUtil(HBaseTestingUtility util) {
105     this.util = util;
106   }
107 
108   /**
109    * Thread that does random full-row writes into a table.
110    */
111   public static class AtomicityWriter extends RepeatingTestThread {
112     Random rand = new Random();
113     byte data[] = new byte[10];
114     byte targetRows[][];
115     byte targetFamilies[][];
116     Table table;
117     AtomicLong numWritten = new AtomicLong();
118 
119     public AtomicityWriter(TestContext ctx, byte targetRows[][],
120                            byte targetFamilies[][]) throws IOException {
121       super(ctx);
122       this.targetRows = targetRows;
123       this.targetFamilies = targetFamilies;
124       table = new HTable(ctx.getConf(), TABLE_NAME);
125     }
126     public void doAnAction() throws Exception {
127       // Pick a random row to write into
128       byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
129       Put p = new Put(targetRow);
130       rand.nextBytes(data);
131 
132       for (byte[] family : targetFamilies) {
133         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
134           byte qualifier[] = Bytes.toBytes("col" + i);
135           p.add(family, qualifier, data);
136         }
137       }
138       table.put(p);
139       numWritten.getAndIncrement();
140     }
141   }
142 
143   /**
144    * Thread that does single-row reads in a table, looking for partially
145    * completed rows.
146    */
147   public static class AtomicGetReader extends RepeatingTestThread {
148     byte targetRow[];
149     byte targetFamilies[][];
150     Table table;
151     int numVerified = 0;
152     AtomicLong numRead = new AtomicLong();
153 
154     public AtomicGetReader(TestContext ctx, byte targetRow[],
155                            byte targetFamilies[][]) throws IOException {
156       super(ctx);
157       this.targetRow = targetRow;
158       this.targetFamilies = targetFamilies;
159       table = new HTable(ctx.getConf(), TABLE_NAME);
160     }
161 
162     public void doAnAction() throws Exception {
163       Get g = new Get(targetRow);
164       Result res = table.get(g);
165       byte[] gotValue = null;
166       if (res.getRow() == null) {
167         // Trying to verify but we didn't find the row - the writing
168         // thread probably just hasn't started writing yet, so we can
169         // ignore this action
170         return;
171       }
172 
173       for (byte[] family : targetFamilies) {
174         for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
175           byte qualifier[] = Bytes.toBytes("col" + i);
176           byte thisValue[] = res.getValue(family, qualifier);
177           if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
178             gotFailure(gotValue, res);
179           }
180           numVerified++;
181           gotValue = thisValue;
182         }
183       }
184       numRead.getAndIncrement();
185     }
186 
187     private void gotFailure(byte[] expected, Result res) {
188       StringBuilder msg = new StringBuilder();
189       msg.append("Failed after ").append(numVerified).append("!");
190       msg.append("Expected=").append(Bytes.toStringBinary(expected));
191       msg.append("Got:\n");
192       for (Cell kv : res.listCells()) {
193         msg.append(kv.toString());
194         msg.append(" val= ");
195         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
196         msg.append("\n");
197       }
198       throw new RuntimeException(msg.toString());
199     }
200   }
201 
202   /**
203    * Thread that does full scans of the table looking for any partially completed
204    * rows.
205    */
206   public static class AtomicScanReader extends RepeatingTestThread {
207     byte targetFamilies[][];
208     Table table;
209     AtomicLong numScans = new AtomicLong();
210     AtomicLong numRowsScanned = new AtomicLong();
211 
212     public AtomicScanReader(TestContext ctx,
213                            byte targetFamilies[][]) throws IOException {
214       super(ctx);
215       this.targetFamilies = targetFamilies;
216       table = new HTable(ctx.getConf(), TABLE_NAME);
217     }
218 
219     public void doAnAction() throws Exception {
220       Scan s = new Scan();
221       for (byte[] family : targetFamilies) {
222         s.addFamily(family);
223       }
224       ResultScanner scanner = table.getScanner(s);
225 
226       for (Result res : scanner) {
227         byte[] gotValue = null;
228 
229         for (byte[] family : targetFamilies) {
230           for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
231             byte qualifier[] = Bytes.toBytes("col" + i);
232             byte thisValue[] = res.getValue(family, qualifier);
233             if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
234               gotFailure(gotValue, res);
235             }
236             gotValue = thisValue;
237           }
238         }
239         numRowsScanned.getAndIncrement();
240       }
241       numScans.getAndIncrement();
242     }
243 
244     private void gotFailure(byte[] expected, Result res) {
245       StringBuilder msg = new StringBuilder();
246       msg.append("Failed after ").append(numRowsScanned).append("!");
247       msg.append("Expected=").append(Bytes.toStringBinary(expected));
248       msg.append("Got:\n");
249       for (Cell kv : res.listCells()) {
250         msg.append(kv.toString());
251         msg.append(" val= ");
252         msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
253         msg.append("\n");
254       }
255       throw new RuntimeException(msg.toString());
256     }
257   }
258 
259   public void runTestAtomicity(long millisToRun,
260       int numWriters,
261       int numGetters,
262       int numScanners,
263       int numUniqueRows) throws Exception {
264     runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false);
265   }
266 
267   public void runTestAtomicity(long millisToRun,
268       int numWriters,
269       int numGetters,
270       int numScanners,
271       int numUniqueRows,
272       final boolean systemTest) throws Exception {
273     runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, systemTest,
274             false);
275   }
276 
277   public void runTestAtomicity(long millisToRun,
278     int numWriters,
279     int numGetters,
280     int numScanners,
281     int numUniqueRows,
282     final boolean systemTest,
283     final boolean useMob) throws Exception {
284 
285     createTableIfMissing(useMob);
286     TestContext ctx = new TestContext(util.getConfiguration());
287 
288     byte rows[][] = new byte[numUniqueRows][];
289     for (int i = 0; i < numUniqueRows; i++) {
290       rows[i] = Bytes.toBytes("test_row_" + i);
291     }
292 
293     List<AtomicityWriter> writers = Lists.newArrayList();
294     for (int i = 0; i < numWriters; i++) {
295       AtomicityWriter writer = new AtomicityWriter(
296           ctx, rows, FAMILIES);
297       writers.add(writer);
298       ctx.addThread(writer);
299     }
300     // Add a flusher
301     ctx.addThread(new RepeatingTestThread(ctx) {
302       HBaseAdmin admin = util.getHBaseAdmin();
303       public void doAnAction() throws Exception {
304         try {
305           admin.flush(TABLE_NAME);
306         } catch(IOException ioe) {
307           LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
308         }
309         // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
310         // we would flush as often as possible.  On a running cluster, this isn't practical:
311         // (1) we will cause a lot of load due to all the flushing and compacting
312         // (2) we cannot change the flushing/compacting related Configuration options to try to
313         // alleviate this
314         // (3) it is an unrealistic workload, since no one would actually flush that often.
315         // Therefore, let's flush every minute to have more flushes than usual, but not overload
316         // the running cluster.
317         if (systemTest) Thread.sleep(60000);
318       }
319     });
320 
321     List<AtomicGetReader> getters = Lists.newArrayList();
322     for (int i = 0; i < numGetters; i++) {
323       AtomicGetReader getter = new AtomicGetReader(
324           ctx, rows[i % numUniqueRows], FAMILIES);
325       getters.add(getter);
326       ctx.addThread(getter);
327     }
328 
329     List<AtomicScanReader> scanners = Lists.newArrayList();
330     for (int i = 0; i < numScanners; i++) {
331       AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
332       scanners.add(scanner);
333       ctx.addThread(scanner);
334     }
335 
336     ctx.startThreads();
337     ctx.waitFor(millisToRun);
338     ctx.stop();
339 
340     LOG.info("Finished test. Writers:");
341     for (AtomicityWriter writer : writers) {
342       LOG.info("  wrote " + writer.numWritten.get());
343     }
344     LOG.info("Readers:");
345     for (AtomicGetReader reader : getters) {
346       LOG.info("  read " + reader.numRead.get());
347     }
348     LOG.info("Scanners:");
349     for (AtomicScanReader scanner : scanners) {
350       LOG.info("  scanned " + scanner.numScans.get());
351       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
352     }
353   }
354 
355   @Test
356   public void testGetAtomicity() throws Exception {
357     util.startMiniCluster(1);
358     try {
359       runTestAtomicity(20000, 5, 5, 0, 3);
360     } finally {
361       util.shutdownMiniCluster();
362     }
363   }
364 
365   @Test
366   public void testScanAtomicity() throws Exception {
367     util.startMiniCluster(1);
368     try {
369       runTestAtomicity(20000, 5, 0, 5, 3);
370     } finally {
371       util.shutdownMiniCluster();
372     }
373   }
374 
375   @Test
376   public void testMixedAtomicity() throws Exception {
377     util.startMiniCluster(1);
378     try {
379       runTestAtomicity(20000, 5, 2, 2, 3);
380     } finally {
381       util.shutdownMiniCluster();
382     }
383   }
384 
385   @Test
386   public void testMobGetAtomicity() throws Exception {
387     util.startMiniCluster(1);
388     try {
389       boolean systemTest = false;
390       boolean useMob = true;
391       runTestAtomicity(20000, 5, 5, 0, 3, systemTest, useMob);
392     } finally {
393       util.shutdownMiniCluster();
394     }
395   }
396 
397   @Test
398   public void testMobScanAtomicity() throws Exception {
399     util.startMiniCluster(1);
400     try {
401       boolean systemTest = false;
402       boolean useMob = true;
403       runTestAtomicity(20000, 5, 0, 5, 3, systemTest, useMob);
404     } finally {
405       util.shutdownMiniCluster();
406     }
407   }
408 
409   @Test
410   public void testMobMixedAtomicity() throws Exception {
411     util.startMiniCluster(1);
412     try {
413       boolean systemTest = false;
414       boolean useMob = true;
415       runTestAtomicity(20000, 5, 2, 2, 3, systemTest, useMob);
416     } finally {
417       util.shutdownMiniCluster();
418     }
419   }
420 
421   ////////////////////////////////////////////////////////////////////////////
422   // Tool interface
423   ////////////////////////////////////////////////////////////////////////////
424   @Override
425   public Configuration getConf() {
426     return conf;
427   }
428 
429   @Override
430   public void setConf(Configuration c) {
431     this.conf = c;
432     this.util = new HBaseTestingUtility(c);
433   }
434 
435   @Override
436   public int run(String[] arg0) throws Exception {
437     Configuration c = getConf();
438     int millis = c.getInt("millis", 5000);
439     int numWriters = c.getInt("numWriters", 50);
440     int numGetters = c.getInt("numGetters", 2);
441     int numScanners = c.getInt("numScanners", 2);
442     int numUniqueRows = c.getInt("numUniqueRows", 3);
443     boolean useMob = c.getBoolean("useMob",false);
444     assert useMob && c.getInt("hfile.format.version", 2) == 3 : "Mob runs must use hfile v3";
445     runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true, useMob);
446     return 0;
447   }
448 
449   public static void main(String args[]) throws Exception {
450     Configuration c = HBaseConfiguration.create();
451     int status;
452     try {
453       TestAcidGuarantees test = new TestAcidGuarantees();
454       status = ToolRunner.run(c, test, args);
455     } catch (Exception e) {
456       LOG.error("Exiting due to error", e);
457       status = -1;
458     }
459     System.exit(status);
460   }
461 
462 
463 }
464