View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.util;
18  
19  import static org.junit.Assert.assertEquals;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.testclassification.LargeTests;
35  import org.apache.hadoop.hbase.TableNotFoundException;
36  import org.apache.hadoop.hbase.client.Admin;
37  import org.apache.hadoop.hbase.client.HBaseAdmin;
38  import org.apache.hadoop.hbase.io.compress.Compression;
39  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
40  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
41  import org.junit.After;
42  import org.junit.Before;
43  import org.junit.Test;
44  import org.junit.experimental.categories.Category;
45  import org.junit.runner.RunWith;
46  import org.junit.runners.Parameterized;
47  import org.junit.runners.Parameterized.Parameters;
48  
49  /**
50   * A write/read/verify load test on a mini HBase cluster. Tests reading
51   * and then writing.
52   */
53  @Category(LargeTests.class)
54  @RunWith(Parameterized.class)
55  public class TestMiniClusterLoadSequential {
56  
57    private static final Log LOG = LogFactory.getLog(
58        TestMiniClusterLoadSequential.class);
59  
60    protected static final TableName TABLE =
61        TableName.valueOf("load_test_tbl");
62    protected static final byte[] CF = Bytes.toBytes("load_test_cf");
63    protected static final int NUM_THREADS = 8;
64    protected static final int NUM_RS = 2;
65    protected static final int TIMEOUT_MS = 180000;
66    protected static final HBaseTestingUtility TEST_UTIL =
67        new HBaseTestingUtility();
68  
69    protected final Configuration conf = TEST_UTIL.getConfiguration();
70    protected final boolean isMultiPut;
71    protected final DataBlockEncoding dataBlockEncoding;
72  
73    protected MultiThreadedWriter writerThreads;
74    protected MultiThreadedReader readerThreads;
75    protected int numKeys;
76  
77    protected Compression.Algorithm compression = Compression.Algorithm.NONE;
78  
79    public TestMiniClusterLoadSequential(boolean isMultiPut,
80        DataBlockEncoding dataBlockEncoding) {
81      this.isMultiPut = isMultiPut;
82      this.dataBlockEncoding = dataBlockEncoding;
83      conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
84  
85      // We don't want any region reassignments by the load balancer during the test.
86      conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, 10.0f);
87    }
88  
89    @Parameters
90    public static Collection<Object[]> parameters() {
91      List<Object[]> parameters = new ArrayList<Object[]>();
92      for (boolean multiPut : new boolean[]{false, true}) {
93        for (DataBlockEncoding dataBlockEncoding : new DataBlockEncoding[] {
94            DataBlockEncoding.NONE, DataBlockEncoding.PREFIX }) {
95          parameters.add(new Object[]{multiPut, dataBlockEncoding});
96        }
97      }
98      return parameters;
99    }
100 
101   @Before
102   public void setUp() throws Exception {
103     LOG.debug("Test setup: isMultiPut=" + isMultiPut);
104     TEST_UTIL.startMiniCluster(1, NUM_RS);
105   }
106 
107   @After
108   public void tearDown() throws Exception {
109     LOG.debug("Test teardown: isMultiPut=" + isMultiPut);
110     TEST_UTIL.shutdownMiniCluster();
111   }
112 
113   protected MultiThreadedReader prepareReaderThreads(LoadTestDataGenerator dataGen,
114       Configuration conf, TableName tableName, double verifyPercent) throws IOException {
115     MultiThreadedReader reader = new MultiThreadedReader(dataGen, conf, tableName, verifyPercent);
116     return reader;
117   }
118 
119   protected MultiThreadedWriter prepareWriterThreads(LoadTestDataGenerator dataGen,
120       Configuration conf, TableName tableName) throws IOException {
121     MultiThreadedWriter writer = new MultiThreadedWriter(dataGen, conf, tableName);
122     writer.setMultiPut(isMultiPut);
123     return writer;
124   }
125 
126   @Test(timeout=TIMEOUT_MS)
127   public void loadTest() throws Exception {
128     prepareForLoadTest();
129     runLoadTestOnExistingTable();
130   }
131 
132   protected void runLoadTestOnExistingTable() throws IOException {
133     writerThreads.start(0, numKeys, NUM_THREADS);
134     writerThreads.waitForFinish();
135     assertEquals(0, writerThreads.getNumWriteFailures());
136 
137     readerThreads.start(0, numKeys, NUM_THREADS);
138     readerThreads.waitForFinish();
139     assertEquals(0, readerThreads.getNumReadFailures());
140     assertEquals(0, readerThreads.getNumReadErrors());
141     assertEquals(numKeys, readerThreads.getNumKeysVerified());
142   }
143 
144   protected void createPreSplitLoadTestTable(HTableDescriptor htd, HColumnDescriptor hcd)
145       throws IOException {
146     HBaseTestingUtility.createPreSplitLoadTestTable(conf, htd, hcd);
147     TEST_UTIL.waitUntilAllRegionsAssigned(htd.getTableName());
148   }
149 
150   protected void prepareForLoadTest() throws IOException {
151     LOG.info("Starting load test: dataBlockEncoding=" + dataBlockEncoding +
152         ", isMultiPut=" + isMultiPut);
153     numKeys = numKeys();
154     Admin admin = new HBaseAdmin(conf);
155     while (admin.getClusterStatus().getServers().size() < NUM_RS) {
156       LOG.info("Sleeping until " + NUM_RS + " RSs are online");
157       Threads.sleepWithoutInterrupt(1000);
158     }
159     admin.close();
160 
161     HTableDescriptor htd = new HTableDescriptor(TABLE);
162     HColumnDescriptor hcd = new HColumnDescriptor(CF)
163       .setCompressionType(compression)
164       .setDataBlockEncoding(dataBlockEncoding);
165     createPreSplitLoadTestTable(htd, hcd);
166 
167     LoadTestDataGenerator dataGen = new MultiThreadedAction.DefaultDataGenerator(CF);
168     writerThreads = prepareWriterThreads(dataGen, conf, TABLE);
169     readerThreads = prepareReaderThreads(dataGen, conf, TABLE, 100);
170   }
171 
172   protected int numKeys() {
173     return 1000;
174   }
175 
176   protected HColumnDescriptor getColumnDesc(Admin admin)
177       throws TableNotFoundException, IOException {
178     return admin.getTableDescriptor(TABLE).getFamily(CF);
179   }
180 
181 }