1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.hamcrest.core.Is.is;
21 import static org.junit.Assert.assertThat;
22
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
44 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
45 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46 import org.apache.hadoop.hbase.testclassification.LargeTests;
47 import org.apache.hadoop.hbase.KeyValueUtil;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
49 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
50 import org.apache.hadoop.hbase.TableExistsException;
51 import org.apache.hadoop.hbase.TableName;
52 import org.apache.hadoop.hbase.client.HConnection;
53 import org.apache.hadoop.hbase.client.RegionServerCallable;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.ResultScanner;
56 import org.apache.hadoop.hbase.client.RpcRetryingCaller;
57 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
58 import org.apache.hadoop.hbase.client.Scan;
59 import org.apache.hadoop.hbase.io.compress.Compression;
60 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
61 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
62 import org.apache.hadoop.hbase.io.hfile.HFile;
63 import org.apache.hadoop.hbase.io.hfile.HFileContext;
64 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
65 import org.apache.hadoop.hbase.protobuf.RequestConverter;
66 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
67 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
68 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
69 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
70 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.util.Pair;
73 import org.apache.hadoop.hbase.wal.WAL;
74 import org.apache.hadoop.hbase.wal.WALKey;
75 import org.junit.BeforeClass;
76 import org.junit.Test;
77 import org.junit.experimental.categories.Category;
78 import org.junit.runner.RunWith;
79 import org.junit.runners.Parameterized;
80 import org.junit.runners.Parameterized.Parameters;
81
82 import com.google.common.collect.Lists;
83
84
85
86
87
88 @RunWith(Parameterized.class)
89 @Category(LargeTests.class)
90 public class TestHRegionServerBulkLoad {
91 final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
92 private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
93 private final static Configuration conf = UTIL.getConfiguration();
94 private final static byte[] QUAL = Bytes.toBytes("qual");
95 private final static int NUM_CFS = 10;
96 private int sleepDuration;
97 public static int BLOCKSIZE = 64 * 1024;
98 public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
99
100 private final static byte[][] families = new byte[NUM_CFS][];
101 static {
102 for (int i = 0; i < NUM_CFS; i++) {
103 families[i] = Bytes.toBytes(family(i));
104 }
105 }
106 @Parameters
107 public static final Collection<Object[]> parameters() {
108 int[] sleepDurations = new int[] { 0, 30000 };
109 List<Object[]> configurations = new ArrayList<Object[]>();
110 for (int i : sleepDurations) {
111 configurations.add(new Object[] { i });
112 }
113 return configurations;
114 }
115
116 public TestHRegionServerBulkLoad(int duration) {
117 this.sleepDuration = duration;
118 }
119
120 @BeforeClass
121 public static void setUpBeforeClass() throws Exception {
122 conf.setInt("hbase.rpc.timeout", 10 * 1000);
123 }
124
125
126
127
128
129 public static byte[] rowkey(int i) {
130 return Bytes.toBytes(String.format("row_%08d", i));
131 }
132
133 static String family(int i) {
134 return String.format("family_%04d", i);
135 }
136
137
138
139
140 public static void createHFile(FileSystem fs, Path path, byte[] family,
141 byte[] qualifier, byte[] value, int numRows) throws IOException {
142 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
143 .withCompression(COMPRESSION)
144 .build();
145 HFile.Writer writer = HFile
146 .getWriterFactory(conf, new CacheConfig(conf))
147 .withPath(fs, path)
148 .withFileContext(context)
149 .create();
150 long now = System.currentTimeMillis();
151 try {
152
153 for (int i = 0; i < numRows; i++) {
154 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
155 writer.append(kv);
156 }
157 writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
158 } finally {
159 writer.close();
160 }
161 }
162
163
164
165
166
167
168
169
170
171 public static class AtomicHFileLoader extends RepeatingTestThread {
172 final AtomicLong numBulkLoads = new AtomicLong();
173 final AtomicLong numCompactions = new AtomicLong();
174 private TableName tableName;
175
176 public AtomicHFileLoader(TableName tableName, TestContext ctx,
177 byte targetFamilies[][]) throws IOException {
178 super(ctx);
179 this.tableName = tableName;
180 }
181
182 public void doAnAction() throws Exception {
183 long iteration = numBulkLoads.getAndIncrement();
184 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
185 iteration));
186
187
188 FileSystem fs = UTIL.getTestFileSystem();
189 byte[] val = Bytes.toBytes(String.format("%010d", iteration));
190 final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
191 NUM_CFS);
192 for (int i = 0; i < NUM_CFS; i++) {
193 Path hfile = new Path(dir, family(i));
194 byte[] fam = Bytes.toBytes(family(i));
195 createHFile(fs, hfile, fam, QUAL, val, 1000);
196 famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
197 }
198
199
200 final HConnection conn = UTIL.getHBaseAdmin().getConnection();
201 RegionServerCallable<Void> callable =
202 new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
203 @Override
204 public Void call(int callTimeout) throws Exception {
205 LOG.debug("Going to connect to server " + getLocation() + " for row "
206 + Bytes.toStringBinary(getRow()));
207 byte[] regionName = getLocation().getRegionInfo().getRegionName();
208 BulkLoadHFileRequest request =
209 RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
210 getStub().bulkLoadHFile(null, request);
211 return null;
212 }
213 };
214 RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
215 RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
216 caller.callWithRetries(callable, Integer.MAX_VALUE);
217
218
219 if (numBulkLoads.get() % 5 == 0) {
220
221 callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
222 @Override
223 public Void call(int callTimeout) throws Exception {
224 LOG.debug("compacting " + getLocation() + " for row "
225 + Bytes.toStringBinary(getRow()));
226 AdminProtos.AdminService.BlockingInterface server =
227 conn.getAdmin(getLocation().getServerName());
228 CompactRegionRequest request =
229 RequestConverter.buildCompactRegionRequest(
230 getLocation().getRegionInfo().getRegionName(), true, null);
231 server.compactRegion(null, request);
232 numCompactions.incrementAndGet();
233 return null;
234 }
235 };
236 caller.callWithRetries(callable, Integer.MAX_VALUE);
237 }
238 }
239 }
240
241 public static class MyObserver extends BaseRegionObserver {
242 static int sleepDuration;
243 @Override
244 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
245 final Store store, final InternalScanner scanner, final ScanType scanType)
246 throws IOException {
247 try {
248 Thread.sleep(sleepDuration);
249 } catch (InterruptedException ie) {
250 IOException ioe = new InterruptedIOException();
251 ioe.initCause(ie);
252 throw ioe;
253 }
254 return scanner;
255 }
256 }
257
258
259
260
261
262 public static class AtomicScanReader extends RepeatingTestThread {
263 byte targetFamilies[][];
264 HTable table;
265 AtomicLong numScans = new AtomicLong();
266 AtomicLong numRowsScanned = new AtomicLong();
267 TableName TABLE_NAME;
268
269 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
270 byte targetFamilies[][]) throws IOException {
271 super(ctx);
272 this.TABLE_NAME = TABLE_NAME;
273 this.targetFamilies = targetFamilies;
274 table = new HTable(conf, TABLE_NAME);
275 }
276
277 public void doAnAction() throws Exception {
278 Scan s = new Scan();
279 for (byte[] family : targetFamilies) {
280 s.addFamily(family);
281 }
282 ResultScanner scanner = table.getScanner(s);
283
284 for (Result res : scanner) {
285 byte[] lastRow = null, lastFam = null, lastQual = null;
286 byte[] gotValue = null;
287 for (byte[] family : targetFamilies) {
288 byte qualifier[] = QUAL;
289 byte thisValue[] = res.getValue(family, qualifier);
290 if (gotValue != null && thisValue != null
291 && !Bytes.equals(gotValue, thisValue)) {
292
293 StringBuilder msg = new StringBuilder();
294 msg.append("Failed on scan ").append(numScans)
295 .append(" after scanning ").append(numRowsScanned)
296 .append(" rows!\n");
297 msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
298 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
299 + " = " + Bytes.toString(thisValue) + "\n");
300 msg.append("Previous was " + Bytes.toString(lastRow) + "/"
301 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
302 + " = " + Bytes.toString(gotValue));
303 throw new RuntimeException(msg.toString());
304 }
305
306 lastFam = family;
307 lastQual = qualifier;
308 lastRow = res.getRow();
309 gotValue = thisValue;
310 }
311 numRowsScanned.getAndIncrement();
312 }
313 numScans.getAndIncrement();
314 }
315 }
316
317
318
319
320
321 private void setupTable(TableName table, int cfs) throws IOException {
322 try {
323 LOG.info("Creating table " + table);
324 HTableDescriptor htd = new HTableDescriptor(table);
325 htd.addCoprocessor(MyObserver.class.getName());
326 MyObserver.sleepDuration = this.sleepDuration;
327 for (int i = 0; i < 10; i++) {
328 htd.addFamily(new HColumnDescriptor(family(i)));
329 }
330
331 UTIL.getHBaseAdmin().createTable(htd);
332 } catch (TableExistsException tee) {
333 LOG.info("Table " + table + " already exists");
334 }
335 }
336
337
338
339
340 @Test
341 public void testAtomicBulkLoad() throws Exception {
342 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
343
344 int millisToRun = 30000;
345 int numScanners = 50;
346
347 UTIL.startMiniCluster(1, false, true);
348 try {
349 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
350 FindBulkHBaseListener listener = new FindBulkHBaseListener();
351 log.registerWALActionsListener(listener);
352 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
353 assertThat(listener.isFound(), is(true));
354 } finally {
355 UTIL.shutdownMiniCluster();
356 }
357 }
358
359 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
360 throws Exception {
361 setupTable(tableName, 10);
362
363 TestContext ctx = new TestContext(UTIL.getConfiguration());
364
365 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
366 ctx.addThread(loader);
367
368 List<AtomicScanReader> scanners = Lists.newArrayList();
369 for (int i = 0; i < numScanners; i++) {
370 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
371 scanners.add(scanner);
372 ctx.addThread(scanner);
373 }
374
375 ctx.startThreads();
376 ctx.waitFor(millisToRun);
377 ctx.stop();
378
379 LOG.info("Loaders:");
380 LOG.info(" loaded " + loader.numBulkLoads.get());
381 LOG.info(" compations " + loader.numCompactions.get());
382
383 LOG.info("Scanners:");
384 for (AtomicScanReader scanner : scanners) {
385 LOG.info(" scanned " + scanner.numScans.get());
386 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
387 }
388 }
389
390
391
392
393
394 public static void main(String args[]) throws Exception {
395 try {
396 Configuration c = HBaseConfiguration.create();
397 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
398 test.setConf(c);
399 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
400 } finally {
401 System.exit(0);
402 }
403 }
404
405 private void setConf(Configuration c) {
406 UTIL = new HBaseTestingUtility(c);
407 }
408
409 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
410 private boolean found = false;
411
412 @Override
413 public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
414 for (Cell cell : logEdit.getCells()) {
415 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
416 for (Map.Entry entry : kv.toStringMap().entrySet()) {
417 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
418 found = true;
419 }
420 }
421 }
422 }
423
424 public boolean isFound() {
425 return found;
426 }
427 }
428 }
429
430