1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.security.PrivilegedExceptionAction;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.ListIterator;
27 import java.util.Map;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.Server;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.MetaTableAccessor;
45 import org.apache.hadoop.hbase.client.HConnection;
46 import org.apache.hadoop.hbase.client.Mutation;
47 import org.apache.hadoop.hbase.client.Put;
48 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
49 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
50 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
51 import org.apache.hadoop.hbase.security.User;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.CancelableProgressable;
54 import org.apache.hadoop.hbase.util.ConfigUtil;
55 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56 import org.apache.hadoop.hbase.util.FSUtils;
57 import org.apache.hadoop.hbase.util.HasThread;
58 import org.apache.hadoop.hbase.util.Pair;
59 import org.apache.hadoop.hbase.util.PairOfSameType;
60 import org.apache.zookeeper.KeeperException;
61
62 import com.google.common.util.concurrent.ThreadFactoryBuilder;
63
64 @InterfaceAudience.Private
65 public class SplitTransactionImpl implements SplitTransaction {
66 private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
67
68
69
70
71 private final HRegion parent;
72 private HRegionInfo hri_a;
73 private HRegionInfo hri_b;
74 private long fileSplitTimeout = 30000;
75 public SplitTransactionCoordination.SplitTransactionDetails std;
76 boolean useZKForAssignment;
77
78
79
80
81 private final byte [] splitrow;
82
83
84
85
86
87 private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
88 private Server server;
89 private RegionServerServices rsServices;
90
91 public static class JournalEntryImpl implements JournalEntry {
92 private SplitTransactionPhase type;
93 private long timestamp;
94
95 public JournalEntryImpl(SplitTransactionPhase type) {
96 this(type, EnvironmentEdgeManager.currentTime());
97 }
98
99 public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
100 this.type = type;
101 this.timestamp = timestamp;
102 }
103
104 @Override
105 public String toString() {
106 StringBuilder sb = new StringBuilder();
107 sb.append(type);
108 sb.append(" at ");
109 sb.append(timestamp);
110 return sb.toString();
111 }
112
113 @Override
114 public SplitTransactionPhase getPhase() {
115 return type;
116 }
117
118 @Override
119 public long getTimeStamp() {
120 return timestamp;
121 }
122 }
123
124
125
126
127 private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
128
129
130
131
132 private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
133
134
135
136
137
138
139 public SplitTransactionImpl(final Region r, final byte [] splitrow) {
140 this.parent = (HRegion)r;
141 this.splitrow = splitrow;
142 this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
143 useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
144 }
145
146 private void transition(SplitTransactionPhase nextPhase) throws IOException {
147 transition(nextPhase, false);
148 }
149
150 private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
151 throws IOException {
152 if (!isRollback) {
153
154
155 this.journal.add(new JournalEntryImpl(nextPhase));
156 }
157 for (int i = 0; i < listeners.size(); i++) {
158 TransactionListener listener = listeners.get(i);
159 if (!isRollback) {
160 listener.transition(this, currentPhase, nextPhase);
161 } else {
162 listener.rollback(this, currentPhase, nextPhase);
163 }
164 }
165 currentPhase = nextPhase;
166 }
167
168
169
170
171
172
173 public boolean prepare() throws IOException {
174 if (!this.parent.isSplittable()) return false;
175
176 if (this.splitrow == null) return false;
177 HRegionInfo hri = this.parent.getRegionInfo();
178 parent.prepareToSplit();
179
180 byte [] startKey = hri.getStartKey();
181 byte [] endKey = hri.getEndKey();
182 if (Bytes.equals(startKey, splitrow) ||
183 !this.parent.getRegionInfo().containsRow(splitrow)) {
184 LOG.info("Split row is not inside region key range or is equal to " +
185 "startkey: " + Bytes.toStringBinary(this.splitrow));
186 return false;
187 }
188 long rid = getDaughterRegionIdTimestamp(hri);
189 this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
190 this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
191
192 transition(SplitTransactionPhase.PREPARED);
193
194 return true;
195 }
196
197
198
199
200
201
202 private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
203 long rid = EnvironmentEdgeManager.currentTime();
204
205
206 if (rid < hri.getRegionId()) {
207 LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
208 " but current time here is " + rid);
209 rid = hri.getRegionId() + 1;
210 }
211 return rid;
212 }
213
214 private static IOException closedByOtherException = new IOException(
215 "Failed to close region: already closed by another thread");
216
217
218
219
220
221
222
223
224
225
226
227 @Deprecated
228
229 final RegionServerServices services) throws IOException {
230 return createDaughters(server, services, null);
231 }
232
233
234 final RegionServerServices services, User user) throws IOException {
235 LOG.info("Starting split of region " + this.parent);
236 if ((server != null && server.isStopped()) ||
237 (services != null && services.isStopping())) {
238 throw new IOException("Server is stopped or stopping");
239 }
240 assert !this.parent.lock.writeLock().isHeldByCurrentThread():
241 "Unsafe to hold write lock while performing RPCs";
242
243 transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
244
245
246 if (this.parent.getCoprocessorHost() != null) {
247 if (user == null) {
248
249 parent.getCoprocessorHost().preSplit();
250 parent.getCoprocessorHost().preSplit(splitrow);
251 } else {
252 try {
253 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
254 @Override
255 public Void run() throws Exception {
256 parent.getCoprocessorHost().preSplit();
257 parent.getCoprocessorHost().preSplit(splitrow);
258 return null;
259 }
260 });
261 } catch (InterruptedException ie) {
262 InterruptedIOException iioe = new InterruptedIOException();
263 iioe.initCause(ie);
264 throw iioe;
265 }
266 }
267 }
268
269 transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
270
271
272 boolean testing = server == null? true:
273 server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
274 this.fileSplitTimeout = testing ? this.fileSplitTimeout :
275 server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
276 this.fileSplitTimeout);
277
278 PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
279
280 final List<Mutation> metaEntries = new ArrayList<Mutation>();
281 boolean ret = false;
282 if (this.parent.getCoprocessorHost() != null) {
283 if (user == null) {
284 ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
285 } else {
286 try {
287 ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
288 @Override
289 public Boolean run() throws Exception {
290 return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
291 }
292 });
293 } catch (InterruptedException ie) {
294 InterruptedIOException iioe = new InterruptedIOException();
295 iioe.initCause(ie);
296 throw iioe;
297 }
298 }
299 if (ret) {
300 throw new IOException("Coprocessor bypassing region "
301 + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
302 }
303 try {
304 for (Mutation p : metaEntries) {
305 HRegionInfo.parseRegionName(p.getRow());
306 }
307 } catch (IOException e) {
308 LOG.error("Row key of mutation from coprossor is not parsable as region name."
309 + "Mutations from coprocessor should only for hbase:meta table.");
310 throw e;
311 }
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329 transition(SplitTransactionPhase.PONR);
330
331
332
333
334
335
336 if (!testing && useZKForAssignment) {
337 if (metaEntries == null || metaEntries.isEmpty()) {
338 MetaTableAccessor.splitRegion(server.getConnection(),
339 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
340 daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
341 parent.getTableDesc().getRegionReplication());
342 } else {
343 offlineParentInMetaAndputMetaEntries(server.getConnection(),
344 parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
345 .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
346 parent.getTableDesc().getRegionReplication());
347 }
348 } else if (services != null && !useZKForAssignment) {
349 if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
350 parent.getRegionInfo(), hri_a, hri_b)) {
351
352 throw new IOException("Failed to notify master that split passed PONR: "
353 + parent.getRegionInfo().getRegionNameAsString());
354 }
355 }
356 return daughterRegions;
357 }
358
359 public PairOfSameType<Region> stepsBeforePONR(final Server server,
360 final RegionServerServices services, boolean testing) throws IOException {
361
362 if (useCoordinatedStateManager(server)) {
363 if (std == null) {
364 std =
365 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
366 .getSplitTransactionCoordination().getDefaultDetails();
367 }
368 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
369 .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
370 hri_a, hri_b);
371 } else if (services != null && !useZKForAssignment) {
372 if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
373 parent.getRegionInfo(), hri_a, hri_b)) {
374 throw new IOException("Failed to get ok from master to split "
375 + parent.getRegionInfo().getRegionNameAsString());
376 }
377 }
378
379 transition(SplitTransactionPhase.SET_SPLITTING);
380
381 if (useCoordinatedStateManager(server)) {
382 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
383 .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
384 hri_b, std);
385 }
386
387 this.parent.getRegionFileSystem().createSplitsDir();
388
389 transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
390
391 Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
392 Exception exceptionToThrow = null;
393 try{
394 hstoreFilesToSplit = this.parent.close(false);
395 } catch (Exception e) {
396 exceptionToThrow = e;
397 }
398 if (exceptionToThrow == null && hstoreFilesToSplit == null) {
399
400
401
402
403
404 exceptionToThrow = closedByOtherException;
405 }
406 if (exceptionToThrow != closedByOtherException) {
407 transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
408 }
409 if (exceptionToThrow != null) {
410 if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
411 throw new IOException(exceptionToThrow);
412 }
413 if (!testing) {
414 services.removeFromOnlineRegions(this.parent, null);
415 }
416
417 transition(SplitTransactionPhase.OFFLINED_PARENT);
418
419
420
421
422
423
424
425 Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
426
427
428
429
430
431 transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
432
433 assertReferenceFileCount(expectedReferences.getFirst(),
434 this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
435 Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
436 assertReferenceFileCount(expectedReferences.getFirst(),
437 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
438
439
440 transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
441
442 assertReferenceFileCount(expectedReferences.getSecond(),
443 this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
444 Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
445 assertReferenceFileCount(expectedReferences.getSecond(),
446 new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
447
448 return new PairOfSameType<Region>(a, b);
449 }
450
451 void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
452 throws IOException {
453 if (expectedReferenceFileCount != 0 &&
454 expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
455 dir)) {
456 throw new IOException("Failing split. Expected reference file count isn't equal.");
457 }
458 }
459
460
461
462
463
464
465
466
467
468
469
470 final RegionServerServices services, Region a, Region b)
471 throws IOException {
472 boolean stopped = server != null && server.isStopped();
473 boolean stopping = services != null && services.isStopping();
474
475 if (stopped || stopping) {
476 LOG.info("Not opening daughters " +
477 b.getRegionInfo().getRegionNameAsString() +
478 " and " +
479 a.getRegionInfo().getRegionNameAsString() +
480 " because stopping=" + stopping + ", stopped=" + stopped);
481 } else {
482
483 DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
484 DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
485 aOpener.start();
486 bOpener.start();
487 try {
488 aOpener.join();
489 if (aOpener.getException() == null) {
490 transition(SplitTransactionPhase.OPENED_REGION_A);
491 }
492 bOpener.join();
493 if (bOpener.getException() == null) {
494 transition(SplitTransactionPhase.OPENED_REGION_B);
495 }
496 } catch (InterruptedException e) {
497 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
498 }
499 if (aOpener.getException() != null) {
500 throw new IOException("Failed " +
501 aOpener.getName(), aOpener.getException());
502 }
503 if (bOpener.getException() != null) {
504 throw new IOException("Failed " +
505 bOpener.getName(), bOpener.getException());
506 }
507 if (services != null) {
508 try {
509 if (useZKForAssignment) {
510
511 services.postOpenDeployTasks(b);
512 } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
513 parent.getRegionInfo(), hri_a, hri_b)) {
514 throw new IOException("Failed to report split region to master: "
515 + parent.getRegionInfo().getShortNameToLog());
516 }
517
518 services.addToOnlineRegions(b);
519 if (useZKForAssignment) {
520 services.postOpenDeployTasks(a);
521 }
522 services.addToOnlineRegions(a);
523 } catch (KeeperException ke) {
524 throw new IOException(ke);
525 }
526 }
527 }
528 }
529
530 public PairOfSameType<Region> execute(final Server server,
531 final RegionServerServices services)
532 throws IOException {
533 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
534 LOG.warn("Should use execute(Server, RegionServerServices, User)");
535 }
536 return execute(server, services, null);
537 }
538
539
540
541
542
543
544
545
546
547
548
549 @Override
550 public PairOfSameType<Region> execute(final Server server,
551 final RegionServerServices services, User user) throws IOException {
552 this.server = server;
553 this.rsServices = services;
554 useZKForAssignment = server == null ? true :
555 ConfigUtil.useZKForAssignment(server.getConfiguration());
556 if (useCoordinatedStateManager(server)) {
557 std =
558 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
559 .getSplitTransactionCoordination().getDefaultDetails();
560 }
561 PairOfSameType<Region> regions = createDaughters(server, services, user);
562 if (this.parent.getCoprocessorHost() != null) {
563 if (user == null) {
564 parent.getCoprocessorHost().preSplitAfterPONR();
565 } else {
566 try {
567 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
568 @Override
569 public Void run() throws Exception {
570 parent.getCoprocessorHost().preSplitAfterPONR();
571 return null;
572 }
573 });
574 } catch (InterruptedException ie) {
575 InterruptedIOException iioe = new InterruptedIOException();
576 iioe.initCause(ie);
577 throw iioe;
578 }
579 }
580 }
581 regions = stepsAfterPONR(server, services, regions, user);
582
583 transition(SplitTransactionPhase.COMPLETED);
584
585 return regions;
586 }
587
588 @Deprecated
589 public PairOfSameType<Region> stepsAfterPONR(final Server server,
590 final RegionServerServices services, final PairOfSameType<Region> regions)
591 throws IOException {
592 return stepsAfterPONR(server, services, regions, null);
593 }
594
595 public PairOfSameType<Region> stepsAfterPONR(final Server server,
596 final RegionServerServices services, final PairOfSameType<Region> regions, User user)
597 throws IOException {
598 openDaughters(server, services, regions.getFirst(), regions.getSecond());
599 if (useCoordinatedStateManager(server)) {
600 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
601 .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
602 regions.getSecond(), std, parent);
603 }
604
605 transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
606
607
608 if (parent.getCoprocessorHost() != null) {
609 if (user == null) {
610 this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
611 } else {
612 try {
613 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
614 @Override
615 public Void run() throws Exception {
616 parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
617 return null;
618 }
619 });
620 } catch (InterruptedException ie) {
621 InterruptedIOException iioe = new InterruptedIOException();
622 iioe.initCause(ie);
623 throw iioe;
624 }
625 }
626 }
627
628 transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
629
630 return regions;
631 }
632
633 private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
634 HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
635 ServerName serverName, List<Mutation> metaEntries, int regionReplication)
636 throws IOException {
637 List<Mutation> mutations = metaEntries;
638 HRegionInfo copyOfParent = new HRegionInfo(parent);
639 copyOfParent.setOffline(true);
640 copyOfParent.setSplit(true);
641
642
643 Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
644 MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
645 mutations.add(putParent);
646
647
648 Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
649 Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
650
651 addLocation(putA, serverName, 1);
652 addLocation(putB, serverName, 1);
653 mutations.add(putA);
654 mutations.add(putB);
655
656
657
658 for (int i = 1; i < regionReplication; i++) {
659 addEmptyLocation(putA, i);
660 addEmptyLocation(putB, i);
661 }
662
663 MetaTableAccessor.mutateMetaTable(hConnection, mutations);
664 }
665
666 private static Put addEmptyLocation(final Put p, int replicaId){
667 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
668 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
669 null);
670 p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
671 return p;
672 }
673
674 public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
675 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
676 Bytes.toBytes(sn.getHostAndPort()));
677 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
678 Bytes.toBytes(sn.getStartcode()));
679 p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
680 Bytes.toBytes(openSeqNum));
681 return p;
682 }
683
684
685
686
687
688 class DaughterOpener extends HasThread {
689 private final Server server;
690 private final HRegion r;
691 private Throwable t = null;
692
693 DaughterOpener(final Server s, final HRegion r) {
694 super((s == null? "null-services": s.getServerName()) +
695 "-daughterOpener=" + r.getRegionInfo().getEncodedName());
696 setDaemon(true);
697 this.server = s;
698 this.r = r;
699 }
700
701
702
703
704
705 Throwable getException() {
706 return this.t;
707 }
708
709 @Override
710 public void run() {
711 try {
712 openDaughterRegion(this.server, r);
713 } catch (Throwable t) {
714 this.t = t;
715 }
716 }
717 }
718
719
720
721
722
723
724
725
726 void openDaughterRegion(final Server server, final HRegion daughter)
727 throws IOException, KeeperException {
728 HRegionInfo hri = daughter.getRegionInfo();
729 LoggingProgressable reporter = server == null ? null
730 : new LoggingProgressable(hri, server.getConfiguration().getLong(
731 "hbase.regionserver.split.daughter.open.log.interval", 10000));
732 daughter.openHRegion(reporter);
733 }
734
735 static class LoggingProgressable implements CancelableProgressable {
736 private final HRegionInfo hri;
737 private long lastLog = -1;
738 private final long interval;
739
740 LoggingProgressable(final HRegionInfo hri, final long interval) {
741 this.hri = hri;
742 this.interval = interval;
743 }
744
745 @Override
746 public boolean progress() {
747 long now = EnvironmentEdgeManager.currentTime();
748 if (now - lastLog > this.interval) {
749 LOG.info("Opening " + this.hri.getRegionNameAsString());
750 this.lastLog = now;
751 }
752 return true;
753 }
754 }
755
756 private boolean useCoordinatedStateManager(final Server server) {
757 return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
758 }
759
760
761
762
763
764
765
766 private Pair<Integer, Integer> splitStoreFiles(
767 final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
768 throws IOException {
769 if (hstoreFilesToSplit == null) {
770
771 throw new IOException("Close returned empty list of StoreFiles");
772 }
773
774
775
776 int nbFiles = 0;
777 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
778 nbFiles += entry.getValue().size();
779 }
780 if (nbFiles == 0) {
781
782 return new Pair<Integer, Integer>(0,0);
783 }
784
785
786 int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
787 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
788 Runtime.getRuntime().availableProcessors());
789
790 int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
791 defMaxThreads), nbFiles);
792 LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
793 " using " + maxThreads + " threads");
794 ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
795 builder.setNameFormat("StoreFileSplitter-%1$d");
796 ThreadFactory factory = builder.build();
797 ThreadPoolExecutor threadPool =
798 (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
799 List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
800
801
802 for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
803 for (StoreFile sf: entry.getValue()) {
804 StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
805 futures.add(threadPool.submit(sfs));
806 }
807 }
808
809 threadPool.shutdown();
810
811
812 try {
813 boolean stillRunning = !threadPool.awaitTermination(
814 this.fileSplitTimeout, TimeUnit.MILLISECONDS);
815 if (stillRunning) {
816 threadPool.shutdownNow();
817
818 while (!threadPool.isTerminated()) {
819 Thread.sleep(50);
820 }
821 throw new IOException("Took too long to split the" +
822 " files and create the references, aborting split");
823 }
824 } catch (InterruptedException e) {
825 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
826 }
827
828 int created_a = 0;
829 int created_b = 0;
830
831 for (Future<Pair<Path, Path>> future : futures) {
832 try {
833 Pair<Path, Path> p = future.get();
834 created_a += p.getFirst() != null ? 1 : 0;
835 created_b += p.getSecond() != null ? 1 : 0;
836 } catch (InterruptedException e) {
837 throw (InterruptedIOException) new InterruptedIOException().initCause(e);
838 } catch (ExecutionException e) {
839 throw new IOException(e);
840 }
841 }
842
843 if (LOG.isDebugEnabled()) {
844 LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
845 + " storefiles, Daughter B: " + created_b + " storefiles.");
846 }
847 return new Pair<Integer, Integer>(created_a, created_b);
848 }
849
850 private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
851 throws IOException {
852 if (LOG.isDebugEnabled()) {
853 LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
854 this.parent);
855 }
856 HRegionFileSystem fs = this.parent.getRegionFileSystem();
857 String familyName = Bytes.toString(family);
858
859 Path path_a =
860 fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
861 this.parent.getSplitPolicy());
862 Path path_b =
863 fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
864 this.parent.getSplitPolicy());
865 if (LOG.isDebugEnabled()) {
866 LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
867 this.parent);
868 }
869 return new Pair<Path,Path>(path_a, path_b);
870 }
871
872
873
874
875
876 class StoreFileSplitter implements Callable<Pair<Path,Path>> {
877 private final byte[] family;
878 private final StoreFile sf;
879
880
881
882
883
884
885 public StoreFileSplitter(final byte[] family, final StoreFile sf) {
886 this.sf = sf;
887 this.family = family;
888 }
889
890 public Pair<Path,Path> call() throws IOException {
891 return splitStoreFile(family, sf);
892 }
893 }
894
895 @Override
896 public boolean rollback(final Server server, final RegionServerServices services)
897 throws IOException {
898 if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
899 LOG.warn("Should use rollback(Server, RegionServerServices, User)");
900 }
901 return rollback(server, services, null);
902 }
903
904
905
906
907
908
909
910
911 @Override
912 @SuppressWarnings("deprecation")
913 public boolean rollback(final Server server, final RegionServerServices services, User user)
914 throws IOException {
915
916 if (this.parent.getCoprocessorHost() != null) {
917 if (user == null) {
918 this.parent.getCoprocessorHost().preRollBackSplit();
919 } else {
920 try {
921 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
922 @Override
923 public Void run() throws Exception {
924 parent.getCoprocessorHost().preRollBackSplit();
925 return null;
926 }
927 });
928 } catch (InterruptedException ie) {
929 InterruptedIOException iioe = new InterruptedIOException();
930 iioe.initCause(ie);
931 throw iioe;
932 }
933 }
934 }
935
936 boolean result = true;
937 ListIterator<JournalEntry> iterator =
938 this.journal.listIterator(this.journal.size());
939
940 while (iterator.hasPrevious()) {
941 JournalEntry je = iterator.previous();
942
943 transition(je.getPhase(), true);
944
945 switch(je.getPhase()) {
946
947 case SET_SPLITTING:
948 if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
949 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
950 .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
951 } else if (services != null && !useZKForAssignment
952 && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
953 parent.getRegionInfo(), hri_a, hri_b)) {
954 return false;
955 }
956 break;
957
958 case CREATE_SPLIT_DIR:
959 this.parent.writestate.writesEnabled = true;
960 this.parent.getRegionFileSystem().cleanupSplitsDir();
961 break;
962
963 case CLOSED_PARENT_REGION:
964 try {
965
966
967
968
969
970 this.parent.initialize();
971 } catch (IOException e) {
972 LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
973 this.parent.getRegionInfo().getRegionNameAsString(), e);
974 throw new RuntimeException(e);
975 }
976 break;
977
978 case STARTED_REGION_A_CREATION:
979 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
980 break;
981
982 case STARTED_REGION_B_CREATION:
983 this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
984 break;
985
986 case OFFLINED_PARENT:
987 if (services != null) services.addToOnlineRegions(this.parent);
988 break;
989
990 case PONR:
991
992
993
994
995 return false;
996
997
998 case STARTED:
999 case PREPARED:
1000 case BEFORE_PRE_SPLIT_HOOK:
1001 case AFTER_PRE_SPLIT_HOOK:
1002 case BEFORE_POST_SPLIT_HOOK:
1003 case AFTER_POST_SPLIT_HOOK:
1004 case OPENED_REGION_A:
1005 case OPENED_REGION_B:
1006 case COMPLETED:
1007 break;
1008
1009 default:
1010 throw new RuntimeException("Unhandled journal entry: " + je);
1011 }
1012 }
1013
1014 if (this.parent.getCoprocessorHost() != null) {
1015 if (user == null) {
1016 this.parent.getCoprocessorHost().postRollBackSplit();
1017 } else {
1018 try {
1019 user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1020 @Override
1021 public Void run() throws Exception {
1022 parent.getCoprocessorHost().postRollBackSplit();
1023 return null;
1024 }
1025 });
1026 } catch (InterruptedException ie) {
1027 InterruptedIOException iioe = new InterruptedIOException();
1028 iioe.initCause(ie);
1029 throw iioe;
1030 }
1031 }
1032 }
1033 return result;
1034 }
1035
1036 HRegionInfo getFirstDaughter() {
1037 return hri_a;
1038 }
1039
1040 HRegionInfo getSecondDaughter() {
1041 return hri_b;
1042 }
1043
1044 @Override
1045 public List<JournalEntry> getJournal() {
1046 return journal;
1047 }
1048
1049 @Override
1050 public SplitTransaction registerTransactionListener(TransactionListener listener) {
1051 listeners.add(listener);
1052 return this;
1053 }
1054
1055 @Override
1056 public Server getServer() {
1057 return server;
1058 }
1059
1060 @Override
1061 public RegionServerServices getRegionServerServices() {
1062 return rsServices;
1063 }
1064 }