1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.util.Collection;
21 import java.util.HashSet;
22 import java.util.Set;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceAudience;
28
29
30
31
32
33 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
34 public class FlushLargeStoresPolicy extends FlushPolicy {
35
36 private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
37
38 public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
39 "hbase.hregion.percolumnfamilyflush.size.lower.bound";
40
41 public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN =
42 "hbase.hregion.percolumnfamilyflush.size.lower.bound.min";
43
44 private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN =
45 1024 * 1024 * 16L;
46
47 private long flushSizeLowerBound = -1;
48
49 @Override
50 protected void configureForRegion(HRegion region) {
51 super.configureForRegion(region);
52 int familyNumber = region.getTableDesc().getFamilies().size();
53 if (familyNumber <= 1) {
54
55
56 return;
57 }
58
59
60 long flushSizeLowerBound = region.getMemstoreFlushSize() / familyNumber;
61 long minimumLowerBound =
62 getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
63 DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN);
64 if (minimumLowerBound > flushSizeLowerBound) {
65 flushSizeLowerBound = minimumLowerBound;
66 }
67
68 String flushedSizeLowerBoundString =
69 region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
70 if (flushedSizeLowerBoundString == null) {
71 if (LOG.isDebugEnabled()) {
72 LOG.debug("No " + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
73 + " set in description of table " + region.getTableDesc().getTableName()
74 + ", use config (" + flushSizeLowerBound + ") instead");
75 }
76 } else {
77 try {
78 flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
79 } catch (NumberFormatException nfe) {
80
81 LOG.warn("Number format exception when parsing "
82 + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
83 + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
84 + ", use config (" + flushSizeLowerBound + ") instead");
85
86 }
87 }
88 this.flushSizeLowerBound = flushSizeLowerBound;
89 }
90
91 private boolean shouldFlush(Store store) {
92 if (store.getMemStoreSize() > this.flushSizeLowerBound) {
93 if (LOG.isDebugEnabled()) {
94 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
95 + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
96 + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
97 }
98 return true;
99 }
100 return region.shouldFlushStore(store);
101 }
102
103 @Override
104 public Collection<Store> selectStoresToFlush() {
105
106 if (region.getTableDesc().getFamilies().size() == 1) {
107 return region.stores.values();
108 }
109
110 Collection<Store> stores = region.stores.values();
111 Set<Store> specificStoresToFlush = new HashSet<Store>();
112 for (Store store : stores) {
113 if (shouldFlush(store)) {
114 specificStoresToFlush.add(store);
115 }
116 }
117
118 if (specificStoresToFlush.isEmpty()) {
119 if (LOG.isDebugEnabled()) {
120 LOG.debug("Since none of the CFs were above the size, flushing all.");
121 }
122 return stores;
123 } else {
124 return specificStoresToFlush;
125 }
126 }
127
128 }