1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.spark;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.apache.hadoop.hbase.Cell;
23 import org.apache.hadoop.hbase.exceptions.DeserializationException;
24 import org.apache.hadoop.hbase.filter.FilterBase;
25 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
26 import org.apache.hadoop.hbase.spark.datasources.BytesEncoder;
27 import org.apache.hadoop.hbase.spark.datasources.JavaBytesEncoder;
28 import org.apache.hadoop.hbase.spark.protobuf.generated.SparkFilterProtos;
29 import org.apache.hadoop.hbase.util.ByteStringer;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.spark.sql.datasources.hbase.Field;
32 import scala.collection.mutable.MutableList;
33
34
35 import java.io.IOException;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import com.google.protobuf.InvalidProtocolBufferException;
40 import com.google.protobuf.ByteString;
41
42
43
44
45
46
47 public class SparkSQLPushDownFilter extends FilterBase{
48 protected static final Log log = LogFactory.getLog(SparkSQLPushDownFilter.class);
49
50
51 DynamicLogicExpression dynamicLogicExpression;
52 byte[][] valueFromQueryArray;
53 HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
54 currentCellToColumnIndexMap;
55
56
57 HashMap<String, ByteArrayComparable> columnToCurrentRowValueMap = null;
58
59 static final byte[] rowKeyFamily = new byte[0];
60 static final byte[] rowKeyQualifier = Bytes.toBytes("key");
61
62 String encoderClassName;
63
64 public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
65 byte[][] valueFromQueryArray,
66 HashMap<ByteArrayComparable,
67 HashMap<ByteArrayComparable, String>>
68 currentCellToColumnIndexMap, String encoderClassName) {
69 this.dynamicLogicExpression = dynamicLogicExpression;
70 this.valueFromQueryArray = valueFromQueryArray;
71 this.currentCellToColumnIndexMap = currentCellToColumnIndexMap;
72 this.encoderClassName = encoderClassName;
73 }
74
75 public SparkSQLPushDownFilter(DynamicLogicExpression dynamicLogicExpression,
76 byte[][] valueFromQueryArray,
77 MutableList<Field> fields, String encoderClassName) {
78 this.dynamicLogicExpression = dynamicLogicExpression;
79 this.valueFromQueryArray = valueFromQueryArray;
80 this.encoderClassName = encoderClassName;
81
82
83 this.currentCellToColumnIndexMap =
84 new HashMap<>();
85
86 for (int i = 0; i < fields.size(); i++) {
87 Field field = fields.apply(i);
88
89 byte[] cfBytes = field.cfBytes();
90 ByteArrayComparable familyByteComparable =
91 new ByteArrayComparable(cfBytes, 0, cfBytes.length);
92
93 HashMap<ByteArrayComparable, String> qualifierIndexMap =
94 currentCellToColumnIndexMap.get(familyByteComparable);
95
96 if (qualifierIndexMap == null) {
97 qualifierIndexMap = new HashMap<>();
98 currentCellToColumnIndexMap.put(familyByteComparable, qualifierIndexMap);
99 }
100 byte[] qBytes = field.colBytes();
101 ByteArrayComparable qualifierByteComparable =
102 new ByteArrayComparable(qBytes, 0, qBytes.length);
103
104 qualifierIndexMap.put(qualifierByteComparable, field.colName());
105 }
106 }
107
108 @Override
109 public ReturnCode filterKeyValue(Cell c) throws IOException {
110
111
112
113 if (columnToCurrentRowValueMap == null) {
114 columnToCurrentRowValueMap = new HashMap<>();
115 HashMap<ByteArrayComparable, String> qualifierColumnMap =
116 currentCellToColumnIndexMap.get(
117 new ByteArrayComparable(rowKeyFamily, 0, rowKeyFamily.length));
118
119 if (qualifierColumnMap != null) {
120 String rowKeyColumnName =
121 qualifierColumnMap.get(
122 new ByteArrayComparable(rowKeyQualifier, 0,
123 rowKeyQualifier.length));
124
125 if (rowKeyColumnName != null) {
126 columnToCurrentRowValueMap.put(rowKeyColumnName,
127 new ByteArrayComparable(c.getRowArray(),
128 c.getRowOffset(), c.getRowLength()));
129 }
130 }
131 }
132
133
134 ByteArrayComparable currentFamilyByteComparable =
135 new ByteArrayComparable(c.getFamilyArray(),
136 c.getFamilyOffset(),
137 c.getFamilyLength());
138
139 HashMap<ByteArrayComparable, String> qualifierColumnMap =
140 currentCellToColumnIndexMap.get(
141 currentFamilyByteComparable);
142
143 if (qualifierColumnMap != null) {
144
145 String columnName =
146 qualifierColumnMap.get(
147 new ByteArrayComparable(c.getQualifierArray(),
148 c.getQualifierOffset(),
149 c.getQualifierLength()));
150
151 if (columnName != null) {
152 columnToCurrentRowValueMap.put(columnName,
153 new ByteArrayComparable(c.getValueArray(),
154 c.getValueOffset(), c.getValueLength()));
155 }
156 }
157
158 return ReturnCode.INCLUDE;
159 }
160
161
162 @Override
163 public boolean filterRow() throws IOException {
164
165 try {
166 boolean result =
167 dynamicLogicExpression.execute(columnToCurrentRowValueMap,
168 valueFromQueryArray);
169 columnToCurrentRowValueMap = null;
170 return !result;
171 } catch (Throwable e) {
172 log.error("Error running dynamic logic on row", e);
173 }
174 return false;
175 }
176
177
178
179
180
181
182
183 @SuppressWarnings("unused")
184 public static SparkSQLPushDownFilter parseFrom(final byte[] pbBytes)
185 throws DeserializationException {
186
187 SparkFilterProtos.SQLPredicatePushDownFilter proto;
188 try {
189 proto = SparkFilterProtos.SQLPredicatePushDownFilter.parseFrom(pbBytes);
190 } catch (InvalidProtocolBufferException e) {
191 throw new DeserializationException(e);
192 }
193
194 String encoder = proto.getEncoderClassName();
195 BytesEncoder enc = JavaBytesEncoder.create(encoder);
196
197
198 DynamicLogicExpression dynamicLogicExpression =
199 DynamicLogicExpressionBuilder.build(proto.getDynamicLogicExpression(), enc);
200
201
202 final List<ByteString> valueFromQueryArrayList = proto.getValueFromQueryArrayList();
203 byte[][] valueFromQueryArray = new byte[valueFromQueryArrayList.size()][];
204 for (int i = 0; i < valueFromQueryArrayList.size(); i++) {
205 valueFromQueryArray[i] = valueFromQueryArrayList.get(i).toByteArray();
206 }
207
208
209 HashMap<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
210 currentCellToColumnIndexMap = new HashMap<>();
211
212 for (SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping
213 sqlPredicatePushDownCellToColumnMapping :
214 proto.getCellToColumnMappingList()) {
215
216 byte[] familyArray =
217 sqlPredicatePushDownCellToColumnMapping.getColumnFamily().toByteArray();
218 ByteArrayComparable familyByteComparable =
219 new ByteArrayComparable(familyArray, 0, familyArray.length);
220 HashMap<ByteArrayComparable, String> qualifierMap =
221 currentCellToColumnIndexMap.get(familyByteComparable);
222
223 if (qualifierMap == null) {
224 qualifierMap = new HashMap<>();
225 currentCellToColumnIndexMap.put(familyByteComparable, qualifierMap);
226 }
227 byte[] qualifierArray =
228 sqlPredicatePushDownCellToColumnMapping.getQualifier().toByteArray();
229
230 ByteArrayComparable qualifierByteComparable =
231 new ByteArrayComparable(qualifierArray, 0 ,qualifierArray.length);
232
233 qualifierMap.put(qualifierByteComparable,
234 sqlPredicatePushDownCellToColumnMapping.getColumnName());
235 }
236
237 return new SparkSQLPushDownFilter(dynamicLogicExpression,
238 valueFromQueryArray, currentCellToColumnIndexMap, encoder);
239 }
240
241
242
243
244 public byte[] toByteArray() {
245
246 SparkFilterProtos.SQLPredicatePushDownFilter.Builder builder =
247 SparkFilterProtos.SQLPredicatePushDownFilter.newBuilder();
248
249 SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.Builder columnMappingBuilder =
250 SparkFilterProtos.SQLPredicatePushDownCellToColumnMapping.newBuilder();
251
252 builder.setDynamicLogicExpression(dynamicLogicExpression.toExpressionString());
253 for (byte[] valueFromQuery: valueFromQueryArray) {
254 builder.addValueFromQueryArray(ByteStringer.wrap(valueFromQuery));
255 }
256
257 for (Map.Entry<ByteArrayComparable, HashMap<ByteArrayComparable, String>>
258 familyEntry : currentCellToColumnIndexMap.entrySet()) {
259 for (Map.Entry<ByteArrayComparable, String> qualifierEntry :
260 familyEntry.getValue().entrySet()) {
261 columnMappingBuilder.setColumnFamily(
262 ByteStringer.wrap(familyEntry.getKey().bytes()));
263 columnMappingBuilder.setQualifier(
264 ByteStringer.wrap(qualifierEntry.getKey().bytes()));
265 columnMappingBuilder.setColumnName(qualifierEntry.getValue());
266 builder.addCellToColumnMapping(columnMappingBuilder.build());
267 }
268 }
269 builder.setEncoderClassName(encoderClassName);
270
271
272 return builder.build().toByteArray();
273 }
274 }