1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.master;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.ThreadPoolExecutor;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
32 import org.apache.hadoop.hbase.errorhandling.ForeignException;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
34 import org.apache.hadoop.hbase.master.MasterServices;
35 import org.apache.hadoop.hbase.master.MetricsMaster;
36 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
37 import org.apache.hadoop.hbase.procedure.Procedure;
38 import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
39 import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
40 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
41 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
42 import org.apache.zookeeper.KeeperException;
43
44 public class LogRollMasterProcedureManager extends MasterProcedureManager {
45
46 public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
47 public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
48 public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.master.wake.millis";
49 public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.master.timeout.millis";
50 public static final String BACKUP_POOL_THREAD_NUMBER_KEY = "hbase.backup.master.pool.thread.number";
51
52 public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500;
53 public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
54 public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8;
55
56
57 private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
58
59 private MasterServices master;
60 private ProcedureCoordinator coordinator;
61 private boolean done;
62
63 @Override
64 public void stop(String why) {
65 LOG.info("stop: " + why);
66 }
67
68 @Override
69 public boolean isStopped() {
70 return false;
71 }
72
73 @Override
74 public void initialize(MasterServices master, MetricsMaster metricsMaster)
75 throws KeeperException, IOException, UnsupportedOperationException {
76 this.master = master;
77 this.done = false;
78
79
80 String name = master.getServerName().toString();
81
82
83
84 Configuration conf = master.getConfiguration();
85 long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT);
86 long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT);
87 int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY,
88 BACKUP_POOL_THREAD_NUMBER_DEFAULT);
89
90
91 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
92 BaseCoordinatedStateManager coordManager =
93 (BaseCoordinatedStateManager) CoordinatedStateManagerFactory
94 .getCoordinatedStateManager(master.getConfiguration());
95 coordManager.initialize(master);
96 ProcedureCoordinatorRpcs comms =
97 coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
98 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
99
100 }
101
102 @Override
103 public String getProcedureSignature() {
104 return ROLLLOG_PROCEDURE_SIGNATURE;
105 }
106
107 @Override
108 public void execProcedure(ProcedureDescription desc) throws IOException {
109 this.done = false;
110
111 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance());
112 List<ServerName> serverNames = master.getServerManager().getOnlineServersList();
113 List<String> servers = new ArrayList<String>();
114 for (ServerName sn : serverNames) {
115 servers.add(sn.toString());
116 }
117
118 List<NameStringPair> conf = desc.getConfigurationList();
119 byte[] data = new byte[0];
120 if(conf.size() > 0){
121
122 data = conf.get(0).getValue().getBytes();
123 }
124 Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), data, servers);
125 if (proc == null) {
126 String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'";
127 LOG.error(msg);
128 throw new IOException(msg);
129 }
130
131 try {
132
133
134 proc.waitForCompleted();
135 LOG.info("Done waiting - exec procedure for " + desc.getInstance());
136 LOG.info("Distributed roll log procedure is successful!");
137 this.done = true;
138 } catch (InterruptedException e) {
139 ForeignException ee =
140 new ForeignException("Interrupted while waiting for roll log procdure to finish", e);
141 monitor.receive(ee);
142 Thread.currentThread().interrupt();
143 } catch (ForeignException e) {
144 ForeignException ee =
145 new ForeignException("Exception while waiting for roll log procdure to finish", e);
146 monitor.receive(ee);
147 }
148 monitor.rethrowException();
149 }
150
151 @Override
152 public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
153 return done;
154 }
155
156 }