1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.quotas;
18
19 import java.io.IOException;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.concurrent.TimeUnit;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.ScheduledChore;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.client.Connection;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.ResultScanner;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.util.Bytes;
35
36
37
38
39
40 public class SpaceQuotaRefresherChore extends ScheduledChore {
41 private static final Log LOG = LogFactory.getLog(SpaceQuotaRefresherChore.class);
42
43 static final String POLICY_REFRESHER_CHORE_PERIOD_KEY =
44 "hbase.regionserver.quotas.policy.refresher.chore.period";
45 static final int POLICY_REFRESHER_CHORE_PERIOD_DEFAULT = 1000 * 60 * 1;
46
47 static final String POLICY_REFRESHER_CHORE_DELAY_KEY =
48 "hbase.regionserver.quotas.policy.refresher.chore.delay";
49 static final long POLICY_REFRESHER_CHORE_DELAY_DEFAULT = 1000L * 15L;
50
51 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_KEY =
52 "hbase.regionserver.quotas.policy.refresher.chore.timeunit";
53 static final String POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
54
55 static final String POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY =
56 "hbase.regionserver.quotas.policy.refresher.report.percent";
57 static final double POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT= 0.95;
58
59 private final RegionServerSpaceQuotaManager manager;
60 private final Connection conn;
61
62 public SpaceQuotaRefresherChore(RegionServerSpaceQuotaManager manager, Connection conn) {
63 super(SpaceQuotaRefresherChore.class.getSimpleName(),
64 manager.getRegionServerServices(),
65 getPeriod(manager.getRegionServerServices().getConfiguration()),
66 getInitialDelay(manager.getRegionServerServices().getConfiguration()),
67 getTimeUnit(manager.getRegionServerServices().getConfiguration()));
68 this.manager = manager;
69 this.conn = conn;
70 }
71
72 @Override
73 protected void chore() {
74 try {
75 if (LOG.isTraceEnabled()) {
76 LOG.trace("Reading current quota violations from hbase:quota.");
77 }
78
79 final Map<TableName, SpaceQuotaSnapshot> currentSnapshots =
80 getManager().copyQuotaSnapshots();
81
82 final Map<TableName, SpaceQuotaSnapshot> newSnapshots = fetchSnapshotsFromQuotaTable();
83 if (LOG.isTraceEnabled()) {
84 LOG.trace(currentSnapshots.size() + " table quota snapshots are collected, "
85 + "read " + newSnapshots.size() + " from the quota table.");
86 }
87
88 for (Entry<TableName, SpaceQuotaSnapshot> entry : newSnapshots.entrySet()) {
89 final TableName tableName = entry.getKey();
90 final SpaceQuotaSnapshot newSnapshot = entry.getValue();
91
92 final SpaceQuotaSnapshot currentSnapshot = currentSnapshots.get(tableName);
93 if (LOG.isTraceEnabled()) {
94 LOG.trace(tableName + ": current=" + currentSnapshot + ", new=" + newSnapshot);
95 }
96 if (!newSnapshot.equals(currentSnapshot)) {
97
98 if (!isInViolation(currentSnapshot) && newSnapshot.getQuotaStatus().isInViolation()) {
99 if (LOG.isTraceEnabled()) {
100 LOG.trace("Enabling " + newSnapshot + " on " + tableName);
101 }
102 getManager().enforceViolationPolicy(tableName, newSnapshot);
103 }
104 if (isInViolation(currentSnapshot) && !newSnapshot.getQuotaStatus().isInViolation()) {
105 if (LOG.isTraceEnabled()) {
106 LOG.trace("Removing quota violation policy on " + tableName);
107 }
108 getManager().disableViolationPolicyEnforcement(tableName);
109 }
110 }
111 }
112
113
114 getManager().updateQuotaSnapshot(newSnapshots);
115 } catch (IOException e) {
116 LOG.warn(
117 "Caught exception while refreshing enforced quota violation policies, will retry.", e);
118 }
119 }
120
121
122
123
124
125
126
127
128 boolean isInViolation(SpaceQuotaSnapshot snapshot) {
129 if (snapshot == null) {
130 return false;
131 }
132 return snapshot.getQuotaStatus().isInViolation();
133 }
134
135
136
137
138
139
140
141 public Map<TableName, SpaceQuotaSnapshot> fetchSnapshotsFromQuotaTable() throws IOException {
142 try (Table quotaTable = getConnection().getTable(QuotaUtil.QUOTA_TABLE_NAME);
143 ResultScanner scanner = quotaTable.getScanner(QuotaTableUtil.makeQuotaSnapshotScan())) {
144 Map<TableName,SpaceQuotaSnapshot> activeViolations = new HashMap<>();
145 for (Result result : scanner) {
146 try {
147 extractQuotaSnapshot(result, activeViolations);
148 } catch (IllegalArgumentException e) {
149 final String msg = "Failed to parse result for row " + Bytes.toString(result.getRow());
150 LOG.error(msg, e);
151 throw new IOException(msg, e);
152 }
153 }
154 return activeViolations;
155 }
156 }
157
158
159
160
161 void extractQuotaSnapshot(Result result, Map<TableName,SpaceQuotaSnapshot> snapshots) {
162 QuotaTableUtil.extractQuotaSnapshot(result, snapshots);
163 }
164
165 Connection getConnection() {
166 return conn;
167 }
168
169 RegionServerSpaceQuotaManager getManager() {
170 return manager;
171 }
172
173
174
175
176
177
178
179 static int getPeriod(Configuration conf) {
180 return conf.getInt(POLICY_REFRESHER_CHORE_PERIOD_KEY,
181 POLICY_REFRESHER_CHORE_PERIOD_DEFAULT);
182 }
183
184
185
186
187
188
189
190 static long getInitialDelay(Configuration conf) {
191 return conf.getLong(POLICY_REFRESHER_CHORE_DELAY_KEY,
192 POLICY_REFRESHER_CHORE_DELAY_DEFAULT);
193 }
194
195
196
197
198
199
200
201
202
203 static TimeUnit getTimeUnit(Configuration conf) {
204 return TimeUnit.valueOf(conf.get(POLICY_REFRESHER_CHORE_TIMEUNIT_KEY,
205 POLICY_REFRESHER_CHORE_TIMEUNIT_DEFAULT));
206 }
207
208
209
210
211
212
213
214
215 static Double getRegionReportPercent(Configuration conf) {
216 return conf.getDouble(POLICY_REFRESHER_CHORE_REPORT_PERCENT_KEY,
217 POLICY_REFRESHER_CHORE_REPORT_PERCENT_DEFAULT);
218 }
219 }