View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to you under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.quotas;
18  
19  import java.io.IOException;
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.ScheduledChore;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Connection;
31  import org.apache.hadoop.hbase.client.Result;
32  import org.apache.hadoop.hbase.client.ResultScanner;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.util.Bytes;
35  
36  /**
37   * A {@link ScheduledChore} which periodically updates the {@link RegionServerSpaceQuotaManager}
38   * with information from the hbase:quota.
39   */
40  public class SpaceQuotaRefresherChore extends ScheduledChore {
41    private static final Log LOG = LogFactory.getLog(SpaceQuotaRefresherChore.class);
42  
43    static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
44        "hbase.regionserver.quotas.policy.refresher.chore.period";
45    static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1; // 1 minute in millis
46  
47    static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
48        "hbase.regionserver.quotas.policy.refresher.chore.delay";
49    static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L; // 15 seconds in millis
50  
51    static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
52        "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
53    static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
54  
55    static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
56        "hbase.regionserver.quotas.policy.refresher.report.percent";
57    static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
58  
59    private final RegionServerSpaceQuotaManager manager;
60    private final Connection conn;
61  
62    public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
63      super(SpaceQuotaRefresherChore.class.getSimpleName(),
64          manager.getRegionServerServices(),
65          getPeriod(manager.getRegionServerServices().getConfiguration()),
66          getInitialDelay(manager.getRegionServerServices().getConfiguration()),
67          getTimeUnit(manager.getRegionServerServices().getConfiguration()));
68      this.manager = manager;
69      this.conn = conn;
70    }
71  
72    @Override
73    protected void chore() {
74      try {
75        if (LOG.isTraceEnabled()) {
76          LOG.trace("Reading current quota violations from hbase:quota.");
77        }
78        // Get the snapshots that the quota manager is currently aware of
79        final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
80            getManager().copyQuotaSnapshots();
81        // Read the new snapshots from the quota table
82        final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
83        if (LOG.isTraceEnabled()) {
84          LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
85              + "read " + newSnapshots.size() + " from the quota table.");
86        }
87        // Iterate over each new quota snapshot
88        for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
89          final TableName tableName = entry.getKey();
90          final SpaceQuotaSnapshot newSnapshot = entry.getValue();
91          // May be null!
92          final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
93          if (LOG.isTraceEnabled()) {
94            LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
95          }
96          if (!newSnapshot.equals(currentSnapshot)) {
97            // We have a new snapshot. We might need to enforce it or disable the enforcement
98            if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
99              if (LOG.isTraceEnabled()) {
100               LOG.trace("Enabling " + newSnapshot + " on " + tableName);
101             }
102             getManager().enforceViolationPolicy(tableName, newSnapshot);
103           }
104           if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
105             if (LOG.isTraceEnabled()) {
106               LOG.trace("Removing quota violation policy on " + tableName);
107             }
108             getManager().disableViolationPolicyEnforcement(tableName);
109           }
110         }
111       }
112 
113       // Update the snapshots in the manager
114       getManager().updateQuotaSnapshot(newSnapshots);
115     } catch (IOException e) {
116       LOG.warn(
117           "Caught exception while refreshing enforced quota violation policies, will retry.", e);
118     }
119   }
120 
121   /**
122    * Checks if the given <code>snapshot</code> is in violation, allowing the snapshot to be null.
123    * If the snapshot is null, this is interpreted as no snapshot which implies not in violation.
124    *
125    * @param snapshot The snapshot to operate on.
126    * @return true if the snapshot is in violation, false otherwise.
127    */
128   boolean isInViolation(SpaceQuotaSnapshot snapshot) {
129     if (snapshot == null) {
130       return false;
131     }
132     return snapshot.getQuotaStatus().isInViolation();
133   }
134 
135   /**
136    * Reads all quota violation policies which are to be enforced from the quota table.
137    *
138    * @return The collection of tables which are in violation of their quota and the policy which
139    *    should be enforced.
140    */
141   public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
142     try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
143         ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
144       Map<TableName,SpaceQuotaSnapshot> activeViolations = new HashMap<>();
145       for (Result result : scanner) {
146         try {
147           extractQuotaSnapshot(result, activeViolations);
148         } catch (IllegalArgumentException e) {
149           final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
150           LOG.error(msg, e);
151           throw new IOException(msg, e);
152         }
153       }
154       return activeViolations;
155     }
156   }
157 
158   /**
159    * Wrapper around {@link QuotaTableUtil#extractQuotaSnapshot(Result, Map)} for testing.
160    */
161   void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
162     QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
163   }
164 
165   Connection getConnection() {
166     return conn;
167   }
168 
169   RegionServerSpaceQuotaManager getManager() {
170     return manager;
171   }
172 
173   /**
174    * Extracts the period for the chore from the configuration.
175    *
176    * @param conf The configuration object.
177    * @return The configured chore period or the default value.
178    */
179   static int getPeriod(Configuration conf) {
180     return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
181         POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
182   }
183 
184   /**
185    * Extracts the initial delay for the chore from the configuration.
186    *
187    * @param conf The configuration object.
188    * @return The configured chore initial delay or the default value.
189    */
190   static long getInitialDelay(Configuration conf) {
191     return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
192         POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
193   }
194 
195   /**
196    * Extracts the time unit for the chore period and initial delay from the configuration. The
197    * configuration value for {@link #POLICY_REFRESHER_CHORE_TIMEUNIT_KEY} must correspond to
198    * a {@link TimeUnit} value.
199    *
200    * @param conf The configuration object.
201    * @return The configured time unit for the chore period and initial delay or the default value.
202    */
203   static TimeUnit getTimeUnit(Configuration conf) {
204     return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
205         POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
206   }
207 
208   /**
209    * Extracts the percent of Regions for a table to have been reported to enable quota violation
210    * state change.
211    *
212    * @param conf The configuration object.
213    * @return The percent of regions reported to use.
214    */
215   static Double getRegionReportPercent(Configuration conf) {
216     return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
217         POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
218   }
219 }