View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements. See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.backup.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.ThreadPoolExecutor;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
30  import org.apache.hadoop.hbase.ServerName;
31  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
32  import org.apache.hadoop.hbase.errorhandling.ForeignException;
33  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
34  import org.apache.hadoop.hbase.master.MasterServices;
35  import org.apache.hadoop.hbase.master.MetricsMaster;
36  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
37  import org.apache.hadoop.hbase.procedure.Procedure;
38  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
39  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
40  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
42  import org.apache.zookeeper.KeeperException;
43  
44  public class LogRollMasterProcedureManager extends MasterProcedureManager {
45  
46    public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
47    public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
48    public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.master.wake.millis";
49    public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.master.timeout.millis";
50    public static final String BACKUP_POOL_THREAD_NUMBER_KEY = "hbase.backup.master.pool.thread.number";
51  
52    public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500;
53    public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
54    public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8;
55  
56  
57    private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
58  
59    private MasterServices master;
60    private ProcedureCoordinator coordinator;
61    private boolean done;
62  
63    @Override
64    public void stop(String why) {
65      LOG.info("stop: " + why);
66    }
67  
68    @Override
69    public boolean isStopped() {
70      return false;
71    }
72  
73    @Override
74    public void initialize(MasterServices master, MetricsMaster metricsMaster)
75        throws KeeperException, IOException, UnsupportedOperationException {
76      this.master = master;
77      this.done = false;
78  
79      // setup the default procedure coordinator
80      String name = master.getServerName().toString();
81  
82  
83      // get the configuration for the coordinator
84      Configuration conf = master.getConfiguration();
85      long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT);
86      long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT);
87      int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY,
88                                      BACKUP_POOL_THREAD_NUMBER_DEFAULT);
89  
90      // setup the default procedure coordinator
91      ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
92      BaseCoordinatedStateManager coordManager =
93          (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
94          .getCoordinatedStateManager(master.getConfiguration());
95      coordManager.initialize(master);
96      ProcedureCoordinatorRpcs comms =
97          coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
98      this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
99  
100   }
101 
102   @Override
103   public String getProcedureSignature() {
104     return ROLLLOG_PROCEDURE_SIGNATURE;
105   }
106 
107   @Override
108   public void execProcedure(ProcedureDescription desc) throws IOException {
109     this.done = false;
110     // start the process on the RS
111     ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
112     List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
113     List<String> servers = new ArrayList<String>();
114     for (ServerName sn : serverNames) {
115       servers.add(sn.toString());
116     }
117 
118     List<NameStringPair> conf = desc.getConfigurationList();
119     byte[] data = new byte[0];
120     if(conf.size() > 0){
121       // Get backup root path
122       data = conf.get(0).getValue().getBytes();
123     }
124     Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers);
125     if (proc == null) {
126       String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
127       LOG.error(msg);
128       throw new IOException(msg);
129     }
130 
131     try {
132       // wait for the procedure to complete. A timer thread is kicked off that should cancel this
133       // if it takes too long.
134       proc.waitForCompleted();
135       LOG.info("Done waiting - exec procedure for " + desc.getInstance());
136       LOG.info("Distributed roll log procedure is successful!");
137       this.done = true;
138     } catch (InterruptedException e) {
139       ForeignException ee =
140           new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
141       monitor.receive(ee);
142       Thread.currentThread().interrupt();
143     } catch (ForeignException e) {
144       ForeignException ee =
145           new ForeignException("Exception while waiting for roll log procdure to finish", e);
146       monitor.receive(ee);
147     }
148     monitor.rethrowException();
149   }
150 
151   @Override
152   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
153     return done;
154   }
155 
156 }