1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.io.hfile.bucket;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Comparator;
26 import java.util.HashSet;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import com.google.common.collect.MinMaxPriorityQueue;
33 import org.apache.commons.collections.map.LinkedMap;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
38 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
39 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
40 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
41
42 import com.google.common.base.Objects;
43 import com.google.common.base.Preconditions;
44 import com.google.common.primitives.Ints;
45
46
47
48
49
50
51
52
53
54 @InterfaceAudience.Private
55 @JsonIgnoreProperties({"indexStatistics", "freeSize", "usedSize"})
56 public final class BucketAllocator {
57 static final Log LOG = LogFactory.getLog(BucketAllocator.class);
58
59 @JsonIgnoreProperties({"completelyFree", "uninstantiated"})
60 public final static class Bucket {
61 private long baseOffset;
62 private int itemAllocationSize, sizeIndex;
63 private int itemCount;
64 private int freeList[];
65 private int freeCount, usedCount;
66
67 public Bucket(long offset) {
68 baseOffset = offset;
69 sizeIndex = -1;
70 }
71
72 void reconfigure(int sizeIndex, int[] bucketSizes, long bucketCapacity) {
73 Preconditions.checkElementIndex(sizeIndex, bucketSizes.length);
74 this.sizeIndex = sizeIndex;
75 itemAllocationSize = bucketSizes[sizeIndex];
76 itemCount = (int) (bucketCapacity / (long) itemAllocationSize);
77 freeCount = itemCount;
78 usedCount = 0;
79 freeList = new int[itemCount];
80 for (int i = 0; i < freeCount; ++i)
81 freeList[i] = i;
82 }
83
84 public boolean isUninstantiated() {
85 return sizeIndex == -1;
86 }
87
88 public int sizeIndex() {
89 return sizeIndex;
90 }
91
92 public int getItemAllocationSize() {
93 return itemAllocationSize;
94 }
95
96 public boolean hasFreeSpace() {
97 return freeCount > 0;
98 }
99
100 public boolean isCompletelyFree() {
101 return usedCount == 0;
102 }
103
104 public int freeCount() {
105 return freeCount;
106 }
107
108 public int usedCount() {
109 return usedCount;
110 }
111
112 public int getFreeBytes() {
113 return freeCount * itemAllocationSize;
114 }
115
116 public int getUsedBytes() {
117 return usedCount * itemAllocationSize;
118 }
119
120 public long getBaseOffset() {
121 return baseOffset;
122 }
123
124
125
126
127
128
129 public long allocate() {
130 assert freeCount > 0;
131 assert sizeIndex != -1;
132 ++usedCount;
133 long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
134 assert offset >= 0;
135 return offset;
136 }
137
138 public void addAllocation(long offset) throws BucketAllocatorException {
139 offset -= baseOffset;
140 if (offset < 0 || offset % itemAllocationSize != 0)
141 throw new BucketAllocatorException(
142 "Attempt to add allocation for bad offset: " + offset + " base="
143 + baseOffset + ", bucket size=" + itemAllocationSize);
144 int idx = (int) (offset / itemAllocationSize);
145 boolean matchFound = false;
146 for (int i = 0; i < freeCount; ++i) {
147 if (matchFound) freeList[i - 1] = freeList[i];
148 else if (freeList[i] == idx) matchFound = true;
149 }
150 if (!matchFound)
151 throw new BucketAllocatorException("Couldn't find match for index "
152 + idx + " in free list");
153 ++usedCount;
154 --freeCount;
155 }
156
157 private void free(long offset) {
158 offset -= baseOffset;
159 assert offset >= 0;
160 assert offset < itemCount * itemAllocationSize;
161 assert offset % itemAllocationSize == 0;
162 assert usedCount > 0;
163 assert freeCount < itemCount;
164 int item = (int) (offset / (long) itemAllocationSize);
165 assert !freeListContains(item);
166 --usedCount;
167 freeList[freeCount++] = item;
168 }
169
170 private boolean freeListContains(int blockNo) {
171 for (int i = 0; i < freeCount; ++i) {
172 if (freeList[i] == blockNo) return true;
173 }
174 return false;
175 }
176 }
177
178 final class BucketSizeInfo {
179
180
181 private LinkedMap bucketList, freeBuckets, completelyFreeBuckets;
182 private int sizeIndex;
183
184 BucketSizeInfo(int sizeIndex) {
185 bucketList = new LinkedMap();
186 freeBuckets = new LinkedMap();
187 completelyFreeBuckets = new LinkedMap();
188 this.sizeIndex = sizeIndex;
189 }
190
191 public synchronized void instantiateBucket(Bucket b) {
192 assert b.isUninstantiated() || b.isCompletelyFree();
193 b.reconfigure(sizeIndex, bucketSizes, bucketCapacity);
194 bucketList.put(b, b);
195 freeBuckets.put(b, b);
196 completelyFreeBuckets.put(b, b);
197 }
198
199 public int sizeIndex() {
200 return sizeIndex;
201 }
202
203
204
205
206
207 public long allocateBlock() {
208 Bucket b = null;
209 if (freeBuckets.size() > 0) {
210
211 b = (Bucket) freeBuckets.lastKey();
212 }
213 if (b == null) {
214 b = grabGlobalCompletelyFreeBucket();
215 if (b != null) instantiateBucket(b);
216 }
217 if (b == null) return -1;
218 long result = b.allocate();
219 blockAllocated(b);
220 return result;
221 }
222
223 void blockAllocated(Bucket b) {
224 if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
225 if (!b.hasFreeSpace()) freeBuckets.remove(b);
226 }
227
228 public Bucket findAndRemoveCompletelyFreeBucket() {
229 Bucket b = null;
230 assert bucketList.size() > 0;
231 if (bucketList.size() == 1) {
232
233 return null;
234 }
235
236 if (completelyFreeBuckets.size() > 0) {
237 b = (Bucket) completelyFreeBuckets.firstKey();
238 removeBucket(b);
239 }
240 return b;
241 }
242
243 private synchronized void removeBucket(Bucket b) {
244 assert b.isCompletelyFree();
245 bucketList.remove(b);
246 freeBuckets.remove(b);
247 completelyFreeBuckets.remove(b);
248 }
249
250 public void freeBlock(Bucket b, long offset) {
251 assert bucketList.containsKey(b);
252
253 assert (!completelyFreeBuckets.containsKey(b));
254 b.free(offset);
255 if (!freeBuckets.containsKey(b)) freeBuckets.put(b, b);
256 if (b.isCompletelyFree()) completelyFreeBuckets.put(b, b);
257 }
258
259 public synchronized IndexStatistics statistics() {
260 long free = 0, used = 0;
261 for (Object obj : bucketList.keySet()) {
262 Bucket b = (Bucket) obj;
263 free += b.freeCount();
264 used += b.usedCount();
265 }
266 return new IndexStatistics(free, used, bucketSizes[sizeIndex]);
267 }
268
269 @Override
270 public String toString() {
271 return Objects.toStringHelper(this.getClass())
272 .add("sizeIndex", sizeIndex)
273 .add("bucketSize", bucketSizes[sizeIndex])
274 .toString();
275 }
276 }
277
278
279
280
281 private static final int DEFAULT_BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
282 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
283 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
284 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
285 512 * 1024 + 1024 };
286
287
288
289
290
291 public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
292 for (int i = 0; i < bucketSizes.length; ++i)
293 if (blockSize <= bucketSizes[i])
294 return bucketSizeInfos[i];
295 return null;
296 }
297
298 static public final int FEWEST_ITEMS_IN_BUCKET = 4;
299
300 private final int[] bucketSizes;
301 private final int bigItemSize;
302
303 private final long bucketCapacity;
304 private Bucket[] buckets;
305 private BucketSizeInfo[] bucketSizeInfos;
306 private final long totalSize;
307 private long usedSize = 0;
308
309 BucketAllocator(long availableSpace, int[] bucketSizes)
310 throws BucketAllocatorException {
311 this.bucketSizes = bucketSizes == null ? DEFAULT_BUCKET_SIZES : bucketSizes;
312 Arrays.sort(this.bucketSizes);
313 this.bigItemSize = Ints.max(this.bucketSizes);
314 this.bucketCapacity = FEWEST_ITEMS_IN_BUCKET * bigItemSize;
315 buckets = new Bucket[(int) (availableSpace / bucketCapacity)];
316 if (buckets.length < this.bucketSizes.length)
317 throw new BucketAllocatorException(
318 "Bucket allocator size too small - must have room for at least "
319 + this.bucketSizes.length + " buckets");
320 bucketSizeInfos = new BucketSizeInfo[this.bucketSizes.length];
321 for (int i = 0; i < this.bucketSizes.length; ++i) {
322 bucketSizeInfos[i] = new BucketSizeInfo(i);
323 }
324 for (int i = 0; i < buckets.length; ++i) {
325 buckets[i] = new Bucket(bucketCapacity * i);
326 bucketSizeInfos[i < this.bucketSizes.length ? i : this.bucketSizes.length - 1]
327 .instantiateBucket(buckets[i]);
328 }
329 this.totalSize = ((long) buckets.length) * bucketCapacity;
330 }
331
332
333
334
335
336
337
338
339
340 BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
341 AtomicLong realCacheSize) throws BucketAllocatorException {
342 this(availableSpace, bucketSizes);
343
344
345
346
347
348 boolean[] reconfigured = new boolean[buckets.length];
349 for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
350 long foundOffset = entry.getValue().offset();
351 int foundLen = entry.getValue().getLength();
352 int bucketSizeIndex = -1;
353 for (int i = 0; i < bucketSizes.length; ++i) {
354 if (foundLen <= bucketSizes[i]) {
355 bucketSizeIndex = i;
356 break;
357 }
358 }
359 if (bucketSizeIndex == -1) {
360 throw new BucketAllocatorException(
361 "Can't match bucket size for the block with size " + foundLen);
362 }
363 int bucketNo = (int) (foundOffset / bucketCapacity);
364 if (bucketNo < 0 || bucketNo >= buckets.length)
365 throw new BucketAllocatorException("Can't find bucket " + bucketNo
366 + ", total buckets=" + buckets.length
367 + "; did you shrink the cache?");
368 Bucket b = buckets[bucketNo];
369 if (reconfigured[bucketNo]) {
370 if (b.sizeIndex() != bucketSizeIndex)
371 throw new BucketAllocatorException(
372 "Inconsistent allocation in bucket map;");
373 } else {
374 if (!b.isCompletelyFree())
375 throw new BucketAllocatorException("Reconfiguring bucket "
376 + bucketNo + " but it's already allocated; corrupt data");
377
378
379 BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
380 BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
381 oldbsi.removeBucket(b);
382 bsi.instantiateBucket(b);
383 reconfigured[bucketNo] = true;
384 }
385 realCacheSize.addAndGet(foundLen);
386 buckets[bucketNo].addAllocation(foundOffset);
387 usedSize += buckets[bucketNo].getItemAllocationSize();
388 bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
389 }
390 }
391
392 public String toString() {
393 StringBuilder sb = new StringBuilder(1024);
394 for (int i = 0; i < buckets.length; ++i) {
395 Bucket b = buckets[i];
396 if (i > 0) sb.append(", ");
397 sb.append("bucket.").append(i).append(": size=").append(b.getItemAllocationSize());
398 sb.append(", freeCount=").append(b.freeCount()).append(", used=").append(b.usedCount());
399 }
400 return sb.toString();
401 }
402
403 public long getUsedSize() {
404 return this.usedSize;
405 }
406
407 public long getFreeSize() {
408 return this.totalSize - getUsedSize();
409 }
410
411 public long getTotalSize() {
412 return this.totalSize;
413 }
414
415
416
417
418
419
420
421 public synchronized long allocateBlock(int blockSize) throws CacheFullException,
422 BucketAllocatorException {
423 assert blockSize > 0;
424 BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
425 if (bsi == null) {
426 throw new BucketAllocatorException("Allocation too big size=" + blockSize +
427 "; adjust BucketCache sizes " + CacheConfig.BUCKET_CACHE_BUCKETS_KEY +
428 " to accomodate if size seems reasonable and you want it cached.");
429 }
430 long offset = bsi.allocateBlock();
431
432
433 if (offset < 0)
434 throw new CacheFullException(blockSize, bsi.sizeIndex());
435 usedSize += bucketSizes[bsi.sizeIndex()];
436 return offset;
437 }
438
439 private Bucket grabGlobalCompletelyFreeBucket() {
440 for (BucketSizeInfo bsi : bucketSizeInfos) {
441 Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
442 if (b != null) return b;
443 }
444 return null;
445 }
446
447
448
449
450
451
452 public synchronized int freeBlock(long offset) {
453 int bucketNo = (int) (offset / bucketCapacity);
454 assert bucketNo >= 0 && bucketNo < buckets.length;
455 Bucket targetBucket = buckets[bucketNo];
456 bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
457 usedSize -= targetBucket.getItemAllocationSize();
458 return targetBucket.getItemAllocationSize();
459 }
460
461 public int sizeIndexOfAllocation(long offset) {
462 int bucketNo = (int) (offset / bucketCapacity);
463 assert bucketNo >= 0 && bucketNo < buckets.length;
464 Bucket targetBucket = buckets[bucketNo];
465 return targetBucket.sizeIndex();
466 }
467
468 public int sizeOfAllocation(long offset) {
469 int bucketNo = (int) (offset / bucketCapacity);
470 assert bucketNo >= 0 && bucketNo < buckets.length;
471 Bucket targetBucket = buckets[bucketNo];
472 return targetBucket.getItemAllocationSize();
473 }
474
475 static class IndexStatistics {
476 private long freeCount, usedCount, itemSize, totalCount;
477
478 public long freeCount() {
479 return freeCount;
480 }
481
482 public long usedCount() {
483 return usedCount;
484 }
485
486 public long totalCount() {
487 return totalCount;
488 }
489
490 public long freeBytes() {
491 return freeCount * itemSize;
492 }
493
494 public long usedBytes() {
495 return usedCount * itemSize;
496 }
497
498 public long totalBytes() {
499 return totalCount * itemSize;
500 }
501
502 public long itemSize() {
503 return itemSize;
504 }
505
506 public IndexStatistics(long free, long used, long itemSize) {
507 setTo(free, used, itemSize);
508 }
509
510 public IndexStatistics() {
511 setTo(-1, -1, 0);
512 }
513
514 public void setTo(long free, long used, long itemSize) {
515 this.itemSize = itemSize;
516 this.freeCount = free;
517 this.usedCount = used;
518 this.totalCount = free + used;
519 }
520 }
521
522 public Bucket [] getBuckets() {
523 return this.buckets;
524 }
525
526 void logStatistics() {
527 IndexStatistics total = new IndexStatistics();
528 IndexStatistics[] stats = getIndexStatistics(total);
529 LOG.info("Bucket allocator statistics follow:\n");
530 LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes="
531 + total.usedBytes() + "; total bytes=" + total.totalBytes());
532 for (IndexStatistics s : stats) {
533 LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount()
534 + "; free=" + s.freeCount() + "; total=" + s.totalCount());
535 }
536 }
537
538 IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
539 IndexStatistics[] stats = getIndexStatistics();
540 long totalfree = 0, totalused = 0;
541 for (IndexStatistics stat : stats) {
542 totalfree += stat.freeBytes();
543 totalused += stat.usedBytes();
544 }
545 grandTotal.setTo(totalfree, totalused, 1);
546 return stats;
547 }
548
549 IndexStatistics[] getIndexStatistics() {
550 IndexStatistics[] stats = new IndexStatistics[bucketSizes.length];
551 for (int i = 0; i < stats.length; ++i)
552 stats[i] = bucketSizeInfos[i].statistics();
553 return stats;
554 }
555
556 public long freeBlock(long freeList[]) {
557 long sz = 0;
558 for (int i = 0; i < freeList.length; ++i)
559 sz += freeBlock(freeList[i]);
560 return sz;
561 }
562
563 public int getBucketIndex(long offset) {
564 return (int) (offset / bucketCapacity);
565 }
566
567
568
569
570
571
572
573
574
575
576
577
578 public Set<Integer> getLeastFilledBuckets(Set<Integer> excludedBuckets,
579 int bucketCount) {
580 Queue<Integer> queue = MinMaxPriorityQueue.<Integer>orderedBy(
581 new Comparator<Integer>() {
582 @Override
583 public int compare(Integer left, Integer right) {
584
585 return Float.compare(
586 ((float) buckets[left].usedCount) / buckets[left].itemCount,
587 ((float) buckets[right].usedCount) / buckets[right].itemCount);
588 }
589 }).maximumSize(bucketCount).create();
590
591 for (int i = 0; i < buckets.length; i ++ ) {
592 if (!excludedBuckets.contains(i) && !buckets[i].isUninstantiated() &&
593
594 bucketSizeInfos[buckets[i].sizeIndex()].bucketList.size() != 1) {
595 queue.add(i);
596 }
597 }
598
599 Set<Integer> result = new HashSet<>(bucketCount);
600 result.addAll(queue);
601
602 return result;
603 }
604 }