1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import java.io.IOException;
21 import java.text.MessageFormat;
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28 import org.apache.hadoop.hbase.classification.InterfaceStability;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HRegionLocation;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Connection;
34 import org.apache.hadoop.hbase.client.ConnectionFactory;
35 import org.apache.hadoop.hbase.client.RegionLocator;
36 import org.apache.hadoop.hbase.client.Result;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.Pair;
42 import org.apache.hadoop.hbase.util.RegionSizeCalculator;
43 import org.apache.hadoop.mapreduce.InputFormat;
44 import org.apache.hadoop.mapreduce.InputSplit;
45 import org.apache.hadoop.mapreduce.JobContext;
46 import org.apache.hadoop.mapreduce.RecordReader;
47 import org.apache.hadoop.mapreduce.TaskAttemptContext;
48
49 import java.util.Map;
50 import java.util.HashMap;
51 import java.util.Iterator;
52
53
54
55
56
57 @InterfaceAudience.Public
58 @InterfaceStability.Evolving
59 public abstract class MultiTableInputFormatBase extends
60 InputFormat<ImmutableBytesWritable, Result> {
61
62 final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
63
64
65 private List<Scan> scans;
66
67
68 private TableRecordReader tableRecordReader = null;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Override
84 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
85 InputSplit split, TaskAttemptContext context)
86 throws IOException, InterruptedException {
87 TableSplit tSplit = (TableSplit) split;
88 LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
89
90 if (tSplit.getTable() == null) {
91 throw new IOException("Cannot create a record reader because of a"
92 + " previous error. Please look at the previous logs lines from"
93 + " the task's full log for more details.");
94 }
95 Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
96 Table table = connection.getTable(tSplit.getTable());
97
98 TableRecordReader trr = this.tableRecordReader;
99
100 try {
101
102 if (trr == null) {
103 trr = new TableRecordReader();
104 }
105 Scan sc = tSplit.getScan();
106 sc.setStartRow(tSplit.getStartRow());
107 sc.setStopRow(tSplit.getEndRow());
108 trr.setScan(sc);
109 trr.setTable(table);
110 } catch (IOException ioe) {
111
112
113 connection.close();
114 table.close();
115 trr.close();
116 throw ioe;
117 }
118 return trr;
119 }
120
121
122
123
124
125
126
127
128
129
130 @Override
131 public List<InputSplit> getSplits(JobContext context) throws IOException {
132 if (scans.isEmpty()) {
133 throw new IOException("No scans were provided.");
134 }
135
136 Map<TableName, List<Scan>> tableMaps = new HashMap<TableName, List<Scan>>();
137 for (Scan scan : scans) {
138 byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
139 if (tableNameBytes == null)
140 throw new IOException("A scan object did not have a table name");
141
142 TableName tableName = TableName.valueOf(tableNameBytes);
143
144 List<Scan> scanList = tableMaps.get(tableName);
145 if (scanList == null) {
146 scanList = new ArrayList<Scan>();
147 tableMaps.put(tableName, scanList);
148 }
149 scanList.add(scan);
150 }
151
152 List<InputSplit> splits = new ArrayList<InputSplit>();
153 Iterator iter = tableMaps.entrySet().iterator();
154 while (iter.hasNext()) {
155 Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
156 TableName tableName = entry.getKey();
157 List<Scan> scanList = entry.getValue();
158
159 try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
160 Table table = conn.getTable(tableName);
161 RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
162 RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
163 regionLocator, conn.getAdmin());
164 Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
165 for (Scan scan : scanList) {
166 if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
167 throw new IOException("Expecting at least one region for table : "
168 + tableName.getNameAsString());
169 }
170 int count = 0;
171
172 byte[] startRow = scan.getStartRow();
173 byte[] stopRow = scan.getStopRow();
174
175 for (int i = 0; i < keys.getFirst().length; i++) {
176 if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
177 continue;
178 }
179
180 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
181 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
182 (stopRow.length == 0 || Bytes.compareTo(stopRow,
183 keys.getFirst()[i]) > 0)) {
184 byte[] splitStart = startRow.length == 0 ||
185 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
186 keys.getFirst()[i] : startRow;
187 byte[] splitStop = (stopRow.length == 0 ||
188 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
189 keys.getSecond()[i].length > 0 ?
190 keys.getSecond()[i] : stopRow;
191
192 HRegionLocation hregionLocation = regionLocator.getRegionLocation(
193 keys.getFirst()[i], false);
194 String regionHostname = hregionLocation.getHostname();
195 HRegionInfo regionInfo = hregionLocation.getRegionInfo();
196 long regionSize = sizeCalculator.getRegionSize(
197 regionInfo.getRegionName());
198
199 TableSplit split = new TableSplit(table.getName(),
200 scan, splitStart, splitStop, regionHostname, regionSize);
201
202 splits.add(split);
203
204 if (LOG.isDebugEnabled())
205 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
206 }
207 }
208 }
209 }
210 }
211
212 return splits;
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 protected boolean includeRegionInSplit(final byte[] startKey,
238 final byte[] endKey) {
239 return true;
240 }
241
242
243
244
245 protected List<Scan> getScans() {
246 return this.scans;
247 }
248
249
250
251
252
253
254 protected void setScans(List<Scan> scans) {
255 this.scans = scans;
256 }
257
258
259
260
261
262
263
264 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
265 this.tableRecordReader = tableRecordReader;
266 }
267 }