1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor.example;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.apache.hadoop.hbase.Cell;
29 import org.apache.hadoop.hbase.CellUtil;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.testclassification.MediumTests;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Result;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.client.Table;
41 import org.apache.hadoop.hbase.client.coprocessor.Batch;
42 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
43 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
44 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
45 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
46 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
47 import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
48 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
49 import org.apache.hadoop.hbase.filter.FilterList;
50 import org.apache.hadoop.hbase.filter.FilterList.Operator;
51 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
52 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
53 import org.apache.hadoop.hbase.ipc.ServerRpcController;
54 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.junit.experimental.categories.Category;
57
58 @Category(MediumTests.class)
59 public class TestBulkDeleteProtocol {
60 private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
61 private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
62 private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
63 private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
64 private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
65 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
66
67
68 public static void setupBeforeClass() throws Exception {
69 TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
70 BulkDeleteEndpoint.class.getName());
71 TEST_UTIL.startMiniCluster(2);
72 }
73
74
75 public static void tearDownAfterClass() throws Exception {
76 TEST_UTIL.shutdownMiniCluster();
77 }
78
79
80 public void testBulkDeleteEndpoint() throws Throwable {
81 TableName tableName = TableName.valueOf("testBulkDeleteEndpoint");
82 Table ht = createTable(tableName);
83 List<Put> puts = new ArrayList<Put>(100);
84 for (int j = 0; j < 100; j++) {
85 byte[] rowkey = Bytes.toBytes(j);
86 puts.add(createPut(rowkey, "v1"));
87 }
88 ht.put(puts);
89
90 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
91 assertEquals(100, noOfRowsDeleted);
92
93 int rows = 0;
94 for (Result result : ht.getScanner(new Scan())) {
95 rows++;
96 }
97 assertEquals(0, rows);
98 ht.close();
99 }
100
101
102 public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
103 throws Throwable {
104 TableName tableName = TableName
105 .valueOf("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
106 Table ht = createTable(tableName);
107 List<Put> puts = new ArrayList<Put>(100);
108 for (int j = 0; j < 100; j++) {
109 byte[] rowkey = Bytes.toBytes(j);
110 puts.add(createPut(rowkey, "v1"));
111 }
112 ht.put(puts);
113
114 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
115 assertEquals(100, noOfRowsDeleted);
116
117 int rows = 0;
118 for (Result result : ht.getScanner(new Scan())) {
119 rows++;
120 }
121 assertEquals(0, rows);
122 ht.close();
123 }
124
125 private long invokeBulkDeleteProtocol(TableName tableName, final Scan scan, final int rowBatchSize,
126 final DeleteType deleteType, final Long timeStamp) throws Throwable {
127 Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
128 long noOfDeletedRows = 0L;
129 Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
130 new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
131 ServerRpcController controller = new ServerRpcController();
132 BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
133 new BlockingRpcCallback<BulkDeleteResponse>();
134
135 public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
136 Builder builder = BulkDeleteRequest.newBuilder();
137 builder.setScan(ProtobufUtil.toScan(scan));
138 builder.setDeleteType(deleteType);
139 builder.setRowBatchSize(rowBatchSize);
140 if (timeStamp != null) {
141 builder.setTimestamp(timeStamp);
142 }
143 service.delete(controller, builder.build(), rpcCallback);
144 return rpcCallback.get();
145 }
146 };
147 Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
148 .getStartRow(), scan.getStopRow(), callable);
149 for (BulkDeleteResponse response : result.values()) {
150 noOfDeletedRows += response.getRowsDeleted();
151 }
152 ht.close();
153 return noOfDeletedRows;
154 }
155
156
157 public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
158 TableName tableName = TableName.valueOf("testBulkDeleteWithConditionBasedDelete");
159 Table ht = createTable(tableName);
160 List<Put> puts = new ArrayList<Put>(100);
161 for (int j = 0; j < 100; j++) {
162 byte[] rowkey = Bytes.toBytes(j);
163 String value = (j % 10 == 0) ? "v1" : "v2";
164 puts.add(createPut(rowkey, value));
165 }
166 ht.put(puts);
167 Scan scan = new Scan();
168 FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
169 SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
170 CompareOp.EQUAL, Bytes.toBytes("v1"));
171
172 fl.addFilter(scvf);
173 scan.setFilter(fl);
174
175 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
176 assertEquals(10, noOfRowsDeleted);
177
178 int rows = 0;
179 for (Result result : ht.getScanner(new Scan())) {
180 rows++;
181 }
182 assertEquals(90, rows);
183 ht.close();
184 }
185
186
187 public void testBulkDeleteColumn() throws Throwable {
188 TableName tableName = TableName.valueOf("testBulkDeleteColumn");
189 Table ht = createTable(tableName);
190 List<Put> puts = new ArrayList<Put>(100);
191 for (int j = 0; j < 100; j++) {
192 byte[] rowkey = Bytes.toBytes(j);
193 String value = (j % 10 == 0) ? "v1" : "v2";
194 puts.add(createPut(rowkey, value));
195 }
196 ht.put(puts);
197 Scan scan = new Scan();
198 scan.addColumn(FAMILY1, QUALIFIER2);
199
200 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
201 assertEquals(100, noOfRowsDeleted);
202
203 int rows = 0;
204 for (Result result : ht.getScanner(new Scan())) {
205 assertEquals(2, result.getFamilyMap(FAMILY1).size());
206 assertTrue(result.getColumnCells(FAMILY1, QUALIFIER2).isEmpty());
207 assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER1).size());
208 assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER3).size());
209 rows++;
210 }
211 assertEquals(100, rows);
212 ht.close();
213 }
214
215
216 public void testBulkDeleteFamily() throws Throwable {
217 TableName tableName = TableName.valueOf("testBulkDeleteFamily");
218 HTableDescriptor htd = new HTableDescriptor(tableName);
219 htd.addFamily(new HColumnDescriptor(FAMILY1));
220 htd.addFamily(new HColumnDescriptor(FAMILY2));
221 TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
222 Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
223 List<Put> puts = new ArrayList<Put>(100);
224 for (int j = 0; j < 100; j++) {
225 Put put = new Put(Bytes.toBytes(j));
226 put.add(FAMILY1, QUALIFIER1, "v1".getBytes());
227 put.add(FAMILY2, QUALIFIER2, "v2".getBytes());
228 puts.add(put);
229 }
230 ht.put(puts);
231 Scan scan = new Scan();
232 scan.addFamily(FAMILY1);
233
234 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
235 assertEquals(100, noOfRowsDeleted);
236 int rows = 0;
237 for (Result result : ht.getScanner(new Scan())) {
238 assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
239 assertEquals(1, result.getColumnCells(FAMILY2, QUALIFIER2).size());
240 rows++;
241 }
242 assertEquals(100, rows);
243 ht.close();
244 }
245
246
247 public void testBulkDeleteColumnVersion() throws Throwable {
248 TableName tableName = TableName.valueOf("testBulkDeleteColumnVersion");
249 Table ht = createTable(tableName);
250 List<Put> puts = new ArrayList<Put>(100);
251 for (int j = 0; j < 100; j++) {
252 Put put = new Put(Bytes.toBytes(j));
253 byte[] value = "v1".getBytes();
254 put.add(FAMILY1, QUALIFIER1, 1234L, value);
255 put.add(FAMILY1, QUALIFIER2, 1234L, value);
256 put.add(FAMILY1, QUALIFIER3, 1234L, value);
257
258 value = "v2".getBytes();
259 put.add(FAMILY1, QUALIFIER1, value);
260 put.add(FAMILY1, QUALIFIER2, value);
261 put.add(FAMILY1, QUALIFIER3, value);
262 put.add(FAMILY1, null, value);
263 puts.add(put);
264 }
265 ht.put(puts);
266 Scan scan = new Scan();
267 scan.addFamily(FAMILY1);
268
269 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
270 HConstants.LATEST_TIMESTAMP);
271 assertEquals(100, noOfRowsDeleted);
272 int rows = 0;
273 scan = new Scan();
274 scan.setMaxVersions();
275 for (Result result : ht.getScanner(scan)) {
276 assertEquals(3, result.getFamilyMap(FAMILY1).size());
277 List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER1);
278 assertEquals(1, column.size());
279 assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
280
281 column = result.getColumnCells(FAMILY1, QUALIFIER2);
282 assertEquals(1, column.size());
283 assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
284
285 column = result.getColumnCells(FAMILY1, QUALIFIER3);
286 assertEquals(1, column.size());
287 assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
288 rows++;
289 }
290 assertEquals(100, rows);
291 ht.close();
292 }
293
294
295 public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
296 TableName tableName = TableName.valueOf("testBulkDeleteColumnVersionBasedOnTS");
297 Table ht = createTable(tableName);
298 List<Put> puts = new ArrayList<Put>(100);
299 for (int j = 0; j < 100; j++) {
300 Put put = new Put(Bytes.toBytes(j));
301
302 byte[] value = "v1".getBytes();
303 put.add(FAMILY1, QUALIFIER1, 1000L, value);
304 put.add(FAMILY1, QUALIFIER2, 1000L, value);
305 put.add(FAMILY1, QUALIFIER3, 1000L, value);
306
307 value = "v2".getBytes();
308 put.add(FAMILY1, QUALIFIER1, 1234L, value);
309 put.add(FAMILY1, QUALIFIER2, 1234L, value);
310 put.add(FAMILY1, QUALIFIER3, 1234L, value);
311
312 value = "v3".getBytes();
313 put.add(FAMILY1, QUALIFIER1, value);
314 put.add(FAMILY1, QUALIFIER2, value);
315 put.add(FAMILY1, QUALIFIER3, value);
316 puts.add(put);
317 }
318 ht.put(puts);
319 Scan scan = new Scan();
320 scan.addColumn(FAMILY1, QUALIFIER3);
321
322 long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
323 assertEquals(100, noOfRowsDeleted);
324 int rows = 0;
325 scan = new Scan();
326 scan.setMaxVersions();
327 for (Result result : ht.getScanner(scan)) {
328 assertEquals(3, result.getFamilyMap(FAMILY1).size());
329 assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER1).size());
330 assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER2).size());
331 List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER3);
332 assertEquals(2, column.size());
333 assertTrue(CellUtil.matchingValue(column.get(0), "v3".getBytes()));
334 assertTrue(CellUtil.matchingValue(column.get(1), "v1".getBytes()));
335 rows++;
336 }
337 assertEquals(100, rows);
338 ht.close();
339 }
340
341
342 public void testBulkDeleteWithNumberOfVersions() throws Throwable {
343 TableName tableName = TableName.valueOf("testBulkDeleteWithNumberOfVersions");
344 Table ht = createTable(tableName);
345 List<Put> puts = new ArrayList<Put>(100);
346 for (int j = 0; j < 100; j++) {
347 Put put = new Put(Bytes.toBytes(j));
348
349 byte[] value = "v1".getBytes();
350 put.add(FAMILY1, QUALIFIER1, 1000L, value);
351 put.add(FAMILY1, QUALIFIER2, 1000L, value);
352 put.add(FAMILY1, QUALIFIER3, 1000L, value);
353
354 value = "v2".getBytes();
355 put.add(FAMILY1, QUALIFIER1, 1234L, value);
356 put.add(FAMILY1, QUALIFIER2, 1234L, value);
357 put.add(FAMILY1, QUALIFIER3, 1234L, value);
358
359 value = "v3".getBytes();
360 put.add(FAMILY1, QUALIFIER1, 2000L, value);
361 put.add(FAMILY1, QUALIFIER2, 2000L, value);
362 put.add(FAMILY1, QUALIFIER3, 2000L, value);
363
364 value = "v4".getBytes();
365 put.add(FAMILY1, QUALIFIER1, value);
366 put.add(FAMILY1, QUALIFIER2, value);
367 put.add(FAMILY1, QUALIFIER3, value);
368 puts.add(put);
369 }
370 ht.put(puts);
371
372
373
374 final Scan scan = new Scan();
375 scan.addColumn(FAMILY1, QUALIFIER1);
376 scan.addColumn(FAMILY1, QUALIFIER2);
377 scan.setTimeRange(1000L, 2000L);
378 scan.setMaxVersions();
379
380 long noOfDeletedRows = 0L;
381 long noOfVersionsDeleted = 0L;
382 Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
383 new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
384 ServerRpcController controller = new ServerRpcController();
385 BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
386 new BlockingRpcCallback<BulkDeleteResponse>();
387
388 public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
389 Builder builder = BulkDeleteRequest.newBuilder();
390 builder.setScan(ProtobufUtil.toScan(scan));
391 builder.setDeleteType(DeleteType.VERSION);
392 builder.setRowBatchSize(500);
393 service.delete(controller, builder.build(), rpcCallback);
394 return rpcCallback.get();
395 }
396 };
397 Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
398 .getStartRow(), scan.getStopRow(), callable);
399 for (BulkDeleteResponse response : result.values()) {
400 noOfDeletedRows += response.getRowsDeleted();
401 noOfVersionsDeleted += response.getVersionsDeleted();
402 }
403 assertEquals(100, noOfDeletedRows);
404 assertEquals(400, noOfVersionsDeleted);
405
406 int rows = 0;
407 Scan scan1 = new Scan();
408 scan1.setMaxVersions();
409 for (Result res : ht.getScanner(scan1)) {
410 assertEquals(3, res.getFamilyMap(FAMILY1).size());
411 List<Cell> column = res.getColumnCells(FAMILY1, QUALIFIER1);
412 assertEquals(2, column.size());
413 assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
414 assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
415 column = res.getColumnCells(FAMILY1, QUALIFIER2);
416 assertEquals(2, column.size());
417 assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
418 assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
419 assertEquals(4, res.getColumnCells(FAMILY1, QUALIFIER3).size());
420 rows++;
421 }
422 assertEquals(100, rows);
423 ht.close();
424 }
425
426 private Table createTable(TableName tableName) throws IOException {
427 HTableDescriptor htd = new HTableDescriptor(tableName);
428 HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
429 hcd.setMaxVersions(10);
430 htd.addFamily(hcd);
431 TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
432 Table ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
433 return ht;
434 }
435
436 private Put createPut(byte[] rowkey, String value) throws IOException {
437 Put put = new Put(rowkey);
438 put.add(FAMILY1, QUALIFIER1, value.getBytes());
439 put.add(FAMILY1, QUALIFIER2, value.getBytes());
440 put.add(FAMILY1, QUALIFIER3, value.getBytes());
441 return put;
442 }
443 }