1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.Cell;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.Tag;
30 import org.apache.hadoop.hbase.TagType;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
34 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
35 import org.apache.hadoop.hbase.util.Base64;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.io.Text;
39 import org.apache.hadoop.mapreduce.Counter;
40 import org.apache.hadoop.mapreduce.Mapper;
41
42
43
44
45 @InterfaceAudience.Public
46 @InterfaceStability.Stable
47 public class TsvImporterMapper
48 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
49 {
50
51
52 protected long ts;
53
54
55 private String separator;
56
57
58 private boolean skipBadLines;
59 private Counter badLineCount;
60
61 protected ImportTsv.TsvParser parser;
62
63 protected Configuration conf;
64
65 protected String cellVisibilityExpr;
66
67 protected long ttl;
68
69 protected CellCreator kvCreator;
70
71 private String hfileOutPath;
72
73 public long getTs() {
74 return ts;
75 }
76
77 public boolean getSkipBadLines() {
78 return skipBadLines;
79 }
80
81 public Counter getBadLineCount() {
82 return badLineCount;
83 }
84
85 public void incrementBadLineCount(int count) {
86 this.badLineCount.increment(count);
87 }
88
89
90
91
92
93
94
95
96
97 @Override
98 protected void setup(Context context) {
99 doSetup(context);
100
101 conf = context.getConfiguration();
102 parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
103 separator);
104 if (parser.getRowKeyColumnIndex() == -1) {
105 throw new RuntimeException("No row key column specified");
106 }
107 this.kvCreator = new CellCreator(conf);
108 }
109
110
111
112
113
114 protected void doSetup(Context context) {
115 Configuration conf = context.getConfiguration();
116
117
118
119 separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
120 if (separator == null) {
121 separator = ImportTsv.DEFAULT_SEPARATOR;
122 } else {
123 separator = new String(Base64.decode(separator));
124 }
125
126
127 ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
128
129 skipBadLines = context.getConfiguration().getBoolean(
130 ImportTsv.SKIP_LINES_CONF_KEY, true);
131 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
132 hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
133 }
134
135
136
137
138 @Override
139 public void map(LongWritable offset, Text value,
140 Context context)
141 throws IOException {
142 byte[] lineBytes = value.getBytes();
143
144 try {
145 ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
146 lineBytes, value.getLength());
147 ImmutableBytesWritable rowKey =
148 new ImmutableBytesWritable(lineBytes,
149 parsed.getRowKeyOffset(),
150 parsed.getRowKeyLength());
151
152 ts = parsed.getTimestamp(ts);
153 cellVisibilityExpr = parsed.getCellVisibility();
154 ttl = parsed.getCellTTL();
155
156 Put put = new Put(rowKey.copyBytes());
157 for (int i = 0; i < parsed.getColumnCount(); i++) {
158 if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
159 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
160 || i == parser.getCellTTLColumnIndex()) {
161 continue;
162 }
163 populatePut(lineBytes, parsed, put, i);
164 }
165 context.write(rowKey, put);
166 } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
167 if (skipBadLines) {
168 System.err.println(
169 "Bad line at offset: " + offset.get() + ":\n" +
170 badLine.getMessage());
171 incrementBadLineCount(1);
172 return;
173 } else {
174 throw new IOException(badLine);
175 }
176 } catch (IllegalArgumentException e) {
177 if (skipBadLines) {
178 System.err.println(
179 "Bad line at offset: " + offset.get() + ":\n" +
180 e.getMessage());
181 incrementBadLineCount(1);
182 return;
183 } else {
184 throw new IOException(e);
185 }
186 } catch (InterruptedException e) {
187 e.printStackTrace();
188 }
189 }
190
191 protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put,
192 int i) throws BadTsvLineException, IOException {
193 Cell cell = null;
194 if (hfileOutPath == null) {
195 cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
196 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
197 parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
198 parsed.getColumnOffset(i), parsed.getColumnLength(i));
199 if (cellVisibilityExpr != null) {
200
201
202 put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
203 }
204 if (ttl > 0) {
205 put.setTTL(ttl);
206 }
207 } else {
208
209
210 List<Tag> tags = new ArrayList<Tag>();
211 if (cellVisibilityExpr != null) {
212 tags.addAll(kvCreator.getVisibilityExpressionResolver()
213 .createVisibilityExpTags(cellVisibilityExpr));
214 }
215
216
217 if (ttl > 0) {
218 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
219 }
220 cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
221 parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
222 parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
223 parsed.getColumnLength(i), tags);
224 }
225 put.add(cell);
226 }
227 }