View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.MetaTableAccessor;
45  import org.apache.hadoop.hbase.client.HConnection;
46  import org.apache.hadoop.hbase.client.Mutation;
47  import org.apache.hadoop.hbase.client.Put;
48  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
49  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination;
50  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
51  import org.apache.hadoop.hbase.security.User;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.hbase.util.CancelableProgressable;
54  import org.apache.hadoop.hbase.util.ConfigUtil;
55  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.util.HasThread;
58  import org.apache.hadoop.hbase.util.Pair;
59  import org.apache.hadoop.hbase.util.PairOfSameType;
60  import org.apache.zookeeper.KeeperException;
61  
62  import com.google.common.util.concurrent.ThreadFactoryBuilder;
63  
64  @InterfaceAudience.Private
65  public class SplitTransactionImpl implements SplitTransaction {
66    private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
67  
68    /*
69     * Region to split
70     */
71    private final HRegion parent;
72    private HRegionInfo hri_a;
73    private HRegionInfo hri_b;
74    private long fileSplitTimeout = 30000;
75    public SplitTransactionCoordination.SplitTransactionDetails std;
76    boolean useZKForAssignment;
77  
78    /*
79     * Row to split around
80     */
81    private final byte [] splitrow;
82  
83    /*
84     * Transaction state for listener, only valid during execute and
85     * rollback
86     */
87    private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
88    private Server server;
89    private RegionServerServices rsServices;
90  
91    public static class JournalEntryImpl implements JournalEntry {
92      private SplitTransactionPhase type;
93      private long timestamp;
94  
95      public JournalEntryImpl(SplitTransactionPhase type) {
96        this(type, EnvironmentEdgeManager.currentTime());
97      }
98  
99      public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
100       this.type = type;
101       this.timestamp = timestamp;
102     }
103 
104     @Override
105     public String toString() {
106       StringBuilder sb = new StringBuilder();
107       sb.append(type);
108       sb.append(" at ");
109       sb.append(timestamp);
110       return sb.toString();
111     }
112 
113     @Override
114     public SplitTransactionPhase getPhase() {
115       return type;
116     }
117 
118     @Override
119     public long getTimeStamp() {
120       return timestamp;
121     }
122   }
123 
124   /*
125    * Journal of how far the split transaction has progressed.
126    */
127   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
128 
129   /**
130    * Listeners
131    */
132   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
133 
134   /**
135    * Constructor
136    * @param r Region to split
137    * @param splitrow Row to split around
138    */
139   public SplitTransactionImpl(final Region r, final byte [] splitrow) {
140     this.parent = (HRegion)r;
141     this.splitrow = splitrow;
142     this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
143     useZKForAssignment = ConfigUtil.useZKForAssignment(parent.getBaseConf());
144   }
145 
146   private void transition(SplitTransactionPhase nextPhase) throws IOException {
147     transition(nextPhase, false);
148   }
149 
150   private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
151       throws IOException {
152     if (!isRollback) {
153       // Add to the journal first, because if the listener throws an exception
154       // we need to roll back starting at 'nextPhase'
155       this.journal.add(new JournalEntryImpl(nextPhase));
156     }
157     for (int i = 0; i < listeners.size(); i++) {
158       TransactionListener listener = listeners.get(i);
159       if (!isRollback) {
160         listener.transition(this, currentPhase, nextPhase);
161       } else {
162         listener.rollback(this, currentPhase, nextPhase);
163       }
164     }
165     currentPhase = nextPhase;
166   }
167 
168   /**
169    * Does checks on split inputs.
170    * @return <code>true</code> if the region is splittable else
171    * <code>false</code> if it is not (e.g. its already closed, etc.).
172    */
173   public boolean prepare() throws IOException {
174     if (!this.parent.isSplittable()) return false;
175     // Split key can be null if this region is unsplittable; i.e. has refs.
176     if (this.splitrow == null) return false;
177     HRegionInfo hri = this.parent.getRegionInfo();
178     parent.prepareToSplit();
179     // Check splitrow.
180     byte [] startKey = hri.getStartKey();
181     byte [] endKey = hri.getEndKey();
182     if (Bytes.equals(startKey, splitrow) ||
183         !this.parent.getRegionInfo().containsRow(splitrow)) {
184       LOG.info("Split row is not inside region key range or is equal to " +
185           "startkey: " + Bytes.toStringBinary(this.splitrow));
186       return false;
187     }
188     long rid = getDaughterRegionIdTimestamp(hri);
189     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
190     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
191 
192     transition(SplitTransactionPhase.PREPARED);
193 
194     return true;
195   }
196 
197   /**
198    * Calculate daughter regionid to use.
199    * @param hri Parent {@link HRegionInfo}
200    * @return Daughter region id (timestamp) to use.
201    */
202   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
203     long rid = EnvironmentEdgeManager.currentTime();
204     // Regionid is timestamp.  Can't be less than that of parent else will insert
205     // at wrong location in hbase:meta (See HBASE-710).
206     if (rid < hri.getRegionId()) {
207       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
208         " but current time here is " + rid);
209       rid = hri.getRegionId() + 1;
210     }
211     return rid;
212   }
213 
214   private static IOException closedByOtherException = new IOException(
215       "Failed to close region: already closed by another thread");
216 
217   /**
218    * Prepare the regions and region files.
219    * @param server Hosting server instance.  Can be null when testing (won't try
220    * and update in zk if a null server)
221    * @param services Used to online/offline regions.
222    * @param user
223    * @throws IOException If thrown, transaction failed.
224    *    Call {@link #rollback(Server, RegionServerServices)}
225    * @return Regions created
226    */
227   @Deprecated
228   /* package */PairOfSameType<Region> createDaughters(final Server server,
229       final RegionServerServices services) throws IOException {
230     return createDaughters(server, services, null);
231   }
232 
233   /* package */PairOfSameType<Region> createDaughters(final Server server,
234       final RegionServerServices services, User user) throws IOException {
235     LOG.info("Starting split of region " + this.parent);
236     if ((server != null && server.isStopped()) ||
237         (services != null && services.isStopping())) {
238       throw new IOException("Server is stopped or stopping");
239     }
240     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
241       "Unsafe to hold write lock while performing RPCs";
242 
243     transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
244 
245     // Coprocessor callback
246     if (this.parent.getCoprocessorHost() != null) {
247       if (user == null) {
248         // TODO: Remove one of these
249         parent.getCoprocessorHost().preSplit();
250         parent.getCoprocessorHost().preSplit(splitrow);
251       } else {
252         try {
253           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
254             @Override
255             public Void run() throws Exception {
256               parent.getCoprocessorHost().preSplit();
257               parent.getCoprocessorHost().preSplit(splitrow);
258               return null;
259             }
260           });
261         } catch (InterruptedException ie) {
262           InterruptedIOException iioe = new InterruptedIOException();
263           iioe.initCause(ie);
264           throw iioe;
265         }
266       }
267     }
268 
269     transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
270 
271     // If true, no cluster to write meta edits to or to update znodes in.
272     boolean testing = server == null? true:
273         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
274     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
275         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
276           this.fileSplitTimeout);
277 
278     PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
279 
280     final List<Mutation> metaEntries = new ArrayList<Mutation>();
281     boolean ret = false;
282     if (this.parent.getCoprocessorHost() != null) {
283       if (user == null) {
284         ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
285       } else {
286         try {
287           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
288             @Override
289             public Boolean run() throws Exception {
290               return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
291             }
292           });
293         } catch (InterruptedException ie) {
294           InterruptedIOException iioe = new InterruptedIOException();
295           iioe.initCause(ie);
296           throw iioe;
297         }
298       }
299       if (ret) {
300           throw new IOException("Coprocessor bypassing region "
301             + this.parent.getRegionInfo().getRegionNameAsString() + " split.");
302       }
303       try {
304         for (Mutation p : metaEntries) {
305           HRegionInfo.parseRegionName(p.getRow());
306         }
307       } catch (IOException e) {
308         LOG.error("Row key of mutation from coprossor is not parsable as region name."
309             + "Mutations from coprocessor should only for hbase:meta table.");
310         throw e;
311       }
312     }
313 
314     // This is the point of no return.  Adding subsequent edits to .META. as we
315     // do below when we do the daughter opens adding each to .META. can fail in
316     // various interesting ways the most interesting of which is a timeout
317     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
318     // then subsequent failures need to crash out this regionserver; the
319     // server shutdown processing should be able to fix-up the incomplete split.
320     // The offlined parent will have the daughters as extra columns.  If
321     // we leave the daughter regions in place and do not remove them when we
322     // crash out, then they will have their references to the parent in place
323     // still and the server shutdown fixup of .META. will point to these
324     // regions.
325     // We should add PONR JournalEntry before offlineParentInMeta,so even if
326     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
327     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
328     // HBase-4562).
329     transition(SplitTransactionPhase.PONR);
330 
331     // Edit parent in meta.  Offlines parent region and adds splita and splitb
332     // as an atomic update. See HBASE-7721. This update to META makes the region
333     // will determine whether the region is split or not in case of failures.
334     // If it is successful, master will roll-forward, if not, master will rollback
335     // and assign the parent region.
336     if (!testing && useZKForAssignment) {
337       if (metaEntries == null || metaEntries.isEmpty()) {
338         MetaTableAccessor.splitRegion(server.getConnection(),
339           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
340           daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
341           parent.getTableDesc().getRegionReplication());
342       } else {
343         offlineParentInMetaAndputMetaEntries(server.getConnection(),
344           parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions
345               .getSecond().getRegionInfo(), server.getServerName(), metaEntries,
346               parent.getTableDesc().getRegionReplication());
347       }
348     } else if (services != null && !useZKForAssignment) {
349       if (!services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
350           parent.getRegionInfo(), hri_a, hri_b)) {
351         // Passed PONR, let SSH clean it up
352         throw new IOException("Failed to notify master that split passed PONR: "
353           + parent.getRegionInfo().getRegionNameAsString());
354       }
355     }
356     return daughterRegions;
357   }
358 
359   public PairOfSameType<Region> stepsBeforePONR(final Server server,
360       final RegionServerServices services, boolean testing) throws IOException {
361 
362     if (useCoordinatedStateManager(server)) {
363       if (std == null) {
364         std =
365             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
366                 .getSplitTransactionCoordination().getDefaultDetails();
367       }
368       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
369           .getSplitTransactionCoordination().startSplitTransaction(parent, server.getServerName(),
370             hri_a, hri_b);
371     } else if (services != null && !useZKForAssignment) {
372       if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
373           parent.getRegionInfo(), hri_a, hri_b)) {
374         throw new IOException("Failed to get ok from master to split "
375           + parent.getRegionInfo().getRegionNameAsString());
376       }
377     }
378 
379     transition(SplitTransactionPhase.SET_SPLITTING);
380 
381     if (useCoordinatedStateManager(server)) {
382       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
383           .getSplitTransactionCoordination().waitForSplitTransaction(services, parent, hri_a,
384             hri_b, std);
385     }
386 
387     this.parent.getRegionFileSystem().createSplitsDir();
388 
389     transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
390 
391     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
392     Exception exceptionToThrow = null;
393     try{
394       hstoreFilesToSplit = this.parent.close(false);
395     } catch (Exception e) {
396       exceptionToThrow = e;
397     }
398     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
399       // The region was closed by a concurrent thread.  We can't continue
400       // with the split, instead we must just abandon the split.  If we
401       // reopen or split this could cause problems because the region has
402       // probably already been moved to a different server, or is in the
403       // process of moving to a different server.
404       exceptionToThrow = closedByOtherException;
405     }
406     if (exceptionToThrow != closedByOtherException) {
407       transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
408     }
409     if (exceptionToThrow != null) {
410       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
411       throw new IOException(exceptionToThrow);
412     }
413     if (!testing) {
414       services.removeFromOnlineRegions(this.parent, null);
415     }
416 
417     transition(SplitTransactionPhase.OFFLINED_PARENT);
418 
419     // TODO: If splitStoreFiles were multithreaded would we complete steps in
420     // less elapsed time?  St.Ack 20100920
421     //
422     // splitStoreFiles creates daughter region dirs under the parent splits dir
423     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
424     // clean this up.
425     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
426 
427     // Log to the journal that we are creating region A, the first daughter
428     // region.  We could fail halfway through.  If we do, we could have left
429     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
430     // add entry to journal BEFORE rather than AFTER the change.
431     transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
432 
433     assertReferenceFileCount(expectedReferences.getFirst(),
434         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
435     Region a = this.parent.createDaughterRegionFromSplits(this.hri_a);
436     assertReferenceFileCount(expectedReferences.getFirst(),
437         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
438 
439     // Ditto
440     transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
441 
442     assertReferenceFileCount(expectedReferences.getSecond(),
443         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
444     Region b = this.parent.createDaughterRegionFromSplits(this.hri_b);
445     assertReferenceFileCount(expectedReferences.getSecond(),
446         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
447 
448     return new PairOfSameType<Region>(a, b);
449   }
450 
451   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
452       throws IOException {
453     if (expectedReferenceFileCount != 0 &&
454         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
455           dir)) {
456       throw new IOException("Failing split. Expected reference file count isn't equal.");
457     }
458   }
459 
460   /**
461    * Perform time consuming opening of the daughter regions.
462    * @param server Hosting server instance.  Can be null when testing
463    * @param services Used to online/offline regions.
464    * @param a first daughter region
465    * @param a second daughter region
466    * @throws IOException If thrown, transaction failed.
467    *          Call {@link #rollback(Server, RegionServerServices)}
468    */
469   /* package */void openDaughters(final Server server,
470       final RegionServerServices services, Region a, Region b)
471       throws IOException {
472     boolean stopped = server != null && server.isStopped();
473     boolean stopping = services != null && services.isStopping();
474     // TODO: Is this check needed here?
475     if (stopped || stopping) {
476       LOG.info("Not opening daughters " +
477           b.getRegionInfo().getRegionNameAsString() +
478           " and " +
479           a.getRegionInfo().getRegionNameAsString() +
480           " because stopping=" + stopping + ", stopped=" + stopped);
481     } else {
482       // Open daughters in parallel.
483       DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a);
484       DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b);
485       aOpener.start();
486       bOpener.start();
487       try {
488         aOpener.join();
489         if (aOpener.getException() == null) {
490           transition(SplitTransactionPhase.OPENED_REGION_A);
491         }
492         bOpener.join();
493         if (bOpener.getException() == null) {
494           transition(SplitTransactionPhase.OPENED_REGION_B);
495         }
496       } catch (InterruptedException e) {
497         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
498       }
499       if (aOpener.getException() != null) {
500         throw new IOException("Failed " +
501           aOpener.getName(), aOpener.getException());
502       }
503       if (bOpener.getException() != null) {
504         throw new IOException("Failed " +
505           bOpener.getName(), bOpener.getException());
506       }
507       if (services != null) {
508         try {
509           if (useZKForAssignment) {
510             // add 2nd daughter first (see HBASE-4335)
511             services.postOpenDeployTasks(b);
512           } else if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
513               parent.getRegionInfo(), hri_a, hri_b)) {
514             throw new IOException("Failed to report split region to master: "
515               + parent.getRegionInfo().getShortNameToLog());
516           }
517           // Should add it to OnlineRegions
518           services.addToOnlineRegions(b);
519           if (useZKForAssignment) {
520             services.postOpenDeployTasks(a);
521           }
522           services.addToOnlineRegions(a);
523         } catch (KeeperException ke) {
524           throw new IOException(ke);
525         }
526       }
527     }
528   }
529 
530   public PairOfSameType<Region> execute(final Server server,
531     final RegionServerServices services)
532         throws IOException {
533     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
534       LOG.warn("Should use execute(Server, RegionServerServices, User)");
535     }
536     return execute(server, services, null);
537   }
538 
539   /**
540    * Run the transaction.
541    * @param server Hosting server instance.  Can be null when testing
542    * @param services Used to online/offline regions.
543    * @throws IOException If thrown, transaction failed.
544    *          Call {@link #rollback(Server, RegionServerServices)}
545    * @return Regions created
546    * @throws IOException
547    * @see #rollback(Server, RegionServerServices)
548    */
549   @Override
550   public PairOfSameType<Region> execute(final Server server,
551       final RegionServerServices services, User user) throws IOException {
552     this.server = server;
553     this.rsServices = services;
554     useZKForAssignment = server == null ? true :
555       ConfigUtil.useZKForAssignment(server.getConfiguration());
556     if (useCoordinatedStateManager(server)) {
557       std =
558           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
559               .getSplitTransactionCoordination().getDefaultDetails();
560     }
561     PairOfSameType<Region> regions = createDaughters(server, services, user);
562     if (this.parent.getCoprocessorHost() != null) {
563       if (user == null) {
564         parent.getCoprocessorHost().preSplitAfterPONR();
565       } else {
566         try {
567           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
568             @Override
569             public Void run() throws Exception {
570               parent.getCoprocessorHost().preSplitAfterPONR();
571               return null;
572             }
573           });
574         } catch (InterruptedException ie) {
575           InterruptedIOException iioe = new InterruptedIOException();
576           iioe.initCause(ie);
577           throw iioe;
578         }
579       }
580     }
581     regions = stepsAfterPONR(server, services, regions, user);
582 
583     transition(SplitTransactionPhase.COMPLETED);
584 
585     return regions;
586   }
587 
588 	@Deprecated
589   public PairOfSameType<Region> stepsAfterPONR(final Server server,
590       final RegionServerServices services, final PairOfSameType<Region> regions)
591       throws IOException {
592     return stepsAfterPONR(server, services, regions, null);
593   }
594 
595   public PairOfSameType<Region> stepsAfterPONR(final Server server,
596       final RegionServerServices services, final PairOfSameType<Region> regions, User user)
597       throws IOException {
598     openDaughters(server, services, regions.getFirst(), regions.getSecond());
599     if (useCoordinatedStateManager(server)) {
600       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
601           .getSplitTransactionCoordination().completeSplitTransaction(services, regions.getFirst(),
602             regions.getSecond(), std, parent);
603     }
604 
605     transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
606 
607     // Coprocessor callback
608     if (parent.getCoprocessorHost() != null) {
609       if (user == null) {
610         this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
611       } else {
612         try {
613           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
614             @Override
615             public Void run() throws Exception {
616               parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
617               return null;
618             }
619           });
620         } catch (InterruptedException ie) {
621           InterruptedIOException iioe = new InterruptedIOException();
622           iioe.initCause(ie);
623           throw iioe;
624         }
625       }
626     }
627 
628     transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
629 
630     return regions;
631   }
632 
633   private void offlineParentInMetaAndputMetaEntries(HConnection hConnection,
634       HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
635       ServerName serverName, List<Mutation> metaEntries, int regionReplication)
636           throws IOException {
637     List<Mutation> mutations = metaEntries;
638     HRegionInfo copyOfParent = new HRegionInfo(parent);
639     copyOfParent.setOffline(true);
640     copyOfParent.setSplit(true);
641 
642     //Put for parent
643     Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
644     MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
645     mutations.add(putParent);
646     
647     //Puts for daughters
648     Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
649     Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
650 
651     addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
652     addLocation(putB, serverName, 1);
653     mutations.add(putA);
654     mutations.add(putB);
655 
656     // Add empty locations for region replicas of daughters so that number of replicas can be
657     // cached whenever the primary region is looked up from meta
658     for (int i = 1; i < regionReplication; i++) {
659       addEmptyLocation(putA, i);
660       addEmptyLocation(putB, i);
661     }
662 
663     MetaTableAccessor.mutateMetaTable(hConnection, mutations);
664   }
665 
666   private static Put addEmptyLocation(final Put p, int replicaId){
667     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null);
668     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId),
669       null);
670     p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null);
671     return p;
672   }
673 
674   public Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
675     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
676       Bytes.toBytes(sn.getHostAndPort()));
677     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
678       Bytes.toBytes(sn.getStartcode()));
679     p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
680         Bytes.toBytes(openSeqNum));
681     return p;
682   }
683 
684   /*
685    * Open daughter region in its own thread.
686    * If we fail, abort this hosting server.
687    */
688   class DaughterOpener extends HasThread {
689     private final Server server;
690     private final HRegion r;
691     private Throwable t = null;
692 
693     DaughterOpener(final Server s, final HRegion r) {
694       super((s == null? "null-services": s.getServerName()) +
695         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
696       setDaemon(true);
697       this.server = s;
698       this.r = r;
699     }
700 
701     /**
702      * @return Null if open succeeded else exception that causes us fail open.
703      * Call it after this thread exits else you may get wrong view on result.
704      */
705     Throwable getException() {
706       return this.t;
707     }
708 
709     @Override
710     public void run() {
711       try {
712         openDaughterRegion(this.server, r);
713       } catch (Throwable t) {
714         this.t = t;
715       }
716     }
717   }
718 
719   /**
720    * Open daughter regions, add them to online list and update meta.
721    * @param server
722    * @param daughter
723    * @throws IOException
724    * @throws KeeperException
725    */
726   void openDaughterRegion(final Server server, final HRegion daughter)
727   throws IOException, KeeperException {
728     HRegionInfo hri = daughter.getRegionInfo();
729     LoggingProgressable reporter = server == null ? null
730         : new LoggingProgressable(hri, server.getConfiguration().getLong(
731             "hbase.regionserver.split.daughter.open.log.interval", 10000));
732     daughter.openHRegion(reporter);
733   }
734 
735   static class LoggingProgressable implements CancelableProgressable {
736     private final HRegionInfo hri;
737     private long lastLog = -1;
738     private final long interval;
739 
740     LoggingProgressable(final HRegionInfo hri, final long interval) {
741       this.hri = hri;
742       this.interval = interval;
743     }
744 
745     @Override
746     public boolean progress() {
747       long now = EnvironmentEdgeManager.currentTime();
748       if (now - lastLog > this.interval) {
749         LOG.info("Opening " + this.hri.getRegionNameAsString());
750         this.lastLog = now;
751       }
752       return true;
753     }
754   }
755 
756   private boolean useCoordinatedStateManager(final Server server) {
757     return server != null && useZKForAssignment && server.getCoordinatedStateManager() != null;
758   }
759 
760   /**
761    * Creates reference files for top and bottom half of the
762    * @param hstoreFilesToSplit map of store files to create half file references for.
763    * @return the number of reference files that were created.
764    * @throws IOException
765    */
766   private Pair<Integer, Integer> splitStoreFiles(
767       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
768       throws IOException {
769     if (hstoreFilesToSplit == null) {
770       // Could be null because close didn't succeed -- for now consider it fatal
771       throw new IOException("Close returned empty list of StoreFiles");
772     }
773     // The following code sets up a thread pool executor with as many slots as
774     // there's files to split. It then fires up everything, waits for
775     // completion and finally checks for any exception
776     int nbFiles = 0;
777     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
778         nbFiles += entry.getValue().size();
779     }
780     if (nbFiles == 0) {
781       // no file needs to be splitted.
782       return new Pair<Integer, Integer>(0,0);
783     }
784     // Default max #threads to use is the smaller of table's configured number of blocking store
785     // files or the available number of logical cores.
786     int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
787                 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
788             Runtime.getRuntime().availableProcessors());
789     // Max #threads is the smaller of the number of storefiles or the default max determined above.
790     int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
791                 defMaxThreads), nbFiles);
792     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
793             " using " + maxThreads + " threads");
794     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
795     builder.setNameFormat("StoreFileSplitter-%1$d");
796     ThreadFactory factory = builder.build();
797     ThreadPoolExecutor threadPool =
798       (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
799     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
800 
801     // Split each store file.
802     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
803       for (StoreFile sf: entry.getValue()) {
804         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
805         futures.add(threadPool.submit(sfs));
806       }
807     }
808     // Shutdown the pool
809     threadPool.shutdown();
810 
811     // Wait for all the tasks to finish
812     try {
813       boolean stillRunning = !threadPool.awaitTermination(
814           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
815       if (stillRunning) {
816         threadPool.shutdownNow();
817         // wait for the thread to shutdown completely.
818         while (!threadPool.isTerminated()) {
819           Thread.sleep(50);
820         }
821         throw new IOException("Took too long to split the" +
822             " files and create the references, aborting split");
823       }
824     } catch (InterruptedException e) {
825       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
826     }
827 
828     int created_a = 0;
829     int created_b = 0;
830     // Look for any exception
831     for (Future<Pair<Path, Path>> future : futures) {
832       try {
833         Pair<Path, Path> p = future.get();
834         created_a += p.getFirst() != null ? 1 : 0;
835         created_b += p.getSecond() != null ? 1 : 0;
836       } catch (InterruptedException e) {
837         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
838       } catch (ExecutionException e) {
839         throw new IOException(e);
840       }
841     }
842 
843     if (LOG.isDebugEnabled()) {
844       LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
845           + " storefiles, Daughter B: " + created_b + " storefiles.");
846     }
847     return new Pair<Integer, Integer>(created_a, created_b);
848   }
849 
850   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
851       throws IOException {
852     if (LOG.isDebugEnabled()) {
853         LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
854                   this.parent);
855     }
856     HRegionFileSystem fs = this.parent.getRegionFileSystem();
857     String familyName = Bytes.toString(family);
858 
859     Path path_a =
860         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
861           this.parent.getSplitPolicy());
862     Path path_b =
863         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
864           this.parent.getSplitPolicy());
865     if (LOG.isDebugEnabled()) {
866         LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
867                   this.parent);
868     }
869     return new Pair<Path,Path>(path_a, path_b);
870   }
871 
872   /**
873    * Utility class used to do the file splitting / reference writing
874    * in parallel instead of sequentially.
875    */
876   class StoreFileSplitter implements Callable<Pair<Path,Path>> {
877     private final byte[] family;
878     private final StoreFile sf;
879 
880     /**
881      * Constructor that takes what it needs to split
882      * @param family Family that contains the store file
883      * @param sf which file
884      */
885     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
886       this.sf = sf;
887       this.family = family;
888     }
889 
890     public Pair<Path,Path> call() throws IOException {
891       return splitStoreFile(family, sf);
892     }
893   }
894   
895   @Override
896   public boolean rollback(final Server server, final RegionServerServices services)
897       throws IOException {
898     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
899       LOG.warn("Should use rollback(Server, RegionServerServices, User)");
900     }
901     return rollback(server, services, null);
902   }
903 
904   /**
905    * @param server Hosting server instance (May be null when testing).
906    * @param services
907    * @throws IOException If thrown, rollback failed.  Take drastic action.
908    * @return True if we successfully rolled back, false if we got to the point
909    * of no return and so now need to abort the server to minimize damage.
910    */
911   @Override
912   @SuppressWarnings("deprecation")
913   public boolean rollback(final Server server, final RegionServerServices services, User user)
914   throws IOException {
915     // Coprocessor callback
916     if (this.parent.getCoprocessorHost() != null) {
917       if (user == null) {
918         this.parent.getCoprocessorHost().preRollBackSplit();
919       } else {
920         try {
921           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
922             @Override
923             public Void run() throws Exception {
924               parent.getCoprocessorHost().preRollBackSplit();
925               return null;
926             }
927           });
928         } catch (InterruptedException ie) {
929           InterruptedIOException iioe = new InterruptedIOException();
930           iioe.initCause(ie);
931           throw iioe;
932         }
933       }
934     }
935 
936     boolean result = true;
937     ListIterator<JournalEntry> iterator =
938       this.journal.listIterator(this.journal.size());
939     // Iterate in reverse.
940     while (iterator.hasPrevious()) {
941       JournalEntry je = iterator.previous();
942 
943       transition(je.getPhase(), true);
944 
945       switch(je.getPhase()) {
946 
947       case SET_SPLITTING:
948         if (useCoordinatedStateManager(server) && server instanceof HRegionServer) {
949           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
950               .getSplitTransactionCoordination().clean(this.parent.getRegionInfo());
951         } else if (services != null && !useZKForAssignment
952             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
953                 parent.getRegionInfo(), hri_a, hri_b)) {
954           return false;
955         }
956         break;
957 
958       case CREATE_SPLIT_DIR:
959         this.parent.writestate.writesEnabled = true;
960         this.parent.getRegionFileSystem().cleanupSplitsDir();
961         break;
962 
963       case CLOSED_PARENT_REGION:
964         try {
965           // So, this returns a seqid but if we just closed and then reopened, we
966           // should be ok. On close, we flushed using sequenceid obtained from
967           // hosting regionserver so no need to propagate the sequenceid returned
968           // out of initialize below up into regionserver as we normally do.
969           // TODO: Verify.
970           this.parent.initialize();
971         } catch (IOException e) {
972           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
973             this.parent.getRegionInfo().getRegionNameAsString(), e);
974           throw new RuntimeException(e);
975         }
976         break;
977 
978       case STARTED_REGION_A_CREATION:
979         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
980         break;
981 
982       case STARTED_REGION_B_CREATION:
983         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
984         break;
985 
986       case OFFLINED_PARENT:
987         if (services != null) services.addToOnlineRegions(this.parent);
988         break;
989 
990       case PONR:
991         // We got to the point-of-no-return so we need to just abort. Return
992         // immediately.  Do not clean up created daughter regions.  They need
993         // to be in place so we don't delete the parent region mistakenly.
994         // See HBASE-3872.
995         return false;
996 
997       // Informational only cases
998       case STARTED:
999       case PREPARED:
1000       case BEFORE_PRE_SPLIT_HOOK:
1001       case AFTER_PRE_SPLIT_HOOK:
1002       case BEFORE_POST_SPLIT_HOOK:
1003       case AFTER_POST_SPLIT_HOOK:
1004       case OPENED_REGION_A:
1005       case OPENED_REGION_B:
1006       case COMPLETED:
1007         break;
1008 
1009       default:
1010         throw new RuntimeException("Unhandled journal entry: " + je);
1011       }
1012     }
1013     // Coprocessor callback
1014     if (this.parent.getCoprocessorHost() != null) {
1015       if (user == null) {
1016         this.parent.getCoprocessorHost().postRollBackSplit();
1017       } else {
1018         try {
1019           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
1020             @Override
1021             public Void run() throws Exception {
1022               parent.getCoprocessorHost().postRollBackSplit();
1023               return null;
1024             }
1025           });
1026         } catch (InterruptedException ie) {
1027           InterruptedIOException iioe = new InterruptedIOException();
1028           iioe.initCause(ie);
1029           throw iioe;
1030         }
1031       }
1032     }
1033     return result;
1034   }
1035 
1036   HRegionInfo getFirstDaughter() {
1037     return hri_a;
1038   }
1039 
1040   HRegionInfo getSecondDaughter() {
1041     return hri_b;
1042   }
1043 
1044   @Override
1045   public List<JournalEntry> getJournal() {
1046     return journal;
1047   }
1048 
1049   @Override
1050   public SplitTransaction registerTransactionListener(TransactionListener listener) {
1051     listeners.add(listener);
1052     return this;
1053   }
1054 
1055   @Override
1056   public Server getServer() {
1057     return server;
1058   }
1059 
1060   @Override
1061   public RegionServerServices getRegionServerServices() {
1062     return rsServices;
1063   }
1064 }