1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.ipc;
19
20 import java.io.DataInput;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.nio.BufferOverflowException;
25 import java.nio.ByteBuffer;
26
27 import org.apache.commons.io.IOUtils;
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configurable;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.CellScanner;
34 import org.apache.hadoop.hbase.DoNotRetryIOException;
35 import org.apache.hadoop.hbase.HBaseIOException;
36 import org.apache.hadoop.hbase.codec.Codec;
37 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
38 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
39 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
40 import org.apache.hadoop.hbase.io.HeapSize;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.util.ClassSize;
43 import org.apache.hadoop.io.compress.CodecPool;
44 import org.apache.hadoop.io.compress.CompressionCodec;
45 import org.apache.hadoop.io.compress.CompressionInputStream;
46 import org.apache.hadoop.io.compress.Compressor;
47 import org.apache.hadoop.io.compress.Decompressor;
48
49 import com.google.common.base.Preconditions;
50 import com.google.protobuf.CodedOutputStream;
51 import com.google.protobuf.Message;
52
53
54
55
56 @InterfaceAudience.Private
57 public class IPCUtil {
58 public static final Log LOG = LogFactory.getLog(IPCUtil.class);
59
60
61
62 private final int cellBlockDecompressionMultiplier;
63 private final int cellBlockBuildingInitialBufferSize;
64 private final Configuration conf;
65
66 public IPCUtil(final Configuration conf) {
67 super();
68 this.conf = conf;
69 this.cellBlockDecompressionMultiplier =
70 conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
71
72
73
74 this.cellBlockBuildingInitialBufferSize =
75 ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
76 }
77
78
79
80
81 public static class CellScannerButNoCodecException extends HBaseIOException {};
82
83
84
85
86
87
88
89
90
91
92
93
94 @SuppressWarnings("resource")
95 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
96 final CellScanner cellScanner)
97 throws IOException {
98 return buildCellBlock(codec, compressor, cellScanner, null);
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 @SuppressWarnings("resource")
117 public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
118 final CellScanner cellScanner, final BoundedByteBufferPool pool)
119 throws IOException {
120 if (cellScanner == null) return null;
121 if (codec == null) throw new CellScannerButNoCodecException();
122 int bufferSize = this.cellBlockBuildingInitialBufferSize;
123 ByteBufferOutputStream baos = null;
124 if (pool != null) {
125 ByteBuffer bb = pool.getBuffer();
126 bufferSize = bb.capacity();
127 baos = new ByteBufferOutputStream(bb);
128 } else {
129
130 if (cellScanner instanceof HeapSize) {
131 long longSize = ((HeapSize)cellScanner).heapSize();
132
133 if (longSize > Integer.MAX_VALUE) {
134 throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
135 }
136 bufferSize = ClassSize.align((int)longSize);
137 }
138 baos = new ByteBufferOutputStream(bufferSize);
139 }
140 OutputStream os = baos;
141 Compressor poolCompressor = null;
142 try {
143 if (compressor != null) {
144 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
145 poolCompressor = CodecPool.getCompressor(compressor);
146 os = compressor.createOutputStream(os, poolCompressor);
147 }
148 Codec.Encoder encoder = codec.getEncoder(os);
149 int count = 0;
150 while (cellScanner.advance()) {
151 encoder.write(cellScanner.current());
152 count++;
153 }
154 encoder.flush();
155
156
157 if (count == 0) return null;
158 } catch (BufferOverflowException e) {
159 throw new DoNotRetryIOException(e);
160 } finally {
161 os.close();
162 if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
163 }
164 if (LOG.isTraceEnabled()) {
165 if (bufferSize < baos.size()) {
166 LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
167 "; up hbase.ipc.cellblock.building.initial.buffersize?");
168 }
169 }
170 return baos.getByteBuffer();
171 }
172
173
174
175
176
177
178
179 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
180 final byte [] cellBlock)
181 throws IOException {
182 return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
183 }
184
185
186
187
188
189
190
191
192 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
193 final ByteBuffer cellBlock)
194 throws IOException {
195
196
197 InputStream is = null;
198 if (compressor != null) {
199
200 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
201 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
202 CompressionInputStream cis =
203 compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
204 ByteBufferOutputStream bbos = null;
205 try {
206
207
208 bbos = new ByteBufferOutputStream(cellBlock.remaining() *
209 this.cellBlockDecompressionMultiplier);
210 IOUtils.copy(cis, bbos);
211 bbos.close();
212 ByteBuffer bb = bbos.getByteBuffer();
213 is = new ByteBufferInputStream(bb);
214 } finally {
215 if (is != null) is.close();
216 if (bbos != null) bbos.close();
217
218 CodecPool.returnDecompressor(poolDecompressor);
219 }
220 } else {
221 is = new ByteBufferInputStream(cellBlock);
222 }
223 return codec.getDecoder(is);
224 }
225
226
227
228
229
230
231
232 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
233 if (m == null) return null;
234 int serializedSize = m.getSerializedSize();
235 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
236 byte [] buffer = new byte[serializedSize + vintSize];
237
238 CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
239
240 cos.writeMessageNoTag(m);
241 cos.flush();
242 cos.checkNoSpaceLeft();
243 return ByteBuffer.wrap(buffer);
244 }
245
246
247
248
249
250
251
252
253
254
255 public static int write(final OutputStream dos, final Message header, final Message param,
256 final ByteBuffer cellBlock)
257 throws IOException {
258
259
260
261 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
262 if (cellBlock != null) totalSize += cellBlock.remaining();
263 return write(dos, header, param, cellBlock, totalSize);
264 }
265
266 private static int write(final OutputStream dos, final Message header, final Message param,
267 final ByteBuffer cellBlock, final int totalSize)
268 throws IOException {
269
270 dos.write(Bytes.toBytes(totalSize));
271
272 header.writeDelimitedTo(dos);
273 if (param != null) param.writeDelimitedTo(dos);
274 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
275 dos.flush();
276 return totalSize;
277 }
278
279
280
281
282
283
284
285
286
287 public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
288 throws IOException {
289 int maxRead = 8192;
290
291 for (; offset < len; offset += maxRead) {
292 in.readFully(dest, offset, Math.min(len - offset, maxRead));
293 }
294 }
295
296
297
298
299 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
300 int totalSize = 0;
301 for (Message m: messages) {
302 if (m == null) continue;
303 totalSize += m.getSerializedSize();
304 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
305 }
306 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
307 return totalSize;
308 }
309 }