View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *    http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.hadoop.hbase.spark;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.apache.hadoop.hbase.Cell;
23  import org.apache.hadoop.hbase.exceptions.DeserializationException;
24  import org.apache.hadoop.hbase.filter.FilterBase;
25  import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
26  import org.apache.hadoop.hbase.spark.datasources.BytesEncoder;
27  import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder;
28  import org.apache.hadoop.hbase.spark.protobuf.generated.SparkFilterProtos;
29  import org.apache.hadoop.hbase.util.ByteStringer;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.spark.sql.datasources.hbase.Field;
32  import scala.collection.mutable.MutableList;
33  
34  
35  import java.io.IOException;
36  import java.util.HashMap;
37  import java.util.List;
38  import java.util.Map;
39  import com.google.protobuf.InvalidProtocolBufferException;
40  import com.google.protobuf.ByteString;
41  
42  /**
43   * This filter will push down all qualifier logic given to us
44   * by SparkSQL so that we have make the filters at the region server level
45   * and avoid sending the data back to the client to be filtered.
46   */
47  public class SparkSQLPushDownFilter extends FilterBase{
48    protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class);
49  
50    //The following values are populated with protobuffer
51    DynamicLogicExpression dynamicLogicExpression;
52    byte[][] valueFromQueryArray;
53    HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
54            currentCellToColumnIndexMap;
55  
56    //The following values are transient
57    HashMap<String, ByteArrayComparable> columnToCurrentRowValueMap = null;
58  
59    static final byte[] rowKeyFamily = new byte[0];
60    static final byte[] rowKeyQualifier = Bytes.toBytes("key");
61  
62    String encoderClassName;
63  
64    public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
65                                  byte[][] valueFromQueryArray,
66                                  HashMap<ByteArrayComparable,
67                                          HashMap<ByteArrayComparable, String>>
68                                          currentCellToColumnIndexMap, String encoderClassName) {
69      this.dynamicLogicExpression = dynamicLogicExpression;
70      this.valueFromQueryArray = valueFromQueryArray;
71      this.currentCellToColumnIndexMap = currentCellToColumnIndexMap;
72      this.encoderClassName = encoderClassName;
73    }
74  
75    public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
76                                  byte[][] valueFromQueryArray,
77                                  MutableList<Field> fields, String encoderClassName) {
78      this.dynamicLogicExpression = dynamicLogicExpression;
79      this.valueFromQueryArray = valueFromQueryArray;
80      this.encoderClassName = encoderClassName;
81  
82      //generate family qualifier to index mapping
83      this.currentCellToColumnIndexMap =
84              new HashMap<>();
85  
86      for (int i = 0; i < fields.size(); i++) {
87        Field field = fields.apply(i);
88  
89        byte[] cfBytes = field.cfBytes();
90        ByteArrayComparable familyByteComparable =
91            new ByteArrayComparable(cfBytes, 0, cfBytes.length);
92  
93        HashMap<ByteArrayComparable, String> qualifierIndexMap =
94                currentCellToColumnIndexMap.get(familyByteComparable);
95  
96        if (qualifierIndexMap == null) {
97          qualifierIndexMap = new HashMap<>();
98          currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap);
99        }
100       byte[] qBytes = field.colBytes();
101       ByteArrayComparable qualifierByteComparable =
102           new ByteArrayComparable(qBytes, 0, qBytes.length);
103 
104       qualifierIndexMap.put(qualifierByteComparable, field.colName());
105     }
106   }
107 
108   @Override
109   public ReturnCode filterKeyValue(Cell c) throws IOException {
110 
111     //If the map RowValueMap is empty then we need to populate
112     // the row key
113     if (columnToCurrentRowValueMap == null) {
114       columnToCurrentRowValueMap = new HashMap<>();
115       HashMap<ByteArrayComparable, String> qualifierColumnMap =
116               currentCellToColumnIndexMap.get(
117                       new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length));
118 
119       if (qualifierColumnMap != null) {
120         String rowKeyColumnName =
121                 qualifierColumnMap.get(
122                         new ByteArrayComparable(rowKeyQualifier, 0,
123                                 rowKeyQualifier.length));
124         //Make sure that the rowKey is part of the where clause
125         if (rowKeyColumnName != null) {
126           columnToCurrentRowValueMap.put(rowKeyColumnName,
127                   new ByteArrayComparable(c.getRowArray(),
128                           c.getRowOffset(), c.getRowLength()));
129         }
130       }
131     }
132 
133     //Always populate the column value into the RowValueMap
134     ByteArrayComparable currentFamilyByteComparable =
135             new ByteArrayComparable(c.getFamilyArray(),
136             c.getFamilyOffset(),
137             c.getFamilyLength());
138 
139     HashMap<ByteArrayComparable, String> qualifierColumnMap =
140             currentCellToColumnIndexMap.get(
141                     currentFamilyByteComparable);
142 
143     if (qualifierColumnMap != null) {
144 
145       String columnName =
146               qualifierColumnMap.get(
147                       new ByteArrayComparable(c.getQualifierArray(),
148                               c.getQualifierOffset(),
149                               c.getQualifierLength()));
150 
151       if (columnName != null) {
152         columnToCurrentRowValueMap.put(columnName,
153                 new ByteArrayComparable(c.getValueArray(),
154                         c.getValueOffset(), c.getValueLength()));
155       }
156     }
157 
158     return ReturnCode.INCLUDE;
159   }
160 
161 
162   @Override
163   public boolean filterRow() throws IOException {
164 
165     try {
166       boolean result =
167               dynamicLogicExpression.execute(columnToCurrentRowValueMap,
168                       valueFromQueryArray);
169       columnToCurrentRowValueMap = null;
170       return !result;
171     } catch (Throwable e) {
172       log.error("Error running dynamic logic on row", e);
173     }
174     return false;
175   }
176 
177 
178   /**
179    * @param pbBytes A pb serialized instance
180    * @return An instance of SparkSQLPushDownFilter
181    * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
182    */
183   @SuppressWarnings("unused")
184   public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes)
185           throws DeserializationException {
186 
187     SparkFilterProtos.SQLPredicatePushDownFilter proto;
188     try {
189       proto = SparkFilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes);
190     } catch (InvalidProtocolBufferException e) {
191       throw new DeserializationException(e);
192     }
193 
194     String encoder = proto.getEncoderClassName();
195     BytesEncoder enc = JavaBytesEncoder.create(encoder);
196 
197     //Load DynamicLogicExpression
198     DynamicLogicExpression dynamicLogicExpression =
199             DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression(), enc);
200 
201     //Load valuesFromQuery
202     final List<ByteString> valueFromQueryArrayList = proto.getValueFromQueryArrayList();
203     byte[][] valueFromQueryArray = new byte[valueFromQueryArrayList.size()][];
204     for (int i = 0; i < valueFromQueryArrayList.size(); i++) {
205       valueFromQueryArray[i] = valueFromQueryArrayList.get(i).toByteArray();
206     }
207 
208     //Load mapping from HBase family/qualifier to Spark SQL columnName
209     HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
210             currentCellToColumnIndexMap = new HashMap<>();
211 
212     for (SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping
213             sqlPredicatePushDownCellToColumnMapping :
214             proto.getCellToColumnMappingList()) {
215 
216       byte[] familyArray =
217               sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray();
218       ByteArrayComparable familyByteComparable =
219               new ByteArrayComparable(familyArray, 0, familyArray.length);
220       HashMap<ByteArrayComparable, String> qualifierMap =
221               currentCellToColumnIndexMap.get(familyByteComparable);
222 
223       if (qualifierMap == null) {
224         qualifierMap = new HashMap<>();
225         currentCellToColumnIndexMap.put(familyByteComparable, qualifierMap);
226       }
227       byte[] qualifierArray =
228               sqlPredicatePushDownCellToColumnMapping.getQualifier().toByteArray();
229 
230       ByteArrayComparable qualifierByteComparable =
231               new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length);
232 
233       qualifierMap.put(qualifierByteComparable,
234               sqlPredicatePushDownCellToColumnMapping.getColumnName());
235     }
236 
237     return new SparkSQLPushDownFilter(dynamicLogicExpression,
238             valueFromQueryArray, currentCellToColumnIndexMap, encoder);
239   }
240 
241   /**
242    * @return The filter serialized using pb
243    */
244   public byte[] toByteArray() {
245 
246     SparkFilterProtos.SQLPredicatePushDownFilter.Builder builder =
247             SparkFilterProtos.SQLPredicatePushDownFilter.newBuilder();
248 
249     SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.Builder columnMappingBuilder =
250             SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.newBuilder();
251 
252     builder.setDynamicLogicExpression(dynamicLogicExpression.toExpressionString());
253     for (byte[] valueFromQuery: valueFromQueryArray) {
254       builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery));
255     }
256 
257     for (Map.Entry<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
258             familyEntry : currentCellToColumnIndexMap.entrySet()) {
259       for (Map.Entry<ByteArrayComparable, String> qualifierEntry :
260               familyEntry.getValue().entrySet()) {
261         columnMappingBuilder.setColumnFamily(
262                 ByteStringer.wrap(familyEntry.getKey().bytes()));
263         columnMappingBuilder.setQualifier(
264                 ByteStringer.wrap(qualifierEntry.getKey().bytes()));
265         columnMappingBuilder.setColumnName(qualifierEntry.getValue());
266         builder.addCellToColumnMapping(columnMappingBuilder.build());
267       }
268     }
269     builder.setEncoderClassName(encoderClassName);
270 
271 
272     return builder.build().toByteArray();
273   }
274 }