View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.*;
22  
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.Cell;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HColumnDescriptor;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.Delete;
38  import org.apache.hadoop.hbase.client.Mutation;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.Table;
44  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
45  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.wal.WALKey;
49  import org.junit.AfterClass;
50  import org.junit.Before;
51  import org.junit.BeforeClass;
52  import org.junit.Rule;
53  import org.junit.Test;
54  import org.junit.experimental.categories.Category;
55  import org.junit.rules.TestName;
56  
57  import com.google.common.collect.Lists;
58  
59  @Category(MediumTests.class)
60  public class TestRegionObserverForAddingMutationsFromCoprocessors {
61  
62    private static final Log LOG
63      = LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class);
64  
65    private static HBaseTestingUtility util;
66    private static final byte[] dummy = Bytes.toBytes("dummy");
67    private static final byte[] row1 = Bytes.toBytes("r1");
68    private static final byte[] row2 = Bytes.toBytes("r2");
69    private static final byte[] row3 = Bytes.toBytes("r3");
70    private static final byte[] test = Bytes.toBytes("test");
71  
72    @Rule
73    public TestName name = new TestName();
74    private TableName tableName;
75  
76    @BeforeClass
77    public static void setUpBeforeClass() throws Exception {
78      Configuration conf = HBaseConfiguration.create();
79      conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName());
80      util = new HBaseTestingUtility(conf);
81      util.startMiniCluster();
82    }
83  
84    @AfterClass
85    public static void tearDownAfterClass() throws Exception {
86      util.shutdownMiniCluster();
87    }
88  
89    @Before
90    public void setUp() throws Exception {
91      tableName = TableName.valueOf(name.getMethodName());
92    }
93  
94    private void createTable(String coprocessor) throws IOException {
95      HTableDescriptor htd = new HTableDescriptor(tableName)
96          .addFamily(new HColumnDescriptor(dummy))
97          .addFamily(new HColumnDescriptor(test))
98          .addCoprocessor(coprocessor);
99      util.getHBaseAdmin().createTable(htd);
100   }
101 
102   /**
103    * Test various multiput operations.
104    * @throws Exception
105    */
106   @Test
107   public void testMulti() throws Exception {
108     createTable(TestMultiMutationCoprocessor.class.getName());
109 
110     try (Table t = util.getConnection().getTable(tableName)) {
111       t.put(new Put(row1).addColumn(test, dummy, dummy));
112       assertRowCount(t, 3);
113     }
114   }
115 
116   /**
117    * Tests that added mutations from coprocessors end up in the WAL.
118    */
119   @Test
120   public void testCPMutationsAreWrittenToWALEdit() throws Exception {
121     createTable(TestMultiMutationCoprocessor.class.getName());
122 
123     try (Table t = util.getConnection().getTable(tableName)) {
124       t.put(new Put(row1).addColumn(test, dummy, dummy));
125       assertRowCount(t, 3);
126     }
127 
128     assertNotNull(TestWALObserver.savedEdit);
129     assertEquals(4, TestWALObserver.savedEdit.getCells().size());
130   }
131 
132   private static void assertRowCount(Table t, int expected) throws IOException {
133     try (ResultScanner scanner = t.getScanner(new Scan())) {
134       int i = 0;
135       for (Result r: scanner) {
136         LOG.info(r.toString());
137         i++;
138       }
139       assertEquals(expected, i);
140     }
141   }
142 
143   @Test
144   public void testDeleteCell() throws Exception {
145     createTable(TestDeleteCellCoprocessor.class.getName());
146 
147     try (Table t = util.getConnection().getTable(tableName)) {
148       t.put(Lists.newArrayList(
149         new Put(row1).addColumn(test, dummy, dummy),
150         new Put(row2).addColumn(test, dummy, dummy),
151         new Put(row3).addColumn(test, dummy, dummy)
152           ));
153 
154       assertRowCount(t, 3);
155 
156       t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row
157       assertRowCount(t, 1);
158     }
159   }
160 
161   @Test
162   public void testDeleteFamily() throws Exception {
163     createTable(TestDeleteFamilyCoprocessor.class.getName());
164 
165     try (Table t = util.getConnection().getTable(tableName)) {
166       t.put(Lists.newArrayList(
167         new Put(row1).addColumn(test, dummy, dummy),
168         new Put(row2).addColumn(test, dummy, dummy),
169         new Put(row3).addColumn(test, dummy, dummy)
170           ));
171 
172       assertRowCount(t, 3);
173 
174       t.delete(new Delete(test).addFamily(test)); // delete non-existing row
175       assertRowCount(t, 1);
176     }
177   }
178 
179   @Test
180   public void testDeleteRow() throws Exception {
181     createTable(TestDeleteRowCoprocessor.class.getName());
182 
183     try (Table t = util.getConnection().getTable(tableName)) {
184       t.put(Lists.newArrayList(
185         new Put(row1).addColumn(test, dummy, dummy),
186         new Put(row2).addColumn(test, dummy, dummy),
187         new Put(row3).addColumn(test, dummy, dummy)
188           ));
189 
190       assertRowCount(t, 3);
191 
192       t.delete(new Delete(test).addColumn(test, dummy)); // delete non-existing row
193       assertRowCount(t, 1);
194     }
195   }
196 
197   public static class TestMultiMutationCoprocessor extends BaseRegionObserver {
198     @Override
199     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
200         MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
201       Mutation mut = miniBatchOp.getOperation(0);
202       List<Cell> cells = mut.getFamilyCellMap().get(test);
203       Put[] puts = new Put[] {
204           new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(),
205             Bytes.toBytes("cpdummy")),
206           new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
207           new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
208       };
209       LOG.info("Putting:" + puts);
210       miniBatchOp.addOperationsFromCP(0, puts);
211     }
212   }
213 
214   public static class TestDeleteCellCoprocessor extends BaseRegionObserver {
215     @Override
216     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
217         MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
218       Mutation mut = miniBatchOp.getOperation(0);
219 
220       if (mut instanceof Delete) {
221         List<Cell> cells = mut.getFamilyCellMap().get(test);
222         Delete[] deletes = new Delete[] {
223             // delete only 2 rows
224             new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()),
225             new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()),
226         };
227         LOG.info("Deleting:" + Arrays.toString(deletes));
228         miniBatchOp.addOperationsFromCP(0, deletes);
229       }
230     }
231   }
232 
233   public static class TestDeleteFamilyCoprocessor extends BaseRegionObserver {
234     @Override
235     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
236         MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
237       Mutation mut = miniBatchOp.getOperation(0);
238 
239       if (mut instanceof Delete) {
240         List<Cell> cells = mut.getFamilyCellMap().get(test);
241         Delete[] deletes = new Delete[] {
242             // delete only 2 rows
243             new Delete(row1).addFamily(test, cells.get(0).getTimestamp()),
244             new Delete(row2).addFamily(test, cells.get(0).getTimestamp()),
245         };
246         LOG.info("Deleting:" + Arrays.toString(deletes));
247         miniBatchOp.addOperationsFromCP(0, deletes);
248       }
249     }
250   }
251 
252   public static class TestDeleteRowCoprocessor extends BaseRegionObserver {
253     @Override
254     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
255         MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
256       Mutation mut = miniBatchOp.getOperation(0);
257 
258       if (mut instanceof Delete) {
259         List<Cell> cells = mut.getFamilyCellMap().get(test);
260         Delete[] deletes = new Delete[] {
261             // delete only 2 rows
262             new Delete(row1, cells.get(0).getTimestamp()),
263             new Delete(row2, cells.get(0).getTimestamp()),
264         };
265         LOG.info("Deleting:" + Arrays.toString(deletes));
266         miniBatchOp.addOperationsFromCP(0, deletes);
267       }
268     }
269   }
270 
271   public static class TestWALObserver extends BaseWALObserver {
272     static WALEdit savedEdit = null;
273     @Override
274     public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
275         HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
276       if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) {
277         savedEdit = logEdit;
278       }
279       super.postWALWrite(ctx, info, logKey, logEdit);
280     }
281   }
282 }