View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to you under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.hadoop.hbase.quotas;
18  
19  import java.io.IOException;
20  import java.util.HashMap;
21  import java.util.Map;
22  import java.util.Objects;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.atomic.AtomicReference;
25  import java.util.Map.Entry;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.classification.InterfaceStability;
32  import org.apache.hadoop.hbase.client.Connection;
33  import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
34  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  
38  /**
39   * A manager for filesystem space quotas in the RegionServer.
40   *
41   * This class is the centralized point for what a RegionServer knows about space quotas
42   * on tables. For each table, it tracks two different things: the {@link SpaceQuotaSnapshot}
43   * and a {@link SpaceViolationPolicyEnforcement} (which may be null when a quota is not
44   * being violated). Both of these are sensitive on when they were last updated. The
45   * {link SpaceQutoaViolationPolicyRefresherChore} periodically runs and updates
46   * the state on <code>this</code>.
47   */
48  @InterfaceAudience.Private
49  @InterfaceStability.Evolving
50  public class RegionServerSpaceQuotaManager {
51    private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
52  
53    private final RegionServerServices rsServices;
54  
55    private SpaceQuotaRefresherChore spaceQuotaRefresher;
56    private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
57    private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
58    private SpaceViolationPolicyEnforcementFactory factory;
59  
60    public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
61      this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
62    }
63  
64    @VisibleForTesting
65    RegionServerSpaceQuotaManager(
66        RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
67      this.rsServices = Objects.requireNonNull(rsServices);
68      this.factory = factory;
69      this.enforcedPolicies = new ConcurrentHashMap<>();
70      Map<TableName,SpaceQuotaSnapshot> initialMap = new HashMap<>();
71      this.currentQuotaSnapshots = new AtomicReference<>(initialMap);
72    }
73  
74    public synchronized void start() throws IOException {
75      if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
76        LOG.info("Quota support disabled");
77        return;
78      }
79      if (null != spaceQuotaRefresher) {
80        LOG.warn("RegionServerSpaceQuotaManager has already been started!");
81        return;
82      }
83      this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection());
84      rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
85    }
86  
87    public synchronized void stop() {
88      if (spaceQuotaRefresher != null) {
89        spaceQuotaRefresher.cancel();
90        spaceQuotaRefresher = null;
91      }
92    }
93  
94    /**
95     * Copies the last {@link SpaceQuotaSnapshot}s that were recorded. The current view
96     * of what the RegionServer thinks the table's utilization is.
97     */
98    public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
99      return new HashMap<>(currentQuotaSnapshots.get());
100   }
101 
102   /**
103    * Updates the current {@link SpaceQuotaSnapshot}s for the RegionServer.
104    *
105    * @param newSnapshots The space quota snapshots.
106    */
107   public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
108     currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
109   }
110 
111   /**
112    * Creates an object well-suited for the RegionServer to use in verifying active policies.
113    */
114   public ActivePolicyEnforcement getActiveEnforcements() {
115     return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
116   }
117 
118   /**
119    * Converts a map of table to {@link SpaceViolationPolicyEnforcement}s into
120    * {@link SpaceViolationPolicy}s.
121    */
122   public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
123     final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
124         copyActiveEnforcements();
125     final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
126     for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
127       final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
128       if (snapshot != null) {
129         policies.put(entry.getKey(), snapshot);
130       }
131     }
132     return policies;
133   }
134 
135   /**
136    * Enforces the given violationPolicy on the given table in this RegionServer.
137    */
138   public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
139     SpaceQuotaStatus status = snapshot.getQuotaStatus();
140     if (!status.isInViolation()) {
141       throw new IllegalStateException(
142           tableName + " is not in violation. Violation policy should not be enabled.");
143     }
144     if (LOG.isTraceEnabled()) {
145       LOG.trace(
146           "Enabling violation policy enforcement on " + tableName
147           + " with policy " + status.getPolicy());
148     }
149     // Construct this outside of the lock
150     final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
151         getRegionServerServices(), tableName, snapshot);
152     // "Enables" the policy
153     // HBASE-XXXX: Should this synchronize on the actual table name instead of the map? That would
154     // allow policy enable/disable on different tables to happen concurrently. As written now, only
155     // one table will be allowed to transition at a time. This is probably OK, but not sure if
156     // it would become a bottleneck at large clusters/number of tables.
157     synchronized (enforcedPolicies) {
158       try {
159         enforcement.enable();
160       } catch (IOException e) {
161         LOG.error("Failed to enable space violation policy for " + tableName
162             + ". This table will not enter violation.", e);
163         return;
164       }
165       enforcedPolicies.put(tableName, enforcement);
166     }
167   }
168 
169   /**
170    * Disables enforcement on any violation policy on the given <code>tableName</code>.
171    */
172   public void disableViolationPolicyEnforcement(TableName tableName) {
173     if (LOG.isTraceEnabled()) {
174       LOG.trace("Disabling violation policy enforcement on " + tableName);
175     }
176     // "Disables" the policy
177     synchronized (enforcedPolicies) {
178       SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
179       if (enforcement != null) {
180         try {
181           enforcement.disable();
182         } catch (IOException e) {
183           LOG.error("Failed to disable space violation policy for " + tableName
184               + ". This table will remain in violation.", e);
185           enforcedPolicies.put(tableName, enforcement);
186         }
187       }
188     }
189   }
190 
191   /**
192    * Returns whether or not compactions should be disabled for the given <code>tableName</code> per
193    * a space quota violation policy. A convenience method.
194    *
195    * @param tableName The table to check
196    * @return True if compactions should be disabled for the table, false otherwise.
197    */
198   public boolean areCompactionsDisabled(TableName tableName) {
199     SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
200     if (enforcement != null) {
201       return enforcement.areCompactionsDisabled();
202     }
203     return false;
204   }
205 
206   /**
207    * Returns the collection of tables which have quota violation policies enforced on
208    * this RegionServer.
209    */
210   Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
211     // Allows reads to happen concurrently (or while the map is being updated)
212     return new HashMap<>(this.enforcedPolicies);
213   }
214 
215   RegionServerServices getRegionServerServices() {
216     return rsServices;
217   }
218 
219   Connection getConnection() {
220     return rsServices.getConnection();
221   }
222 
223   SpaceViolationPolicyEnforcementFactory getFactory() {
224     return factory;
225   }
226 }