View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations
15   * under the License.
16   */
17  package org.apache.hadoop.hbase.io.encoding;
18  
19  import static org.junit.Assert.assertTrue;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.List;
26  import java.util.Random;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.HColumnDescriptor;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.testclassification.LargeTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.client.Admin;
40  import org.apache.hadoop.hbase.client.Connection;
41  import org.apache.hadoop.hbase.client.ConnectionFactory;
42  import org.apache.hadoop.hbase.client.Durability;
43  import org.apache.hadoop.hbase.client.Get;
44  import org.apache.hadoop.hbase.client.HBaseAdmin;
45  import org.apache.hadoop.hbase.client.HTable;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.regionserver.HRegionServer;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.Threads;
52  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
53  import org.junit.AfterClass;
54  import org.junit.BeforeClass;
55  import org.junit.Test;
56  import org.junit.experimental.categories.Category;
57  
58  /**
59   * Tests changing data block encoding settings of a column family.
60   */
61  @Category(LargeTests.class)
62  public class TestChangingEncoding {
63    private static final Log LOG = LogFactory.getLog(TestChangingEncoding.class);
64    static final String CF = "EncodingTestCF";
65    static final byte[] CF_BYTES = Bytes.toBytes(CF);
66  
67    private static final int NUM_ROWS_PER_BATCH = 100;
68    private static final int NUM_COLS_PER_ROW = 20;
69  
70    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71    private static final Configuration conf = TEST_UTIL.getConfiguration();
72  
73    private static final int TIMEOUT_MS = 600000;
74  
75    private HColumnDescriptor hcd;
76  
77    private TableName tableName;
78    private static final List<DataBlockEncoding> ENCODINGS_TO_ITERATE =
79        createEncodingsToIterate();
80  
81    private static final List<DataBlockEncoding> createEncodingsToIterate() {
82      List<DataBlockEncoding> encodings = new ArrayList<DataBlockEncoding>(
83          Arrays.asList(DataBlockEncoding.values()));
84      encodings.add(DataBlockEncoding.NONE);
85      return Collections.unmodifiableList(encodings);
86    }
87  
88    /** A zero-based index of the current batch of test data being written */
89    private int numBatchesWritten;
90  
91    private void prepareTest(String testId) throws IOException {
92      tableName = TableName.valueOf("test_table_" + testId);
93      HTableDescriptor htd = new HTableDescriptor(tableName);
94      hcd = new HColumnDescriptor(CF);
95      htd.addFamily(hcd);
96      try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
97        admin.createTable(htd);
98      }
99      numBatchesWritten = 0;
100   }
101 
102   @BeforeClass
103   public static void setUpBeforeClass() throws Exception {
104     // Use a small flush size to create more HFiles.
105     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
106     // ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
107     // ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
108     conf.setBoolean("hbase.online.schema.update.enable", true);
109     TEST_UTIL.startMiniCluster();
110   }
111 
112   @AfterClass
113   public static void tearDownAfterClass() throws Exception {
114     TEST_UTIL.shutdownMiniCluster();
115   }
116 
117   private static byte[] getRowKey(int batchId, int i) {
118     return Bytes.toBytes("batch" + batchId + "_row" + i);
119   }
120 
121   private static byte[] getQualifier(int j) {
122     return Bytes.toBytes("col" + j);
123   }
124 
125   private static byte[] getValue(int batchId, int i, int j) {
126     return Bytes.toBytes("value_for_" + Bytes.toString(getRowKey(batchId, i))
127         + "_col" + j);
128   }
129 
130   static void writeTestDataBatch(Configuration conf, TableName tableName,
131       int batchId) throws Exception {
132     LOG.debug("Writing test data batch " + batchId);
133     List<Put> puts = new ArrayList<>();
134     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
135       Put put = new Put(getRowKey(batchId, i));
136       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
137         put.add(CF_BYTES, getQualifier(j),
138             getValue(batchId, i, j));
139       }
140       put.setDurability(Durability.SKIP_WAL);
141       puts.add(put);
142     }
143     try (Connection conn = ConnectionFactory.createConnection(conf);
144         Table table = conn.getTable(tableName)) {
145       table.put(puts);
146     }
147   }
148 
149   static void verifyTestDataBatch(Configuration conf, TableName tableName,
150       int batchId) throws Exception {
151     LOG.debug("Verifying test data batch " + batchId);
152     Table table = new HTable(conf, tableName);
153     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
154       Get get = new Get(getRowKey(batchId, i));
155       Result result = table.get(get);
156       for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
157         Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
158         assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
159       }
160     }
161     table.close();
162   }
163 
164   private void writeSomeNewData() throws Exception {
165     writeTestDataBatch(conf, tableName, numBatchesWritten);
166     ++numBatchesWritten;
167   }
168 
169   private void verifyAllData() throws Exception {
170     for (int i = 0; i < numBatchesWritten; ++i) {
171       verifyTestDataBatch(conf, tableName, i);
172     }
173   }
174 
175   private void setEncodingConf(DataBlockEncoding encoding,
176       boolean onlineChange) throws Exception {
177     LOG.debug("Setting CF encoding to " + encoding + " (ordinal="
178       + encoding.ordinal() + "), onlineChange=" + onlineChange);
179     hcd.setDataBlockEncoding(encoding);
180     try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
181       if (!onlineChange) {
182         admin.disableTable(tableName);
183       }
184       admin.modifyColumn(tableName, hcd);
185       if (!onlineChange) {
186         admin.enableTable(tableName);
187       }
188     }
189     // This is a unit test, not integration test. So let's
190     // wait for regions out of transition. Otherwise, for online
191     // encoding change, verification phase may be flaky because
192     // regions could be still in transition.
193     ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher());
194   }
195 
196   @Test(timeout=TIMEOUT_MS)
197   public void testChangingEncoding() throws Exception {
198     prepareTest("ChangingEncoding");
199     for (boolean onlineChange : new boolean[]{false, true}) {
200       for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
201         setEncodingConf(encoding, onlineChange);
202         writeSomeNewData();
203         verifyAllData();
204       }
205     }
206   }
207 
208   @Test(timeout=TIMEOUT_MS)
209   public void testChangingEncodingWithCompaction() throws Exception {
210     prepareTest("ChangingEncodingWithCompaction");
211     for (boolean onlineChange : new boolean[]{false, true}) {
212       for (DataBlockEncoding encoding : ENCODINGS_TO_ITERATE) {
213         setEncodingConf(encoding, onlineChange);
214         writeSomeNewData();
215         verifyAllData();
216         compactAndWait();
217         verifyAllData();
218       }
219     }
220   }
221 
222   private void compactAndWait() throws IOException, InterruptedException {
223     LOG.debug("Compacting table " + tableName);
224     HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
225     HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
226     admin.majorCompact(tableName);
227 
228     // Waiting for the compaction to start, at least .5s.
229     final long maxWaitime = System.currentTimeMillis() + 500;
230     boolean cont;
231     do {
232       cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
233       Threads.sleep(1);
234     } while (cont && System.currentTimeMillis() < maxWaitime);
235 
236     while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
237       Threads.sleep(1);
238     }
239     LOG.debug("Compaction queue size reached 0, continuing");
240   }
241 
242   @Test
243   public void testCrazyRandomChanges() throws Exception {
244     prepareTest("RandomChanges");
245     Random rand = new Random(2934298742974297L);
246     for (int i = 0; i < 20; ++i) {
247       int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
248       DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
249       setEncodingConf(encoding, rand.nextBoolean());
250       writeSomeNewData();
251       verifyAllData();
252     }
253   }
254 
255 }