View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements. See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership. The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License. You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.backup.regionserver;
20  
21  import java.util.HashMap;
22  import java.util.concurrent.Callable;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
27  import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
28  import org.apache.hadoop.hbase.client.Connection;
29  import org.apache.hadoop.hbase.errorhandling.ForeignException;
30  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
31  import org.apache.hadoop.hbase.procedure.ProcedureMember;
32  import org.apache.hadoop.hbase.procedure.Subprocedure;
33  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
34  import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
35  
36  /**
37   * This backup subprocedure implementation forces a log roll on the RS.
38   */
39  public class LogRollBackupSubprocedure extends Subprocedure {
40    private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class);
41  
42    private final RegionServerServices rss;
43    private final LogRollBackupSubprocedurePool taskManager;
44    private FSHLog hlog;
45    private String backupRoot;
46  
47    public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
48        ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
49        LogRollBackupSubprocedurePool taskManager, byte[] data) {
50  
51      super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
52        wakeFrequency, timeout);
53      LOG.info("Constructing a LogRollBackupSubprocedure.");
54      this.rss = rss;
55      this.taskManager = taskManager;
56      if(data != null) {
57        backupRoot = new String(data);
58      }
59    }
60  
61    /**
62     * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
63     * with no use of subprocedurepool.
64     */
65    class RSRollLogTask implements Callable<Void> {
66      RSRollLogTask() {
67      }
68  
69      @Override
70      public Void call() throws Exception {
71        if (LOG.isDebugEnabled()) {
72          LOG.debug("++ DRPC started: " + rss.getServerName());
73        }
74        hlog = (FSHLog) rss.getWAL(null);
75        long filenum = hlog.getFilenum();
76  
77        LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum);
78        hlog.rollWriter(true);
79        LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum());
80  
81        Connection connection = rss.getConnection();
82        try(final BackupSystemTable table = new BackupSystemTable(connection)) {
83          // sanity check, good for testing
84          HashMap<String, Long> serverTimestampMap = table.readRegionServerLastLogRollResult(backupRoot);
85          String host = rss.getServerName().getHostname();
86          int port = rss.getServerName().getPort();
87          String server = host + ":" + port;
88          Long sts = serverTimestampMap.get(host);
89          if (sts != null && sts > filenum) {
90            LOG.warn("Won't update server's last roll log result: current="
91                + sts + " new=" + filenum);
92            return null;
93          }
94          // write the log number to hbase:backup.
95          table.writeRegionServerLastLogRollResult(server, filenum, backupRoot);
96          return null;
97        } catch (Exception e) {
98          LOG.error(e);
99          throw e; // TODO: is this correct?
100       }
101     }
102   }
103 
104   private void rolllog() throws ForeignException {
105     monitor.rethrowException();
106 
107     taskManager.submitTask(new RSRollLogTask());
108     monitor.rethrowException();
109 
110     // wait for everything to complete.
111     taskManager.waitForOutstandingTasks();
112     monitor.rethrowException();
113 
114   }
115 
116   @Override
117   public void acquireBarrier() throws ForeignException {
118     // do nothing, executing in inside barrier step.
119   }
120 
121   /**
122    * do a log roll.
123    * @return some bytes
124    */
125   @Override
126   public byte[] insideBarrier() throws ForeignException {
127     rolllog();
128     // FIXME
129     return null;
130   }
131 
132   /**
133    * Cancel threads if they haven't finished.
134    */
135   @Override
136   public void cleanup(Exception e) {
137     taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
138   }
139 
140   /**
141    * Hooray!
142    */
143   public void releaseBarrier() {
144     // NO OP
145   }
146 
147 }