1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.IOException;
22 import java.util.Collection;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.util.FSUtils;
33 import org.apache.hadoop.hbase.wal.WAL.Entry;
34
35 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.DEFAULT_PROVIDER_ID;
36 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.META_WAL_PROVIDER_ID;
37 import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER;
38
39
40
41 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
42 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
43 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 @InterfaceAudience.Private
73 public class IOTestProvider implements WALProvider {
74 private static final Log LOG = LogFactory.getLog(IOTestProvider.class);
75
76 private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations";
77 private enum AllowedOperations {
78 all,
79 append,
80 sync,
81 fileroll,
82 none;
83 }
84
85 private FSHLog log = null;
86
87
88
89
90
91
92
93
94 @Override
95 public void init(final WALFactory factory, final Configuration conf,
96 final List<WALActionsListener> listeners, String providerId) throws IOException {
97 if (null != log) {
98 throw new IllegalStateException("WALProvider.init should only be called once.");
99 }
100 if (null == providerId) {
101 providerId = DEFAULT_PROVIDER_ID;
102 }
103 final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
104 log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
105 DefaultWALProvider.getWALDirectoryName(factory.factoryId),
106 HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
107 true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
108 }
109
110 @Override
111 public WAL getWAL(final byte[] identifier) throws IOException {
112 return log;
113 }
114
115 @Override
116 public void close() throws IOException {
117 log.close();
118 }
119
120 @Override
121 public void shutdown() throws IOException {
122 log.shutdown();
123 }
124
125 private static class IOTestWAL extends FSHLog {
126
127 private final boolean doFileRolls;
128
129
130 private final boolean initialized;
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 public IOTestWAL(final FileSystem fs, final Path rootDir, final String logDir,
157 final String archiveDir, final Configuration conf,
158 final List<WALActionsListener> listeners,
159 final boolean failIfWALExists, final String prefix, final String suffix)
160 throws IOException {
161 super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
162 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
163 doFileRolls = operations.isEmpty() || operations.contains(AllowedOperations.all.name()) ||
164 operations.contains(AllowedOperations.fileroll.name());
165 initialized = true;
166 LOG.info("Initialized with file rolling " + (doFileRolls ? "enabled" : "disabled"));
167 }
168
169 private Writer noRollsWriter;
170
171
172
173 @Override
174 protected Writer createWriterInstance(final Path path) throws IOException {
175
176
177
178 if (!initialized || doFileRolls) {
179 LOG.info("creating new writer instance.");
180 final ProtobufLogWriter writer = new IOTestWriter();
181 writer.init(fs, path, conf, false);
182 if (!initialized) {
183 LOG.info("storing initial writer instance in case file rolling isn't allowed.");
184 noRollsWriter = writer;
185 }
186 return writer;
187 } else {
188 LOG.info("WAL rolling disabled, returning the first writer.");
189
190
191 return noRollsWriter;
192 }
193 }
194 }
195
196
197
198
199 private static class IOTestWriter extends ProtobufLogWriter {
200 private boolean doAppends;
201 private boolean doSyncs;
202
203 @Override
204 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
205 Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
206 if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
207 doAppends = doSyncs = true;
208 } else if (operations.contains(AllowedOperations.none.name())) {
209 doAppends = doSyncs = false;
210 } else {
211 doAppends = operations.contains(AllowedOperations.append.name());
212 doSyncs = operations.contains(AllowedOperations.sync.name());
213 }
214 LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
215 " and syncs " + (doSyncs ? "enabled" : "disabled"));
216 super.init(fs, path, conf, overwritable);
217 }
218
219 @Override
220 public void append(Entry entry) throws IOException {
221 if (doAppends) {
222 super.append(entry);
223 }
224 }
225
226 @Override
227 public void sync() throws IOException {
228 if (doSyncs) {
229 super.sync();
230 }
231 }
232 }
233 }