1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.coprocessor;
20
21 import static org.junit.Assert.*;
22
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.Cell;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.HColumnDescriptor;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.Delete;
38 import org.apache.hadoop.hbase.client.Mutation;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
45 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
46 import org.apache.hadoop.hbase.testclassification.MediumTests;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.wal.WALKey;
49 import org.junit.AfterClass;
50 import org.junit.Before;
51 import org.junit.BeforeClass;
52 import org.junit.Rule;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55 import org.junit.rules.TestName;
56
57 import com.google.common.collect.Lists;
58
59 @Category(MediumTests.class)
60 public class TestRegionObserverForAddingMutationsFromCoprocessors {
61
62 private static final Log LOG
63 = LogFactory.getLog(TestRegionObserverForAddingMutationsFromCoprocessors.class);
64
65 private static HBaseTestingUtility util;
66 private static final byte[] dummy = Bytes.toBytes("dummy");
67 private static final byte[] row1 = Bytes.toBytes("r1");
68 private static final byte[] row2 = Bytes.toBytes("r2");
69 private static final byte[] row3 = Bytes.toBytes("r3");
70 private static final byte[] test = Bytes.toBytes("test");
71
72 @Rule
73 public TestName name = new TestName();
74 private TableName tableName;
75
76 @BeforeClass
77 public static void setUpBeforeClass() throws Exception {
78 Configuration conf = HBaseConfiguration.create();
79 conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, TestWALObserver.class.getName());
80 util = new HBaseTestingUtility(conf);
81 util.startMiniCluster();
82 }
83
84 @AfterClass
85 public static void tearDownAfterClass() throws Exception {
86 util.shutdownMiniCluster();
87 }
88
89 @Before
90 public void setUp() throws Exception {
91 tableName = TableName.valueOf(name.getMethodName());
92 }
93
94 private void createTable(String coprocessor) throws IOException {
95 HTableDescriptor htd = new HTableDescriptor(tableName)
96 .addFamily(new HColumnDescriptor(dummy))
97 .addFamily(new HColumnDescriptor(test))
98 .addCoprocessor(coprocessor);
99 util.getHBaseAdmin().createTable(htd);
100 }
101
102
103
104
105
106 @Test
107 public void testMulti() throws Exception {
108 createTable(TestMultiMutationCoprocessor.class.getName());
109
110 try (Table t = util.getConnection().getTable(tableName)) {
111 t.put(new Put(row1).addColumn(test, dummy, dummy));
112 assertRowCount(t, 3);
113 }
114 }
115
116
117
118
119 @Test
120 public void testCPMutationsAreWrittenToWALEdit() throws Exception {
121 createTable(TestMultiMutationCoprocessor.class.getName());
122
123 try (Table t = util.getConnection().getTable(tableName)) {
124 t.put(new Put(row1).addColumn(test, dummy, dummy));
125 assertRowCount(t, 3);
126 }
127
128 assertNotNull(TestWALObserver.savedEdit);
129 assertEquals(4, TestWALObserver.savedEdit.getCells().size());
130 }
131
132 private static void assertRowCount(Table t, int expected) throws IOException {
133 try (ResultScanner scanner = t.getScanner(new Scan())) {
134 int i = 0;
135 for (Result r: scanner) {
136 LOG.info(r.toString());
137 i++;
138 }
139 assertEquals(expected, i);
140 }
141 }
142
143 @Test
144 public void testDeleteCell() throws Exception {
145 createTable(TestDeleteCellCoprocessor.class.getName());
146
147 try (Table t = util.getConnection().getTable(tableName)) {
148 t.put(Lists.newArrayList(
149 new Put(row1).addColumn(test, dummy, dummy),
150 new Put(row2).addColumn(test, dummy, dummy),
151 new Put(row3).addColumn(test, dummy, dummy)
152 ));
153
154 assertRowCount(t, 3);
155
156 t.delete(new Delete(test).addColumn(test, dummy));
157 assertRowCount(t, 1);
158 }
159 }
160
161 @Test
162 public void testDeleteFamily() throws Exception {
163 createTable(TestDeleteFamilyCoprocessor.class.getName());
164
165 try (Table t = util.getConnection().getTable(tableName)) {
166 t.put(Lists.newArrayList(
167 new Put(row1).addColumn(test, dummy, dummy),
168 new Put(row2).addColumn(test, dummy, dummy),
169 new Put(row3).addColumn(test, dummy, dummy)
170 ));
171
172 assertRowCount(t, 3);
173
174 t.delete(new Delete(test).addFamily(test));
175 assertRowCount(t, 1);
176 }
177 }
178
179 @Test
180 public void testDeleteRow() throws Exception {
181 createTable(TestDeleteRowCoprocessor.class.getName());
182
183 try (Table t = util.getConnection().getTable(tableName)) {
184 t.put(Lists.newArrayList(
185 new Put(row1).addColumn(test, dummy, dummy),
186 new Put(row2).addColumn(test, dummy, dummy),
187 new Put(row3).addColumn(test, dummy, dummy)
188 ));
189
190 assertRowCount(t, 3);
191
192 t.delete(new Delete(test).addColumn(test, dummy));
193 assertRowCount(t, 1);
194 }
195 }
196
197 public static class TestMultiMutationCoprocessor extends BaseRegionObserver {
198 @Override
199 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
200 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
201 Mutation mut = miniBatchOp.getOperation(0);
202 List<Cell> cells = mut.getFamilyCellMap().get(test);
203 Put[] puts = new Put[] {
204 new Put(row1).addColumn(test, dummy, cells.get(0).getTimestamp(),
205 Bytes.toBytes("cpdummy")),
206 new Put(row2).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
207 new Put(row3).addColumn(test, dummy, cells.get(0).getTimestamp(), dummy),
208 };
209 LOG.info("Putting:" + puts);
210 miniBatchOp.addOperationsFromCP(0, puts);
211 }
212 }
213
214 public static class TestDeleteCellCoprocessor extends BaseRegionObserver {
215 @Override
216 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
217 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
218 Mutation mut = miniBatchOp.getOperation(0);
219
220 if (mut instanceof Delete) {
221 List<Cell> cells = mut.getFamilyCellMap().get(test);
222 Delete[] deletes = new Delete[] {
223
224 new Delete(row1).addColumns(test, dummy, cells.get(0).getTimestamp()),
225 new Delete(row2).addColumns(test, dummy, cells.get(0).getTimestamp()),
226 };
227 LOG.info("Deleting:" + Arrays.toString(deletes));
228 miniBatchOp.addOperationsFromCP(0, deletes);
229 }
230 }
231 }
232
233 public static class TestDeleteFamilyCoprocessor extends BaseRegionObserver {
234 @Override
235 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
236 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
237 Mutation mut = miniBatchOp.getOperation(0);
238
239 if (mut instanceof Delete) {
240 List<Cell> cells = mut.getFamilyCellMap().get(test);
241 Delete[] deletes = new Delete[] {
242
243 new Delete(row1).addFamily(test, cells.get(0).getTimestamp()),
244 new Delete(row2).addFamily(test, cells.get(0).getTimestamp()),
245 };
246 LOG.info("Deleting:" + Arrays.toString(deletes));
247 miniBatchOp.addOperationsFromCP(0, deletes);
248 }
249 }
250 }
251
252 public static class TestDeleteRowCoprocessor extends BaseRegionObserver {
253 @Override
254 public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
255 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
256 Mutation mut = miniBatchOp.getOperation(0);
257
258 if (mut instanceof Delete) {
259 List<Cell> cells = mut.getFamilyCellMap().get(test);
260 Delete[] deletes = new Delete[] {
261
262 new Delete(row1, cells.get(0).getTimestamp()),
263 new Delete(row2, cells.get(0).getTimestamp()),
264 };
265 LOG.info("Deleting:" + Arrays.toString(deletes));
266 miniBatchOp.addOperationsFromCP(0, deletes);
267 }
268 }
269 }
270
271 public static class TestWALObserver extends BaseWALObserver {
272 static WALEdit savedEdit = null;
273 @Override
274 public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
275 HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
276 if (info.getTable().equals(TableName.valueOf("testCPMutationsAreWrittenToWALEdit"))) {
277 savedEdit = logEdit;
278 }
279 super.postWALWrite(ctx, info, logKey, logEdit);
280 }
281 }
282 }