1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.TreeMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FSDataOutputStream;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35 import org.apache.hadoop.hbase.util.FSUtils;
36 import org.apache.hadoop.hbase.wal.WAL;
37 import org.apache.hadoop.hbase.wal.WALProvider;
38 import org.apache.hadoop.io.SequenceFile;
39 import org.apache.hadoop.io.SequenceFile.CompressionType;
40 import org.apache.hadoop.io.SequenceFile.Metadata;
41 import org.apache.hadoop.io.Text;
42 import org.apache.hadoop.io.compress.CompressionCodec;
43 import org.apache.hadoop.io.compress.DefaultCodec;
44
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
55 public class SequenceFileLogWriter extends WriterBase {
56 private final Log LOG = LogFactory.getLog(this.getClass());
57
58 private SequenceFile.Writer writer;
59
60
61 private FSDataOutputStream writer_out;
62
63
64 private static final Text WAL_VERSION_KEY = new Text("version");
65 private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type");
66 private static final Text DICTIONARY_COMPRESSION_TYPE = new Text("dictionary");
67
68
69
70
71 public SequenceFileLogWriter() {
72 super();
73 }
74
75
76
77
78
79
80
81 private static Metadata createMetadata(final Configuration conf,
82 final boolean compress) {
83 TreeMap<Text, Text> metaMap = new TreeMap<Text, Text>();
84 metaMap.put(WAL_VERSION_KEY, new Text("1"));
85 if (compress) {
86
87 metaMap.put(WAL_COMPRESSION_TYPE_KEY, DICTIONARY_COMPRESSION_TYPE);
88 }
89 return new Metadata(metaMap);
90 }
91
92 @Override
93 public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
94 throws IOException {
95 super.init(fs, path, conf, overwritable);
96 boolean compress = initializeCompressionContext(conf, path);
97
98 try {
99
100
101 this.writer = (SequenceFile.Writer) SequenceFile.class
102 .getMethod("createWriter", new Class[] {FileSystem.class,
103 Configuration.class, Path.class, Class.class, Class.class,
104 Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
105 CompressionType.class, CompressionCodec.class, Metadata.class})
106 .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
107 Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
108 Short.valueOf((short)
109 conf.getInt("hbase.regionserver.hlog.replication",
110 FSUtils.getDefaultReplication(fs, path))),
111 Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
112 FSUtils.getDefaultBlockSize(fs, path))),
113 Boolean.valueOf(false)
114 SequenceFile.CompressionType.NONE, new DefaultCodec(),
115 createMetadata(conf, compress)
116 });
117 } catch (InvocationTargetException ite) {
118
119 throw new IOException(ite.getCause());
120 } catch (Exception e) {
121
122 }
123
124
125 if (this.writer == null) {
126 LOG.debug("new createWriter -- HADOOP-6840 -- not available");
127 this.writer = SequenceFile.createWriter(fs, conf, path,
128 HLogKey.class, WALEdit.class,
129 FSUtils.getDefaultBufferSize(fs),
130 (short) conf.getInt("hbase.regionserver.hlog.replication",
131 FSUtils.getDefaultReplication(fs, path)),
132 conf.getLong("hbase.regionserver.hlog.blocksize",
133 FSUtils.getDefaultBlockSize(fs, path)),
134 SequenceFile.CompressionType.NONE,
135 new DefaultCodec(),
136 null,
137 createMetadata(conf, compress));
138 } else {
139 if (LOG.isTraceEnabled()) LOG.trace("Using new createWriter -- HADOOP-6840");
140 }
141
142 this.writer_out = getSequenceFilePrivateFSDataOutputStreamAccessible();
143 if (LOG.isTraceEnabled()) LOG.trace("Path=" + path + ", compression=" + compress);
144 }
145
146
147
148 private FSDataOutputStream getSequenceFilePrivateFSDataOutputStreamAccessible()
149 throws IOException {
150 FSDataOutputStream out = null;
151 final Field fields [] = this.writer.getClass().getDeclaredFields();
152 final String fieldName = "out";
153 for (int i = 0; i < fields.length; ++i) {
154 if (fieldName.equals(fields[i].getName())) {
155 try {
156
157 fields[i].setAccessible(true);
158 out = (FSDataOutputStream)fields[i].get(this.writer);
159 break;
160 } catch (IllegalAccessException ex) {
161 throw new IOException("Accessing " + fieldName, ex);
162 } catch (SecurityException e) {
163 LOG.warn("Does not have access to out field from FSDataOutputStream",
164 e);
165 }
166 }
167 }
168 return out;
169 }
170
171 @Override
172 public void append(WAL.Entry entry) throws IOException {
173 entry.setCompressionContext(compressionContext);
174 try {
175 this.writer.append(entry.getKey(), entry.getEdit());
176 } catch (NullPointerException npe) {
177
178 throw new IOException(npe);
179 }
180 }
181
182 @Override
183 public void close() throws IOException {
184 if (this.writer != null) {
185 try {
186 this.writer.close();
187 } catch (NullPointerException npe) {
188
189 LOG.warn(npe);
190 }
191 this.writer = null;
192 }
193 }
194
195 @Override
196 public void sync() throws IOException {
197 try {
198 this.writer.syncFs();
199 } catch (NullPointerException npe) {
200
201 throw new IOException(npe);
202 }
203 }
204
205 @Override
206 public long getLength() throws IOException {
207 try {
208 return this.writer.getLength();
209 } catch (NullPointerException npe) {
210
211 throw new IOException(npe);
212 }
213 }
214
215
216
217
218
219 public FSDataOutputStream getWriterFSDataOutputStream() {
220 return this.writer_out;
221 }
222 }