View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.TreeSet;
26  
27  import org.apache.hadoop.hbase.classification.InterfaceAudience;
28  import org.apache.hadoop.hbase.classification.InterfaceStability;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.KeyValueUtil;
33  import org.apache.hadoop.hbase.Tag;
34  import org.apache.hadoop.hbase.TagType;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.util.Base64;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.io.Text;
39  import org.apache.hadoop.mapreduce.Counter;
40  import org.apache.hadoop.mapreduce.Reducer;
41  import org.apache.hadoop.util.StringUtils;
42  
43  /**
44   * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit.
45   * @see HFileOutputFormat
46   * @see KeyValueSortReducer
47   * @see PutSortReducer
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Evolving
51  public class TextSortReducer extends
52      Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
53    
54    /** Timestamp for all inserted rows */
55    private long ts;
56  
57    /** Column seperator */
58    private String separator;
59  
60    /** Should skip bad lines */
61    private boolean skipBadLines;
62    
63    private Counter badLineCount;
64  
65    private ImportTsv.TsvParser parser;
66  
67    /** Cell visibility expr **/
68    private String cellVisibilityExpr;
69  
70    /** Cell TTL */
71    private long ttl;
72  
73    private CellCreator kvCreator;
74  
75    public long getTs() {
76      return ts;
77    }
78  
79    public boolean getSkipBadLines() {
80      return skipBadLines;
81    }
82  
83    public Counter getBadLineCount() {
84      return badLineCount;
85    }
86  
87    public void incrementBadLineCount(int count) {
88      this.badLineCount.increment(count);
89    }
90  
91    /**
92     * Handles initializing this class with objects specific to it (i.e., the parser).
93     * Common initialization that might be leveraged by a subsclass is done in
94     * <code>doSetup</code>. Hence a subclass may choose to override this method
95     * and call <code>doSetup</code> as well before handling it's own custom params.
96     *
97     * @param context
98     */
99    @Override
100   protected void setup(Context context) {
101     doSetup(context);
102 
103     Configuration conf = context.getConfiguration();
104 
105     parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
106     if (parser.getRowKeyColumnIndex() == -1) {
107       throw new RuntimeException("No row key column specified");
108     }
109     this.kvCreator = new CellCreator(conf);
110   }
111 
112   /**
113    * Handles common parameter initialization that a subclass might want to leverage.
114    * @param context
115    */
116   protected void doSetup(Context context) {
117     Configuration conf = context.getConfiguration();
118 
119     // If a custom separator has been used,
120     // decode it back from Base64 encoding.
121     separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
122     if (separator == null) {
123       separator = ImportTsv.DEFAULT_SEPARATOR;
124     } else {
125       separator = new String(Base64.decode(separator));
126     }
127 
128     // Should never get 0 as we are setting this to a valid value in job configuration.
129     ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
130 
131     skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
132     badLineCount = context.getCounter("ImportTsv", "Bad Lines");
133   }
134   
135   @Override
136   protected void reduce(
137       ImmutableBytesWritable rowKey,
138       java.lang.Iterable<Text> lines,
139       Reducer<ImmutableBytesWritable, Text,
140               ImmutableBytesWritable, KeyValue>.Context context)
141       throws java.io.IOException, InterruptedException
142   {
143     // although reduce() is called per-row, handle pathological case
144     long threshold = context.getConfiguration().getLong(
145         "reducer.row.threshold", 1L * (1<<30));
146     Iterator<Text> iter = lines.iterator();
147     while (iter.hasNext()) {
148       Set<KeyValue> kvs = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
149       long curSize = 0;
150       // stop at the end or the RAM threshold
151       while (iter.hasNext() && curSize < threshold) {
152         Text line = iter.next();
153         byte[] lineBytes = line.getBytes();
154         try {
155           ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength());
156           // Retrieve timestamp if exists
157           ts = parsed.getTimestamp(ts);
158           cellVisibilityExpr = parsed.getCellVisibility();
159           ttl = parsed.getCellTTL();
160 
161           for (int i = 0; i < parsed.getColumnCount(); i++) {
162             if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
163                 || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
164                 || i == parser.getCellTTLColumnIndex()) {
165               continue;
166             }
167             // Creating the KV which needs to be directly written to HFiles. Using the Facade
168             // KVCreator for creation of kvs.
169             List<Tag> tags = new ArrayList<Tag>();
170             if (cellVisibilityExpr != null) {
171               tags.addAll(kvCreator.getVisibilityExpressionResolver()
172                 .createVisibilityExpTags(cellVisibilityExpr));
173             }
174             // Add TTL directly to the KV so we can vary them when packing more than one KV
175             // into puts
176             if (ttl > 0) {
177               tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
178             }
179             Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
180                 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
181                 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
182                 parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
183             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
184             kvs.add(kv);
185             curSize += kv.heapSize();
186           }
187         } catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
188           if (skipBadLines) {
189             System.err.println("Bad line." + badLine.getMessage());
190             incrementBadLineCount(1);
191             continue;
192           }
193           throw new IOException(badLine);
194         } catch (IllegalArgumentException e) {
195           if (skipBadLines) {
196             System.err.println("Bad line." + e.getMessage());
197             incrementBadLineCount(1);
198             continue;
199           } 
200           throw new IOException(e);
201         } 
202       }
203       context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
204           + "(" + StringUtils.humanReadableInt(curSize) + ")");
205       int index = 0;
206       for (KeyValue kv : kvs) {
207         context.write(rowKey, kv);
208         if (++index > 0 && index % 100 == 0)
209           context.setStatus("Wrote " + index + " key values.");
210       }
211 
212       // if we have more entries to process
213       if (iter.hasNext()) {
214         // force flush because we cannot guarantee intra-row sorted order
215         context.write(null, null);
216       }
217     }
218   }
219 }