1
2
3
4
5
6
7
8
9
10
11
12 package org.apache.hadoop.hbase.quotas;
13
14 import java.io.IOException;
15 import java.util.Collections;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.Iterator;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.concurrent.ConcurrentHashMap;
22
23 import org.apache.commons.lang.builder.HashCodeBuilder;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.DoNotRetryIOException;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.MetaTableAccessor;
29 import org.apache.hadoop.hbase.NamespaceDescriptor;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.classification.InterfaceAudience;
32 import org.apache.hadoop.hbase.classification.InterfaceStability;
33 import org.apache.hadoop.hbase.master.MasterServices;
34 import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
35 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
36 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
37 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
40 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
41 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceLimitRequest;
42 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceQuota;
43 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
44 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
46 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47
48 import com.google.common.annotations.VisibleForTesting;
49
50
51
52
53
54
55
56 @InterfaceAudience.Private
57 @InterfaceStability.Evolving
58 public class MasterQuotaManager implements RegionStateListener {
59 private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class);
60 private static final Map<HRegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
61 new HashMap<HRegionInfo, Long>());
62
63 private final MasterServices masterServices;
64 private NamedLock<String> namespaceLocks;
65 private NamedLock<TableName> tableLocks;
66 private NamedLock<String> userLocks;
67 private boolean initialized = false;
68 private NamespaceAuditor namespaceQuotaManager;
69 private ConcurrentHashMap<HRegionInfo, SizeSnapshot> regionSizes;
70
71 public MasterQuotaManager(final MasterServices masterServices) {
72 this.masterServices = masterServices;
73 }
74
75 public void start() throws IOException {
76
77 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
78 LOG.info("Quota support disabled");
79 return;
80 }
81
82
83 if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
84 QuotaUtil.QUOTA_TABLE_NAME)) {
85 LOG.info("Quota table not found. Creating...");
86 createQuotaTable();
87 }
88
89 LOG.info("Initializing quota support");
90 namespaceLocks = new NamedLock<String>();
91 tableLocks = new NamedLock<TableName>();
92 userLocks = new NamedLock<String>();
93 regionSizes = new ConcurrentHashMap<>();
94
95 namespaceQuotaManager = new NamespaceAuditor(masterServices);
96 namespaceQuotaManager.start();
97 initialized = true;
98 }
99
100 public void stop() {
101 }
102
103 public boolean isQuotaInitialized() {
104 return initialized && namespaceQuotaManager.isInitialized();
105 }
106
107
108
109
110
111 public SetQuotaResponse setQuota(final SetQuotaRequest req) throws IOException,
112 InterruptedException {
113 checkQuotaSupport();
114
115 if (req.hasUserName()) {
116 userLocks.lock(req.getUserName());
117 try {
118 if (req.hasTableName()) {
119 setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
120 } else if (req.hasNamespace()) {
121 setUserQuota(req.getUserName(), req.getNamespace(), req);
122 } else {
123 setUserQuota(req.getUserName(), req);
124 }
125 } finally {
126 userLocks.unlock(req.getUserName());
127 }
128 } else if (req.hasTableName()) {
129 TableName table = ProtobufUtil.toTableName(req.getTableName());
130 tableLocks.lock(table);
131 try {
132 setTableQuota(table, req);
133 } finally {
134 tableLocks.unlock(table);
135 }
136 } else if (req.hasNamespace()) {
137 namespaceLocks.lock(req.getNamespace());
138 try {
139 setNamespaceQuota(req.getNamespace(), req);
140 } finally {
141 namespaceLocks.unlock(req.getNamespace());
142 }
143 } else {
144 throw new DoNotRetryIOException(new UnsupportedOperationException(
145 "a user, a table or a namespace must be specified"));
146 }
147 return SetQuotaResponse.newBuilder().build();
148 }
149
150 public void setUserQuota(final String userName, final SetQuotaRequest req) throws IOException,
151 InterruptedException {
152 setQuota(req, new SetQuotaOperations() {
153 @Override
154 public Quotas fetch() throws IOException {
155 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName);
156 }
157
158 @Override
159 public void update(final Quotas quotas) throws IOException {
160 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotas);
161 }
162
163 @Override
164 public void delete() throws IOException {
165 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
166 }
167
168 @Override
169 public void preApply(final Quotas quotas) throws IOException {
170 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotas);
171 }
172
173 @Override
174 public void postApply(final Quotas quotas) throws IOException {
175 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotas);
176 }
177 });
178 }
179
180 public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
181 throws IOException, InterruptedException {
182 setQuota(req, new SetQuotaOperations() {
183 @Override
184 public Quotas fetch() throws IOException {
185 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table);
186 }
187
188 @Override
189 public void update(final Quotas quotas) throws IOException {
190 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, quotas);
191 }
192
193 @Override
194 public void delete() throws IOException {
195 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
196 }
197
198 @Override
199 public void preApply(final Quotas quotas) throws IOException {
200 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotas);
201 }
202
203 @Override
204 public void postApply(final Quotas quotas) throws IOException {
205 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotas);
206 }
207 });
208 }
209
210 public void
211 setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
212 throws IOException, InterruptedException {
213 setQuota(req, new SetQuotaOperations() {
214 @Override
215 public Quotas fetch() throws IOException {
216 return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace);
217 }
218
219 @Override
220 public void update(final Quotas quotas) throws IOException {
221 QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, quotas);
222 }
223
224 @Override
225 public void delete() throws IOException {
226 QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
227 }
228
229 @Override
230 public void preApply(final Quotas quotas) throws IOException {
231 masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotas);
232 }
233
234 @Override
235 public void postApply(final Quotas quotas) throws IOException {
236 masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotas);
237 }
238 });
239 }
240
241 public void setTableQuota(final TableName table, final SetQuotaRequest req) throws IOException,
242 InterruptedException {
243 setQuota(req, new SetQuotaOperations() {
244 @Override
245 public Quotas fetch() throws IOException {
246 return QuotaUtil.getTableQuota(masterServices.getConnection(), table);
247 }
248
249 @Override
250 public void update(final Quotas quotas) throws IOException {
251 QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotas);
252 }
253
254 @Override
255 public void delete() throws IOException {
256 QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
257 }
258
259 @Override
260 public void preApply(final Quotas quotas) throws IOException {
261 masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotas);
262 }
263
264 @Override
265 public void postApply(final Quotas quotas) throws IOException {
266 masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotas);
267 }
268 });
269 }
270
271 public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
272 throws IOException, InterruptedException {
273 setQuota(req, new SetQuotaOperations() {
274 @Override
275 public Quotas fetch() throws IOException {
276 return QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace);
277 }
278
279 @Override
280 public void update(final Quotas quotas) throws IOException {
281 QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, quotas);
282 }
283
284 @Override
285 public void delete() throws IOException {
286 QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
287 }
288
289 @Override
290 public void preApply(final Quotas quotas) throws IOException {
291 masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotas);
292 }
293
294 @Override
295 public void postApply(final Quotas quotas) throws IOException {
296 masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotas);
297 }
298 });
299 }
300
301 public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
302 if (initialized) {
303 this.namespaceQuotaManager.addNamespace(desc);
304 }
305 }
306
307 public void removeNamespaceQuota(String namespace) throws IOException {
308 if (initialized) {
309 this.namespaceQuotaManager.deleteNamespace(namespace);
310 }
311 }
312
313 private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
314 throws IOException, InterruptedException {
315 if (req.hasRemoveAll() && req.getRemoveAll() == true) {
316 quotaOps.preApply(null);
317 quotaOps.delete();
318 quotaOps.postApply(null);
319 return;
320 }
321
322
323 Quotas quotas = quotaOps.fetch();
324 quotaOps.preApply(quotas);
325
326
327 Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
328 if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
329 if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
330 if (req.hasSpaceLimit()) applySpaceLimit(builder, req.getSpaceLimit());
331
332
333 quotas = builder.build();
334 if (QuotaUtil.isEmptyQuota(quotas)) {
335 quotaOps.delete();
336 } else {
337 quotaOps.update(quotas);
338 }
339 quotaOps.postApply(quotas);
340 }
341
342 public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
343 if (initialized) {
344 namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
345 }
346 }
347
348 public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
349 if (initialized) {
350 namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
351 }
352 }
353
354
355
356
357 public int getRegionCountOfTable(TableName tName) throws IOException {
358 if (initialized) {
359 return namespaceQuotaManager.getRegionCountOfTable(tName);
360 }
361 return -1;
362 }
363
364 public void onRegionMerged(HRegionInfo hri) throws IOException {
365 if (initialized) {
366 namespaceQuotaManager.updateQuotaForRegionMerge(hri);
367 }
368 }
369
370 public void onRegionSplit(HRegionInfo hri) throws IOException {
371 if (initialized) {
372 namespaceQuotaManager.checkQuotaToSplitRegion(hri);
373 }
374 }
375
376
377
378
379
380
381 public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
382 if (initialized) {
383 namespaceQuotaManager.removeFromNamespaceUsage(tName);
384 }
385 }
386
387 public NamespaceAuditor getNamespaceQuotaManager() {
388 return this.namespaceQuotaManager;
389 }
390
391 private static interface SetQuotaOperations {
392 Quotas fetch() throws IOException;
393
394 void delete() throws IOException;
395
396 void update(final Quotas quotas) throws IOException;
397
398 void preApply(final Quotas quotas) throws IOException;
399
400 void postApply(final Quotas quotas) throws IOException;
401 }
402
403
404
405
406
407 private void applyThrottle(final Quotas.Builder quotas, final ThrottleRequest req)
408 throws IOException {
409 Throttle.Builder throttle;
410
411 if (req.hasType() && (req.hasTimedQuota() || quotas.hasThrottle())) {
412
413 if (req.hasTimedQuota()) {
414 validateTimedQuota(req.getTimedQuota());
415 }
416
417
418 throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder();
419
420 switch (req.getType()) {
421 case REQUEST_NUMBER:
422 if (req.hasTimedQuota()) {
423 throttle.setReqNum(req.getTimedQuota());
424 } else {
425 throttle.clearReqNum();
426 }
427 break;
428 case REQUEST_SIZE:
429 if (req.hasTimedQuota()) {
430 throttle.setReqSize(req.getTimedQuota());
431 } else {
432 throttle.clearReqSize();
433 }
434 break;
435 case WRITE_NUMBER:
436 if (req.hasTimedQuota()) {
437 throttle.setWriteNum(req.getTimedQuota());
438 } else {
439 throttle.clearWriteNum();
440 }
441 break;
442 case WRITE_SIZE:
443 if (req.hasTimedQuota()) {
444 throttle.setWriteSize(req.getTimedQuota());
445 } else {
446 throttle.clearWriteSize();
447 }
448 break;
449 case READ_NUMBER:
450 if (req.hasTimedQuota()) {
451 throttle.setReadNum(req.getTimedQuota());
452 } else {
453 throttle.clearReqNum();
454 }
455 break;
456 case READ_SIZE:
457 if (req.hasTimedQuota()) {
458 throttle.setReadSize(req.getTimedQuota());
459 } else {
460 throttle.clearReadSize();
461 }
462 break;
463 default:
464 throw new RuntimeException("Invalid throttle type: " + req.getType());
465 }
466 quotas.setThrottle(throttle.build());
467 } else {
468 quotas.clearThrottle();
469 }
470 }
471
472 private void applyBypassGlobals(final Quotas.Builder quotas, boolean bypassGlobals) {
473 if (bypassGlobals) {
474 quotas.setBypassGlobals(bypassGlobals);
475 } else {
476 quotas.clearBypassGlobals();
477 }
478 }
479
480
481
482
483
484
485
486 void applySpaceLimit(final Quotas.Builder quotas, final SpaceLimitRequest req) {
487 if (req.hasQuota()) {
488 SpaceQuota spaceQuota = req.getQuota();
489
490 if (spaceQuota.getRemove()) {
491 quotas.setSpace(SpaceQuota.getDefaultInstance());
492 } else {
493
494 applySpaceQuota(quotas, req.getQuota());
495 }
496 }
497 }
498
499
500
501
502
503
504
505 void applySpaceQuota(final Quotas.Builder quotas, final SpaceQuota quota) {
506
507 SpaceQuota.Builder builder = quotas.hasSpace() ? quotas.getSpace().toBuilder() :
508 SpaceQuota.newBuilder();
509
510 quotas.setSpace(builder.mergeFrom(quota).build());
511 }
512
513 private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
514 if (timedQuota.getSoftLimit() < 1) {
515 throw new DoNotRetryIOException(new UnsupportedOperationException(
516 "The throttle limit must be greater then 0, got " + timedQuota.getSoftLimit()));
517 }
518 }
519
520
521
522
523
524 private void checkQuotaSupport() throws IOException {
525 if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
526 throw new DoNotRetryIOException(
527 new UnsupportedOperationException("quota support disabled"));
528 }
529 if (!initialized) {
530 long maxWaitTime = masterServices.getConfiguration().getLong(
531 "hbase.master.wait.for.quota.manager.init", 30000);
532 long startTime = EnvironmentEdgeManager.currentTime();
533 do {
534 try {
535 Thread.sleep(100);
536 } catch (InterruptedException e) {
537 LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
538 break;
539 }
540 } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
541 if (!initialized) {
542 throw new IOException("Quota manager is uninitialized, please retry later.");
543 }
544 }
545 }
546
547 private void createQuotaTable() throws IOException {
548 HRegionInfo[] newRegions = new HRegionInfo[] { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) };
549
550 if (masterServices.isMasterProcedureExecutorEnabled()) {
551 masterServices.getMasterProcedureExecutor()
552 .submitProcedure(new CreateTableProcedure(
553 masterServices.getMasterProcedureExecutor().getEnvironment(),
554 QuotaUtil.QUOTA_TABLE_DESC,
555 newRegions));
556 } else {
557 masterServices.getExecutorService().submit(
558 new CreateTableHandler(masterServices, masterServices.getMasterFileSystem(),
559 QuotaUtil.QUOTA_TABLE_DESC, masterServices.getConfiguration(), newRegions,
560 masterServices).prepare());
561 }
562 }
563
564 private static class NamedLock<T> {
565 private HashSet<T> locks = new HashSet<T>();
566
567 public void lock(final T name) throws InterruptedException {
568 synchronized (locks) {
569 while (locks.contains(name)) {
570 locks.wait();
571 }
572 locks.add(name);
573 }
574 }
575
576 public void unlock(final T name) {
577 synchronized (locks) {
578 locks.remove(name);
579 locks.notifyAll();
580 }
581 }
582 }
583
584 @Override
585 public void onRegionSplitReverted(HRegionInfo hri) throws IOException {
586 if (initialized) {
587 this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
588 }
589 }
590
591
592
593
594 private static class SizeSnapshot {
595 private final long size;
596 private final long time;
597
598 public SizeSnapshot(long size, long time) {
599 this.size = size;
600 this.time = time;
601 }
602
603 public long getSize() {
604 return size;
605 }
606
607 public long getTime() {
608 return time;
609 }
610
611 public boolean equals(Object o) {
612 if (o instanceof SizeSnapshot) {
613 SizeSnapshot other = (SizeSnapshot) o;
614 return size == other.size && time == other.time;
615 }
616 return false;
617 }
618
619 public int hashCode() {
620 HashCodeBuilder hcb = new HashCodeBuilder();
621 return hcb.append(size).append(time).toHashCode();
622 }
623
624 @Override
625 public String toString() {
626 StringBuilder sb = new StringBuilder(32);
627 sb.append("SizeSnapshot={size=").append(size).append("B, ");
628 sb.append("time=").append(time).append("}");
629 return sb.toString();
630 }
631 }
632
633 @VisibleForTesting
634 void initializeRegionSizes() {
635 assert regionSizes == null;
636 this.regionSizes = new ConcurrentHashMap<>();
637 }
638
639 public void addRegionSize(HRegionInfo hri, long size, long time) {
640 if (regionSizes == null) {
641 return;
642 }
643 regionSizes.put(hri, new SizeSnapshot(size, time));
644 }
645
646 public Map<HRegionInfo, Long> snapshotRegionSizes() {
647 if (regionSizes == null) {
648 return EMPTY_MAP;
649 }
650
651 Map<HRegionInfo, Long> copy = new HashMap<>();
652 for (Entry<HRegionInfo,SizeSnapshot> entry : regionSizes.entrySet()) {
653 copy.put(entry.getKey(), entry.getValue().getSize());
654 }
655 return copy;
656 }
657
658 int pruneEntriesOlderThan(long timeToPruneBefore) {
659 if (regionSizes == null) {
660 return 0;
661 }
662 int numEntriesRemoved = 0;
663 Iterator<Entry<HRegionInfo,SizeSnapshot>> iterator = regionSizes.entrySet().iterator();
664 while (iterator.hasNext()) {
665 long currentEntryTime = iterator.next().getValue().getTime();
666 if (currentEntryTime < timeToPruneBefore) {
667 iterator.remove();
668 numEntriesRemoved++;
669 }
670 }
671 return numEntriesRemoved;
672 }
673 }