View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.hamcrest.core.Is.is;
21  import static org.junit.Assert.assertThat;
22  
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.util.ArrayList;
26  import java.util.Collection;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.HBaseConfiguration;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
44  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
45  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46  import org.apache.hadoop.hbase.testclassification.LargeTests;
47  import org.apache.hadoop.hbase.KeyValueUtil;
48  import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
49  import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
50  import org.apache.hadoop.hbase.TableExistsException;
51  import org.apache.hadoop.hbase.TableName;
52  import org.apache.hadoop.hbase.client.HConnection;
53  import org.apache.hadoop.hbase.client.RegionServerCallable;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.RpcRetryingCaller;
57  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.io.compress.Compression;
60  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
61  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
62  import org.apache.hadoop.hbase.io.hfile.HFile;
63  import org.apache.hadoop.hbase.io.hfile.HFileContext;
64  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
65  import org.apache.hadoop.hbase.protobuf.RequestConverter;
66  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
67  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
68  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
69  import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
70  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.Pair;
73  import org.apache.hadoop.hbase.wal.WAL;
74  import org.apache.hadoop.hbase.wal.WALKey;
75  import org.junit.BeforeClass;
76  import org.junit.Test;
77  import org.junit.experimental.categories.Category;
78  import org.junit.runner.RunWith;
79  import org.junit.runners.Parameterized;
80  import org.junit.runners.Parameterized.Parameters;
81  
82  import com.google.common.collect.Lists;
83  
84  /**
85   * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
86   * the region server's bullkLoad functionality.
87   */
88  @RunWith(Parameterized.class)
89  @Category(LargeTests.class)
90  public class TestHRegionServerBulkLoad {
91    final static Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
92    private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
93    private final static Configuration conf = UTIL.getConfiguration();
94    private final static byte[] QUAL = Bytes.toBytes("qual");
95    private final static int NUM_CFS = 10;
96    private int sleepDuration;
97    public static int BLOCKSIZE = 64 * 1024;
98    public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
99  
100   private final static byte[][] families = new byte[NUM_CFS][];
101   static {
102     for (int i = 0; i < NUM_CFS; i++) {
103       families[i] = Bytes.toBytes(family(i));
104     }
105   }
106   @Parameters
107   public static final Collection<Object[]> parameters() {
108     int[] sleepDurations = new int[] { 0, 30000 };
109     List<Object[]> configurations = new ArrayList<Object[]>();
110     for (int i : sleepDurations) {
111       configurations.add(new Object[] { i });
112     }
113     return configurations;
114   }
115 
116   public TestHRegionServerBulkLoad(int duration) {
117     this.sleepDuration = duration;
118   }
119 
120   @BeforeClass
121   public static void setUpBeforeClass() throws Exception {
122     conf.setInt("hbase.rpc.timeout", 10 * 1000);
123   }
124 
125   /**
126    * Create a rowkey compatible with
127    * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
128    */
129   public static byte[] rowkey(int i) {
130     return Bytes.toBytes(String.format("row_%08d", i));
131   }
132 
133   static String family(int i) {
134     return String.format("family_%04d", i);
135   }
136 
137   /**
138    * Create an HFile with the given number of rows with a specified value.
139    */
140   public static void createHFile(FileSystem fs, Path path, byte[] family,
141       byte[] qualifier, byte[] value, int numRows) throws IOException {
142     HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
143                             .withCompression(COMPRESSION)
144                             .build();
145     HFile.Writer writer = HFile
146         .getWriterFactory(conf, new CacheConfig(conf))
147         .withPath(fs, path)
148         .withFileContext(context)
149         .create();
150     long now = System.currentTimeMillis();
151     try {
152       // subtract 2 since iterateOnSplits doesn't include boundary keys
153       for (int i = 0; i < numRows; i++) {
154         KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
155         writer.append(kv);
156       }
157       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
158     } finally {
159       writer.close();
160     }
161   }
162 
163   /**
164    * Thread that does full scans of the table looking for any partially
165    * completed rows.
166    *
167    * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
168    * handles. So every 10 iterations (500 file handles) it does a region
169    * compaction to reduce the number of open file handles.
170    */
171   public static class AtomicHFileLoader extends RepeatingTestThread {
172     final AtomicLong numBulkLoads = new AtomicLong();
173     final AtomicLong numCompactions = new AtomicLong();
174     private TableName tableName;
175 
176     public AtomicHFileLoader(TableName tableName, TestContext ctx,
177         byte targetFamilies[][]) throws IOException {
178       super(ctx);
179       this.tableName = tableName;
180     }
181 
182     public void doAnAction() throws Exception {
183       long iteration = numBulkLoads.getAndIncrement();
184       Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
185           iteration));
186 
187       // create HFiles for different column families
188       FileSystem fs = UTIL.getTestFileSystem();
189       byte[] val = Bytes.toBytes(String.format("%010d", iteration));
190       final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
191           NUM_CFS);
192       for (int i = 0; i < NUM_CFS; i++) {
193         Path hfile = new Path(dir, family(i));
194         byte[] fam = Bytes.toBytes(family(i));
195         createHFile(fs, hfile, fam, QUAL, val, 1000);
196         famPaths.add(new Pair<byte[], String>(fam, hfile.toString()));
197       }
198 
199       // bulk load HFiles
200       final HConnection conn = UTIL.getHBaseAdmin().getConnection();
201       RegionServerCallable<Void> callable =
202           new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
203         @Override
204         public Void call(int callTimeout) throws Exception {
205           LOG.debug("Going to connect to server " + getLocation() + " for row "
206               + Bytes.toStringBinary(getRow()));
207           byte[] regionName = getLocation().getRegionInfo().getRegionName();
208           BulkLoadHFileRequest request =
209             RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
210           getStub().bulkLoadHFile(null, request);
211           return null;
212         }
213       };
214       RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
215       RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
216       caller.callWithRetries(callable, Integer.MAX_VALUE);
217 
218       // Periodically do compaction to reduce the number of open file handles.
219       if (numBulkLoads.get() % 5 == 0) {
220         // 5 * 50 = 250 open file handles!
221         callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) {
222           @Override
223           public Void call(int callTimeout) throws Exception {
224             LOG.debug("compacting " + getLocation() + " for row "
225                 + Bytes.toStringBinary(getRow()));
226             AdminProtos.AdminService.BlockingInterface server =
227               conn.getAdmin(getLocation().getServerName());
228             CompactRegionRequest request =
229               RequestConverter.buildCompactRegionRequest(
230                 getLocation().getRegionInfo().getRegionName(), true, null);
231             server.compactRegion(null, request);
232             numCompactions.incrementAndGet();
233             return null;
234           }
235         };
236         caller.callWithRetries(callable, Integer.MAX_VALUE);
237       }
238     }
239   }
240 
241   public static class MyObserver extends BaseRegionObserver {
242     static int sleepDuration;
243     @Override
244     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
245         final Store store, final InternalScanner scanner, final ScanType scanType)
246             throws IOException {
247       try {
248         Thread.sleep(sleepDuration);
249       } catch (InterruptedException ie) {
250         IOException ioe = new InterruptedIOException();
251         ioe.initCause(ie);
252         throw ioe;
253       }
254       return scanner;
255     }
256   }
257 
258   /**
259    * Thread that does full scans of the table looking for any partially
260    * completed rows.
261    */
262   public static class AtomicScanReader extends RepeatingTestThread {
263     byte targetFamilies[][];
264     HTable table;
265     AtomicLong numScans = new AtomicLong();
266     AtomicLong numRowsScanned = new AtomicLong();
267     TableName TABLE_NAME;
268 
269     public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
270         byte targetFamilies[][]) throws IOException {
271       super(ctx);
272       this.TABLE_NAME = TABLE_NAME;
273       this.targetFamilies = targetFamilies;
274       table = new HTable(conf, TABLE_NAME);
275     }
276 
277     public void doAnAction() throws Exception {
278       Scan s = new Scan();
279       for (byte[] family : targetFamilies) {
280         s.addFamily(family);
281       }
282       ResultScanner scanner = table.getScanner(s);
283 
284       for (Result res : scanner) {
285         byte[] lastRow = null, lastFam = null, lastQual = null;
286         byte[] gotValue = null;
287         for (byte[] family : targetFamilies) {
288           byte qualifier[] = QUAL;
289           byte thisValue[] = res.getValue(family, qualifier);
290           if (gotValue != null && thisValue != null
291               && !Bytes.equals(gotValue, thisValue)) {
292 
293             StringBuilder msg = new StringBuilder();
294             msg.append("Failed on scan ").append(numScans)
295                 .append(" after scanning ").append(numRowsScanned)
296                 .append(" rows!\n");
297             msg.append("Current  was " + Bytes.toString(res.getRow()) + "/"
298                 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
299                 + " = " + Bytes.toString(thisValue) + "\n");
300             msg.append("Previous  was " + Bytes.toString(lastRow) + "/"
301                 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
302                 + " = " + Bytes.toString(gotValue));
303             throw new RuntimeException(msg.toString());
304           }
305 
306           lastFam = family;
307           lastQual = qualifier;
308           lastRow = res.getRow();
309           gotValue = thisValue;
310         }
311         numRowsScanned.getAndIncrement();
312       }
313       numScans.getAndIncrement();
314     }
315   }
316 
317   /**
318    * Creates a table with given table name and specified number of column
319    * families if the table does not already exist.
320    */
321   private void setupTable(TableName table, int cfs) throws IOException {
322     try {
323       LOG.info("Creating table " + table);
324       HTableDescriptor htd = new HTableDescriptor(table);
325       htd.addCoprocessor(MyObserver.class.getName());
326       MyObserver.sleepDuration = this.sleepDuration;
327       for (int i = 0; i < 10; i++) {
328         htd.addFamily(new HColumnDescriptor(family(i)));
329       }
330 
331       UTIL.getHBaseAdmin().createTable(htd);
332     } catch (TableExistsException tee) {
333       LOG.info("Table " + table + " already exists");
334     }
335   }
336 
337   /**
338    * Atomic bulk load.
339    */
340   @Test
341   public void testAtomicBulkLoad() throws Exception {
342     TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
343 
344     int millisToRun = 30000;
345     int numScanners = 50;
346 
347     UTIL.startMiniCluster(1, false, true);
348     try {
349       WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
350       FindBulkHBaseListener listener = new FindBulkHBaseListener();
351       log.registerWALActionsListener(listener);
352       runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
353       assertThat(listener.isFound(), is(true));
354     } finally {
355       UTIL.shutdownMiniCluster();
356     }
357   }
358 
359   void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
360       throws Exception {
361     setupTable(tableName, 10);
362 
363     TestContext ctx = new TestContext(UTIL.getConfiguration());
364 
365     AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
366     ctx.addThread(loader);
367 
368     List<AtomicScanReader> scanners = Lists.newArrayList();
369     for (int i = 0; i < numScanners; i++) {
370       AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
371       scanners.add(scanner);
372       ctx.addThread(scanner);
373     }
374 
375     ctx.startThreads();
376     ctx.waitFor(millisToRun);
377     ctx.stop();
378 
379     LOG.info("Loaders:");
380     LOG.info("  loaded " + loader.numBulkLoads.get());
381     LOG.info("  compations " + loader.numCompactions.get());
382 
383     LOG.info("Scanners:");
384     for (AtomicScanReader scanner : scanners) {
385       LOG.info("  scanned " + scanner.numScans.get());
386       LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
387     }
388   }
389 
390   /**
391    * Run test on an HBase instance for 5 minutes. This assumes that the table
392    * under test only has a single region.
393    */
394   public static void main(String args[]) throws Exception {
395     try {
396       Configuration c = HBaseConfiguration.create();
397       TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
398       test.setConf(c);
399       test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
400     } finally {
401       System.exit(0); // something hangs (believe it is lru threadpool)
402     }
403   }
404 
405   private void setConf(Configuration c) {
406     UTIL = new HBaseTestingUtility(c);
407   }
408 
409   static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
410     private boolean found = false;
411 
412     @Override
413     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
414       for (Cell cell : logEdit.getCells()) {
415         KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
416         for (Map.Entry entry : kv.toStringMap().entrySet()) {
417           if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
418             found = true;
419           }
420         }
421       }
422     }
423 
424     public boolean isFound() {
425       return found;
426     }
427   }
428 }
429 
430