1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import static org.junit.Assert.assertFalse;
21
22 import java.io.IOException;
23 import java.util.concurrent.ThreadLocalRandom;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.testclassification.SmallTests;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.util.FSTableDescriptors;
39 import org.apache.hadoop.hbase.util.FSUtils;
40 import org.apache.hadoop.hbase.util.Threads;
41 import org.apache.hadoop.hbase.wal.WAL;
42 import org.apache.hadoop.hbase.wal.WALFactory;
43 import org.apache.hadoop.hbase.wal.WALKey;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47
48
49
50 @Category(SmallTests.class)
51 public class TestLogRollingNoCluster {
52 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
53 private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
54 private static final int NUM_THREADS = 100;
55 private static final int NUM_ENTRIES = 100;
56
57
58 public static class HighLatencySyncWriter extends ProtobufLogWriter {
59 @Override
60 public void sync() throws IOException {
61 Threads.sleep(ThreadLocalRandom.current().nextInt(10));
62 super.sync();
63 Threads.sleep(ThreadLocalRandom.current().nextInt(10));
64 }
65 }
66
67
68
69
70
71
72
73 @Test
74 public void testContendedLogRolling() throws Exception {
75 TEST_UTIL.startMiniDFSCluster(3);
76 Path dir = TEST_UTIL.getDataTestDirOnTestFS();
77
78 TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, NUM_THREADS);
79 final Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
80 FSUtils.setRootDir(conf, dir);
81 conf.set("hbase.regionserver.hlog.writer.impl", HighLatencySyncWriter.class.getName());
82 final WALFactory wals = new WALFactory(conf, null, TestLogRollingNoCluster.class.getName());
83 final WAL wal = wals.getWAL(new byte[]{});
84
85 Appender [] appenders = null;
86
87 final int numThreads = NUM_THREADS;
88 appenders = new Appender[numThreads];
89 try {
90 for (int i = 0; i < numThreads; i++) {
91
92 appenders[i] = new Appender(wal, i, NUM_ENTRIES);
93 }
94 for (int i = 0; i < numThreads; i++) {
95 appenders[i].start();
96 }
97 for (int i = 0; i < numThreads; i++) {
98
99 appenders[i].join();
100 }
101 } finally {
102 wals.close();
103 }
104 for (int i = 0; i < numThreads; i++) {
105 assertFalse(appenders[i].isException());
106 }
107 TEST_UTIL.shutdownMiniDFSCluster();
108 }
109
110
111
112
113 static class Appender extends Thread {
114 private final Log log;
115 private final WAL wal;
116 private final int count;
117 private Exception e = null;
118
119 Appender(final WAL wal, final int index, final int count) {
120 super("" + index);
121 this.wal = wal;
122 this.count = count;
123 this.log = LogFactory.getLog("Appender:" + getName());
124 }
125
126
127
128
129 boolean isException() {
130 return !isAlive() && this.e != null;
131 }
132
133 Exception getException() {
134 return this.e;
135 }
136
137 @Override
138 public void run() {
139 this.log.info(getName() +" started");
140 final AtomicLong sequenceId = new AtomicLong(1);
141 try {
142 for (int i = 0; i < this.count; i++) {
143 long now = System.currentTimeMillis();
144
145 if (i % 10 == 0) {
146 this.wal.rollWriter();
147 }
148 WALEdit edit = new WALEdit();
149 byte[] bytes = Bytes.toBytes(i);
150 edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
151 final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
152 final FSTableDescriptors fts = new FSTableDescriptors(TEST_UTIL.getConfiguration());
153 final HTableDescriptor htd = fts.get(TableName.META_TABLE_NAME);
154 final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
155 TableName.META_TABLE_NAME, now), edit, sequenceId, true, null);
156 Threads.sleep(ThreadLocalRandom.current().nextInt(5));
157 wal.sync(txid);
158 }
159 String msg = getName() + " finished";
160 if (isException())
161 this.log.info(msg, getException());
162 else
163 this.log.info(msg);
164 } catch (Exception e) {
165 this.e = e;
166 log.info("Caught exception from Appender:" + getName(), e);
167 } finally {
168
169 try {
170 this.wal.sync();
171 } catch (IOException e) {
172 throw new RuntimeException(e);
173 }
174 }
175 }
176 }
177
178
179
180
181 }