1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.quotas;
18
19 import java.io.IOException;
20 import java.util.Map;
21 import java.util.Objects;
22 import java.util.Map.Entry;
23 import java.util.concurrent.locks.ReentrantReadWriteLock;
24 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
25 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
26
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.client.Connection;
29 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
30 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
31 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceQuota;
32 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
33
34 import com.google.common.base.Predicate;
35 import com.google.common.collect.Iterables;
36
37
38
39
40 public class NamespaceQuotaSnapshotStore implements QuotaSnapshotStore<String> {
41 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
42 private final ReadLock rlock = lock.readLock();
43 private final WriteLock wlock = lock.writeLock();
44
45 private final Connection conn;
46 private final QuotaObserverChore chore;
47 private Map<HRegionInfo,Long> regionUsage;
48
49 public NamespaceQuotaSnapshotStore(Connection conn, QuotaObserverChore chore, Map<HRegionInfo,Long> regionUsage) {
50 this.conn = Objects.requireNonNull(conn);
51 this.chore = Objects.requireNonNull(chore);
52 this.regionUsage = Objects.requireNonNull(regionUsage);
53 }
54
55 @Override
56 public SpaceQuota getSpaceQuota(String namespace) throws IOException {
57 Quotas quotas = getQuotaForNamespace(namespace);
58 if (quotas != null && quotas.hasSpace()) {
59 return quotas.getSpace();
60 }
61 return null;
62 }
63
64
65
66
67 Quotas getQuotaForNamespace(String namespace) throws IOException {
68 return QuotaTableUtil.getNamespaceQuota(conn, namespace);
69 }
70
71 @Override
72 public SpaceQuotaSnapshot getCurrentState(String namespace) {
73
74 return this.chore.getNamespaceQuotaSnapshot(namespace);
75 }
76
77 @Override
78 public SpaceQuotaSnapshot getTargetState(String subject, SpaceQuota spaceQuota) {
79 rlock.lock();
80 try {
81 final long sizeLimitInBytes = spaceQuota.getSoftLimit();
82 long sum = 0L;
83 for (Entry<HRegionInfo,Long> entry : filterBySubject(subject)) {
84 sum += entry.getValue();
85 }
86
87 SpaceQuotaStatus status = sum <= sizeLimitInBytes ? SpaceQuotaStatus.notInViolation()
88 : new SpaceQuotaStatus(ProtobufUtil.toViolationPolicy(spaceQuota.getViolationPolicy()));
89 return new SpaceQuotaSnapshot(status, sum, sizeLimitInBytes);
90 } finally {
91 rlock.unlock();
92 }
93 }
94
95 @Override
96 public Iterable<Entry<HRegionInfo,Long>> filterBySubject(final String namespace) {
97 rlock.lock();
98 try {
99 return Iterables.filter(regionUsage.entrySet(), new Predicate<Entry<HRegionInfo,Long>>() {
100 @Override
101 public boolean apply(Entry<HRegionInfo,Long> input) {
102 return namespace.equals(input.getKey().getTable().getNamespaceAsString());
103 }
104 });
105 } finally {
106 rlock.unlock();
107 }
108 }
109
110 @Override
111 public void setCurrentState(String namespace, SpaceQuotaSnapshot snapshot) {
112
113 this.chore.setNamespaceQuotaViolation(namespace, snapshot);
114 }
115
116 @Override
117 public void setRegionUsage(Map<HRegionInfo,Long> regionUsage) {
118 wlock.lock();
119 try {
120 this.regionUsage = Objects.requireNonNull(regionUsage);
121 } finally {
122 wlock.unlock();
123 }
124 }
125 }