1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import static org.junit.Assert.*;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Mockito.spy;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27
28 import java.io.IOException;
29 import java.lang.ref.SoftReference;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.NavigableSet;
37 import java.util.concurrent.ConcurrentSkipListSet;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataOutputStream;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.FilterFileSystem;
47 import org.apache.hadoop.fs.LocalFileSystem;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.permission.FsPermission;
50 import org.apache.hadoop.hbase.Cell;
51 import org.apache.hadoop.hbase.CellUtil;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtility;
54 import org.apache.hadoop.hbase.HColumnDescriptor;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.KeyValue;
58 import org.apache.hadoop.hbase.KeyValue.KVComparator;
59 import org.apache.hadoop.hbase.KeyValueUtil;
60 import org.apache.hadoop.hbase.testclassification.MediumTests;
61 import org.apache.hadoop.hbase.TableName;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.io.compress.Compression;
64 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
65 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
66 import org.apache.hadoop.hbase.io.hfile.HFile;
67 import org.apache.hadoop.hbase.io.hfile.HFileContext;
68 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
69 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
70 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
71 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
72 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
73 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
74 import org.apache.hadoop.hbase.wal.WALFactory;
75 import org.apache.hadoop.hbase.security.User;
76 import org.apache.hadoop.hbase.util.Bytes;
77 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
78 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
79 import org.apache.hadoop.hbase.util.FSUtils;
80 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
81 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
82 import org.apache.hadoop.util.Progressable;
83 import org.junit.After;
84 import org.junit.Assert;
85 import org.junit.Before;
86 import org.junit.Rule;
87 import org.junit.Test;
88 import org.junit.experimental.categories.Category;
89 import org.junit.rules.TestName;
90 import org.mockito.Mockito;
91
92 import com.google.common.collect.Lists;
93
94
95
96
97 @Category(MediumTests.class)
98 public class TestStore {
99 public static final Log LOG = LogFactory.getLog(TestStore.class);
100 @Rule public TestName name = new TestName();
101
102 HStore store;
103 byte [] table = Bytes.toBytes("table");
104 byte [] family = Bytes.toBytes("family");
105
106 byte [] row = Bytes.toBytes("row");
107 byte [] row2 = Bytes.toBytes("row2");
108 byte [] qf1 = Bytes.toBytes("qf1");
109 byte [] qf2 = Bytes.toBytes("qf2");
110 byte [] qf3 = Bytes.toBytes("qf3");
111 byte [] qf4 = Bytes.toBytes("qf4");
112 byte [] qf5 = Bytes.toBytes("qf5");
113 byte [] qf6 = Bytes.toBytes("qf6");
114
115 NavigableSet<byte[]> qualifiers =
116 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
117
118 List<Cell> expected = new ArrayList<Cell>();
119 List<Cell> result = new ArrayList<Cell>();
120
121 long id = System.currentTimeMillis();
122 Get get = new Get(row);
123
124 private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
125 private final String DIR = TEST_UTIL.getDataTestDir("TestStore").toString();
126
127
128
129
130
131
132 @Before
133 public void setUp() throws IOException {
134 qualifiers.add(qf1);
135 qualifiers.add(qf3);
136 qualifiers.add(qf5);
137
138 Iterator<byte[]> iter = qualifiers.iterator();
139 while(iter.hasNext()){
140 byte [] next = iter.next();
141 expected.add(new KeyValue(row, family, next, 1, (byte[])null));
142 get.addColumn(family, next);
143 }
144 }
145
146 private void init(String methodName) throws IOException {
147 init(methodName, TEST_UTIL.getConfiguration());
148 }
149
150 private void init(String methodName, Configuration conf)
151 throws IOException {
152 HColumnDescriptor hcd = new HColumnDescriptor(family);
153
154
155 hcd.setMaxVersions(4);
156 init(methodName, conf, hcd);
157 }
158
159 private void init(String methodName, Configuration conf,
160 HColumnDescriptor hcd) throws IOException {
161 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
162 init(methodName, conf, htd, hcd);
163 }
164
165 @SuppressWarnings("deprecation")
166 private Store init(String methodName, Configuration conf, HTableDescriptor htd,
167 HColumnDescriptor hcd) throws IOException {
168
169 Path basedir = new Path(DIR+methodName);
170 Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
171 final Path logdir = new Path(basedir, DefaultWALProvider.getWALDirectoryName(methodName));
172
173 FileSystem fs = FileSystem.get(conf);
174
175 fs.delete(logdir, true);
176
177 if (htd.hasFamily(hcd.getName())) {
178 htd.modifyFamily(hcd);
179 } else {
180 htd.addFamily(hcd);
181 }
182 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
183 final Configuration walConf = new Configuration(conf);
184 FSUtils.setRootDir(walConf, basedir);
185 final WALFactory wals = new WALFactory(walConf, null, methodName);
186 HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes()), fs, conf,
187 info, htd, null);
188
189 store = new HStore(region, hcd, conf);
190 return store;
191 }
192
193
194
195
196
197
198 @Test
199 public void testFlushSizeAccounting() throws Exception {
200 LOG.info("Setting up a faulty file system that cannot write in " +
201 this.name.getMethodName());
202 final Configuration conf = HBaseConfiguration.create();
203
204 conf.setInt("hbase.hstore.flush.retries.number", 1);
205 User user = User.createUserForTesting(conf, this.name.getMethodName(),
206 new String[]{"foo"});
207
208 conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
209 user.runAs(new PrivilegedExceptionAction<Object>() {
210 @Override
211 public Object run() throws Exception {
212
213 FileSystem fs = FileSystem.get(conf);
214 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
215 FaultyFileSystem ffs = (FaultyFileSystem)fs;
216
217
218 init(name.getMethodName(), conf);
219
220 long size = store.memstore.getFlushableSize();
221 Assert.assertEquals(0, size);
222 LOG.info("Adding some data");
223 long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)).getFirst();
224 size = store.memstore.getFlushableSize();
225 Assert.assertEquals(kvSize, size);
226
227 try {
228 LOG.info("Flushing");
229 flushStore(store, id++);
230 Assert.fail("Didn't bubble up IOE!");
231 } catch (IOException ioe) {
232 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
233 }
234 size = store.memstore.getFlushableSize();
235 Assert.assertEquals(kvSize, size);
236 store.add(new KeyValue(row, family, qf2, 2, (byte[])null));
237
238
239 Assert.assertEquals(kvSize, size);
240 ffs.fault.set(false);
241 flushStore(store, id++);
242 size = store.memstore.getFlushableSize();
243
244 Assert.assertEquals(kvSize, size);
245 flushStore(store, id++);
246 size = store.memstore.getFlushableSize();
247 Assert.assertEquals(0, size);
248 return null;
249 }
250 });
251 }
252
253
254
255
256
257 @Test
258 public void testCreateWriter() throws Exception {
259 Configuration conf = HBaseConfiguration.create();
260 FileSystem fs = FileSystem.get(conf);
261
262 HColumnDescriptor hcd = new HColumnDescriptor(family);
263 hcd.setCompressionType(Compression.Algorithm.GZ);
264 hcd.setDataBlockEncoding(DataBlockEncoding.DIFF);
265 init(name.getMethodName(), conf, hcd);
266
267
268 StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false);
269 Path path = writer.getPath();
270 writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
271 writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
272 writer.append(new KeyValue(row2, family, qf1, Bytes.toBytes(3)));
273 writer.append(new KeyValue(row2, family, qf2, Bytes.toBytes(4)));
274 writer.close();
275
276
277 HFile.Reader reader = HFile.createReader(fs, path, new CacheConfig(conf), conf);
278 Assert.assertEquals(hcd.getCompressionType(), reader.getCompressionAlgorithm());
279 Assert.assertEquals(hcd.getDataBlockEncoding(), reader.getDataBlockEncoding());
280 reader.close();
281 }
282
283 @Test
284 public void testDeleteExpiredStoreFiles() throws Exception {
285 testDeleteExpiredStoreFiles(0);
286 testDeleteExpiredStoreFiles(1);
287 }
288
289
290
291
292 public void testDeleteExpiredStoreFiles(int minVersions) throws Exception {
293 int storeFileNum = 4;
294 int ttl = 4;
295 IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
296 EnvironmentEdgeManagerTestHelper.injectEdge(edge);
297
298 Configuration conf = HBaseConfiguration.create();
299
300 conf.setBoolean("hbase.store.delete.expired.storefile", true);
301
302 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 5);
303
304 HColumnDescriptor hcd = new HColumnDescriptor(family);
305 hcd.setMinVersions(minVersions);
306 hcd.setTimeToLive(ttl);
307 init(name.getMethodName() + "-" + minVersions, conf, hcd);
308
309 long storeTtl = this.store.getScanInfo().getTtl();
310 long sleepTime = storeTtl / storeFileNum;
311 long timeStamp;
312
313
314 for (int i = 1; i <= storeFileNum; i++) {
315 LOG.info("Adding some data for the store file #" + i);
316 timeStamp = EnvironmentEdgeManager.currentTime();
317 this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null));
318 this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null));
319 this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null));
320 flush(i);
321 edge.incrementTime(sleepTime);
322 }
323
324
325 Assert.assertEquals(storeFileNum, this.store.getStorefiles().size());
326
327
328
329 for (int i = 1; i <= storeFileNum - 1; i++) {
330
331 assertNull(this.store.requestCompaction());
332 Collection<StoreFile> sfs = this.store.getStorefiles();
333
334 if (minVersions == 0) {
335 assertEquals(storeFileNum - i, sfs.size());
336
337 for (StoreFile sf : sfs) {
338 assertTrue(sf.getReader().getMaxTimestamp() >= (edge.currentTime() - storeTtl));
339 }
340 } else {
341 assertEquals(storeFileNum, sfs.size());
342 }
343
344 edge.incrementTime(sleepTime);
345 }
346 assertNull(this.store.requestCompaction());
347
348 Collection<StoreFile> sfs = this.store.getStorefiles();
349
350 if (minVersions == 0) {
351 assertEquals(1, sfs.size());
352 }
353 long ts = sfs.iterator().next().getReader().getMaxTimestamp();
354 assertTrue(ts < (edge.currentTime() - storeTtl));
355
356 for (StoreFile sf : sfs) {
357 sf.closeReader(true);
358 }
359 }
360
361 @Test
362 public void testLowestModificationTime() throws Exception {
363 Configuration conf = HBaseConfiguration.create();
364 FileSystem fs = FileSystem.get(conf);
365
366 init(name.getMethodName(), conf);
367
368 int storeFileNum = 4;
369 for (int i = 1; i <= storeFileNum; i++) {
370 LOG.info("Adding some data for the store file #"+i);
371 this.store.add(new KeyValue(row, family, qf1, i, (byte[])null));
372 this.store.add(new KeyValue(row, family, qf2, i, (byte[])null));
373 this.store.add(new KeyValue(row, family, qf3, i, (byte[])null));
374 flush(i);
375 }
376
377 long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
378 long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
379 Assert.assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
380
381
382 store.compact(store.requestCompaction(), NoLimitCompactionThroughputController.INSTANCE);
383 lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
384 lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
385 Assert.assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
386 }
387
388 private static long getLowestTimeStampFromFS(FileSystem fs,
389 final Collection<StoreFile> candidates) throws IOException {
390 long minTs = Long.MAX_VALUE;
391 if (candidates.isEmpty()) {
392 return minTs;
393 }
394 Path[] p = new Path[candidates.size()];
395 int i = 0;
396 for (StoreFile sf : candidates) {
397 p[i] = sf.getPath();
398 ++i;
399 }
400
401 FileStatus[] stats = fs.listStatus(p);
402 if (stats == null || stats.length == 0) {
403 return minTs;
404 }
405 for (FileStatus s : stats) {
406 minTs = Math.min(minTs, s.getModificationTime());
407 }
408 return minTs;
409 }
410
411
412
413
414
415 private static final int BLOCKSIZE_SMALL = 8192;
416
417
418
419
420 @Test
421 public void testEmptyStoreFile() throws IOException {
422 init(this.name.getMethodName());
423
424 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
425 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
426 flush(1);
427
428
429 StoreFile f = this.store.getStorefiles().iterator().next();
430 Path storedir = f.getPath().getParent();
431 long seqid = f.getMaxSequenceId();
432 Configuration c = HBaseConfiguration.create();
433 FileSystem fs = FileSystem.get(c);
434 HFileContext meta = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
435 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
436 fs)
437 .withOutputDir(storedir)
438 .withFileContext(meta)
439 .build();
440 w.appendMetadata(seqid + 1, false);
441 w.close();
442 this.store.close();
443
444 this.store = new HStore(this.store.getHRegion(), this.store.getFamily(), c);
445 Assert.assertEquals(2, this.store.getStorefilesCount());
446
447 result = HBaseTestingUtility.getFromStoreFile(store,
448 get.getRow(),
449 qualifiers);
450 Assert.assertEquals(1, result.size());
451 }
452
453
454
455
456
457 @Test
458 public void testGet_FromMemStoreOnly() throws IOException {
459 init(this.name.getMethodName());
460
461
462 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
463 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
464 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
465 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
466 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
467 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
468
469
470 result = HBaseTestingUtility.getFromStoreFile(store,
471 get.getRow(), qualifiers);
472
473
474 assertCheck();
475 }
476
477
478
479
480
481 @Test
482 public void testGet_FromFilesOnly() throws IOException {
483 init(this.name.getMethodName());
484
485
486 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
487 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
488
489 flush(1);
490
491
492 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
493 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
494
495 flush(2);
496
497
498 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
499 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
500
501 flush(3);
502
503
504 result = HBaseTestingUtility.getFromStoreFile(store,
505 get.getRow(),
506 qualifiers);
507
508
509
510 Collections.sort(result, KeyValue.COMPARATOR);
511
512
513 assertCheck();
514 }
515
516
517
518
519
520 @Test
521 public void testGet_FromMemStoreAndFiles() throws IOException {
522 init(this.name.getMethodName());
523
524
525 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
526 this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
527
528 flush(1);
529
530
531 this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
532 this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null));
533
534 flush(2);
535
536
537 this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null));
538 this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null));
539
540
541 result = HBaseTestingUtility.getFromStoreFile(store,
542 get.getRow(), qualifiers);
543
544
545 Collections.sort(result, KeyValue.COMPARATOR);
546
547
548 assertCheck();
549 }
550
551 private void flush(int storeFilessize) throws IOException{
552 this.store.snapshot();
553 flushStore(store, id++);
554 Assert.assertEquals(storeFilessize, this.store.getStorefiles().size());
555 Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size());
556 }
557
558 private void assertCheck() {
559 Assert.assertEquals(expected.size(), result.size());
560 for(int i=0; i<expected.size(); i++) {
561 Assert.assertEquals(expected.get(i), result.get(i));
562 }
563 }
564
565
566
567
568
569
570
571 @Test
572 public void testIncrementColumnValue_ICVDuringFlush()
573 throws IOException, InterruptedException {
574 init(this.name.getMethodName());
575
576 long oldValue = 1L;
577 long newValue = 3L;
578 this.store.add(new KeyValue(row, family, qf1,
579 System.currentTimeMillis(),
580 Bytes.toBytes(oldValue)));
581
582
583 this.store.snapshot();
584
585
586 this.store.add(new KeyValue(row, family, qf2,
587 System.currentTimeMillis(),
588 Bytes.toBytes(oldValue)));
589
590
591 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
592
593
594 Assert.assertTrue(ret > 0);
595
596
597 flushStore(store, id++);
598 Assert.assertEquals(1, this.store.getStorefiles().size());
599
600 Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size());
601
602
603 Get get = new Get(row);
604 get.addColumn(family, qf1);
605 get.setMaxVersions();
606 List<Cell> results = new ArrayList<Cell>();
607
608 results = HBaseTestingUtility.getFromStoreFile(store, get);
609 Assert.assertEquals(2, results.size());
610
611 long ts1 = results.get(0).getTimestamp();
612 long ts2 = results.get(1).getTimestamp();
613
614 Assert.assertTrue(ts1 > ts2);
615
616 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
617 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
618 }
619
620 @After
621 public void tearDown() throws Exception {
622 EnvironmentEdgeManagerTestHelper.reset();
623 }
624
625 @Test
626 public void testICV_negMemstoreSize() throws IOException {
627 init(this.name.getMethodName());
628
629 long time = 100;
630 ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
631 ee.setValue(time);
632 EnvironmentEdgeManagerTestHelper.injectEdge(ee);
633 long newValue = 3L;
634 long size = 0;
635
636
637 size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1,
638 System.currentTimeMillis(),
639 Bytes.toBytes(newValue))).getFirst();
640 size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1,
641 System.currentTimeMillis(),
642 Bytes.toBytes(newValue))).getFirst();
643 size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1,
644 System.currentTimeMillis(),
645 Bytes.toBytes(newValue))).getFirst();
646 size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1,
647 System.currentTimeMillis(),
648 Bytes.toBytes(newValue))).getFirst();
649 size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1,
650 System.currentTimeMillis(),
651 Bytes.toBytes(newValue))).getFirst();
652
653
654 for ( int i = 0 ; i < 10000 ; ++i) {
655 newValue++;
656
657 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
658 long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue);
659
660 if (ret != 0) System.out.println("ret: " + ret);
661 if (ret2 != 0) System.out.println("ret2: " + ret2);
662
663 Assert.assertTrue("ret: " + ret, ret >= 0);
664 size += ret;
665 Assert.assertTrue("ret2: " + ret2, ret2 >= 0);
666 size += ret2;
667
668
669 if (i % 1000 == 0)
670 ee.setValue(++time);
671 }
672
673 long computedSize=0;
674 for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) {
675 long kvsize = DefaultMemStore.heapSizeChange(cell, true);
676
677 computedSize += kvsize;
678 }
679 Assert.assertEquals(computedSize, size);
680 }
681
682 @Test
683 public void testIncrementColumnValue_SnapshotFlushCombo() throws Exception {
684 ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
685 EnvironmentEdgeManagerTestHelper.injectEdge(mee);
686 init(this.name.getMethodName());
687
688 long oldValue = 1L;
689 long newValue = 3L;
690 this.store.add(new KeyValue(row, family, qf1,
691 EnvironmentEdgeManager.currentTime(),
692 Bytes.toBytes(oldValue)));
693
694
695 this.store.snapshot();
696
697
698 long ret = this.store.updateColumnValue(row, family, qf1, newValue);
699
700
701 Assert.assertTrue(ret > 0);
702
703
704 flushStore(store, id++);
705 Assert.assertEquals(1, this.store.getStorefiles().size());
706 Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size());
707
708
709 newValue += 1;
710 this.store.updateColumnValue(row, family, qf1, newValue);
711
712
713 newValue += 1;
714 this.store.updateColumnValue(row, family, qf1, newValue);
715
716
717
718
719
720 Get get = new Get(row);
721 get.addColumn(family, qf1);
722 get.setMaxVersions();
723 List<Cell> results = new ArrayList<Cell>();
724
725 results = HBaseTestingUtility.getFromStoreFile(store, get);
726 Assert.assertEquals(2, results.size());
727
728 long ts1 = results.get(0).getTimestamp();
729 long ts2 = results.get(1).getTimestamp();
730
731 Assert.assertTrue(ts1 > ts2);
732 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
733 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
734
735 mee.setValue(2);
736 newValue += 1;
737 this.store.updateColumnValue(row, family, qf1, newValue);
738
739 results = HBaseTestingUtility.getFromStoreFile(store, get);
740 Assert.assertEquals(2, results.size());
741
742 ts1 = results.get(0).getTimestamp();
743 ts2 = results.get(1).getTimestamp();
744
745 Assert.assertTrue(ts1 > ts2);
746 Assert.assertEquals(newValue, Bytes.toLong(CellUtil.cloneValue(results.get(0))));
747 Assert.assertEquals(oldValue, Bytes.toLong(CellUtil.cloneValue(results.get(1))));
748 }
749
750 @Test
751 public void testHandleErrorsInFlush() throws Exception {
752 LOG.info("Setting up a faulty file system that cannot write");
753
754 final Configuration conf = HBaseConfiguration.create();
755 User user = User.createUserForTesting(conf,
756 "testhandleerrorsinflush", new String[]{"foo"});
757
758 conf.setClass("fs.file.impl", FaultyFileSystem.class,
759 FileSystem.class);
760 user.runAs(new PrivilegedExceptionAction<Object>() {
761 @Override
762 public Object run() throws Exception {
763
764 FileSystem fs = FileSystem.get(conf);
765 Assert.assertEquals(FaultyFileSystem.class, fs.getClass());
766
767
768 init(name.getMethodName(), conf);
769
770 LOG.info("Adding some data");
771 store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
772 store.add(new KeyValue(row, family, qf2, 1, (byte[])null));
773 store.add(new KeyValue(row, family, qf3, 1, (byte[])null));
774
775 LOG.info("Before flush, we should have no files");
776
777 Collection<StoreFileInfo> files =
778 store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
779 Assert.assertEquals(0, files != null ? files.size() : 0);
780
781
782 try {
783 LOG.info("Flushing");
784 flush(1);
785 Assert.fail("Didn't bubble up IOE!");
786 } catch (IOException ioe) {
787 Assert.assertTrue(ioe.getMessage().contains("Fault injected"));
788 }
789
790 LOG.info("After failed flush, we should still have no files!");
791 files = store.getRegionFileSystem().getStoreFiles(store.getColumnFamilyName());
792 Assert.assertEquals(0, files != null ? files.size() : 0);
793 store.getHRegion().getWAL().close();
794 return null;
795 }
796 });
797 FileSystem.closeAllForUGI(user.getUGI());
798 }
799
800
801
802
803
804 static class FaultyFileSystem extends FilterFileSystem {
805 List<SoftReference<FaultyOutputStream>> outStreams =
806 new ArrayList<SoftReference<FaultyOutputStream>>();
807 private long faultPos = 200;
808 AtomicBoolean fault = new AtomicBoolean(true);
809
810 public FaultyFileSystem() {
811 super(new LocalFileSystem());
812 System.err.println("Creating faulty!");
813 }
814
815 @Override
816 public FSDataOutputStream create(Path p) throws IOException {
817 return new FaultyOutputStream(super.create(p), faultPos, fault);
818 }
819
820 @Override
821 public FSDataOutputStream create(Path f, FsPermission permission,
822 boolean overwrite, int bufferSize, short replication, long blockSize,
823 Progressable progress) throws IOException {
824 return new FaultyOutputStream(super.create(f, permission,
825 overwrite, bufferSize, replication, blockSize, progress), faultPos, fault);
826 }
827
828 @Override
829 public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
830 int bufferSize, short replication, long blockSize, Progressable progress)
831 throws IOException {
832
833
834 return create(f, overwrite, bufferSize, replication, blockSize, progress);
835 }
836 }
837
838 static class FaultyOutputStream extends FSDataOutputStream {
839 volatile long faultPos = Long.MAX_VALUE;
840 private final AtomicBoolean fault;
841
842 public FaultyOutputStream(FSDataOutputStream out, long faultPos, final AtomicBoolean fault)
843 throws IOException {
844 super(out, null);
845 this.faultPos = faultPos;
846 this.fault = fault;
847 }
848
849 @Override
850 public void write(byte[] buf, int offset, int length) throws IOException {
851 System.err.println("faulty stream write at pos " + getPos());
852 injectFault();
853 super.write(buf, offset, length);
854 }
855
856 private void injectFault() throws IOException {
857 if (this.fault.get() && getPos() >= faultPos) {
858 throw new IOException("Fault injected");
859 }
860 }
861 }
862
863 private static void flushStore(HStore store, long id) throws IOException {
864 StoreFlushContext storeFlushCtx = store.createFlushContext(id);
865 storeFlushCtx.prepare();
866 storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
867 storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
868 }
869
870
871
872
873
874
875
876
877
878 List<Cell> getKeyValueSet(long[] timestamps, int numRows,
879 byte[] qualifier, byte[] family) {
880 List<Cell> kvList = new ArrayList<Cell>();
881 for (int i=1;i<=numRows;i++) {
882 byte[] b = Bytes.toBytes(i);
883 for (long timestamp: timestamps) {
884 kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
885 }
886 }
887 return kvList;
888 }
889
890
891
892
893
894 @Test
895 public void testMultipleTimestamps() throws IOException {
896 int numRows = 1;
897 long[] timestamps1 = new long[] {1,5,10,20};
898 long[] timestamps2 = new long[] {30,80};
899
900 init(this.name.getMethodName());
901
902 List<Cell> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
903 for (Cell kv : kvList1) {
904 this.store.add(KeyValueUtil.ensureKeyValue(kv));
905 }
906
907 this.store.snapshot();
908 flushStore(store, id++);
909
910 List<Cell> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
911 for(Cell kv : kvList2) {
912 this.store.add(KeyValueUtil.ensureKeyValue(kv));
913 }
914
915 List<Cell> result;
916 Get get = new Get(Bytes.toBytes(1));
917 get.addColumn(family,qf1);
918
919 get.setTimeRange(0,15);
920 result = HBaseTestingUtility.getFromStoreFile(store, get);
921 Assert.assertTrue(result.size()>0);
922
923 get.setTimeRange(40,90);
924 result = HBaseTestingUtility.getFromStoreFile(store, get);
925 Assert.assertTrue(result.size()>0);
926
927 get.setTimeRange(10,45);
928 result = HBaseTestingUtility.getFromStoreFile(store, get);
929 Assert.assertTrue(result.size()>0);
930
931 get.setTimeRange(80,145);
932 result = HBaseTestingUtility.getFromStoreFile(store, get);
933 Assert.assertTrue(result.size()>0);
934
935 get.setTimeRange(1,2);
936 result = HBaseTestingUtility.getFromStoreFile(store, get);
937 Assert.assertTrue(result.size()>0);
938
939 get.setTimeRange(90,200);
940 result = HBaseTestingUtility.getFromStoreFile(store, get);
941 Assert.assertTrue(result.size()==0);
942 }
943
944
945
946
947
948
949 @Test
950 public void testSplitWithEmptyColFam() throws IOException {
951 init(this.name.getMethodName());
952 Assert.assertNull(store.getSplitPoint());
953 store.getHRegion().forceSplit(null);
954 Assert.assertNull(store.getSplitPoint());
955 store.getHRegion().clearSplit();
956 }
957
958 @Test
959 public void testStoreUsesConfigurationFromHcdAndHtd() throws Exception {
960 final String CONFIG_KEY = "hbase.regionserver.thread.compaction.throttle";
961 long anyValue = 10;
962
963
964
965
966 Configuration conf = HBaseConfiguration.create();
967 conf.setLong(CONFIG_KEY, anyValue);
968 init(name.getMethodName() + "-xml", conf);
969 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
970 Assert.assertFalse(store.throttleCompaction(anyValue));
971
972
973 --anyValue;
974 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
975 HColumnDescriptor hcd = new HColumnDescriptor(family);
976 htd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
977 init(name.getMethodName() + "-htd", conf, htd, hcd);
978 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
979 Assert.assertFalse(store.throttleCompaction(anyValue));
980
981
982 --anyValue;
983 hcd.setConfiguration(CONFIG_KEY, Long.toString(anyValue));
984 init(name.getMethodName() + "-hcd", conf, htd, hcd);
985 Assert.assertTrue(store.throttleCompaction(anyValue + 1));
986 Assert.assertFalse(store.throttleCompaction(anyValue));
987 }
988
989 public static class DummyStoreEngine extends DefaultStoreEngine {
990 public static DefaultCompactor lastCreatedCompactor = null;
991 @Override
992 protected void createComponents(
993 Configuration conf, Store store, KVComparator comparator) throws IOException {
994 super.createComponents(conf, store, comparator);
995 lastCreatedCompactor = this.compactor;
996 }
997 }
998
999 @Test
1000 public void testStoreUsesSearchEngineOverride() throws Exception {
1001 Configuration conf = HBaseConfiguration.create();
1002 conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DummyStoreEngine.class.getName());
1003 init(this.name.getMethodName(), conf);
1004 Assert.assertEquals(DummyStoreEngine.lastCreatedCompactor,
1005 this.store.storeEngine.getCompactor());
1006 }
1007
1008 private void addStoreFile() throws IOException {
1009 StoreFile f = this.store.getStorefiles().iterator().next();
1010 Path storedir = f.getPath().getParent();
1011 long seqid = this.store.getMaxSequenceId();
1012 Configuration c = TEST_UTIL.getConfiguration();
1013 FileSystem fs = FileSystem.get(c);
1014 HFileContext fileContext = new HFileContextBuilder().withBlockSize(BLOCKSIZE_SMALL).build();
1015 StoreFile.Writer w = new StoreFile.WriterBuilder(c, new CacheConfig(c),
1016 fs)
1017 .withOutputDir(storedir)
1018 .withFileContext(fileContext)
1019 .build();
1020 w.appendMetadata(seqid + 1, false);
1021 w.close();
1022 LOG.info("Added store file:" + w.getPath());
1023 }
1024
1025 private void archiveStoreFile(int index) throws IOException {
1026 Collection<StoreFile> files = this.store.getStorefiles();
1027 StoreFile sf = null;
1028 Iterator<StoreFile> it = files.iterator();
1029 for (int i = 0; i <= index; i++) {
1030 sf = it.next();
1031 }
1032 store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
1033 }
1034
1035 @Test
1036 public void testRefreshStoreFiles() throws Exception {
1037 init(name.getMethodName());
1038
1039 assertEquals(0, this.store.getStorefilesCount());
1040
1041
1042 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1043 flush(1);
1044 assertEquals(1, this.store.getStorefilesCount());
1045
1046
1047 addStoreFile();
1048
1049 assertEquals(1, this.store.getStorefilesCount());
1050 store.refreshStoreFiles();
1051 assertEquals(2, this.store.getStorefilesCount());
1052
1053
1054 addStoreFile();
1055 addStoreFile();
1056 addStoreFile();
1057
1058 assertEquals(2, this.store.getStorefilesCount());
1059 store.refreshStoreFiles();
1060 assertEquals(5, this.store.getStorefilesCount());
1061
1062 archiveStoreFile(0);
1063
1064 assertEquals(5, this.store.getStorefilesCount());
1065 store.refreshStoreFiles();
1066 assertEquals(4, this.store.getStorefilesCount());
1067
1068 archiveStoreFile(0);
1069 archiveStoreFile(1);
1070 archiveStoreFile(2);
1071
1072 assertEquals(4, this.store.getStorefilesCount());
1073 store.refreshStoreFiles();
1074 assertEquals(1, this.store.getStorefilesCount());
1075
1076 archiveStoreFile(0);
1077 store.refreshStoreFiles();
1078 assertEquals(0, this.store.getStorefilesCount());
1079 }
1080
1081 @SuppressWarnings("unchecked")
1082 @Test
1083 public void testRefreshStoreFilesNotChanged() throws IOException {
1084 init(name.getMethodName());
1085
1086 assertEquals(0, this.store.getStorefilesCount());
1087
1088
1089 this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null));
1090 flush(1);
1091
1092 addStoreFile();
1093
1094 HStore spiedStore = spy(store);
1095
1096
1097 spiedStore.refreshStoreFiles();
1098 assertEquals(2, this.store.getStorefilesCount());
1099 verify(spiedStore, times(1)).replaceStoreFiles(any(Collection.class), any(Collection.class));
1100
1101
1102 spiedStore.refreshStoreFiles();
1103
1104
1105 verify(spiedStore, times(0)).replaceStoreFiles(null, null);
1106 }
1107 }