1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.backup.regionserver;
20
21 import java.util.HashMap;
22 import java.util.concurrent.Callable;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
27 import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
28 import org.apache.hadoop.hbase.client.Connection;
29 import org.apache.hadoop.hbase.errorhandling.ForeignException;
30 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31 import org.apache.hadoop.hbase.procedure.ProcedureMember;
32 import org.apache.hadoop.hbase.procedure.Subprocedure;
33 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
34 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
35
36
37
38
39 public class LogRollBackupSubprocedure extends Subprocedure {
40 private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
41
42 private final RegionServerServices rss;
43 private final LogRollBackupSubprocedurePool taskManager;
44 private FSHLog hlog;
45 private String backupRoot;
46
47 public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
48 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
49 LogRollBackupSubprocedurePool taskManager, byte[] data) {
50
51 super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
52 wakeFrequency, timeout);
53 LOG.info("Constructing a LogRollBackupSubprocedure.");
54 this.rss = rss;
55 this.taskManager = taskManager;
56 if(data != null) {
57 backupRoot = new String(data);
58 }
59 }
60
61
62
63
64
65 class RSRollLogTask implements Callable<Void> {
66 RSRollLogTask() {
67 }
68
69 @Override
70 public Void call() throws Exception {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("++ DRPC started: " + rss.getServerName());
73 }
74 hlog = (FSHLog) rss.getWAL(null);
75 long filenum = hlog.getFilenum();
76
77 LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
78 hlog.rollWriter(true);
79 LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
80
81 Connection connection = rss.getConnection();
82 try(final BackupSystemTable table = new BackupSystemTable(connection)) {
83
84 HashMap<String, Long> serverTimestampMap = table.readRegionServerLastLogRollResult(backupRoot);
85 String host = rss.getServerName().getHostname();
86 int port = rss.getServerName().getPort();
87 String server = host + ":" + port;
88 Long sts = serverTimestampMap.get(host);
89 if (sts != null && sts > filenum) {
90 LOG.warn("Won't update server's last roll log result: current="
91 + sts + " new=" + filenum);
92 return null;
93 }
94
95 table.writeRegionServerLastLogRollResult(server, filenum, backupRoot);
96 return null;
97 } catch (Exception e) {
98 LOG.error(e);
99 throw e;
100 }
101 }
102 }
103
104 private void rolllog() throws ForeignException {
105 monitor.rethrowException();
106
107 taskManager.submitTask(new RSRollLogTask());
108 monitor.rethrowException();
109
110
111 taskManager.waitForOutstandingTasks();
112 monitor.rethrowException();
113
114 }
115
116 @Override
117 public void acquireBarrier() throws ForeignException {
118
119 }
120
121
122
123
124
125 @Override
126 public byte[] insideBarrier() throws ForeignException {
127 rolllog();
128
129 return null;
130 }
131
132
133
134
135 @Override
136 public void cleanup(Exception e) {
137 taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
138 }
139
140
141
142
143 public void releaseBarrier() {
144
145 }
146
147 }