1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FSDataOutputStream;
23 import org.apache.hadoop.fs.Path;
24 import org.apache.hadoop.hbase.CellUtil;
25 import org.apache.hadoop.hbase.DoNotRetryIOException;
26 import org.apache.hadoop.hbase.HBaseConfiguration;
27 import org.apache.hadoop.hbase.HColumnDescriptor;
28 import org.apache.hadoop.hbase.HRegionInfo;
29 import org.apache.hadoop.hbase.HTableDescriptor;
30 import org.apache.hadoop.hbase.KeyValue;
31 import org.apache.hadoop.hbase.testclassification.SmallTests;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.io.hfile.HFile;
34 import org.apache.hadoop.hbase.io.hfile.HFileContext;
35 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
36 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
37 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
38 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.Pair;
41 import org.apache.hadoop.hbase.wal.WAL;
42 import org.apache.hadoop.hbase.wal.WALKey;
43 import org.hamcrest.Description;
44 import org.hamcrest.Matcher;
45 import org.hamcrest.TypeSafeMatcher;
46 import org.jmock.Expectations;
47 import org.jmock.integration.junit4.JUnitRuleMockery;
48 import org.jmock.lib.concurrent.Synchroniser;
49 import org.junit.Before;
50 import org.junit.ClassRule;
51 import org.junit.Rule;
52 import org.junit.Test;
53 import org.junit.experimental.categories.Category;
54 import org.junit.rules.TemporaryFolder;
55 import org.junit.rules.TestName;
56
57 import java.io.File;
58 import java.io.FileNotFoundException;
59 import java.io.FileOutputStream;
60 import java.io.IOException;
61 import java.util.ArrayList;
62 import java.util.Arrays;
63 import java.util.List;
64 import java.util.Random;
65 import java.util.concurrent.atomic.AtomicLong;
66
67 import static java.util.Arrays.asList;
68 import static org.junit.Assert.assertEquals;
69 import static org.junit.Assert.assertNotNull;
70 import static org.junit.Assert.assertTrue;
71
72
73
74
75 @Category(SmallTests.class)
76 public class TestBulkLoad {
77
78 @ClassRule
79 public static TemporaryFolder testFolder = new TemporaryFolder();
80 @Rule
81 public final JUnitRuleMockery context = new JUnitRuleMockery() {{
82 setThreadingPolicy(new Synchroniser());
83 }};
84 private final WAL log = context.mock(WAL.class);
85 private final Configuration conf = HBaseConfiguration.create();
86 private final Random random = new Random();
87 private final byte[] randomBytes = new byte[100];
88 private final byte[] family1 = Bytes.toBytes("family1");
89 private final byte[] family2 = Bytes.toBytes("family2");
90 private final Expectations callOnce;
91 @Rule
92 public TestName name = new TestName();
93
94 public TestBulkLoad() throws IOException {
95 callOnce = new Expectations() {
96 {
97 oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
98 with(any(WALKey.class)), with(bulkLogWalEditType(WALEdit.BULK_LOAD)),
99 with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
100 will(returnValue(0l));
101 oneOf(log).sync(with(any(long.class)));
102 }
103 };
104 }
105
106 @Before
107 public void before() throws IOException {
108 random.nextBytes(randomBytes);
109 }
110
111 @Test
112 public void verifyBulkLoadEvent() throws IOException {
113 TableName tableName = TableName.valueOf("test", "test");
114 List<Pair<byte[], String>> familyPaths = withFamilyPathsFor(family1);
115 byte[] familyName = familyPaths.get(0).getFirst();
116 String storeFileName = familyPaths.get(0).getSecond();
117 storeFileName = (new Path(storeFileName)).getName();
118 List<String> storeFileNames = new ArrayList<String>();
119 storeFileNames.add(storeFileName);
120 final Matcher<WALEdit> bulkEventMatcher = bulkLogWalEdit(WALEdit.BULK_LOAD,
121 tableName.toBytes(), familyName, storeFileNames);
122 Expectations expection = new Expectations() {
123 {
124 oneOf(log).append(with(any(HTableDescriptor.class)), with(any(HRegionInfo.class)),
125 with(any(WALKey.class)), with(bulkEventMatcher),
126 with(any(AtomicLong.class)), with(any(boolean.class)), with(any(List.class)));
127 will(returnValue(0l));
128 oneOf(log).sync(with(any(long.class)));
129 }
130 };
131 context.checking(expection);
132 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
133 .bulkLoadHFiles(familyPaths, false, null);
134 }
135
136 @Test
137 public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
138 testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
139 false, null);
140 }
141
142 @Test
143 public void shouldBulkLoadSingleFamilyHLog() throws IOException {
144 context.checking(callOnce);
145 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
146 }
147
148 @Test
149 public void shouldBulkLoadManyFamilyHLog() throws IOException {
150 context.checking(callOnce);
151 testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
152 false, null);
153 }
154
155 @Test
156 public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
157 context.checking(callOnce);
158 TableName tableName = TableName.valueOf("test", "test");
159 testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
160 .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
161 }
162
163 @Test(expected = DoNotRetryIOException.class)
164 public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
165 testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
166 null);
167 }
168
169 @Test(expected = DoNotRetryIOException.class)
170 public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
171 throws IOException {
172 testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
173 }
174
175 @Test(expected = DoNotRetryIOException.class)
176 public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
177 testRegionWithFamilies()
178 .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
179 false, null);
180 }
181
182 @Test(expected = FileNotFoundException.class)
183 public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
184 List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
185 testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
186 }
187
188 private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
189 return new Pair<byte[], String>(family, "/tmp/does_not_exist");
190 }
191
192 private Pair<byte[], String> withInvalidColumnFamilyButProperHFileLocation(byte[] family)
193 throws IOException {
194 createHFileForFamilies(family);
195 return new Pair<byte[], String>(new byte[]{0x00, 0x01, 0x02}, "/tmp/does_not_exist");
196 }
197
198
199 private HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
200 byte[]... families)
201 throws IOException {
202 HRegionInfo hRegionInfo = new HRegionInfo(tableName);
203 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
204 for (byte[] family : families) {
205 hTableDescriptor.addFamily(new HColumnDescriptor(family));
206 }
207
208
209 return HRegion.createHRegion(hRegionInfo,
210 new Path(testFolder.newFolder().toURI()),
211 conf,
212 hTableDescriptor,
213 log);
214
215 }
216
217 private HRegion testRegionWithFamilies(byte[]... families) throws IOException {
218 TableName tableName = TableName.valueOf(name.getMethodName());
219 return testRegionWithFamiliesAndSpecifiedTableName(tableName, families);
220 }
221
222 private List<Pair<byte[], String>> getBlankFamilyPaths(){
223 return new ArrayList<Pair<byte[], String>>();
224 }
225
226 private List<Pair<byte[], String>> withFamilyPathsFor(byte[]... families) throws IOException {
227 List<Pair<byte[], String>> familyPaths = getBlankFamilyPaths();
228 for (byte[] family : families) {
229 familyPaths.add(new Pair<byte[], String>(family, createHFileForFamilies(family)));
230 }
231 return familyPaths;
232 }
233
234 private String createHFileForFamilies(byte[] family) throws IOException {
235 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(conf);
236
237 File hFileLocation = testFolder.newFile();
238 FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
239 try {
240 hFileFactory.withOutputStream(out);
241 hFileFactory.withFileContext(new HFileContext());
242 HFile.Writer writer = hFileFactory.create();
243 try {
244 writer.append(new KeyValue(CellUtil.createCell(randomBytes,
245 family,
246 randomBytes,
247 0l,
248 KeyValue.Type.Put.getCode(),
249 randomBytes)));
250 } finally {
251 writer.close();
252 }
253 } finally {
254 out.close();
255 }
256 return hFileLocation.getAbsoluteFile().getAbsolutePath();
257 }
258
259 private static Matcher<WALEdit> bulkLogWalEditType(byte[] typeBytes) {
260 return new WalMatcher(typeBytes);
261 }
262
263 private static Matcher<WALEdit> bulkLogWalEdit(byte[] typeBytes, byte[] tableName,
264 byte[] familyName, List<String> storeFileNames) {
265 return new WalMatcher(typeBytes, tableName, familyName, storeFileNames);
266 }
267
268 private static class WalMatcher extends TypeSafeMatcher<WALEdit> {
269 private final byte[] typeBytes;
270 private final byte[] tableName;
271 private final byte[] familyName;
272 private final List<String> storeFileNames;
273
274 public WalMatcher(byte[] typeBytes) {
275 this(typeBytes, null, null, null);
276 }
277
278 public WalMatcher(byte[] typeBytes, byte[] tableName, byte[] familyName,
279 List<String> storeFileNames) {
280 this.typeBytes = typeBytes;
281 this.tableName = tableName;
282 this.familyName = familyName;
283 this.storeFileNames = storeFileNames;
284 }
285
286 @Override
287 protected boolean matchesSafely(WALEdit item) {
288 assertTrue(Arrays.equals(item.getCells().get(0).getQualifier(), typeBytes));
289 BulkLoadDescriptor desc;
290 try {
291 desc = WALEdit.getBulkLoadDescriptor(item.getCells().get(0));
292 } catch (IOException e) {
293 return false;
294 }
295 assertNotNull(desc);
296
297 if (tableName != null) {
298 assertTrue(Bytes.equals(ProtobufUtil.toTableName(desc.getTableName()).getName(),
299 tableName));
300 }
301
302 if(storeFileNames != null) {
303 int index=0;
304 StoreDescriptor store = desc.getStores(0);
305 assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName));
306 assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName));
307 assertEquals(storeFileNames.size(), store.getStoreFileCount());
308 }
309
310 return true;
311 }
312
313 @Override
314 public void describeTo(Description description) {
315
316 }
317 }
318 }