View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.apache.hadoop.hbase.HBaseTestingUtility;
25  import org.apache.hadoop.hbase.HConstants;
26  import org.apache.hadoop.hbase.HRegionInfo;
27  import org.apache.hadoop.hbase.HTableDescriptor;
28  import org.apache.hadoop.hbase.KeyValue;
29  import org.apache.hadoop.hbase.testclassification.LargeTests;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
32  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
33  import org.apache.hadoop.hbase.wal.WAL;
34  import org.apache.hadoop.hbase.wal.WALFactory;
35  import org.apache.hadoop.hbase.wal.WALKey;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hdfs.MiniDFSCluster;
38  import org.junit.After;
39  import org.junit.AfterClass;
40  import org.junit.Before;
41  import org.junit.BeforeClass;
42  import org.junit.Test;
43  import org.junit.experimental.categories.Category;
44  import org.junit.runner.RunWith;
45  import org.junit.runners.Parameterized;
46  import org.junit.runners.Parameterized.Parameters;
47  
48  import static org.junit.Assert.*;
49  
50  import java.io.IOException;
51  import java.util.ArrayList;
52  import java.util.Collection;
53  import java.util.List;
54  import java.util.concurrent.atomic.AtomicLong;
55  
56  @Category(LargeTests.class)
57  @RunWith(Parameterized.class)
58  public class TestReplicationWALReaderManager {
59  
60    private static HBaseTestingUtility TEST_UTIL;
61    private static Configuration conf;
62    private static Path hbaseDir;
63    private static FileSystem fs;
64    private static MiniDFSCluster cluster;
65    private static final TableName tableName = TableName.valueOf("tablename");
66    private static final byte [] family = Bytes.toBytes("column");
67    private static final byte [] qualifier = Bytes.toBytes("qualifier");
68    private static final HRegionInfo info = new HRegionInfo(tableName,
69        HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
70    private static final HTableDescriptor htd = new HTableDescriptor(tableName);
71  
72    private WAL log;
73    private ReplicationWALReaderManager logManager;
74    private PathWatcher pathWatcher;
75    private int nbRows;
76    private int walEditKVs;
77    private final AtomicLong sequenceId = new AtomicLong(1);
78  
79    @Parameters
80    public static Collection<Object[]> parameters() {
81      // Try out different combinations of row count and KeyValue count
82      int[] NB_ROWS = { 1500, 60000 };
83      int[] NB_KVS = { 1, 100 };
84      // whether compression is used
85      Boolean[] BOOL_VALS = { false, true };
86      List<Object[]> parameters = new ArrayList<Object[]>();
87      for (int nbRows : NB_ROWS) {
88        for (int walEditKVs : NB_KVS) {
89          for (boolean b : BOOL_VALS) {
90            Object[] arr = new Object[3];
91            arr[0] = nbRows;
92            arr[1] = walEditKVs;
93            arr[2] = b;
94            parameters.add(arr);
95          }
96        }
97      }
98      return parameters;
99    }
100 
101   public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
102     this.nbRows = nbRows;
103     this.walEditKVs = walEditKVs;
104     TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
105       enableCompression);
106   }
107   
108   @BeforeClass
109   public static void setUpBeforeClass() throws Exception {
110     TEST_UTIL = new HBaseTestingUtility();
111     conf = TEST_UTIL.getConfiguration();
112     TEST_UTIL.startMiniDFSCluster(3);
113 
114     hbaseDir = TEST_UTIL.createRootDir();
115     cluster = TEST_UTIL.getDFSCluster();
116     fs = cluster.getFileSystem();
117   }
118 
119   @AfterClass
120   public static void tearDownAfterClass() throws Exception {
121     TEST_UTIL.shutdownMiniCluster();
122   }
123 
124   @Before
125   public void setUp() throws Exception {
126     logManager = new ReplicationWALReaderManager(fs, conf);
127     List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
128     pathWatcher = new PathWatcher();
129     listeners.add(pathWatcher);
130     final WALFactory wals = new WALFactory(conf, listeners, "some server");
131     log = wals.getWAL(info.getEncodedNameAsBytes());
132   }
133 
134   @After
135   public void tearDown() throws Exception {
136     log.close();
137   }
138 
139   @Test
140   public void test() throws Exception {
141     // Grab the path that was generated when the log rolled as part of its creation
142     Path path = pathWatcher.currentPath;
143 
144     assertEquals(0, logManager.getPosition());
145 
146     appendToLog();
147 
148     // There's one edit in the log, read it. Reading past it needs to return nulls
149     assertNotNull(logManager.openReader(path));
150     logManager.seek();
151     WAL.Entry entry = logManager.readNextAndSetPosition();
152     assertNotNull(entry);
153     entry = logManager.readNextAndSetPosition();
154     assertNull(entry);
155     logManager.closeReader();
156     long oldPos = logManager.getPosition();
157 
158     appendToLog();
159 
160     // Read the newly added entry, make sure we made progress
161     assertNotNull(logManager.openReader(path));
162     logManager.seek();
163     entry = logManager.readNextAndSetPosition();
164     assertNotEquals(oldPos, logManager.getPosition());
165     assertNotNull(entry);
166     logManager.closeReader();
167     oldPos = logManager.getPosition();
168 
169     log.rollWriter();
170 
171     // We rolled but we still should see the end of the first log and not get data
172     assertNotNull(logManager.openReader(path));
173     logManager.seek();
174     entry = logManager.readNextAndSetPosition();
175     assertEquals(oldPos, logManager.getPosition());
176     assertNull(entry);
177     logManager.finishCurrentFile();
178 
179     path = pathWatcher.currentPath;
180 
181     for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
182     log.rollWriter();
183     logManager.openReader(path);
184     logManager.seek();
185     for (int i = 0; i < nbRows; i++) {
186       WAL.Entry e = logManager.readNextAndSetPosition();
187       if (e == null) {
188         fail("Should have enough entries");
189       }
190     }
191   }
192 
193   private void appendToLog() throws IOException {
194     appendToLogPlus(1);
195   }
196 
197   private void appendToLogPlus(int count) throws IOException {
198     final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
199         System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null);
200     log.sync(txid);
201   }
202 
203   private WALEdit getWALEdits(int count) {
204     WALEdit edit = new WALEdit();
205     for (int i = 0; i < count; i++) {
206       edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
207         System.currentTimeMillis(), qualifier));
208     }
209     return edit;
210   }
211 
212   class PathWatcher extends WALActionsListener.Base {
213 
214     Path currentPath;
215 
216     @Override
217     public void preLogRoll(Path oldPath, Path newPath) throws IOException {
218       currentPath = newPath;
219     }
220   }
221 }