1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.quotas;
18
19 import java.io.IOException;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Objects;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.atomic.AtomicReference;
25 import java.util.Map.Entry;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.hbase.classification.InterfaceStability;
32 import org.apache.hadoop.hbase.client.Connection;
33 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
34 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38
39
40
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 @InterfaceStability.Evolving
50 public class RegionServerSpaceQuotaManager {
51 private static final Log LOG = LogFactory.getLog(RegionServerSpaceQuotaManager.class);
52
53 private final RegionServerServices rsServices;
54
55 private SpaceQuotaRefresherChore spaceQuotaRefresher;
56 private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
57 private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
58 private SpaceViolationPolicyEnforcementFactory factory;
59
60 public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
61 this(rsServices, SpaceViolationPolicyEnforcementFactory.getInstance());
62 }
63
64 @VisibleForTesting
65 RegionServerSpaceQuotaManager(
66 RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
67 this.rsServices = Objects.requireNonNull(rsServices);
68 this.factory = factory;
69 this.enforcedPolicies = new ConcurrentHashMap<>();
70 Map<TableName,SpaceQuotaSnapshot> initialMap = new HashMap<>();
71 this.currentQuotaSnapshots = new AtomicReference<>(initialMap);
72 }
73
74 public synchronized void start() throws IOException {
75 if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
76 LOG.info("Quota support disabled");
77 return;
78 }
79 if (null != spaceQuotaRefresher) {
80 LOG.warn("RegionServerSpaceQuotaManager has already been started!");
81 return;
82 }
83 this.spaceQuotaRefresher = new SpaceQuotaRefresherChore(this, rsServices.getConnection());
84 rsServices.getChoreService().scheduleChore(spaceQuotaRefresher);
85 }
86
87 public synchronized void stop() {
88 if (spaceQuotaRefresher != null) {
89 spaceQuotaRefresher.cancel();
90 spaceQuotaRefresher = null;
91 }
92 }
93
94
95
96
97
98 public Map<TableName,SpaceQuotaSnapshot> copyQuotaSnapshots() {
99 return new HashMap<>(currentQuotaSnapshots.get());
100 }
101
102
103
104
105
106
107 public void updateQuotaSnapshot(Map<TableName,SpaceQuotaSnapshot> newSnapshots) {
108 currentQuotaSnapshots.set(Objects.requireNonNull(newSnapshots));
109 }
110
111
112
113
114 public ActivePolicyEnforcement getActiveEnforcements() {
115 return new ActivePolicyEnforcement(copyActiveEnforcements(), copyQuotaSnapshots(), rsServices);
116 }
117
118
119
120
121
122 public Map<TableName, SpaceQuotaSnapshot> getActivePoliciesAsMap() {
123 final Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
124 copyActiveEnforcements();
125 final Map<TableName, SpaceQuotaSnapshot> policies = new HashMap<>();
126 for (Entry<TableName, SpaceViolationPolicyEnforcement> entry : enforcements.entrySet()) {
127 final SpaceQuotaSnapshot snapshot = entry.getValue().getQuotaSnapshot();
128 if (snapshot != null) {
129 policies.put(entry.getKey(), snapshot);
130 }
131 }
132 return policies;
133 }
134
135
136
137
138 public void enforceViolationPolicy(TableName tableName, SpaceQuotaSnapshot snapshot) {
139 SpaceQuotaStatus status = snapshot.getQuotaStatus();
140 if (!status.isInViolation()) {
141 throw new IllegalStateException(
142 tableName + " is not in violation. Violation policy should not be enabled.");
143 }
144 if (LOG.isTraceEnabled()) {
145 LOG.trace(
146 "Enabling violation policy enforcement on " + tableName
147 + " with policy " + status.getPolicy());
148 }
149
150 final SpaceViolationPolicyEnforcement enforcement = getFactory().create(
151 getRegionServerServices(), tableName, snapshot);
152
153
154
155
156
157 synchronized (enforcedPolicies) {
158 try {
159 enforcement.enable();
160 } catch (IOException e) {
161 LOG.error("Failed to enable space violation policy for " + tableName
162 + ". This table will not enter violation.", e);
163 return;
164 }
165 enforcedPolicies.put(tableName, enforcement);
166 }
167 }
168
169
170
171
172 public void disableViolationPolicyEnforcement(TableName tableName) {
173 if (LOG.isTraceEnabled()) {
174 LOG.trace("Disabling violation policy enforcement on " + tableName);
175 }
176
177 synchronized (enforcedPolicies) {
178 SpaceViolationPolicyEnforcement enforcement = enforcedPolicies.remove(tableName);
179 if (enforcement != null) {
180 try {
181 enforcement.disable();
182 } catch (IOException e) {
183 LOG.error("Failed to disable space violation policy for " + tableName
184 + ". This table will remain in violation.", e);
185 enforcedPolicies.put(tableName, enforcement);
186 }
187 }
188 }
189 }
190
191
192
193
194
195
196
197
198 public boolean areCompactionsDisabled(TableName tableName) {
199 SpaceViolationPolicyEnforcement enforcement = this.enforcedPolicies.get(Objects.requireNonNull(tableName));
200 if (enforcement != null) {
201 return enforcement.areCompactionsDisabled();
202 }
203 return false;
204 }
205
206
207
208
209
210 Map<TableName,SpaceViolationPolicyEnforcement> copyActiveEnforcements() {
211
212 return new HashMap<>(this.enforcedPolicies);
213 }
214
215 RegionServerServices getRegionServerServices() {
216 return rsServices;
217 }
218
219 Connection getConnection() {
220 return rsServices.getConnection();
221 }
222
223 SpaceViolationPolicyEnforcementFactory getFactory() {
224 return factory;
225 }
226 }