View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  package org.apache.hadoop.hbase.coprocessor;
12  
13  import java.io.IOException;
14  
15  import org.apache.commons.logging.Log;
16  import org.apache.commons.logging.LogFactory;
17  import org.apache.hadoop.conf.Configuration;
18  import org.apache.hadoop.hbase.HBaseTestingUtility;
19  import org.apache.hadoop.hbase.HConstants;
20  import org.apache.hadoop.hbase.TableName;
21  import org.apache.hadoop.hbase.client.Durability;
22  import org.apache.hadoop.hbase.client.Put;
23  import org.apache.hadoop.hbase.client.Table;
24  import org.apache.hadoop.hbase.regionserver.HRegion;
25  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
26  import org.apache.hadoop.hbase.testclassification.LargeTests;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.junit.AfterClass;
29  import org.junit.Assert;
30  import org.junit.BeforeClass;
31  import org.junit.Test;
32  import org.junit.experimental.categories.Category;
33  
34  /**
35   * Test that verifies we do not have memstore size negative when a postPut/Delete hook is
36   * slow/expensive and a flush is triggered at the same time the coprocessow is doing its work. To
37   * simulate this we call flush from the coprocessor itself
38   */
39  @Category(LargeTests.class)
40  public class TestNegativeMemstoreSizeWithSlowCoprocessor {
41  
42    static final Log LOG = LogFactory.getLog(TestNegativeMemstoreSizeWithSlowCoprocessor.class);
43    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
44    private static final byte[] tableName = Bytes.toBytes("test_table");
45    private static final byte[] family = Bytes.toBytes("f");
46    private static final byte[] qualifier = Bytes.toBytes("q");
47  
48    @BeforeClass
49    public static void setupBeforeClass() throws Exception {
50      Configuration conf = TEST_UTIL.getConfiguration();
51      conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
52        FlushingRegionObserver.class.getName());
53      conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
54      conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
55      TEST_UTIL.startMiniCluster(1);
56      TEST_UTIL.createTable(TableName.valueOf(tableName), family);
57    }
58  
59    @AfterClass
60    public static void tearDownAfterClass() throws Exception {
61      TEST_UTIL.shutdownMiniCluster();
62    }
63  
64    @Test
65    public void testNegativeMemstoreSize() throws IOException, InterruptedException {
66      boolean IOEthrown = false;
67      Table table = null;
68      try {
69        table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
70  
71        // Adding data
72        Put put1 = new Put(Bytes.toBytes("row1"));
73        put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
74        table.put(put1);
75        Put put2 = new Put(Bytes.toBytes("row2"));
76        put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
77        table.put(put2);
78        table.put(put2);
79      } catch (IOException e) {
80        IOEthrown = true;
81      } finally {
82        Assert.assertFalse("Shouldn't have thrown an exception", IOEthrown);
83        if (table != null) {
84          table.close();
85        }
86      }
87    }
88  
89    public static class FlushingRegionObserver extends SimpleRegionObserver {
90  
91      @Override
92      public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
93          final WALEdit edit, final Durability durability) throws IOException {
94        HRegion region = (HRegion) c.getEnvironment().getRegion();
95        super.postPut(c, put, edit, durability);
96  
97        if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
98          region.flush(false);
99          Assert.assertTrue(region.addAndGetGlobalMemstoreSize(0) >= 0);
100       }
101     }
102   }
103 }