1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.NavigableMap;
29 import java.util.Random;
30 import java.util.Set;
31 import java.util.TreeSet;
32
33 import org.apache.commons.io.IOUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.ChoreService;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HRegionInfo;
41 import org.apache.hadoop.hbase.HRegionLocation;
42 import org.apache.hadoop.hbase.MetaTableAccessor;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.ScheduledChore;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.Stoppable;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.Get;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.MetaScanner;
54 import org.apache.hadoop.hbase.client.Put;
55 import org.apache.hadoop.hbase.client.Result;
56 import org.apache.hadoop.hbase.client.Scan;
57 import org.apache.hadoop.hbase.client.Table;
58 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
59 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.RequestConverter;
62 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
63 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
64 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
65 import org.apache.hadoop.hbase.testclassification.LargeTests;
66 import org.apache.hadoop.hbase.util.Bytes;
67 import org.apache.hadoop.hbase.util.Pair;
68 import org.apache.hadoop.hbase.util.PairOfSameType;
69 import org.apache.hadoop.hbase.util.StoppableImplementation;
70 import org.apache.hadoop.hbase.util.Threads;
71 import org.junit.AfterClass;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75
76 import com.google.common.collect.Iterators;
77 import com.google.common.collect.Sets;
78 import com.google.protobuf.ServiceException;
79
80 @Category(LargeTests.class)
81 public class TestEndToEndSplitTransaction {
82 private static final Log LOG = LogFactory.getLog(TestEndToEndSplitTransaction.class);
83 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
84 private static final Configuration CONF = TEST_UTIL.getConfiguration();
85
86 @BeforeClass
87 public static void beforeAllTests() throws Exception {
88 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
89 TEST_UTIL.startMiniCluster();
90 }
91
92 @AfterClass
93 public static void afterAllTests() throws Exception {
94 TEST_UTIL.shutdownMiniCluster();
95 }
96
97 @Test
98 public void testMasterOpsWhileSplitting() throws Exception {
99 TableName tableName = TableName.valueOf("TestSplit");
100 byte[] familyName = Bytes.toBytes("fam");
101 try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) {
102 TEST_UTIL.loadTable(ht, familyName, false);
103 }
104 HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
105 byte[] firstRow = Bytes.toBytes("aaa");
106 byte[] splitRow = Bytes.toBytes("lll");
107 byte[] lastRow = Bytes.toBytes("zzz");
108 try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
109
110 byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(splitRow)
111 .getRegionInfo().getRegionName();
112 Region region = server.getRegion(regionName);
113 SplitTransactionImpl split = new SplitTransactionImpl((HRegion) region, splitRow);
114 split.prepare();
115
116
117 PairOfSameType<Region> regions = split.createDaughters(server, server, null);
118 assertFalse(test(conn, tableName, firstRow, server));
119 assertFalse(test(conn, tableName, lastRow, server));
120
121
122
123 split.openDaughters(server, null, regions.getFirst(), regions.getSecond());
124 assertFalse(test(conn, tableName, firstRow, server));
125 assertFalse(test(conn, tableName, lastRow, server));
126
127
128
129
130 if (split.useZKForAssignment) {
131 server.postOpenDeployTasks(regions.getSecond());
132 } else {
133 server.reportRegionStateTransition(
134 RegionServerStatusProtos.RegionStateTransition.TransitionCode.SPLIT,
135 region.getRegionInfo(), regions.getFirst().getRegionInfo(),
136 regions.getSecond().getRegionInfo());
137 }
138
139
140 server.addToOnlineRegions(regions.getSecond());
141
142
143 assertFalse(test(conn, tableName, firstRow, server));
144
145 assertTrue(test(conn, tableName, lastRow, server));
146
147
148 server.addToOnlineRegions(regions.getFirst());
149 assertTrue(test(conn, tableName, firstRow, server));
150 assertTrue(test(conn, tableName, lastRow, server));
151
152 if (split.useZKForAssignment) {
153
154 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
155 .getSplitTransactionCoordination().completeSplitTransaction(server, regions.getFirst(),
156 regions.getSecond(), split.std, region);
157 }
158
159 assertTrue(test(conn, tableName, firstRow, server));
160 assertTrue(test(conn, tableName, lastRow, server));
161 }
162 }
163
164
165
166
167
168 private boolean test(Connection conn, TableName tableName, byte[] row,
169 HRegionServer server) {
170
171 try {
172 byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(row, true)
173 .getRegionInfo().getRegionName();
174
175 ClientProtos.GetRequest request =
176 RequestConverter.buildGetRequest(regionName, new Get(row));
177 server.getRSRpcServices().get(null, request);
178 ScanRequest scanRequest = RequestConverter.buildScanRequest(
179 regionName, new Scan(row), 1, true);
180 try {
181 server.getRSRpcServices().scan(
182 new PayloadCarryingRpcController(), scanRequest);
183 } catch (ServiceException se) {
184 throw ProtobufUtil.getRemoteException(se);
185 }
186 } catch (IOException e) {
187 return false;
188 } catch (ServiceException e) {
189 return false;
190 }
191 return true;
192 }
193
194
195
196
197 @Test
198 public void testFromClientSideWhileSplitting() throws Throwable {
199 LOG.info("Starting testFromClientSideWhileSplitting");
200 final TableName TABLENAME =
201 TableName.valueOf("testFromClientSideWhileSplitting");
202 final byte[] FAMILY = Bytes.toBytes("family");
203
204
205
206 Table table = TEST_UTIL.createTable(TABLENAME, FAMILY);
207
208 Stoppable stopper = new StoppableImplementation();
209 RegionSplitter regionSplitter = new RegionSplitter(table);
210 RegionChecker regionChecker = new RegionChecker(CONF, stopper, TABLENAME);
211 final ChoreService choreService = new ChoreService("TEST_SERVER");
212
213 choreService.scheduleChore(regionChecker);
214 regionSplitter.start();
215
216
217 regionSplitter.join();
218 stopper.stop(null);
219
220 if (regionChecker.ex != null) {
221 throw regionChecker.ex;
222 }
223
224 if (regionSplitter.ex != null) {
225 throw regionSplitter.ex;
226 }
227
228
229 regionChecker.verify();
230 }
231
232 static class RegionSplitter extends Thread {
233 final Connection connection;
234 Throwable ex;
235 Table table;
236 TableName tableName;
237 byte[] family;
238 Admin admin;
239 HRegionServer rs;
240
241 RegionSplitter(Table table) throws IOException {
242 this.table = table;
243 this.tableName = table.getName();
244 this.family = table.getTableDescriptor().getFamiliesKeys().iterator().next();
245 admin = TEST_UTIL.getHBaseAdmin();
246 rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
247 connection = TEST_UTIL.getConnection();
248 }
249
250 @Override
251 public void run() {
252 try {
253 Random random = new Random();
254 for (int i= 0; i< 5; i++) {
255 NavigableMap<HRegionInfo, ServerName> regions =
256 MetaScanner.allTableRegions(connection, tableName);
257 if (regions.size() == 0) {
258 continue;
259 }
260 int regionIndex = random.nextInt(regions.size());
261
262
263 HRegionInfo region = Iterators.get(regions.keySet().iterator(), regionIndex);
264
265
266 int start = 0, end = Integer.MAX_VALUE;
267 if (region.getStartKey().length > 0) {
268 start = Bytes.toInt(region.getStartKey());
269 }
270 if (region.getEndKey().length > 0) {
271 end = Bytes.toInt(region.getEndKey());
272 }
273 int mid = start + ((end - start) / 2);
274 byte[] splitPoint = Bytes.toBytes(mid);
275
276
277 addData(start);
278 addData(mid);
279
280 flushAndBlockUntilDone(admin, rs, region.getRegionName());
281 compactAndBlockUntilDone(admin, rs, region.getRegionName());
282
283 log("Initiating region split for:" + region.getRegionNameAsString());
284 try {
285 admin.splitRegion(region.getRegionName(), splitPoint);
286
287 blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
288
289 } catch (NotServingRegionException ex) {
290
291 }
292 }
293 } catch (Throwable ex) {
294 this.ex = ex;
295 }
296 }
297
298 void addData(int start) throws IOException {
299 List<Put> puts = new ArrayList<>();
300 for (int i=start; i< start + 100; i++) {
301 Put put = new Put(Bytes.toBytes(i));
302 put.addColumn(family, family, Bytes.toBytes(i));
303 puts.add(put);
304 }
305 table.put(puts);
306 }
307 }
308
309
310
311
312 static class RegionChecker extends ScheduledChore {
313 Connection connection;
314 Configuration conf;
315 TableName tableName;
316 Throwable ex;
317
318 RegionChecker(Configuration conf, Stoppable stopper, TableName tableName) throws IOException {
319 super("RegionChecker", stopper, 10);
320 this.conf = conf;
321 this.tableName = tableName;
322
323 this.connection = ConnectionFactory.createConnection(conf);
324 }
325
326
327 void verifyRegionsUsingMetaScanner() throws Exception {
328
329
330 NavigableMap<HRegionInfo, ServerName> regions = MetaScanner.allTableRegions(connection,
331 tableName);
332 verifyTableRegions(regions.keySet());
333
334
335 List<HRegionInfo> regionList = MetaScanner.listAllRegions(conf, connection, false);
336 verifyTableRegions(Sets.newTreeSet(regionList));
337 }
338
339
340 void verifyRegionsUsingHTable() throws IOException {
341 HTable table = null;
342 try {
343 table = (HTable) connection.getTable(tableName);
344 Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
345 verifyStartEndKeys(keys);
346
347
348 Set<HRegionInfo> regions = new TreeSet<HRegionInfo>();
349 for (HRegionLocation loc : table.getRegionLocator().getAllRegionLocations()) {
350 regions.add(loc.getRegionInfo());
351 }
352 verifyTableRegions(regions);
353 } finally {
354 IOUtils.closeQuietly(table);
355 }
356 }
357
358 void verify() throws Exception {
359 verifyRegionsUsingMetaScanner();
360 verifyRegionsUsingHTable();
361 }
362
363 void verifyTableRegions(Set<HRegionInfo> regions) {
364 log("Verifying " + regions.size() + " regions: " + regions);
365
366 byte[][] startKeys = new byte[regions.size()][];
367 byte[][] endKeys = new byte[regions.size()][];
368
369 int i=0;
370 for (HRegionInfo region : regions) {
371 startKeys[i] = region.getStartKey();
372 endKeys[i] = region.getEndKey();
373 i++;
374 }
375
376 Pair<byte[][], byte[][]> keys = new Pair<byte[][], byte[][]>(startKeys, endKeys);
377 verifyStartEndKeys(keys);
378 }
379
380 void verifyStartEndKeys(Pair<byte[][], byte[][]> keys) {
381 byte[][] startKeys = keys.getFirst();
382 byte[][] endKeys = keys.getSecond();
383 assertEquals(startKeys.length, endKeys.length);
384 assertTrue("Found 0 regions for the table", startKeys.length > 0);
385
386 assertArrayEquals("Start key for the first region is not byte[0]",
387 HConstants.EMPTY_START_ROW, startKeys[0]);
388 byte[] prevEndKey = HConstants.EMPTY_START_ROW;
389
390
391 for (int i=0; i<startKeys.length; i++) {
392 assertArrayEquals(
393 "Hole in hbase:meta is detected. prevEndKey=" + Bytes.toStringBinary(prevEndKey)
394 + " ,regionStartKey=" + Bytes.toStringBinary(startKeys[i]), prevEndKey,
395 startKeys[i]);
396 prevEndKey = endKeys[i];
397 }
398 assertArrayEquals("End key for the last region is not byte[0]", HConstants.EMPTY_END_ROW,
399 endKeys[endKeys.length - 1]);
400 }
401
402 @Override
403 protected void chore() {
404 try {
405 verify();
406 } catch (Throwable ex) {
407 this.ex = ex;
408 getStopper().stop("caught exception");
409 }
410 }
411 }
412
413 public static void log(String msg) {
414 LOG.info(msg);
415 }
416
417
418
419 public static void flushAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
420 throws IOException, InterruptedException {
421 log("flushing region: " + Bytes.toStringBinary(regionName));
422 admin.flushRegion(regionName);
423 log("blocking until flush is complete: " + Bytes.toStringBinary(regionName));
424 Threads.sleepWithoutInterrupt(500);
425 while (rs.getOnlineRegion(regionName).getMemstoreSize() > 0) {
426 Threads.sleep(50);
427 }
428 }
429
430 public static void compactAndBlockUntilDone(Admin admin, HRegionServer rs, byte[] regionName)
431 throws IOException, InterruptedException {
432 log("Compacting region: " + Bytes.toStringBinary(regionName));
433 admin.majorCompactRegion(regionName);
434 log("blocking until compaction is complete: " + Bytes.toStringBinary(regionName));
435 Threads.sleepWithoutInterrupt(500);
436 outer: for (;;) {
437 for (Store store : rs.getOnlineRegion(regionName).getStores()) {
438 if (store.getStorefilesCount() > 1) {
439 Threads.sleep(50);
440 continue outer;
441 }
442 }
443 break;
444 }
445 }
446
447
448 public static void blockUntilRegionSplit(Configuration conf, long timeout,
449 final byte[] regionName, boolean waitForDaughters)
450 throws IOException, InterruptedException {
451 long start = System.currentTimeMillis();
452 log("blocking until region is split:" + Bytes.toStringBinary(regionName));
453 HRegionInfo daughterA = null, daughterB = null;
454 try (Connection conn = ConnectionFactory.createConnection(conf);
455 Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) {
456 Result result = null;
457 HRegionInfo region = null;
458 while ((System.currentTimeMillis() - start) < timeout) {
459 result = metaTable.get(new Get(regionName));
460 if (result == null) {
461 break;
462 }
463
464 region = MetaTableAccessor.getHRegionInfo(result);
465 if (region.isSplitParent()) {
466 log("found parent region: " + region.toString());
467 PairOfSameType<HRegionInfo> pair = MetaTableAccessor.getDaughterRegions(result);
468 daughterA = pair.getFirst();
469 daughterB = pair.getSecond();
470 break;
471 }
472 Threads.sleep(100);
473 }
474 if (daughterA == null || daughterB == null) {
475 throw new IOException("Failed to get daughters, daughterA=" + daughterA + ", daughterB=" +
476 daughterB + ", timeout=" + timeout + ", result=" + result + ", regionName=" + regionName +
477 ", region=" + region);
478 }
479
480
481 if (waitForDaughters) {
482 long rem = timeout - (System.currentTimeMillis() - start);
483 blockUntilRegionIsInMeta(conn, rem, daughterA);
484
485 rem = timeout - (System.currentTimeMillis() - start);
486 blockUntilRegionIsInMeta(conn, rem, daughterB);
487
488 rem = timeout - (System.currentTimeMillis() - start);
489 blockUntilRegionIsOpened(conf, rem, daughterA);
490
491 rem = timeout - (System.currentTimeMillis() - start);
492 blockUntilRegionIsOpened(conf, rem, daughterB);
493 }
494 }
495 }
496
497 public static void blockUntilRegionIsInMeta(Connection conn, long timeout, HRegionInfo hri)
498 throws IOException, InterruptedException {
499 log("blocking until region is in META: " + hri.getRegionNameAsString());
500 long start = System.currentTimeMillis();
501 while (System.currentTimeMillis() - start < timeout) {
502 HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri);
503 if (loc != null && !loc.getRegionInfo().isOffline()) {
504 log("found region in META: " + hri.getRegionNameAsString());
505 break;
506 }
507 Threads.sleep(10);
508 }
509 }
510
511 public static void blockUntilRegionIsOpened(Configuration conf, long timeout, HRegionInfo hri)
512 throws IOException, InterruptedException {
513 log("blocking until region is opened for reading:" + hri.getRegionNameAsString());
514 long start = System.currentTimeMillis();
515 try (Connection conn = ConnectionFactory.createConnection(conf);
516 Table table = conn.getTable(hri.getTable())) {
517 byte[] row = hri.getStartKey();
518
519 if (row == null || row.length <= 0) row = new byte[] { '0' };
520 Get get = new Get(row);
521 while (System.currentTimeMillis() - start < timeout) {
522 try {
523 table.get(get);
524 break;
525 } catch (IOException ex) {
526
527 }
528 Threads.sleep(10);
529 }
530 }
531 }
532 }
533