1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.regionserver;
20
21
22 import java.io.IOException;
23 import java.util.concurrent.ThreadPoolExecutor;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
29 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
30 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
31 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
32 import org.apache.hadoop.hbase.procedure.ProcedureMember;
33 import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
34 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
35 import org.apache.hadoop.hbase.procedure.Subprocedure;
36 import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
37 import org.apache.hadoop.hbase.regionserver.HRegionServer;
38 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager {
53
54 private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class);
55
56
57 public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
58
59 public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
60
61 public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
62 public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
63
64
65 public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
66
67 private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
68
69 private RegionServerServices rss;
70 private ProcedureMemberRpcs memberRpcs;
71 private ProcedureMember member;
72
73
74
75
76 public LogRollRegionServerProcedureManager() {
77 }
78
79
80
81
82 @Override
83 public void start() {
84 this.memberRpcs.start(rss.getServerName().toString(), member);
85 LOG.info("Started region server backup manager.");
86 }
87
88
89
90
91
92
93 @Override
94 public void stop(boolean force) throws IOException {
95 String mode = force ? "abruptly" : "gracefully";
96 LOG.info("Stopping RegionServerBackupManager " + mode + ".");
97
98 try {
99 this.member.close();
100 } finally {
101 this.memberRpcs.close();
102 }
103 }
104
105
106
107
108
109 public Subprocedure buildSubprocedure(byte[] data) {
110
111
112 if (rss.isStopping() || rss.isStopped()) {
113 throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
114 + ", because stopping/stopped!");
115 }
116
117 LOG.info("Attempting to run a roll log procedure for backup.");
118 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
119 Configuration conf = rss.getConfiguration();
120 long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
121 long wakeMillis =
122 conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
123
124 LogRollBackupSubprocedurePool taskManager =
125 new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf);
126 return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
127 taskManager, data);
128
129 }
130
131
132
133
134 public class BackupSubprocedureBuilder implements SubprocedureFactory {
135
136 @Override
137 public Subprocedure buildSubprocedure(String name, byte[] data) {
138 return LogRollRegionServerProcedureManager.this.buildSubprocedure(data);
139 }
140 }
141
142 @Override
143 public void initialize(RegionServerServices rss) throws IOException {
144 this.rss = rss;
145 BaseCoordinatedStateManager coordManager =
146 (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
147 .getConfiguration());
148 coordManager.initialize(rss);
149 this.memberRpcs =
150 coordManager
151 .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
152
153
154 Configuration conf = rss.getConfiguration();
155 long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
156 int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
157
158 ThreadPoolExecutor pool =
159 ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
160 this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
161 }
162
163 @Override
164 public String getProcedureSignature() {
165 return "backup-proc";
166 }
167
168 }