1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.KeyValueUtil;
34 import org.apache.hadoop.hbase.Tag;
35 import org.apache.hadoop.hbase.TagType;
36 import org.apache.hadoop.hbase.client.BufferedMutator;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.io.crypto.Encryption;
39 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
40 import org.apache.hadoop.hbase.mob.MobConstants;
41 import org.apache.hadoop.hbase.mob.MobUtils;
42 import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
43 import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
44 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
45 import org.apache.hadoop.hbase.regionserver.MemStore;
46 import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
47 import org.apache.hadoop.hbase.regionserver.StoreFile;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.mapreduce.Reducer.Context;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 @InterfaceAudience.Private
65 public class MemStoreWrapper {
66
67 private static final Log LOG = LogFactory.getLog(MemStoreWrapper.class);
68
69 private MemStore memstore;
70 private long flushSize;
71 private CompactionPartitionId partitionId;
72 private Context context;
73 private Configuration conf;
74 private BufferedMutator table;
75 private HColumnDescriptor hcd;
76 private Path mobFamilyDir;
77 private FileSystem fs;
78 private CacheConfig cacheConfig;
79 private Encryption.Context cryptoContext = Encryption.Context.NONE;
80
81 public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table,
82 HColumnDescriptor hcd, MemStore memstore, CacheConfig cacheConfig) throws IOException {
83 this.memstore = memstore;
84 this.context = context;
85 this.fs = fs;
86 this.table = table;
87 this.hcd = hcd;
88 this.conf = context.getConfiguration();
89 this.cacheConfig = cacheConfig;
90 flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
91 MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
92 mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
93 cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
94 }
95
96 public void setPartitionId(CompactionPartitionId partitionId) {
97 this.partitionId = partitionId;
98 }
99
100
101
102
103
104 private void flushMemStoreIfNecessary() throws IOException {
105 if (memstore.heapSize() >= flushSize) {
106 flushMemStore();
107 }
108 }
109
110
111
112
113
114 public void flushMemStore() throws IOException {
115 MemStoreSnapshot snapshot = memstore.snapshot();
116 internalFlushCache(snapshot);
117 memstore.clearSnapshot(snapshot.getId());
118 }
119
120
121
122
123
124
125
126 private void internalFlushCache(final MemStoreSnapshot snapshot)
127 throws IOException {
128 if (snapshot.getCellsCount() == 0) {
129 return;
130 }
131
132 String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
133 StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
134 new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
135 partitionId.getStartKey(), cacheConfig, cryptoContext);
136
137 String relativePath = mobFileWriter.getPath().getName();
138 LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
139
140 byte[] referenceValue = Bytes.toBytes(relativePath);
141 KeyValueScanner scanner = snapshot.getScanner();
142 Cell cell = null;
143 while (null != (cell = scanner.next())) {
144 mobFileWriter.append(cell);
145 }
146 scanner.close();
147
148
149 mobFileWriter.appendMetadata(Long.MAX_VALUE, false, snapshot.getCellsCount());
150 mobFileWriter.close();
151
152 MobUtils.commitFile(conf, fs, mobFileWriter.getPath(), mobFamilyDir, cacheConfig);
153 context.getCounter(SweepCounter.FILE_AFTER_MERGE_OR_CLEAN).increment(1);
154
155 scanner = snapshot.getScanner();
156 scanner.seek(KeyValueUtil.createFirstOnRow(HConstants.EMPTY_START_ROW));
157 cell = null;
158 Tag tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, Bytes.toBytes(this.table.getName()
159 .toString()));
160 long updatedCount = 0;
161 while (null != (cell = scanner.next())) {
162 Cell reference = MobUtils.createMobRefCell(cell, referenceValue, tableNameTag);
163 Put put =
164 new Put(reference.getRowArray(), reference.getRowOffset(), reference.getRowLength());
165 put.add(reference);
166 table.mutate(put);
167 updatedCount++;
168 }
169 table.flush();
170 context.getCounter(SweepCounter.RECORDS_UPDATED).increment(updatedCount);
171 scanner.close();
172 }
173
174
175
176
177
178
179 public void addToMemstore(KeyValue kv) throws IOException {
180 memstore.add(kv);
181
182 flushMemStoreIfNecessary();
183 }
184
185 }