1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.util;
19
20
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.io.IOException;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableSet;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HColumnDescriptor;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValueUtil;
43 import org.apache.hadoop.hbase.testclassification.MediumTests;
44 import org.apache.hadoop.hbase.client.Get;
45 import org.apache.hadoop.hbase.client.HTable;
46 import org.apache.hadoop.hbase.client.IsolationLevel;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.Scan;
50 import org.apache.hadoop.hbase.client.Durability;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
53 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
54 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
56 import org.apache.hadoop.hbase.regionserver.HStore;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
59 import org.apache.hadoop.hbase.regionserver.ScanType;
60 import org.apache.hadoop.hbase.regionserver.Store;
61 import org.apache.hadoop.hbase.regionserver.ScanInfo;
62 import org.apache.hadoop.hbase.regionserver.StoreScanner;
63 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64 import org.junit.AfterClass;
65 import org.junit.BeforeClass;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68
69 import org.junit.runner.RunWith;
70 import org.junit.runners.Parameterized;
71 import org.junit.runners.Parameterized.Parameters;
72
73 @Category(MediumTests.class)
74 @RunWith(Parameterized.class)
75 public class TestCoprocessorScanPolicy {
76 final Log LOG = LogFactory.getLog(getClass());
77 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
78 private static final byte[] F = Bytes.toBytes("fam");
79 private static final byte[] Q = Bytes.toBytes("qual");
80 private static final byte[] R = Bytes.toBytes("row");
81
82 @BeforeClass
83 public static void setUpBeforeClass() throws Exception {
84 Configuration conf = TEST_UTIL.getConfiguration();
85 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
86 ScanObserver.class.getName());
87 TEST_UTIL.startMiniCluster();
88 }
89
90 @AfterClass
91 public static void tearDownAfterClass() throws Exception {
92 TEST_UTIL.shutdownMiniCluster();
93 }
94
95 @Parameters
96 public static Collection<Object[]> parameters() {
97 return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
98 }
99
100 public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
101 TEST_UTIL.getMiniHBaseCluster().getConf()
102 .setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
103 }
104
105 @Test
106 public void testBaseCases() throws Exception {
107 TableName tableName =
108 TableName.valueOf("baseCases");
109 if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
110 TEST_UTIL.deleteTable(tableName);
111 }
112 Table t = TEST_UTIL.createTable(tableName, F, 1);
113
114 Put p = new Put(R);
115 p.setAttribute("versions", new byte[]{});
116 p.add(F, tableName.getName(), Bytes.toBytes(2));
117 t.put(p);
118
119 long now = EnvironmentEdgeManager.currentTime();
120
121
122 p = new Put(R);
123 p.add(F, Q, now, Q);
124 t.put(p);
125 p = new Put(R);
126 p.add(F, Q, now+1, Q);
127 t.put(p);
128 Get g = new Get(R);
129 g.setMaxVersions(10);
130 Result r = t.get(g);
131 assertEquals(2, r.size());
132
133 TEST_UTIL.flush(tableName);
134 TEST_UTIL.compact(tableName, true);
135
136
137 g = new Get(R);
138 g.setMaxVersions(10);
139 r = t.get(g);
140 assertEquals(2, r.size());
141
142
143 p = new Put(R);
144 p.add(F, Q, now+2, Q);
145 t.put(p);
146 g = new Get(R);
147 g.setMaxVersions(10);
148 r = t.get(g);
149
150 assertEquals(2, r.size());
151
152 t.close();
153 }
154
155 @Test
156 public void testTTL() throws Exception {
157 TableName tableName =
158 TableName.valueOf("testTTL");
159 if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
160 TEST_UTIL.deleteTable(tableName);
161 }
162 HTableDescriptor desc = new HTableDescriptor(tableName);
163 HColumnDescriptor hcd = new HColumnDescriptor(F)
164 .setMaxVersions(10)
165 .setTimeToLive(1);
166 desc.addFamily(hcd);
167 TEST_UTIL.getHBaseAdmin().createTable(desc);
168 Table t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
169 long now = EnvironmentEdgeManager.currentTime();
170 ManualEnvironmentEdge me = new ManualEnvironmentEdge();
171 me.setValue(now);
172 EnvironmentEdgeManagerTestHelper.injectEdge(me);
173
174 long ts = now - 2000;
175
176 Put p = new Put(R);
177 p.setAttribute("ttl", new byte[]{});
178 p.add(F, tableName.getName(), Bytes.toBytes(3000L));
179 t.put(p);
180
181 p = new Put(R);
182 p.add(F, Q, ts, Q);
183 t.put(p);
184 p = new Put(R);
185 p.add(F, Q, ts+1, Q);
186 t.put(p);
187
188
189
190 Get g = new Get(R);
191 g.setMaxVersions(10);
192 Result r = t.get(g);
193
194 assertEquals(2, r.size());
195
196 TEST_UTIL.flush(tableName);
197 TEST_UTIL.compact(tableName, true);
198
199 g = new Get(R);
200 g.setMaxVersions(10);
201 r = t.get(g);
202
203 assertEquals(2, r.size());
204
205
206 me.setValue(now + 2000);
207
208 g = new Get(R);
209 g.setMaxVersions(10);
210 r = t.get(g);
211
212 assertEquals(0, r.size());
213 t.close();
214 }
215
216 public static class ScanObserver extends BaseRegionObserver {
217 private Map<TableName, Long> ttls =
218 new HashMap<TableName, Long>();
219 private Map<TableName, Integer> versions =
220 new HashMap<TableName, Integer>();
221
222
223
224 @Override
225 public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
226 final WALEdit edit, final Durability durability) throws IOException {
227 if (put.getAttribute("ttl") != null) {
228 Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
229 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
230 ttls.put(TableName.valueOf(kv.getQualifier()), Bytes.toLong(kv.getValue()));
231 c.bypass();
232 } else if (put.getAttribute("versions") != null) {
233 Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
234 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
235 versions.put(TableName.valueOf(kv.getQualifier()), Bytes.toInt(kv.getValue()));
236 c.bypass();
237 }
238 }
239
240 @Override
241 public InternalScanner preFlushScannerOpen(
242 final ObserverContext<RegionCoprocessorEnvironment> c,
243 Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
244 Long newTtl = ttls.get(store.getTableName());
245 if (newTtl != null) {
246 System.out.println("PreFlush:" + newTtl);
247 }
248 Integer newVersions = versions.get(store.getTableName());
249 ScanInfo oldSI = store.getScanInfo();
250 HColumnDescriptor family = store.getFamily();
251 ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
252 newVersions == null ? family.getMaxVersions() : newVersions,
253 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
254 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
255 Scan scan = new Scan();
256 scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
257 return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
258 ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
259 HConstants.OLDEST_TIMESTAMP);
260 }
261
262 @Override
263 public InternalScanner preCompactScannerOpen(
264 final ObserverContext<RegionCoprocessorEnvironment> c,
265 Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
266 long earliestPutTs, InternalScanner s) throws IOException {
267 Long newTtl = ttls.get(store.getTableName());
268 Integer newVersions = versions.get(store.getTableName());
269 ScanInfo oldSI = store.getScanInfo();
270 HColumnDescriptor family = store.getFamily();
271 ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
272 newVersions == null ? family.getMaxVersions() : newVersions,
273 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
274 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
275 Scan scan = new Scan();
276 scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
277 return new StoreScanner(store, scanInfo, scan, scanners, scanType,
278 store.getSmallestReadPoint(), earliestPutTs);
279 }
280
281 @Override
282 public KeyValueScanner preStoreScannerOpen(
283 final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
284 final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
285 TableName tn = store.getTableName();
286 if (!tn.isSystemTable()) {
287 Long newTtl = ttls.get(store.getTableName());
288 Integer newVersions = versions.get(store.getTableName());
289 ScanInfo oldSI = store.getScanInfo();
290 HColumnDescriptor family = store.getFamily();
291 ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
292 newVersions == null ? family.getMaxVersions() : newVersions,
293 newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
294 oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
295 return new StoreScanner(store, scanInfo, scan, targetCols,
296 ((HStore) store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
297 } else {
298 return s;
299 }
300 }
301 }
302
303 }