1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.compactions;
19
20 import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
21 import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
24 import static org.mockito.Matchers.any;
25 import static org.mockito.Matchers.anyBoolean;
26 import static org.mockito.Matchers.anyLong;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.when;
29
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.List;
34
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.KeyValue.KVComparator;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.io.compress.Compression;
45 import org.apache.hadoop.hbase.regionserver.InternalScanner;
46 import org.apache.hadoop.hbase.regionserver.ScanInfo;
47 import org.apache.hadoop.hbase.regionserver.ScanType;
48 import org.apache.hadoop.hbase.regionserver.Store;
49 import org.apache.hadoop.hbase.regionserver.StoreFile;
50 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
51 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
52 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
53 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
54 import org.apache.hadoop.hbase.testclassification.SmallTests;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
60 import org.junit.runners.Parameterized.Parameter;
61 import org.junit.runners.Parameterized.Parameters;
62
63 @RunWith(Parameterized.class)
64 @Category({ SmallTests.class })
65 public class TestDateTieredCompactor {
66
67 private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
68
69 private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
70
71 private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
72
73 private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
74
75 private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
76
77 private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
78
79 @Parameters(name = "{index}: usePrivateReaders={0}")
80 public static Iterable<Object[]> data() {
81 return Arrays.asList(new Object[] { true }, new Object[] { false });
82 }
83
84 @Parameter
85 public boolean usePrivateReaders;
86
87 private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
88 final KeyValue[] input, List<StoreFile> storefiles) throws Exception {
89 Configuration conf = HBaseConfiguration.create();
90 conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
91 final Scanner scanner = new Scanner(input);
92
93 HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
94 ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
95 final Store store = mock(Store.class);
96 when(store.getStorefiles()).thenReturn(storefiles);
97 when(store.getFamily()).thenReturn(col);
98 when(store.getScanInfo()).thenReturn(si);
99 when(store.areWritesEnabled()).thenReturn(true);
100 when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
101 when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
102 when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
103 anyBoolean(), anyBoolean())).thenAnswer(writers);
104 when(store.getComparator()).thenReturn(new KVComparator());
105 long maxSequenceId = StoreFile.getMaxSequenceIdInList(storefiles);
106 when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
107
108 return new DateTieredCompactor(conf, store) {
109 @Override
110 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
111 long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
112 byte[] dropDeletesToRow) throws IOException {
113 return scanner;
114 }
115
116 @Override
117 protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
118 ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
119 return scanner;
120 }
121 };
122 }
123
124 private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
125 boolean allFiles) throws Exception {
126 StoreFileWritersCapture writers = new StoreFileWritersCapture();
127 StoreFile sf1 = createDummyStoreFile(1L);
128 StoreFile sf2 = createDummyStoreFile(2L);
129 DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
130 List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)),
131 boundaries.subList(0, boundaries.size() - 1), NoLimitCompactionThroughputController.INSTANCE,
132 null);
133 writers.verifyKvs(output, allFiles, boundaries);
134 if (allFiles) {
135 assertEquals(output.length, paths.size());
136 }
137 }
138
139 @SuppressWarnings("unchecked")
140 private static <T> T[] a(T... a) {
141 return a;
142 }
143
144 @Test
145 public void test() throws Exception {
146 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
147 a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
148 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
149 a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
150 verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
151 new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
152 }
153
154 @Test
155 public void testEmptyOutputFile() throws Exception {
156 StoreFileWritersCapture writers = new StoreFileWritersCapture();
157 CompactionRequest request = createDummyRequest();
158 DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
159 new ArrayList<StoreFile>(request.getFiles()));
160 List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
161 NoLimitCompactionThroughputController.INSTANCE, null);
162 assertEquals(1, paths.size());
163 List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
164 assertEquals(1, dummyWriters.size());
165 StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
166 assertTrue(dummyWriter.kvs.isEmpty());
167 assertTrue(dummyWriter.hasMetadata);
168 }
169 }