View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3    * agreements. See the NOTICE file distributed with this work for additional information regarding
4    * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7    * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8    * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9    * for the specific language governing permissions and limitations under the License.
10   */
11  
12  package org.apache.hadoop.hbase.quotas;
13  
14  import java.io.IOException;
15  import java.util.Collections;
16  import java.util.HashMap;
17  import java.util.HashSet;
18  import java.util.Iterator;
19  import java.util.Map;
20  import java.util.Map.Entry;
21  import java.util.concurrent.ConcurrentHashMap;
22  
23  import org.apache.commons.lang.builder.HashCodeBuilder;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.DoNotRetryIOException;
27  import org.apache.hadoop.hbase.HRegionInfo;
28  import org.apache.hadoop.hbase.MetaTableAccessor;
29  import org.apache.hadoop.hbase.NamespaceDescriptor;
30  import org.apache.hadoop.hbase.TableName;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.classification.InterfaceStability;
33  import org.apache.hadoop.hbase.master.MasterServices;
34  import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
35  import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
36  import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
37  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
38  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
39  import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaResponse;
40  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
41  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceLimitRequest;
42  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceQuota;
43  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
44  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.ThrottleRequest;
45  import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  /**
51   * Master Quota Manager. It is responsible for initialize the quota table on the first-run and
52   * provide the admin operations to interact with the quota table. TODO: FUTURE: The master will be
53   * responsible to notify each RS of quota changes and it will do the "quota aggregation" when the
54   * QuotaScope is CLUSTER.
55   */
56  @InterfaceAudience.Private
57  @InterfaceStability.Evolving
58  public class MasterQuotaManager implements RegionStateListener {
59    private static final Log LOG = LogFactory.getLog(MasterQuotaManager.class);
60    private static final Map<HRegionInfo, Long> EMPTY_MAP = Collections.unmodifiableMap(
61        new HashMap<HRegionInfo, Long>());
62  
63    private final MasterServices masterServices;
64    private NamedLock<String> namespaceLocks;
65    private NamedLock<TableName> tableLocks;
66    private NamedLock<String> userLocks;
67    private boolean initialized = false;
68    private NamespaceAuditor namespaceQuotaManager;
69    private ConcurrentHashMap<HRegionInfo, SizeSnapshot> regionSizes;
70  
71    public MasterQuotaManager(final MasterServices masterServices) {
72      this.masterServices = masterServices;
73    }
74  
75    public void start() throws IOException {
76      // If the user doesn't want the quota support skip all the initializations.
77      if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
78        LOG.info("Quota support disabled");
79        return;
80      }
81  
82      // Create the quota table if missing
83      if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
84          QuotaUtil.QUOTA_TABLE_NAME)) {
85        LOG.info("Quota table not found. Creating...");
86        createQuotaTable();
87      }
88  
89      LOG.info("Initializing quota support");
90      namespaceLocks = new NamedLock<String>();
91      tableLocks = new NamedLock<TableName>();
92      userLocks = new NamedLock<String>();
93      regionSizes = new ConcurrentHashMap<>();
94  
95      namespaceQuotaManager = new NamespaceAuditor(masterServices);
96      namespaceQuotaManager.start();
97      initialized = true;
98    }
99  
100   public void stop() {
101   }
102 
103   public boolean isQuotaInitialized() {
104     return initialized && namespaceQuotaManager.isInitialized();
105   }
106 
107   /*
108    * ========================================================================== Admin operations to
109    * manage the quota table
110    */
111   public SetQuotaResponse setQuota(final SetQuotaRequest req) throws IOException,
112       InterruptedException {
113     checkQuotaSupport();
114 
115     if (req.hasUserName()) {
116       userLocks.lock(req.getUserName());
117       try {
118         if (req.hasTableName()) {
119           setUserQuota(req.getUserName(), ProtobufUtil.toTableName(req.getTableName()), req);
120         } else if (req.hasNamespace()) {
121           setUserQuota(req.getUserName(), req.getNamespace(), req);
122         } else {
123           setUserQuota(req.getUserName(), req);
124         }
125       } finally {
126         userLocks.unlock(req.getUserName());
127       }
128     } else if (req.hasTableName()) {
129       TableName table = ProtobufUtil.toTableName(req.getTableName());
130       tableLocks.lock(table);
131       try {
132         setTableQuota(table, req);
133       } finally {
134         tableLocks.unlock(table);
135       }
136     } else if (req.hasNamespace()) {
137       namespaceLocks.lock(req.getNamespace());
138       try {
139         setNamespaceQuota(req.getNamespace(), req);
140       } finally {
141         namespaceLocks.unlock(req.getNamespace());
142       }
143     } else {
144       throw new DoNotRetryIOException(new UnsupportedOperationException(
145           "a user, a table or a namespace must be specified"));
146     }
147     return SetQuotaResponse.newBuilder().build();
148   }
149 
150   public void setUserQuota(final String userName, final SetQuotaRequest req) throws IOException,
151       InterruptedException {
152     setQuota(req, new SetQuotaOperations() {
153       @Override
154       public Quotas fetch() throws IOException {
155         return QuotaUtil.getUserQuota(masterServices.getConnection(), userName);
156       }
157 
158       @Override
159       public void update(final Quotas quotas) throws IOException {
160         QuotaUtil.addUserQuota(masterServices.getConnection(), userName, quotas);
161       }
162 
163       @Override
164       public void delete() throws IOException {
165         QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName);
166       }
167 
168       @Override
169       public void preApply(final Quotas quotas) throws IOException {
170         masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, quotas);
171       }
172 
173       @Override
174       public void postApply(final Quotas quotas) throws IOException {
175         masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, quotas);
176       }
177     });
178   }
179 
180   public void setUserQuota(final String userName, final TableName table, final SetQuotaRequest req)
181       throws IOException, InterruptedException {
182     setQuota(req, new SetQuotaOperations() {
183       @Override
184       public Quotas fetch() throws IOException {
185         return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table);
186       }
187 
188       @Override
189       public void update(final Quotas quotas) throws IOException {
190         QuotaUtil.addUserQuota(masterServices.getConnection(), userName, table, quotas);
191       }
192 
193       @Override
194       public void delete() throws IOException {
195         QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, table);
196       }
197 
198       @Override
199       public void preApply(final Quotas quotas) throws IOException {
200         masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, table, quotas);
201       }
202 
203       @Override
204       public void postApply(final Quotas quotas) throws IOException {
205         masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, table, quotas);
206       }
207     });
208   }
209 
210   public void
211       setUserQuota(final String userName, final String namespace, final SetQuotaRequest req)
212           throws IOException, InterruptedException {
213     setQuota(req, new SetQuotaOperations() {
214       @Override
215       public Quotas fetch() throws IOException {
216         return QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace);
217       }
218 
219       @Override
220       public void update(final Quotas quotas) throws IOException {
221         QuotaUtil.addUserQuota(masterServices.getConnection(), userName, namespace, quotas);
222       }
223 
224       @Override
225       public void delete() throws IOException {
226         QuotaUtil.deleteUserQuota(masterServices.getConnection(), userName, namespace);
227       }
228 
229       @Override
230       public void preApply(final Quotas quotas) throws IOException {
231         masterServices.getMasterCoprocessorHost().preSetUserQuota(userName, namespace, quotas);
232       }
233 
234       @Override
235       public void postApply(final Quotas quotas) throws IOException {
236         masterServices.getMasterCoprocessorHost().postSetUserQuota(userName, namespace, quotas);
237       }
238     });
239   }
240 
241   public void setTableQuota(final TableName table, final SetQuotaRequest req) throws IOException,
242       InterruptedException {
243     setQuota(req, new SetQuotaOperations() {
244       @Override
245       public Quotas fetch() throws IOException {
246         return QuotaUtil.getTableQuota(masterServices.getConnection(), table);
247       }
248 
249       @Override
250       public void update(final Quotas quotas) throws IOException {
251         QuotaUtil.addTableQuota(masterServices.getConnection(), table, quotas);
252       }
253 
254       @Override
255       public void delete() throws IOException {
256         QuotaUtil.deleteTableQuota(masterServices.getConnection(), table);
257       }
258 
259       @Override
260       public void preApply(final Quotas quotas) throws IOException {
261         masterServices.getMasterCoprocessorHost().preSetTableQuota(table, quotas);
262       }
263 
264       @Override
265       public void postApply(final Quotas quotas) throws IOException {
266         masterServices.getMasterCoprocessorHost().postSetTableQuota(table, quotas);
267       }
268     });
269   }
270 
271   public void setNamespaceQuota(final String namespace, final SetQuotaRequest req)
272       throws IOException, InterruptedException {
273     setQuota(req, new SetQuotaOperations() {
274       @Override
275       public Quotas fetch() throws IOException {
276         return QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace);
277       }
278 
279       @Override
280       public void update(final Quotas quotas) throws IOException {
281         QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace, quotas);
282       }
283 
284       @Override
285       public void delete() throws IOException {
286         QuotaUtil.deleteNamespaceQuota(masterServices.getConnection(), namespace);
287       }
288 
289       @Override
290       public void preApply(final Quotas quotas) throws IOException {
291         masterServices.getMasterCoprocessorHost().preSetNamespaceQuota(namespace, quotas);
292       }
293 
294       @Override
295       public void postApply(final Quotas quotas) throws IOException {
296         masterServices.getMasterCoprocessorHost().postSetNamespaceQuota(namespace, quotas);
297       }
298     });
299   }
300 
301   public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
302     if (initialized) {
303       this.namespaceQuotaManager.addNamespace(desc);
304     }
305   }
306 
307   public void removeNamespaceQuota(String namespace) throws IOException {
308     if (initialized) {
309       this.namespaceQuotaManager.deleteNamespace(namespace);
310     }
311   }
312 
313   private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
314       throws IOException, InterruptedException {
315     if (req.hasRemoveAll() && req.getRemoveAll() == true) {
316       quotaOps.preApply(null);
317       quotaOps.delete();
318       quotaOps.postApply(null);
319       return;
320     }
321 
322     // Apply quota changes
323     Quotas quotas = quotaOps.fetch();
324     quotaOps.preApply(quotas);
325 
326     // Copy the user request into the Quotas object
327     Quotas.Builder builder = (quotas != null) ? quotas.toBuilder() : Quotas.newBuilder();
328     if (req.hasThrottle()) applyThrottle(builder, req.getThrottle());
329     if (req.hasBypassGlobals()) applyBypassGlobals(builder, req.getBypassGlobals());
330     if (req.hasSpaceLimit()) applySpaceLimit(builder, req.getSpaceLimit());
331 
332     // Submit new changes
333     quotas = builder.build();
334     if (QuotaUtil.isEmptyQuota(quotas)) {
335       quotaOps.delete();
336     } else {
337       quotaOps.update(quotas);
338     }
339     quotaOps.postApply(quotas);
340   }
341 
342   public void checkNamespaceTableAndRegionQuota(TableName tName, int regions) throws IOException {
343     if (initialized) {
344       namespaceQuotaManager.checkQuotaToCreateTable(tName, regions);
345     }
346   }
347   
348   public void checkAndUpdateNamespaceRegionQuota(TableName tName, int regions) throws IOException {
349     if (initialized) {
350       namespaceQuotaManager.checkQuotaToUpdateRegion(tName, regions);
351     }
352   }
353 
354   /**
355    * @return cached region count, or -1 if quota manager is disabled or table status not found
356   */
357   public int getRegionCountOfTable(TableName tName) throws IOException {
358     if (initialized) {
359       return namespaceQuotaManager.getRegionCountOfTable(tName);
360     }
361     return -1;
362   }
363 
364   public void onRegionMerged(HRegionInfo hri) throws IOException {
365     if (initialized) {
366       namespaceQuotaManager.updateQuotaForRegionMerge(hri);
367     }
368   }
369 
370   public void onRegionSplit(HRegionInfo hri) throws IOException {
371     if (initialized) {
372       namespaceQuotaManager.checkQuotaToSplitRegion(hri);
373     }
374   }
375 
376   /**
377    * Remove table from namespace quota.
378    * @param tName - The table name to update quota usage.
379    * @throws IOException Signals that an I/O exception has occurred.
380    */
381   public void removeTableFromNamespaceQuota(TableName tName) throws IOException {
382     if (initialized) {
383       namespaceQuotaManager.removeFromNamespaceUsage(tName);
384     }
385   }
386 
387   public NamespaceAuditor getNamespaceQuotaManager() {
388     return this.namespaceQuotaManager;
389   }
390 
391   private static interface SetQuotaOperations {
392     Quotas fetch() throws IOException;
393 
394     void delete() throws IOException;
395 
396     void update(final Quotas quotas) throws IOException;
397 
398     void preApply(final Quotas quotas) throws IOException;
399 
400     void postApply(final Quotas quotas) throws IOException;
401   }
402 
403   /*
404    * ========================================================================== Helpers to apply
405    * changes to the quotas
406    */
407   private void applyThrottle(final Quotas.Builder quotas, final ThrottleRequest req)
408       throws IOException {
409     Throttle.Builder throttle;
410 
411     if (req.hasType() && (req.hasTimedQuota() || quotas.hasThrottle())) {
412       // Validate timed quota if present
413       if (req.hasTimedQuota()) {
414         validateTimedQuota(req.getTimedQuota());
415       }
416 
417       // apply the new settings
418       throttle = quotas.hasThrottle() ? quotas.getThrottle().toBuilder() : Throttle.newBuilder();
419 
420       switch (req.getType()) {
421       case REQUEST_NUMBER:
422         if (req.hasTimedQuota()) {
423           throttle.setReqNum(req.getTimedQuota());
424         } else {
425           throttle.clearReqNum();
426         }
427         break;
428       case REQUEST_SIZE:
429         if (req.hasTimedQuota()) {
430           throttle.setReqSize(req.getTimedQuota());
431         } else {
432           throttle.clearReqSize();
433         }
434         break;
435       case WRITE_NUMBER:
436         if (req.hasTimedQuota()) {
437           throttle.setWriteNum(req.getTimedQuota());
438         } else {
439           throttle.clearWriteNum();
440         }
441         break;
442       case WRITE_SIZE:
443         if (req.hasTimedQuota()) {
444           throttle.setWriteSize(req.getTimedQuota());
445         } else {
446           throttle.clearWriteSize();
447         }
448         break;
449       case READ_NUMBER:
450         if (req.hasTimedQuota()) {
451           throttle.setReadNum(req.getTimedQuota());
452         } else {
453           throttle.clearReqNum();
454         }
455         break;
456       case READ_SIZE:
457         if (req.hasTimedQuota()) {
458           throttle.setReadSize(req.getTimedQuota());
459         } else {
460           throttle.clearReadSize();
461         }
462         break;
463       default:
464         throw new RuntimeException("Invalid throttle type: " + req.getType());
465       }
466       quotas.setThrottle(throttle.build());
467     } else {
468       quotas.clearThrottle();
469     }
470   }
471 
472   private void applyBypassGlobals(final Quotas.Builder quotas, boolean bypassGlobals) {
473     if (bypassGlobals) {
474       quotas.setBypassGlobals(bypassGlobals);
475     } else {
476       quotas.clearBypassGlobals();
477     }
478   }
479 
480   /**
481    * Adds the information from the provided {@link SpaceLimitRequest} to the {@link Quotas} builder.
482    *
483    * @param quotas The builder to update.
484    * @param req The request to extract space quota information from.
485    */
486   void applySpaceLimit(final Quotas.Builder quotas, final SpaceLimitRequest req) {
487     if (req.hasQuota()) {
488       SpaceQuota spaceQuota = req.getQuota();
489       // If we have the remove flag, unset the space quota.
490       if (spaceQuota.getRemove()) {
491         quotas.setSpace(SpaceQuota.getDefaultInstance());
492       } else {
493         // Otherwise, update the new quota
494         applySpaceQuota(quotas, req.getQuota());
495       }
496     }
497   }
498 
499   /**
500    * Merges the provided {@link SpaceQuota} into the given {@link Quotas} builder.
501    *
502    * @param quotas The Quotas builder instance to update
503    * @param quota The SpaceQuota instance to update from
504    */
505   void applySpaceQuota(final Quotas.Builder quotas, final SpaceQuota quota) {
506     // Create a builder for Quotas
507     SpaceQuota.Builder builder = quotas.hasSpace() ? quotas.getSpace().toBuilder() :
508         SpaceQuota.newBuilder();
509     // Update the values from the provided quota into the new one and set it on Quotas.
510     quotas.setSpace(builder.mergeFrom(quota).build());
511   }
512 
513   private void validateTimedQuota(final TimedQuota timedQuota) throws IOException {
514     if (timedQuota.getSoftLimit() < 1) {
515       throw new DoNotRetryIOException(new UnsupportedOperationException(
516           "The throttle limit must be greater then 0, got " + timedQuota.getSoftLimit()));
517     }
518   }
519 
520   /*
521    * ========================================================================== Helpers
522    */
523 
524   private void checkQuotaSupport() throws IOException {
525     if (!QuotaUtil.isQuotaEnabled(masterServices.getConfiguration())) {
526       throw new DoNotRetryIOException(
527         new UnsupportedOperationException("quota support disabled"));
528     }
529     if (!initialized) {
530       long maxWaitTime = masterServices.getConfiguration().getLong(
531         "hbase.master.wait.for.quota.manager.init", 30000); // default is 30 seconds.
532       long startTime = EnvironmentEdgeManager.currentTime();
533       do {
534         try {
535           Thread.sleep(100);
536         } catch (InterruptedException e) {
537           LOG.warn("Interrupted while waiting for Quota Manager to be initialized.");
538           break;
539         }
540       } while (!initialized && (EnvironmentEdgeManager.currentTime() - startTime) < maxWaitTime);
541       if (!initialized) {
542         throw new IOException("Quota manager is uninitialized, please retry later.");
543       }
544     }
545   }
546 
547   private void createQuotaTable() throws IOException {
548     HRegionInfo[] newRegions = new HRegionInfo[] { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) };
549 
550     if (masterServices.isMasterProcedureExecutorEnabled()) {
551       masterServices.getMasterProcedureExecutor()
552         .submitProcedure(new CreateTableProcedure(
553           masterServices.getMasterProcedureExecutor().getEnvironment(),
554           QuotaUtil.QUOTA_TABLE_DESC,
555           newRegions));
556     } else {
557       masterServices.getExecutorService().submit(
558         new CreateTableHandler(masterServices, masterServices.getMasterFileSystem(),
559           QuotaUtil.QUOTA_TABLE_DESC, masterServices.getConfiguration(), newRegions,
560           masterServices).prepare());
561     }
562   }
563 
564   private static class NamedLock<T> {
565     private HashSet<T> locks = new HashSet<T>();
566 
567     public void lock(final T name) throws InterruptedException {
568       synchronized (locks) {
569         while (locks.contains(name)) {
570           locks.wait();
571         }
572         locks.add(name);
573       }
574     }
575 
576     public void unlock(final T name) {
577       synchronized (locks) {
578         locks.remove(name);
579         locks.notifyAll();
580       }
581     }
582   }
583 
584   @Override
585   public void onRegionSplitReverted(HRegionInfo hri) throws IOException {
586     if (initialized) {
587       this.namespaceQuotaManager.removeRegionFromNamespaceUsage(hri);
588     }
589   }
590 
591   /**
592    * Holds the size of a region at the given time, millis since the epoch.
593    */
594   private static class SizeSnapshot {
595     private final long size;
596     private final long time;
597 
598     public SizeSnapshot(long size, long time) {
599       this.size = size;
600       this.time = time;
601     }
602 
603     public long getSize() {
604       return size;
605     }
606 
607     public long getTime() {
608       return time;
609     }
610 
611     public boolean equals(Object o) {
612       if (o instanceof SizeSnapshot) {
613         SizeSnapshot other = (SizeSnapshot) o;
614         return size == other.size && time == other.time;
615       }
616       return false;
617     }
618 
619     public int hashCode() {
620       HashCodeBuilder hcb = new HashCodeBuilder();
621       return hcb.append(size).append(time).toHashCode();
622     }
623 
624     @Override
625     public String toString() {
626       StringBuilder sb = new StringBuilder(32);
627       sb.append("SizeSnapshot={size=").append(size).append("B, ");
628       sb.append("time=").append(time).append("}");
629       return sb.toString();
630     }
631   }
632 
633   @VisibleForTesting
634   void initializeRegionSizes() {
635     assert regionSizes == null;
636     this.regionSizes = new ConcurrentHashMap<>();
637   }
638 
639   public void addRegionSize(HRegionInfo hri, long size, long time) {
640     if (regionSizes == null) {
641       return;
642     }
643     regionSizes.put(hri, new SizeSnapshot(size, time));
644   }
645 
646   public Map<HRegionInfo, Long> snapshotRegionSizes() {
647     if (regionSizes == null) {
648       return EMPTY_MAP;
649     }
650 
651     Map<HRegionInfo, Long> copy = new HashMap<>();
652     for (Entry<HRegionInfo,SizeSnapshot> entry : regionSizes.entrySet()) {
653       copy.put(entry.getKey(), entry.getValue().getSize());
654     }
655     return copy;
656   }
657 
658   int pruneEntriesOlderThan(long timeToPruneBefore) {
659     if (regionSizes == null) {
660       return 0;
661     }
662     int numEntriesRemoved = 0;
663     Iterator<Entry<HRegionInfo,SizeSnapshot>> iterator = regionSizes.entrySet().iterator();
664     while (iterator.hasNext()) {
665       long currentEntryTime = iterator.next().getValue().getTime();
666       if (currentEntryTime < timeToPruneBefore) {
667         iterator.remove();
668         numEntriesRemoved++;
669       }
670     }
671     return numEntriesRemoved;
672   }
673 }