1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.replication.regionserver;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.HBaseTestingUtility;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.HTableDescriptor;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.testclassification.LargeTests;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
32 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
33 import org.apache.hadoop.hbase.wal.WAL;
34 import org.apache.hadoop.hbase.wal.WALFactory;
35 import org.apache.hadoop.hbase.wal.WALKey;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.hdfs.MiniDFSCluster;
38 import org.junit.After;
39 import org.junit.AfterClass;
40 import org.junit.Before;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.junit.runner.RunWith;
45 import org.junit.runners.Parameterized;
46 import org.junit.runners.Parameterized.Parameters;
47
48 import static org.junit.Assert.*;
49
50 import java.io.IOException;
51 import java.util.ArrayList;
52 import java.util.Collection;
53 import java.util.List;
54 import java.util.concurrent.atomic.AtomicLong;
55
56 @Category(LargeTests.class)
57 @RunWith(Parameterized.class)
58 public class TestReplicationWALReaderManager {
59
60 private static HBaseTestingUtility TEST_UTIL;
61 private static Configuration conf;
62 private static Path hbaseDir;
63 private static FileSystem fs;
64 private static MiniDFSCluster cluster;
65 private static final TableName tableName = TableName.valueOf("tablename");
66 private static final byte [] family = Bytes.toBytes("column");
67 private static final byte [] qualifier = Bytes.toBytes("qualifier");
68 private static final HRegionInfo info = new HRegionInfo(tableName,
69 HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
70 private static final HTableDescriptor htd = new HTableDescriptor(tableName);
71
72 private WAL log;
73 private ReplicationWALReaderManager logManager;
74 private PathWatcher pathWatcher;
75 private int nbRows;
76 private int walEditKVs;
77 private final AtomicLong sequenceId = new AtomicLong(1);
78
79 @Parameters
80 public static Collection<Object[]> parameters() {
81
82 int[] NB_ROWS = { 1500, 60000 };
83 int[] NB_KVS = { 1, 100 };
84
85 Boolean[] BOOL_VALS = { false, true };
86 List<Object[]> parameters = new ArrayList<Object[]>();
87 for (int nbRows : NB_ROWS) {
88 for (int walEditKVs : NB_KVS) {
89 for (boolean b : BOOL_VALS) {
90 Object[] arr = new Object[3];
91 arr[0] = nbRows;
92 arr[1] = walEditKVs;
93 arr[2] = b;
94 parameters.add(arr);
95 }
96 }
97 }
98 return parameters;
99 }
100
101 public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
102 this.nbRows = nbRows;
103 this.walEditKVs = walEditKVs;
104 TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
105 enableCompression);
106 }
107
108 @BeforeClass
109 public static void setUpBeforeClass() throws Exception {
110 TEST_UTIL = new HBaseTestingUtility();
111 conf = TEST_UTIL.getConfiguration();
112 TEST_UTIL.startMiniDFSCluster(3);
113
114 hbaseDir = TEST_UTIL.createRootDir();
115 cluster = TEST_UTIL.getDFSCluster();
116 fs = cluster.getFileSystem();
117 }
118
119 @AfterClass
120 public static void tearDownAfterClass() throws Exception {
121 TEST_UTIL.shutdownMiniCluster();
122 }
123
124 @Before
125 public void setUp() throws Exception {
126 logManager = new ReplicationWALReaderManager(fs, conf);
127 List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
128 pathWatcher = new PathWatcher();
129 listeners.add(pathWatcher);
130 final WALFactory wals = new WALFactory(conf, listeners, "some server");
131 log = wals.getWAL(info.getEncodedNameAsBytes());
132 }
133
134 @After
135 public void tearDown() throws Exception {
136 log.close();
137 }
138
139 @Test
140 public void test() throws Exception {
141
142 Path path = pathWatcher.currentPath;
143
144 assertEquals(0, logManager.getPosition());
145
146 appendToLog();
147
148
149 assertNotNull(logManager.openReader(path));
150 logManager.seek();
151 WAL.Entry entry = logManager.readNextAndSetPosition();
152 assertNotNull(entry);
153 entry = logManager.readNextAndSetPosition();
154 assertNull(entry);
155 logManager.closeReader();
156 long oldPos = logManager.getPosition();
157
158 appendToLog();
159
160
161 assertNotNull(logManager.openReader(path));
162 logManager.seek();
163 entry = logManager.readNextAndSetPosition();
164 assertNotEquals(oldPos, logManager.getPosition());
165 assertNotNull(entry);
166 logManager.closeReader();
167 oldPos = logManager.getPosition();
168
169 log.rollWriter();
170
171
172 assertNotNull(logManager.openReader(path));
173 logManager.seek();
174 entry = logManager.readNextAndSetPosition();
175 assertEquals(oldPos, logManager.getPosition());
176 assertNull(entry);
177 logManager.finishCurrentFile();
178
179 path = pathWatcher.currentPath;
180
181 for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
182 log.rollWriter();
183 logManager.openReader(path);
184 logManager.seek();
185 for (int i = 0; i < nbRows; i++) {
186 WAL.Entry e = logManager.readNextAndSetPosition();
187 if (e == null) {
188 fail("Should have enough entries");
189 }
190 }
191 }
192
193 private void appendToLog() throws IOException {
194 appendToLogPlus(1);
195 }
196
197 private void appendToLogPlus(int count) throws IOException {
198 final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
199 System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null);
200 log.sync(txid);
201 }
202
203 private WALEdit getWALEdits(int count) {
204 WALEdit edit = new WALEdit();
205 for (int i = 0; i < count; i++) {
206 edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
207 System.currentTimeMillis(), qualifier));
208 }
209 return edit;
210 }
211
212 class PathWatcher extends WALActionsListener.Base {
213
214 Path currentPath;
215
216 @Override
217 public void preLogRoll(Path oldPath, Path newPath) throws IOException {
218 currentPath = newPath;
219 }
220 }
221 }