1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.quotas;
18
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertNotNull;
21 import static org.junit.Assert.assertNull;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HRegionInfo;
40 import org.apache.hadoop.hbase.NamespaceDescriptor;
41 import org.apache.hadoop.hbase.NamespaceNotFoundException;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.client.Admin;
44 import org.apache.hadoop.hbase.client.Connection;
45 import org.apache.hadoop.hbase.master.HMaster;
46 import org.apache.hadoop.hbase.quotas.QuotaObserverChore.TablesWithQuotas;
47 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceQuota;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.junit.AfterClass;
50 import org.junit.Before;
51 import org.junit.BeforeClass;
52 import org.junit.Rule;
53 import org.junit.Test;
54 import org.junit.experimental.categories.Category;
55 import org.junit.rules.TestName;
56
57 import com.google.common.collect.Iterables;
58 import com.google.common.collect.Multimap;
59
60
61
62
63 @Category(LargeTests.class)
64 public class TestQuotaObserverChoreWithMiniCluster {
65 private static final Log LOG = LogFactory.getLog(TestQuotaObserverChoreWithMiniCluster.class);
66 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
67 private static final AtomicLong COUNTER = new AtomicLong(0);
68
69 @Rule
70 public TestName testName = new TestName();
71
72 private HMaster master;
73 private QuotaObserverChore chore;
74 private SpaceQuotaSnapshotNotifierForTest snapshotNotifier;
75 private SpaceQuotaHelperForTests helper;
76
77 @BeforeClass
78 public static void setUp() throws Exception {
79 Configuration conf = TEST_UTIL.getConfiguration();
80 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_DELAY_KEY, 1000);
81 conf.setInt(FileSystemUtilizationChore.FS_UTILIZATION_CHORE_PERIOD_KEY, 1000);
82 conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_DELAY_KEY, 1000);
83 conf.setInt(QuotaObserverChore.QUOTA_OBSERVER_CHORE_PERIOD_KEY, 1000);
84 conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
85 conf.setClass(SpaceQuotaSnapshotNotifierFactory.SNAPSHOT_NOTIFIER_KEY,
86 SpaceQuotaSnapshotNotifierForTest.class, SpaceQuotaSnapshotNotifier.class);
87 TEST_UTIL.startMiniCluster(1);
88 }
89
90 @AfterClass
91 public static void tearDown() throws Exception {
92 TEST_UTIL.shutdownMiniCluster();
93 }
94
95 @Before
96 public void removeAllQuotas() throws Exception {
97 final Connection conn = TEST_UTIL.getConnection();
98 if (helper == null) {
99 helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, COUNTER);
100 }
101
102 if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
103 helper.waitForQuotaTable(conn);
104 } else {
105
106 helper.removeAllQuotas(conn);
107 assertEquals(0, helper.listNumDefinedQuotas(conn));
108 }
109
110 master = TEST_UTIL.getMiniHBaseCluster().getMaster();
111 snapshotNotifier =
112 (SpaceQuotaSnapshotNotifierForTest) master.getSpaceQuotaSnapshotNotifier();
113 assertNotNull(snapshotNotifier);
114 snapshotNotifier.clearSnapshots();
115 chore = master.getQuotaObserverChore();
116 }
117
118 @Test
119 public void testTableViolatesQuota() throws Exception {
120 TableName tn = helper.createTableWithRegions(10);
121
122 final long sizeLimit = 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
123 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.NO_INSERTS;
124 QuotaSettings settings = QuotaSettingsFactory.limitTableSpace(tn, sizeLimit, violationPolicy);
125 TEST_UTIL.getHBaseAdmin().setQuota(settings);
126
127
128 helper.writeData(tn, 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
129
130 Map<TableName,SpaceQuotaSnapshot> quotaSnapshots = snapshotNotifier.copySnapshots();
131 boolean foundSnapshot = false;
132 while (!foundSnapshot) {
133 if (quotaSnapshots.isEmpty()) {
134 LOG.info("Found no violated quotas, sleeping and retrying. Current reports: "
135 + master.getMasterQuotaManager().snapshotRegionSizes());
136 sleepWithInterrupt(1000);
137 quotaSnapshots = snapshotNotifier.copySnapshots();
138 } else {
139 Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
140 assertEquals(tn, entry.getKey());
141 final SpaceQuotaSnapshot snapshot = entry.getValue();
142 if (!snapshot.getQuotaStatus().isInViolation()) {
143 LOG.info("Found a snapshot, but it was not yet in violation. " + snapshot);
144 sleepWithInterrupt(1000);
145 quotaSnapshots = snapshotNotifier.copySnapshots();
146 } else {
147 foundSnapshot = true;
148 }
149 }
150 }
151
152 Entry<TableName,SpaceQuotaSnapshot> entry = Iterables.getOnlyElement(quotaSnapshots.entrySet());
153 assertEquals(tn, entry.getKey());
154 final SpaceQuotaSnapshot snapshot = entry.getValue();
155 assertEquals("Snapshot was " + snapshot, violationPolicy, snapshot.getQuotaStatus().getPolicy());
156 assertEquals(sizeLimit, snapshot.getLimit());
157 assertTrue(
158 "The usage should be greater than the limit, but were " + snapshot.getUsage() + " and "
159 + snapshot.getLimit() + ", respectively", snapshot.getUsage() > snapshot.getLimit());
160 }
161
162 @Test
163 public void testNamespaceViolatesQuota() throws Exception {
164 final String namespace = testName.getMethodName();
165 final Admin admin = TEST_UTIL.getHBaseAdmin();
166
167 try {
168 admin.getNamespaceDescriptor(namespace);
169 } catch (NamespaceNotFoundException e) {
170 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
171 admin.createNamespace(desc);
172 }
173
174 TableName tn1 = helper.createTableWithRegions(namespace, 5);
175 TableName tn2 = helper.createTableWithRegions(namespace, 5);
176 TableName tn3 = helper.createTableWithRegions(namespace, 5);
177
178 final long sizeLimit = 5L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
179 final SpaceViolationPolicy violationPolicy = SpaceViolationPolicy.DISABLE;
180 QuotaSettings settings = QuotaSettingsFactory.limitNamespaceSpace(namespace, sizeLimit, violationPolicy);
181 admin.setQuota(settings);
182
183 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
184 admin.flush(tn1);
185 Map<TableName,SpaceQuotaSnapshot> snapshots = snapshotNotifier.copySnapshots();
186 for (int i = 0; i < 5; i++) {
187
188 assertEquals(
189 "Should not see any quota violations after writing 2MB of data: " + snapshots, 0,
190 numQuotaSnapshotsInViolation(snapshots));
191 try {
192 Thread.sleep(1000);
193 } catch (InterruptedException e) {
194 LOG.debug("Interrupted while sleeping." , e);
195 }
196 snapshots = snapshotNotifier.copySnapshots();
197 }
198
199 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
200 admin.flush(tn2);
201 snapshots = snapshotNotifier.copySnapshots();
202 for (int i = 0; i < 5; i++) {
203
204 assertEquals("Should not see any quota violations after writing 4MB of data", 0,
205 numQuotaSnapshotsInViolation(snapshots));
206 try {
207 Thread.sleep(1000);
208 } catch (InterruptedException e) {
209 LOG.debug("Interrupted while sleeping." , e);
210 }
211 snapshots = snapshotNotifier.copySnapshots();
212 }
213
214
215
216 helper.writeData(tn3, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
217 admin.flush(tn3);
218 snapshots = snapshotNotifier.copySnapshots();
219 while (numQuotaSnapshotsInViolation(snapshots) < 3) {
220 LOG.debug("Saw fewer violations than desired (expected 3): " + snapshots
221 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
222 try {
223 Thread.sleep(1000);
224 } catch (InterruptedException e) {
225 LOG.debug("Interrupted while sleeping.", e);
226 Thread.currentThread().interrupt();
227 }
228 snapshots = snapshotNotifier.copySnapshots();
229 }
230
231 SpaceQuotaSnapshot snapshot1 = snapshots.remove(tn1);
232 assertNotNull("tn1 should be in violation", snapshot1);
233 assertEquals(violationPolicy, snapshot1.getQuotaStatus().getPolicy());
234 SpaceQuotaSnapshot snapshot2 = snapshots.remove(tn2);
235 assertNotNull("tn2 should be in violation", snapshot2);
236 assertEquals(violationPolicy, snapshot2.getQuotaStatus().getPolicy());
237 SpaceQuotaSnapshot snapshot3 = snapshots.remove(tn3);
238 assertNotNull("tn3 should be in violation", snapshot3);
239 assertEquals(violationPolicy, snapshot3.getQuotaStatus().getPolicy());
240 assertTrue("Unexpected additional quota violations: " + snapshots, snapshots.isEmpty());
241 }
242
243 @Test
244 public void testTableQuotaOverridesNamespaceQuota() throws Exception {
245 final String namespace = testName.getMethodName();
246 final Admin admin = TEST_UTIL.getHBaseAdmin();
247
248 try {
249 admin.getNamespaceDescriptor(namespace);
250 } catch (NamespaceNotFoundException e) {
251 NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
252 admin.createNamespace(desc);
253 }
254
255 TableName tn1 = helper.createTableWithRegions(namespace, 5);
256 TableName tn2 = helper.createTableWithRegions(namespace, 5);
257
258 final long namespaceSizeLimit = 3L * SpaceQuotaHelperForTests.ONE_MEGABYTE;
259 final SpaceViolationPolicy namespaceViolationPolicy = SpaceViolationPolicy.DISABLE;
260 QuotaSettings namespaceSettings = QuotaSettingsFactory.limitNamespaceSpace(namespace,
261 namespaceSizeLimit, namespaceViolationPolicy);
262 admin.setQuota(namespaceSettings);
263
264 helper.writeData(tn1, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
265 admin.flush(tn1);
266 Map<TableName,SpaceQuotaSnapshot> violatedQuotas = snapshotNotifier.copySnapshots();
267 for (int i = 0; i < 5; i++) {
268
269 assertEquals("Should not see any quota violations after writing 2MB of data: " + violatedQuotas, 0,
270 numQuotaSnapshotsInViolation(violatedQuotas));
271 try {
272 Thread.sleep(1000);
273 } catch (InterruptedException e) {
274 LOG.debug("Interrupted while sleeping." , e);
275 }
276 violatedQuotas = snapshotNotifier.copySnapshots();
277 }
278
279 helper.writeData(tn2, 2L * SpaceQuotaHelperForTests.ONE_MEGABYTE);
280 admin.flush(tn2);
281 violatedQuotas = snapshotNotifier.copySnapshots();
282 while (numQuotaSnapshotsInViolation(violatedQuotas) < 2) {
283 LOG.debug("Saw fewer violations than desired (expected 2): " + violatedQuotas
284 + ". Current reports: " + master.getMasterQuotaManager().snapshotRegionSizes());
285 try {
286 Thread.sleep(1000);
287 } catch (InterruptedException e) {
288 LOG.debug("Interrupted while sleeping.", e);
289 Thread.currentThread().interrupt();
290 }
291 violatedQuotas = snapshotNotifier.copySnapshots();
292 }
293
294 SpaceQuotaSnapshot actualPolicyTN1 = violatedQuotas.get(tn1);
295 assertNotNull("Expected to see violation policy for tn1", actualPolicyTN1);
296 assertEquals(namespaceViolationPolicy, actualPolicyTN1.getQuotaStatus().getPolicy());
297 SpaceQuotaSnapshot actualPolicyTN2 = violatedQuotas.get(tn2);
298 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
299 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
300
301
302 final long tableSizeLimit = SpaceQuotaHelperForTests.ONE_MEGABYTE;
303 final SpaceViolationPolicy tableViolationPolicy = SpaceViolationPolicy.NO_INSERTS;
304 QuotaSettings tableSettings = QuotaSettingsFactory.limitTableSpace(tn1, tableSizeLimit,
305 tableViolationPolicy);
306 admin.setQuota(tableSettings);
307
308
309 while (true) {
310 violatedQuotas = snapshotNotifier.copySnapshots();
311 SpaceQuotaSnapshot actualTableSnapshot = violatedQuotas.get(tn1);
312 assertNotNull("Violation policy should never be null", actualTableSnapshot);
313 if (tableViolationPolicy != actualTableSnapshot.getQuotaStatus().getPolicy()) {
314 LOG.debug("Saw unexpected table violation policy, waiting and re-checking.");
315 try {
316 Thread.sleep(1000);
317 } catch (InterruptedException e) {
318 LOG.debug("Interrupted while sleeping");
319 Thread.currentThread().interrupt();
320 }
321 continue;
322 }
323 assertEquals(tableViolationPolicy, actualTableSnapshot.getQuotaStatus().getPolicy());
324 break;
325 }
326
327
328 actualPolicyTN2 = violatedQuotas.get(tn2);
329 assertNotNull("Expected to see violation policy for tn2", actualPolicyTN2);
330 assertEquals(namespaceViolationPolicy, actualPolicyTN2.getQuotaStatus().getPolicy());
331 }
332
333 @Test
334 public void testGetAllTablesWithQuotas() throws Exception {
335 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
336 Set<TableName> tablesWithQuotas = new HashSet<>();
337 Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
338
339 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
340
341 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
342 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
343 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
344 }
345
346 @Test
347 public void testRpcQuotaTablesAreFiltered() throws Exception {
348 final Multimap<TableName, QuotaSettings> quotas = helper.createTablesWithSpaceQuotas();
349 Set<TableName> tablesWithQuotas = new HashSet<>();
350 Set<TableName> namespaceTablesWithQuotas = new HashSet<>();
351
352 helper.partitionTablesByQuotaTarget(quotas, tablesWithQuotas, namespaceTablesWithQuotas);
353
354 TableName rpcQuotaTable = helper.createTable();
355 TEST_UTIL.getHBaseAdmin().setQuota(QuotaSettingsFactory
356 .throttleTable(rpcQuotaTable, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
357
358
359 TablesWithQuotas tables = chore.fetchAllTablesWithQuotasDefined();
360 assertEquals("Found tables: " + tables, tablesWithQuotas, tables.getTableQuotaTables());
361 assertEquals("Found tables: " + tables, namespaceTablesWithQuotas, tables.getNamespaceQuotaTables());
362 }
363
364 @Test
365 public void testFilterRegions() throws Exception {
366 final Map<TableName,Integer> mockReportedRegions = new HashMap<>();
367
368
369 TablesWithQuotas tables = new TablesWithQuotas(TEST_UTIL.getConnection(),
370 TEST_UTIL.getConfiguration()) {
371 @Override
372 int getNumReportedRegions(TableName table, QuotaSnapshotStore<TableName> tableStore) {
373 Integer i = mockReportedRegions.get(table);
374 if (i == null) {
375 return 0;
376 }
377 return i;
378 }
379 };
380
381
382 TableName tn1 = helper.createTableWithRegions(20);
383 TableName tn2 = helper.createTableWithRegions(20);
384 TableName tn3 = helper.createTableWithRegions(20);
385
386
387 tables.addTableQuotaTable(tn1);
388 tables.addTableQuotaTable(tn2);
389 tables.addTableQuotaTable(tn3);
390
391
392 mockReportedRegions.put(tn1, 10);
393 mockReportedRegions.put(tn2, 19);
394 mockReportedRegions.put(tn3, 20);
395
396
397 tables.filterInsufficientlyReportedTables(null);
398
399 assertEquals(new HashSet<>(Arrays.asList(tn2, tn3)), tables.getTableQuotaTables());
400 }
401
402 @Test
403 public void testFetchSpaceQuota() throws Exception {
404 Multimap<TableName,QuotaSettings> tables = helper.createTablesWithSpaceQuotas();
405
406 chore.initializeViolationStores(Collections.<HRegionInfo,Long> emptyMap());
407
408 for (Entry<TableName,QuotaSettings> entry : tables.entries()) {
409 final TableName table = entry.getKey();
410 final QuotaSettings qs = entry.getValue();
411
412 assertTrue("QuotaSettings was an instance of " + qs.getClass(),
413 qs instanceof SpaceLimitSettings);
414
415 SpaceQuota spaceQuota = null;
416 if (qs.getTableName() != null) {
417 spaceQuota = chore.getTableSnapshotStore().getSpaceQuota(table);
418 assertNotNull("Could not find table space quota for " + table, spaceQuota);
419 } else if (qs.getNamespace() != null) {
420 spaceQuota = chore.getNamespaceSnapshotStore().getSpaceQuota(table.getNamespaceAsString());
421 assertNotNull("Could not find namespace space quota for " + table.getNamespaceAsString(), spaceQuota);
422 } else {
423 fail("Expected table or namespace space quota");
424 }
425
426 final SpaceLimitSettings sls = (SpaceLimitSettings) qs;
427 assertEquals(sls.getProto().getQuota(), spaceQuota);
428 }
429
430 TableName tableWithoutQuota = helper.createTable();
431 assertNull(chore.getTableSnapshotStore().getSpaceQuota(tableWithoutQuota));
432 }
433
434 private int numQuotaSnapshotsInViolation(Map<TableName,SpaceQuotaSnapshot> snapshots) {
435 int sum = 0;
436 for (SpaceQuotaSnapshot snapshot : snapshots.values()) {
437 if (snapshot.getQuotaStatus().isInViolation()) {
438 sum++;
439 }
440 }
441 return sum;
442 }
443
444 private void sleepWithInterrupt(long millis) {
445 try {
446 Thread.sleep(millis);
447 } catch (InterruptedException e) {
448 LOG.debug("Interrupted while sleeping");
449 Thread.currentThread().interrupt();
450 }
451 }
452 }