1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.Tag;
35 import org.apache.hadoop.hbase.TagType;
36 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
37 import org.apache.hadoop.hbase.regionserver.*;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.util.StringUtils;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
58
59 private static final Log LOG = LogFactory.getLog(DefaultMobStoreFlusher.class);
60 private final Object flushLock = new Object();
61 private long mobCellValueSizeThreshold = 0;
62 private Path targetPath;
63 private HMobStore mobStore;
64
65 public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOException {
66 super(conf, store);
67 mobCellValueSizeThreshold = store.getFamily().getMobThreshold();
68 this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(),
69 store.getColumnFamilyName());
70 if (!this.store.getFileSystem().exists(targetPath)) {
71 this.store.getFileSystem().mkdirs(targetPath);
72 }
73 this.mobStore = (HMobStore) store;
74 }
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Override
91 public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
92 MonitoredTask status) throws IOException {
93 ArrayList<Path> result = new ArrayList<Path>();
94 int cellsCount = snapshot.getCellsCount();
95 if (cellsCount == 0) return result;
96
97
98 long smallestReadPoint = store.getSmallestReadPoint();
99 InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
100 if (scanner == null) {
101 return result;
102 }
103 StoreFile.Writer writer;
104 try {
105
106
107 synchronized (flushLock) {
108 status.setStatus("Flushing " + store + ": creating writer");
109
110 writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
111 false, true, true);
112 writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
113 try {
114
115
116 performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
117 } finally {
118 finalizeWriter(writer, cacheFlushId, status);
119 }
120 }
121 } finally {
122 scanner.close();
123 }
124 LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
125 + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
126 ", hasBloomFilter=" + writer.hasGeneralBloom() +
127 ", into tmp file " + writer.getPath());
128 result.add(writer.getPath());
129 return result;
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
150 InternalScanner scanner, StoreFile.Writer writer, MonitoredTask status) throws IOException {
151 StoreFile.Writer mobFileWriter = null;
152 int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
153 HConstants.COMPACTION_KV_MAX_DEFAULT);
154 long mobCount = 0;
155 long mobSize = 0;
156 long time = snapshot.getTimeRangeTracker().getMaximumTimestamp();
157 mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
158 store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
159
160
161 byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
162 try {
163 Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, store.getTableName()
164 .getName());
165 List<Cell> cells = new ArrayList<Cell>();
166 boolean hasMore;
167 ScannerContext scannerContext =
168 ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
169
170 do {
171 hasMore = scanner.next(cells, scannerContext);
172 if (!cells.isEmpty()) {
173 for (Cell c : cells) {
174
175
176
177 if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c)
178 || c.getTypeByte() != KeyValue.Type.Put.getCode()) {
179 writer.append(c);
180 } else {
181
182 mobFileWriter.append(c);
183 mobSize += c.getValueLength();
184 mobCount++;
185
186
187
188 Cell reference = MobUtils.createMobRefCell(c, fileName, tableNameTag);
189 writer.append(reference);
190 }
191 }
192 cells.clear();
193 }
194 } while (hasMore);
195 } finally {
196 status.setStatus("Flushing mob file " + store + ": appending metadata");
197 mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
198 status.setStatus("Flushing mob file " + store + ": closing flushed file");
199 mobFileWriter.close();
200 }
201
202 if (mobCount > 0) {
203
204
205
206
207 mobStore.commitFile(mobFileWriter.getPath(), targetPath);
208 mobStore.updateMobFlushCount();
209 mobStore.updateMobFlushedCellsCount(mobCount);
210 mobStore.updateMobFlushedCellsSize(mobSize);
211 } else {
212 try {
213
214 store.getFileSystem().delete(mobFileWriter.getPath(), true);
215 } catch (IOException e) {
216 LOG.error("Failed to delete the temp mob file", e);
217 }
218 }
219 }
220 }