View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileSystem;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.HRegionInfo;
38  import org.apache.hadoop.hbase.HTableDescriptor;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.testclassification.MediumTests;
41  import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
42  import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
43  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44  import org.apache.hadoop.hbase.util.FSUtils;
45  import org.apache.hadoop.hbase.wal.WAL;
46  import org.apache.hadoop.hbase.wal.WALFactory;
47  import org.apache.hadoop.hbase.wal.WALKey;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.mapreduce.InputSplit;
50  import org.apache.hadoop.mapreduce.MapReduceTestUtil;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Test;
55  import org.junit.experimental.categories.Category;
56  
57  /**
58   * JUnit tests for the WALRecordReader
59   */
60  @Category(MediumTests.class)
61  public class TestWALRecordReader {
62    private final Log LOG = LogFactory.getLog(getClass());
63    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
64    private static Configuration conf;
65    private static FileSystem fs;
66    private static Path hbaseDir;
67    private static FileSystem walFs;
68    private static Path walRootDir;
69    // visible for TestHLogRecordReader
70    static final TableName tableName = TableName.valueOf(getName());
71    private static final byte [] rowName = tableName.getName();
72    // visible for TestHLogRecordReader
73    static final HRegionInfo info = new HRegionInfo(tableName,
74        Bytes.toBytes(""), Bytes.toBytes(""), false);
75    private static final byte [] family = Bytes.toBytes("column");
76    private static final byte [] value = Bytes.toBytes("value");
77    private static HTableDescriptor htd;
78    private static Path logDir;
79  
80    private static String getName() {
81      return "TestWALRecordReader";
82    }
83  
84    @Before
85    public void setUp() throws Exception {
86      fs.delete(hbaseDir, true);
87      walFs.delete(walRootDir, true);
88  
89    }
90    @BeforeClass
91    public static void setUpBeforeClass() throws Exception {
92      // Make block sizes small.
93      conf = TEST_UTIL.getConfiguration();
94      conf.setInt("dfs.blocksize", 1024 * 1024);
95      conf.setInt("dfs.replication", 1);
96      TEST_UTIL.startMiniDFSCluster(1);
97  
98      conf = TEST_UTIL.getConfiguration();
99      fs = TEST_UTIL.getDFSCluster().getFileSystem();
100 
101     hbaseDir = TEST_UTIL.createRootDir();
102 
103     walRootDir = TEST_UTIL.createWALRootDir();
104     walFs = FSUtils.getWALFileSystem(conf);
105     logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
106 
107     htd = new HTableDescriptor(tableName);
108     htd.addFamily(new HColumnDescriptor(family));
109   }
110 
111   @AfterClass
112   public static void tearDownAfterClass() throws Exception {
113     fs.delete(hbaseDir, true);
114     walFs.delete(walRootDir, true);
115     TEST_UTIL.shutdownMiniCluster();
116   }
117 
118   /**
119    * Test partial reads from the log based on passed time range
120    * @throws Exception
121    */
122   @Test
123   public void testPartialRead() throws Exception {
124     final WALFactory walfactory = new WALFactory(conf, null, getName());
125     WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
126     // This test depends on timestamp being millisecond based and the filename of the WAL also
127     // being millisecond based.
128     long ts = System.currentTimeMillis();
129     WALEdit edit = new WALEdit();
130     final AtomicLong sequenceId = new AtomicLong(0);
131     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
132     log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
133     edit = new WALEdit();
134     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
135     log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
136         true, null);
137     log.sync();
138     LOG.info("Before 1st WAL roll " + log.toString());
139     log.rollWriter();
140     LOG.info("Past 1st WAL roll " + log.toString());
141 
142     Thread.sleep(1);
143     long ts1 = System.currentTimeMillis();
144 
145     edit = new WALEdit();
146     edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
147     log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
148         true, null);
149     edit = new WALEdit();
150     edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
151     log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
152         true, null);
153     log.sync();
154     log.shutdown();
155     walfactory.shutdown();
156     LOG.info("Closed WAL " + log.toString());
157 
158  
159     WALInputFormat input = new WALInputFormat();
160     Configuration jobConf = new Configuration(conf);
161     jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
162     jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
163 
164     // only 1st file is considered, and only its 1st entry is used
165     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
166 
167     assertEquals(1, splits.size());
168     testSplit(splits.get(0), Bytes.toBytes("1"));
169 
170     jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
171     jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
172     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
173     // both files need to be considered
174     assertEquals(2, splits.size());
175     // only the 2nd entry from the 1st file is used
176     testSplit(splits.get(0), Bytes.toBytes("2"));
177     // only the 1nd entry from the 2nd file is used
178     testSplit(splits.get(1), Bytes.toBytes("3"));
179   }
180 
181   /**
182    * Test basic functionality
183    * @throws Exception
184    */
185   @Test
186   public void testWALRecordReader() throws Exception {
187     final WALFactory walfactory = new WALFactory(conf, null, getName());
188     WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
189     byte [] value = Bytes.toBytes("value");
190     final AtomicLong sequenceId = new AtomicLong(0);
191     WALEdit edit = new WALEdit();
192     edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
193         System.currentTimeMillis(), value));
194     long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
195         null);
196     log.sync(txid);
197 
198     Thread.sleep(1); // make sure 2nd log gets a later timestamp
199     long secondTs = System.currentTimeMillis();
200     log.rollWriter();
201 
202     edit = new WALEdit();
203     edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
204         System.currentTimeMillis(), value));
205     txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
206         null);
207     log.sync(txid);
208     log.shutdown();
209     walfactory.shutdown();
210     long thirdTs = System.currentTimeMillis();
211 
212     // should have 2 log files now
213     WALInputFormat input = new WALInputFormat();
214     Configuration jobConf = new Configuration(conf);
215     jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
216 
217     // make sure both logs are found
218     List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
219     assertEquals(2, splits.size());
220 
221     // should return exactly one KV
222     testSplit(splits.get(0), Bytes.toBytes("1"));
223     // same for the 2nd split
224     testSplit(splits.get(1), Bytes.toBytes("2"));
225 
226     // now test basic time ranges:
227 
228     // set an endtime, the 2nd log file can be ignored completely.
229     jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
230     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
231     assertEquals(1, splits.size());
232     testSplit(splits.get(0), Bytes.toBytes("1"));
233 
234     // now set a start time
235     jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
236     jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
237     splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
238     // both logs need to be considered
239     assertEquals(2, splits.size());
240     // but both readers skip all edits
241     testSplit(splits.get(0));
242     testSplit(splits.get(1));
243   }
244 
245   protected WALKey getWalKey(final long sequenceid) {
246     return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
247   }
248 
249   protected WALRecordReader getReader() {
250     return new WALKeyRecordReader();
251   }
252 
253   /**
254    * Create a new reader from the split, and match the edits against the passed columns.
255    */
256   private void testSplit(InputSplit split, byte[]... columns) throws Exception {
257     final WALRecordReader reader = getReader();
258     reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
259 
260     for (byte[] column : columns) {
261       assertTrue(reader.nextKeyValue());
262       Cell cell = reader.getCurrentValue().getCells().get(0);
263       if (!Bytes.equals(column, cell.getQualifier())) {
264         assertTrue("expected [" + Bytes.toString(column) + "], actual ["
265             + Bytes.toString(cell.getQualifier()) + "]", false);
266       }
267     }
268     assertFalse(reader.nextKeyValue());
269     reader.close();
270   }
271 
272 }