View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.CellScanner;
30  import org.apache.hadoop.hbase.CellUtil;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.TableName;
39  import org.apache.hadoop.hbase.Tag;
40  import org.apache.hadoop.hbase.client.Admin;
41  import org.apache.hadoop.hbase.client.Append;
42  import org.apache.hadoop.hbase.client.Durability;
43  import org.apache.hadoop.hbase.client.HTable;
44  import org.apache.hadoop.hbase.client.Increment;
45  import org.apache.hadoop.hbase.client.Mutation;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.ResultScanner;
49  import org.apache.hadoop.hbase.client.Scan;
50  import org.apache.hadoop.hbase.client.Table;
51  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
52  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
53  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
54  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
55  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
56  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.junit.After;
60  import org.junit.AfterClass;
61  import org.junit.BeforeClass;
62  import org.junit.Rule;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.junit.rules.TestName;
66  
67  /**
68   * Class that test tags
69   */
70  @Category(MediumTests.class)
71  public class TestTags {
72    static boolean useFilter = false;
73  
74    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
75  
76    @Rule
77    public final TestName TEST_NAME = new TestName();
78  
79    @BeforeClass
80    public static void setUpBeforeClass() throws Exception {
81      Configuration conf = TEST_UTIL.getConfiguration();
82      conf.setInt("hfile.format.version", 3);
83      conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
84          TestCoprocessorForTags.class.getName());
85      TEST_UTIL.startMiniCluster(1, 2);
86    }
87  
88    @AfterClass
89    public static void tearDownAfterClass() throws Exception {
90      TEST_UTIL.shutdownMiniCluster();
91    }
92  
93    @After
94    public void tearDown() {
95      useFilter = false;
96    }
97  
98    @Test
99    public void testTags() throws Exception {
100     Table table = null;
101     try {
102       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
103       byte[] fam = Bytes.toBytes("info");
104       byte[] row = Bytes.toBytes("rowa");
105       // column names
106       byte[] qual = Bytes.toBytes("qual");
107 
108       byte[] row1 = Bytes.toBytes("rowb");
109 
110       byte[] row2 = Bytes.toBytes("rowc");
111 
112       HTableDescriptor desc = new HTableDescriptor(tableName);
113       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
114       colDesc.setBlockCacheEnabled(true);
115       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
116       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
117       desc.addFamily(colDesc);
118       Admin admin = TEST_UTIL.getHBaseAdmin();
119       admin.createTable(desc);
120       byte[] value = Bytes.toBytes("value");
121       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
122       Put put = new Put(row);
123       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
124       put.setAttribute("visibility", Bytes.toBytes("myTag"));
125       table.put(put);
126       admin.flush(tableName);
127       // We are lacking an API for confirming flush request compaction.
128       // Just sleep for a short time. We won't be able to confirm flush
129       // completion but the test won't hang now or in the future if
130       // default compaction policy causes compaction between flush and
131       // when we go to confirm it.
132       Thread.sleep(1000);
133 
134       Put put1 = new Put(row1);
135       byte[] value1 = Bytes.toBytes("1000dfsdf");
136       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
137       // put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
138       table.put(put1);
139       admin.flush(tableName);
140       Thread.sleep(1000);
141 
142       Put put2 = new Put(row2);
143       byte[] value2 = Bytes.toBytes("1000dfsdf");
144       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
145       put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
146       table.put(put2);
147       admin.flush(tableName);
148       Thread.sleep(1000);
149 
150       result(fam, row, qual, row2, table, value, value2, row1, value1);
151 
152       admin.compact(tableName);
153       while (admin.getCompactionState(tableName) != CompactionState.NONE) {
154         Thread.sleep(10);
155       }
156       result(fam, row, qual, row2, table, value, value2, row1, value1);
157     } finally {
158       if (table != null) {
159         table.close();
160       }
161     }
162   }
163 
164   @Test
165   public void testFlushAndCompactionWithoutTags() throws Exception {
166     Table table = null;
167     try {
168       TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
169       byte[] fam = Bytes.toBytes("info");
170       byte[] row = Bytes.toBytes("rowa");
171       // column names
172       byte[] qual = Bytes.toBytes("qual");
173 
174       byte[] row1 = Bytes.toBytes("rowb");
175 
176       byte[] row2 = Bytes.toBytes("rowc");
177 
178       HTableDescriptor desc = new HTableDescriptor(tableName);
179       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
180       colDesc.setBlockCacheEnabled(true);
181       // colDesc.setDataBlockEncoding(DataBlockEncoding.NONE);
182       colDesc.setDataBlockEncoding(DataBlockEncoding.PREFIX_TREE);
183       desc.addFamily(colDesc);
184       Admin admin = TEST_UTIL.getHBaseAdmin();
185       admin.createTable(desc);
186 
187       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
188       Put put = new Put(row);
189       byte[] value = Bytes.toBytes("value");
190       put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
191       table.put(put);
192       admin.flush(tableName);
193       // We are lacking an API for confirming flush request compaction.
194       // Just sleep for a short time. We won't be able to confirm flush
195       // completion but the test won't hang now or in the future if
196       // default compaction policy causes compaction between flush and
197       // when we go to confirm it.
198       Thread.sleep(1000);
199 
200       Put put1 = new Put(row1);
201       byte[] value1 = Bytes.toBytes("1000dfsdf");
202       put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
203       table.put(put1);
204       admin.flush(tableName);
205       Thread.sleep(1000);
206 
207       Put put2 = new Put(row2);
208       byte[] value2 = Bytes.toBytes("1000dfsdf");
209       put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
210       table.put(put2);
211       admin.flush(tableName);
212       Thread.sleep(1000);
213 
214       Scan s = new Scan(row);
215       ResultScanner scanner = table.getScanner(s);
216       try {
217         Result[] next = scanner.next(3);
218         for (Result result : next) {
219           CellScanner cellScanner = result.cellScanner();
220           cellScanner.advance();
221           KeyValue current = (KeyValue) cellScanner.current();
222           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
223         }
224       } finally {
225         if (scanner != null)
226           scanner.close();
227       }
228       admin.compact(tableName);
229       while (admin.getCompactionState(tableName) != CompactionState.NONE) {
230         Thread.sleep(10);
231       }
232       s = new Scan(row);
233       scanner = table.getScanner(s);
234       try {
235         Result[] next = scanner.next(3);
236         for (Result result : next) {
237           CellScanner cellScanner = result.cellScanner();
238           cellScanner.advance();
239           KeyValue current = (KeyValue) cellScanner.current();
240           assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
241         }
242       } finally {
243         if (scanner != null) {
244           scanner.close();
245         }
246       }
247     } finally {
248       if (table != null) {
249         table.close();
250       }
251     }
252   }
253 
254   @Test
255   public void testFlushAndCompactionwithCombinations() throws Exception {
256     TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
257     byte[] fam = Bytes.toBytes("info");
258     byte[] row = Bytes.toBytes("rowa");
259     // column names
260     byte[] qual = Bytes.toBytes("qual");
261 
262     byte[] row1 = Bytes.toBytes("rowb");
263 
264     byte[] row2 = Bytes.toBytes("rowc");
265     byte[] rowd = Bytes.toBytes("rowd");
266     byte[] rowe = Bytes.toBytes("rowe");
267     Table table = null;
268     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
269       HTableDescriptor desc = new HTableDescriptor(tableName);
270       HColumnDescriptor colDesc = new HColumnDescriptor(fam);
271       colDesc.setBlockCacheEnabled(true);
272       colDesc.setDataBlockEncoding(encoding);
273       desc.addFamily(colDesc);
274       Admin admin = TEST_UTIL.getHBaseAdmin();
275       admin.createTable(desc);
276       try {
277         table = new HTable(TEST_UTIL.getConfiguration(), tableName);
278         Put put = new Put(row);
279         byte[] value = Bytes.toBytes("value");
280         put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
281         int bigTagLen = Short.MAX_VALUE - 5;
282         put.setAttribute("visibility", new byte[bigTagLen]);
283         table.put(put);
284         Put put1 = new Put(row1);
285         byte[] value1 = Bytes.toBytes("1000dfsdf");
286         put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
287         table.put(put1);
288         admin.flush(tableName);
289         // We are lacking an API for confirming flush request compaction.
290         // Just sleep for a short time. We won't be able to confirm flush
291         // completion but the test won't hang now or in the future if
292         // default compaction policy causes compaction between flush and
293         // when we go to confirm it.
294         Thread.sleep(1000);
295 
296         put1 = new Put(row2);
297         value1 = Bytes.toBytes("1000dfsdf");
298         put1.add(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
299         table.put(put1);
300         admin.flush(tableName);
301         Thread.sleep(1000);
302 
303         Put put2 = new Put(rowd);
304         byte[] value2 = Bytes.toBytes("1000dfsdf");
305         put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
306         table.put(put2);
307         put2 = new Put(rowe);
308         value2 = Bytes.toBytes("1000dfsddfdf");
309         put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
310         put.setAttribute("visibility", Bytes.toBytes("ram"));
311         table.put(put2);
312         admin.flush(tableName);
313         Thread.sleep(1000);
314 
315         TestCoprocessorForTags.checkTagPresence = true;
316         Scan s = new Scan(row);
317         s.setCaching(1);
318         ResultScanner scanner = table.getScanner(s);
319         try {
320           Result next = null;
321           while ((next = scanner.next()) != null) {
322             CellScanner cellScanner = next.cellScanner();
323             cellScanner.advance();
324             KeyValue current = (KeyValue) cellScanner.current();
325             if (CellUtil.matchingRow(current, row)) {
326               assertEquals(1, TestCoprocessorForTags.tags.size());
327               Tag tag = TestCoprocessorForTags.tags.get(0);
328               assertEquals(bigTagLen, tag.getTagLength());
329             } else {
330               assertEquals(0, TestCoprocessorForTags.tags.size());
331             }
332           }
333         } finally {
334           if (scanner != null) {
335             scanner.close();
336           }
337           TestCoprocessorForTags.checkTagPresence = false;
338         }
339         while (admin.getCompactionState(tableName) != CompactionState.NONE) {
340           Thread.sleep(10);
341         }
342         TestCoprocessorForTags.checkTagPresence = true;
343         scanner = table.getScanner(s);
344         try {
345           Result next = null;
346           while ((next = scanner.next()) != null) {
347             CellScanner cellScanner = next.cellScanner();
348             cellScanner.advance();
349             KeyValue current = (KeyValue) cellScanner.current();
350             if (CellUtil.matchingRow(current, row)) {
351               assertEquals(1, TestCoprocessorForTags.tags.size());
352               Tag tag = TestCoprocessorForTags.tags.get(0);
353               assertEquals(bigTagLen, tag.getTagLength());
354             } else {
355               assertEquals(0, TestCoprocessorForTags.tags.size());
356             }
357           }
358         } finally {
359           if (scanner != null) {
360             scanner.close();
361           }
362           TestCoprocessorForTags.checkTagPresence = false;
363         }
364       } finally {
365         if (table != null) {
366           table.close();
367         }
368         // delete the table
369         admin.disableTable(tableName);
370         admin.deleteTable(tableName);
371       }
372     }
373   }
374 
375   @Test
376   public void testTagsWithAppendAndIncrement() throws Exception {
377     TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
378     byte[] f = Bytes.toBytes("f");
379     byte[] q = Bytes.toBytes("q");
380     byte[] row1 = Bytes.toBytes("r1");
381     byte[] row2 = Bytes.toBytes("r2");
382 
383     HTableDescriptor desc = new HTableDescriptor(tableName);
384     HColumnDescriptor colDesc = new HColumnDescriptor(f);
385     desc.addFamily(colDesc);
386     TEST_UTIL.getHBaseAdmin().createTable(desc);
387 
388     Table table = null;
389     try {
390       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
391       Put put = new Put(row1);
392       byte[] v = Bytes.toBytes(2L);
393       put.add(f, q, v);
394       put.setAttribute("visibility", Bytes.toBytes("tag1"));
395       table.put(put);
396       Increment increment = new Increment(row1);
397       increment.addColumn(f, q, 1L);
398       table.increment(increment);
399       TestCoprocessorForTags.checkTagPresence = true;
400       ResultScanner scanner = table.getScanner(new Scan());
401       Result result = scanner.next();
402       KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
403       List<Tag> tags = TestCoprocessorForTags.tags;
404       assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
405       assertEquals(1, tags.size());
406       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
407       TestCoprocessorForTags.checkTagPresence = false;
408       TestCoprocessorForTags.tags = null;
409 
410       increment = new Increment(row1);
411       increment.add(new KeyValue(row1, f, q, 1234L, v));
412       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
413       table.increment(increment);
414       TestCoprocessorForTags.checkTagPresence = true;
415       scanner = table.getScanner(new Scan());
416       result = scanner.next();
417       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
418       tags = TestCoprocessorForTags.tags;
419       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
420       assertEquals(2, tags.size());
421       // We cannot assume the ordering of tags
422       List<String> tagValues = new ArrayList<String>();
423       for (Tag tag: tags) {
424         tagValues.add(Bytes.toString(tag.getValue()));
425       }
426       assertTrue(tagValues.contains("tag1"));
427       assertTrue(tagValues.contains("tag2"));
428       TestCoprocessorForTags.checkTagPresence = false;
429       TestCoprocessorForTags.tags = null;
430 
431       put = new Put(row2);
432       v = Bytes.toBytes(2L);
433       put.add(f, q, v);
434       table.put(put);
435       increment = new Increment(row2);
436       increment.add(new KeyValue(row2, f, q, 1234L, v));
437       increment.setAttribute("visibility", Bytes.toBytes("tag2"));
438       table.increment(increment);
439       Scan scan = new Scan();
440       scan.setStartRow(row2);
441       TestCoprocessorForTags.checkTagPresence = true;
442       scanner = table.getScanner(scan);
443       result = scanner.next();
444       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
445       tags = TestCoprocessorForTags.tags;
446       assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
447       assertEquals(1, tags.size());
448       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
449       TestCoprocessorForTags.checkTagPresence = false;
450       TestCoprocessorForTags.tags = null;
451 
452       // Test Append
453       byte[] row3 = Bytes.toBytes("r3");
454       put = new Put(row3);
455       put.add(f, q, Bytes.toBytes("a"));
456       put.setAttribute("visibility", Bytes.toBytes("tag1"));
457       table.put(put);
458       Append append = new Append(row3);
459       append.add(f, q, Bytes.toBytes("b"));
460       table.append(append);
461       scan = new Scan();
462       scan.setStartRow(row3);
463       TestCoprocessorForTags.checkTagPresence = true;
464       scanner = table.getScanner(scan);
465       result = scanner.next();
466       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
467       tags = TestCoprocessorForTags.tags;
468       assertEquals(1, tags.size());
469       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
470       TestCoprocessorForTags.checkTagPresence = false;
471       TestCoprocessorForTags.tags = null;
472 
473       append = new Append(row3);
474       append.add(new KeyValue(row3, f, q, 1234L, v));
475       append.setAttribute("visibility", Bytes.toBytes("tag2"));
476       table.append(append);
477       TestCoprocessorForTags.checkTagPresence = true;
478       scanner = table.getScanner(scan);
479       result = scanner.next();
480       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
481       tags = TestCoprocessorForTags.tags;
482       assertEquals(2, tags.size());
483       // We cannot assume the ordering of tags
484       tagValues.clear();
485       for (Tag tag: tags) {
486         tagValues.add(Bytes.toString(tag.getValue()));
487       }
488       assertTrue(tagValues.contains("tag1"));
489       assertTrue(tagValues.contains("tag2"));
490       TestCoprocessorForTags.checkTagPresence = false;
491       TestCoprocessorForTags.tags = null;
492 
493       byte[] row4 = Bytes.toBytes("r4");
494       put = new Put(row4);
495       put.add(f, q, Bytes.toBytes("a"));
496       table.put(put);
497       append = new Append(row4);
498       append.add(new KeyValue(row4, f, q, 1234L, v));
499       append.setAttribute("visibility", Bytes.toBytes("tag2"));
500       table.append(append);
501       scan = new Scan();
502       scan.setStartRow(row4);
503       TestCoprocessorForTags.checkTagPresence = true;
504       scanner = table.getScanner(scan);
505       result = scanner.next();
506       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
507       tags = TestCoprocessorForTags.tags;
508       assertEquals(1, tags.size());
509       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
510     } finally {
511       TestCoprocessorForTags.checkTagPresence = false;
512       TestCoprocessorForTags.tags = null;
513       if (table != null) {
514         table.close();
515       }
516     }
517   }
518 
519   private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
520       byte[] value2, byte[] row1, byte[] value1) throws IOException {
521     Scan s = new Scan(row);
522     // If filters are used this attribute can be specifically check for in
523     // filterKV method and
524     // kvs can be filtered out if the tags of interest is not found in that kv
525     s.setAttribute("visibility", Bytes.toBytes("myTag"));
526     ResultScanner scanner = null;
527     try {
528       scanner = table.getScanner(s);
529       Result next = scanner.next();
530 
531       assertTrue(Bytes.equals(next.getRow(), row));
532       assertTrue(Bytes.equals(next.getValue(fam, qual), value));
533 
534       Result next2 = scanner.next();
535       assertTrue(next2 != null);
536       assertTrue(Bytes.equals(next2.getRow(), row1));
537       assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
538 
539       next2 = scanner.next();
540       assertTrue(next2 != null);
541       assertTrue(Bytes.equals(next2.getRow(), row2));
542       assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
543 
544     } finally {
545       if (scanner != null)
546         scanner.close();
547     }
548   }
549 
550   public static class TestCoprocessorForTags extends BaseRegionObserver {
551 
552     public static volatile boolean checkTagPresence = false;
553     public static List<Tag> tags = null;
554 
555     @Override
556     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
557         final WALEdit edit, final Durability durability) throws IOException {
558       updateMutationAddingTags(put);
559     }
560 
561     private void updateMutationAddingTags(final Mutation m) {
562       byte[] attribute = m.getAttribute("visibility");
563       byte[] cf = null;
564       List<Cell> updatedCells = new ArrayList<Cell>();
565       if (attribute != null) {
566         for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
567           for (Cell cell : edits) {
568             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
569             if (cf == null) {
570               cf = kv.getFamily();
571             }
572             Tag tag = new Tag((byte) 1, attribute);
573             List<Tag> tagList = new ArrayList<Tag>();
574             tagList.add(tag);
575 
576             KeyValue newKV = new KeyValue(kv.getRow(), 0, kv.getRowLength(), kv.getFamily(), 0,
577                 kv.getFamilyLength(), kv.getQualifier(), 0, kv.getQualifierLength(),
578                 kv.getTimestamp(), KeyValue.Type.codeToType(kv.getType()), kv.getValue(), 0,
579                 kv.getValueLength(), tagList);
580             ((List<Cell>) updatedCells).add(newKV);
581           }
582         }
583         m.getFamilyCellMap().remove(cf);
584         // Update the family map
585         m.getFamilyCellMap().put(cf, updatedCells);
586       }
587     }
588 
589     @Override
590     public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
591         throws IOException {
592       updateMutationAddingTags(increment);
593       return super.preIncrement(e, increment);
594     }
595 
596     @Override
597     public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
598         throws IOException {
599       updateMutationAddingTags(append);
600       return super.preAppend(e, append);
601     }
602 
603     @Override
604     public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
605         InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
606       if (checkTagPresence) {
607         if (results.size() > 0) {
608           // Check tag presence in the 1st cell in 1st Result
609           Result result = results.get(0);
610           CellScanner cellScanner = result.cellScanner();
611           if (cellScanner.advance()) {
612             Cell cell = cellScanner.current();
613             tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
614                 cell.getTagsLength());
615           }
616         }
617       }
618       return hasMore;
619     }
620   }
621 }