1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.atomic.AtomicLong;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.client.Admin;
35 import org.apache.hadoop.hbase.client.HBaseAdmin;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
39 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
40 import org.apache.hadoop.hbase.regionserver.HRegion;
41 import org.apache.hadoop.hbase.regionserver.HRegionServer;
42 import org.apache.hadoop.hbase.regionserver.HStore;
43 import org.apache.hadoop.hbase.regionserver.Region;
44 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
45 import org.apache.hadoop.hbase.regionserver.Store;
46 import org.apache.hadoop.hbase.regionserver.StoreFile;
47 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
48 import org.apache.hadoop.hbase.testclassification.MediumTests;
49 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
50 import org.apache.hadoop.hbase.wal.WAL;
51 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
52 import org.apache.hadoop.hbase.security.User;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57
58 import com.google.common.collect.Lists;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @Category(MediumTests.class)
79 public class TestIOFencing {
80 static final Log LOG = LogFactory.getLog(TestIOFencing.class);
81 static {
82
83
84
85
86
87
88
89
90 }
91
92 public abstract static class CompactionBlockerRegion extends HRegion {
93 volatile int compactCount = 0;
94 volatile CountDownLatch compactionsBlocked = new CountDownLatch(0);
95 volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
96
97 @SuppressWarnings("deprecation")
98 public CompactionBlockerRegion(Path tableDir, WAL log,
99 FileSystem fs, Configuration confParam, HRegionInfo info,
100 HTableDescriptor htd, RegionServerServices rsServices) {
101 super(tableDir, log, fs, confParam, info, htd, rsServices);
102 }
103
104 public void stopCompactions() {
105 compactionsBlocked = new CountDownLatch(1);
106 compactionsWaiting = new CountDownLatch(1);
107 }
108
109 public void allowCompactions() {
110 LOG.debug("allowing compactions");
111 compactionsBlocked.countDown();
112 }
113 public void waitForCompactionToBlock() throws IOException {
114 try {
115 LOG.debug("waiting for compaction to block");
116 compactionsWaiting.await();
117 LOG.debug("compaction block reached");
118 } catch (InterruptedException ex) {
119 throw new IOException(ex);
120 }
121 }
122
123 @Override
124 public boolean compact(CompactionContext compaction, Store store,
125 CompactionThroughputController throughputController) throws IOException {
126 try {
127 return super.compact(compaction, store, throughputController);
128 } finally {
129 compactCount++;
130 }
131 }
132
133 @Override
134 public boolean compact(CompactionContext compaction, Store store,
135 CompactionThroughputController throughputController, User user) throws IOException {
136 try {
137 return super.compact(compaction, store, throughputController, user);
138 } finally {
139 compactCount++;
140 }
141 }
142
143 public int countStoreFiles() {
144 int count = 0;
145 for (Store store : stores.values()) {
146 count += store.getStorefilesCount();
147 }
148 return count;
149 }
150 }
151
152
153
154
155
156 public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
157
158 public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
159 FileSystem fs, Configuration confParam, HRegionInfo info,
160 HTableDescriptor htd, RegionServerServices rsServices) {
161 super(tableDir, log, fs, confParam, info, htd, rsServices);
162 }
163 @Override
164 protected void doRegionCompactionPrep() throws IOException {
165 compactionsWaiting.countDown();
166 try {
167 compactionsBlocked.await();
168 } catch (InterruptedException ex) {
169 throw new IOException();
170 }
171 super.doRegionCompactionPrep();
172 }
173 }
174
175
176
177
178
179
180 public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
181 public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
182 FileSystem fs, Configuration confParam, HRegionInfo info,
183 HTableDescriptor htd, RegionServerServices rsServices) {
184 super(tableDir, log, fs, confParam, info, htd, rsServices);
185 }
186 @Override
187 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
188 return new BlockCompactionsInCompletionHStore(this, family, this.conf);
189 }
190 }
191
192 public static class BlockCompactionsInCompletionHStore extends HStore {
193 CompactionBlockerRegion r;
194 protected BlockCompactionsInCompletionHStore(HRegion region, HColumnDescriptor family,
195 Configuration confParam) throws IOException {
196 super(region, family, confParam);
197 r = (CompactionBlockerRegion) region;
198 }
199
200 @Override
201 protected void completeCompaction(final Collection<StoreFile> compactedFiles,
202 boolean removeFiles) throws IOException {
203 try {
204 r.compactionsWaiting.countDown();
205 r.compactionsBlocked.await();
206 } catch (InterruptedException ex) {
207 throw new IOException(ex);
208 }
209 super.completeCompaction(compactedFiles, removeFiles);
210 }
211 @Override
212 protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
213 try {
214 r.compactionsWaiting.countDown();
215 r.compactionsBlocked.await();
216 } catch (InterruptedException ex) {
217 throw new IOException(ex);
218 }
219 super.completeCompaction(compactedFiles);
220 }
221 }
222
223 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
224 private final static TableName TABLE_NAME =
225 TableName.valueOf("tabletest");
226 private final static byte[] FAMILY = Bytes.toBytes("family");
227 private static final int FIRST_BATCH_COUNT = 4000;
228 private static final int SECOND_BATCH_COUNT = FIRST_BATCH_COUNT;
229
230
231
232
233
234
235
236 @Test
237 public void testFencingAroundCompaction() throws Exception {
238 doTest(BlockCompactionsInPrepRegion.class, false);
239 doTest(BlockCompactionsInPrepRegion.class, true);
240 }
241
242
243
244
245
246
247
248 @Test
249 public void testFencingAroundCompactionAfterWALSync() throws Exception {
250 doTest(BlockCompactionsInCompletionRegion.class, false);
251 doTest(BlockCompactionsInCompletionRegion.class, true);
252 }
253
254 public void doTest(Class<?> regionClass, boolean distributedLogReplay) throws Exception {
255 Configuration c = TEST_UTIL.getConfiguration();
256 c.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, distributedLogReplay);
257
258 c.setClass(HConstants.REGION_IMPL, regionClass, HRegion.class);
259 c.setBoolean("dfs.support.append", true);
260
261 c.setLong("hbase.hregion.memstore.flush.size", 200000);
262 c.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, ConstantSizeRegionSplitPolicy.class.getName());
263
264 c.setInt("hbase.hstore.compactionThreshold", 1000);
265 c.setLong("hbase.hstore.blockingStoreFiles", 1000);
266
267 c.setInt("hbase.regionserver.thread.splitcompactcheckfrequency", 1000);
268 LOG.info("Starting mini cluster");
269 TEST_UTIL.startMiniCluster(1);
270 CompactionBlockerRegion compactingRegion = null;
271 Admin admin = null;
272 try {
273 LOG.info("Creating admin");
274 admin = TEST_UTIL.getConnection().getAdmin();
275 LOG.info("Creating table");
276 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
277 Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
278 LOG.info("Loading test table");
279
280 List<HRegion> testRegions = TEST_UTIL.getMiniHBaseCluster().findRegionsForTable(TABLE_NAME);
281 assertEquals(1, testRegions.size());
282 compactingRegion = (CompactionBlockerRegion)testRegions.get(0);
283 LOG.info("Blocking compactions");
284 compactingRegion.stopCompactions();
285 long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores();
286
287 TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT);
288
289
290
291 HRegionInfo oldHri = new HRegionInfo(table.getName(),
292 HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
293 CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
294 FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
295 new Path("store_dir"));
296 WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
297 oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
298
299
300 long startWaitTime = System.currentTimeMillis();
301 while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime ||
302 compactingRegion.countStoreFiles() <= 1) {
303 LOG.info("Waiting for the region to flush " +
304 compactingRegion.getRegionInfo().getRegionNameAsString());
305 Thread.sleep(1000);
306 assertTrue("Timed out waiting for the region to flush",
307 System.currentTimeMillis() - startWaitTime < 30000);
308 }
309 assertTrue(compactingRegion.countStoreFiles() > 1);
310 final byte REGION_NAME[] = compactingRegion.getRegionInfo().getRegionName();
311 LOG.info("Asking for compaction");
312 ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
313 LOG.info("Waiting for compaction to be about to start");
314 compactingRegion.waitForCompactionToBlock();
315 LOG.info("Starting a new server");
316 RegionServerThread newServerThread = TEST_UTIL.getMiniHBaseCluster().startRegionServer();
317 final HRegionServer newServer = newServerThread.getRegionServer();
318 LOG.info("Killing region server ZK lease");
319 TEST_UTIL.expireRegionServerSession(0);
320 CompactionBlockerRegion newRegion = null;
321 startWaitTime = System.currentTimeMillis();
322 LOG.info("Waiting for the new server to pick up the region " + Bytes.toString(REGION_NAME));
323
324
325 Waiter.waitFor(c, 60000, new Waiter.Predicate<Exception>() {
326 @Override
327 public boolean evaluate() throws Exception {
328 Region newRegion = newServer.getOnlineRegion(REGION_NAME);
329 return newRegion != null && !newRegion.isRecovering();
330 }
331 });
332
333 newRegion = (CompactionBlockerRegion)newServer.getOnlineRegion(REGION_NAME);
334
335 LOG.info("Allowing compaction to proceed");
336 compactingRegion.allowCompactions();
337 while (compactingRegion.compactCount == 0) {
338 Thread.sleep(1000);
339 }
340
341
342 LOG.info("Compaction finished");
343
344
345 FileSystem fs = newRegion.getFilesystem();
346 for (String f: newRegion.getStoreFileList(new byte [][] {FAMILY})) {
347 assertTrue("After compaction, does not exist: " + f, fs.exists(new Path(f)));
348 }
349
350
351 TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
352 ((HBaseAdmin)admin).majorCompact(TABLE_NAME.getName());
353 startWaitTime = System.currentTimeMillis();
354 while (newRegion.compactCount == 0) {
355 Thread.sleep(1000);
356 assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
357 }
358 assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
359 } finally {
360 if (compactingRegion != null) {
361 compactingRegion.allowCompactions();
362 }
363 admin.close();
364 TEST_UTIL.shutdownMiniCluster();
365 }
366 }
367 }