1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.DataInput;
22 import java.io.DataOutput;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.io.NullWritable;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.mapreduce.InputFormat;
31 import org.apache.hadoop.mapreduce.InputSplit;
32 import org.apache.hadoop.mapreduce.JobContext;
33 import org.apache.hadoop.mapreduce.RecordReader;
34 import org.apache.hadoop.mapreduce.TaskAttemptContext;
35
36
37
38
39
40
41
42 public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> {
43 private static final String NMAPS_KEY = "nmapinputformat.num.maps";
44
45 @Override
46 public RecordReader<NullWritable, NullWritable> createRecordReader(
47 InputSplit split,
48 TaskAttemptContext tac) throws IOException, InterruptedException {
49 return new SingleRecordReader<NullWritable, NullWritable>(
50 NullWritable.get(), NullWritable.get());
51 }
52
53 @Override
54 public List<InputSplit> getSplits(JobContext context) throws IOException,
55 InterruptedException {
56 int count = getNumMapTasks(context.getConfiguration());
57 List<InputSplit> splits = new ArrayList<InputSplit>(count);
58 for (int i = 0; i < count; i++) {
59 splits.add(new NullInputSplit());
60 }
61 return splits;
62 }
63
64 public static void setNumMapTasks(Configuration conf, int numTasks) {
65 conf.setInt(NMAPS_KEY, numTasks);
66 }
67
68 public static int getNumMapTasks(Configuration conf) {
69 return conf.getInt(NMAPS_KEY, 1);
70 }
71
72 private static class NullInputSplit extends InputSplit implements Writable {
73 @Override
74 public long getLength() throws IOException, InterruptedException {
75 return 0;
76 }
77
78 @Override
79 public String[] getLocations() throws IOException, InterruptedException {
80 return new String[] {};
81 }
82
83 @Override
84 public void readFields(DataInput in) throws IOException {
85 }
86
87 @Override
88 public void write(DataOutput out) throws IOException {
89 }
90 }
91
92 private static class SingleRecordReader<K, V>
93 extends RecordReader<K, V> {
94
95 private final K key;
96 private final V value;
97 boolean providedKey = false;
98
99 SingleRecordReader(K key, V value) {
100 this.key = key;
101 this.value = value;
102 }
103
104 @Override
105 public void close() {
106 }
107
108 @Override
109 public K getCurrentKey() {
110 return key;
111 }
112
113 @Override
114 public V getCurrentValue(){
115 return value;
116 }
117
118 @Override
119 public float getProgress() {
120 return 0;
121 }
122
123 @Override
124 public void initialize(InputSplit split, TaskAttemptContext tac) {
125 }
126
127 @Override
128 public boolean nextKeyValue() {
129 if (providedKey) return false;
130 providedKey = true;
131 return true;
132 }
133
134 }
135 }