1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import static org.apache.hadoop.util.StringUtils.humanReadableInt;
22
23 import java.io.IOException;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.management.ManagementFactory;
26 import java.util.ArrayList;
27 import java.util.ConcurrentModificationException;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.SortedMap;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.DelayQueue;
36 import java.util.concurrent.Delayed;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.locks.ReentrantReadWriteLock;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.DroppedSnapshotException;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.RemoteExceptionHandler;
48 import org.apache.hadoop.hbase.classification.InterfaceAudience;
49 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
50 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
51 import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
54 import org.apache.hadoop.hbase.util.HasThread;
55 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.util.StringUtils;
58 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
59 import org.apache.htrace.Trace;
60 import org.apache.htrace.TraceScope;
61 import org.apache.hadoop.hbase.util.Counter;
62
63 import com.google.common.base.Preconditions;
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class MemStoreFlusher implements FlushRequester {
76 static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
77
78 private Configuration conf;
79
80
81 private final BlockingQueue<FlushQueueEntry> flushQueue =
82 new DelayQueue<FlushQueueEntry>();
83 private final Map<Region, FlushRegionEntry> regionsInQueue =
84 new HashMap<Region, FlushRegionEntry>();
85 private AtomicBoolean wakeupPending = new AtomicBoolean();
86
87 private final long threadWakeFrequency;
88 private final HRegionServer server;
89 private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
90 private final Object blockSignal = new Object();
91
92 protected long globalMemStoreLimit;
93 protected float globalMemStoreLimitLowMarkPercent;
94 protected long globalMemStoreLimitLowMark;
95
96 private long blockingWaitTime;
97 private final Counter updatesBlockedMsHighWater = new Counter();
98
99 private final FlushHandler[] flushHandlers;
100 private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);
101
102
103
104
105
106 public MemStoreFlusher(final Configuration conf,
107 final HRegionServer server) {
108 super();
109 this.conf = conf;
110 this.server = server;
111 this.threadWakeFrequency =
112 conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
113 long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
114 float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
115 this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
116 this.globalMemStoreLimitLowMarkPercent =
117 HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
118 this.globalMemStoreLimitLowMark =
119 (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
120
121 this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
122 90000);
123 int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
124 this.flushHandlers = new FlushHandler[handlerCount];
125 LOG.info("globalMemStoreLimit="
126 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
127 + ", globalMemStoreLimitLowMark="
128 + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
129 + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
130 }
131
132 public Counter getUpdatesBlockedMsHighWater() {
133 return this.updatesBlockedMsHighWater;
134 }
135
136
137
138
139
140
141
142 private boolean flushOneForGlobalPressure() {
143 SortedMap<Long, Region> regionsBySize = server.getCopyOfOnlineRegionsSortedBySize();
144 Set<Region> excludedRegions = new HashSet<Region>();
145
146 double secondaryMultiplier
147 = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf);
148
149 boolean flushedOne = false;
150 while (!flushedOne) {
151
152
153 Region bestFlushableRegion = getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
154
155 Region bestAnyRegion = getBiggestMemstoreRegion(
156 regionsBySize, excludedRegions, false);
157
158 Region bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize,
159 excludedRegions);
160
161 if (bestAnyRegion == null && bestRegionReplica == null) {
162 LOG.error("Above memory mark but there are no flushable regions!");
163 return false;
164 }
165
166 Region regionToFlush;
167 if (bestFlushableRegion != null &&
168 bestAnyRegion.getMemstoreSize() > 2 * bestFlushableRegion.getMemstoreSize()) {
169
170
171
172
173 if (LOG.isDebugEnabled()) {
174 LOG.debug("Under global heap pressure: " + "Region "
175 + bestAnyRegion.getRegionInfo().getRegionNameAsString()
176 + " has too many " + "store files, but is "
177 + TraditionalBinaryPrefix.long2String(bestAnyRegion.getMemstoreSize(), "", 1)
178 + " vs best flushable region's "
179 + TraditionalBinaryPrefix.long2String(bestFlushableRegion.getMemstoreSize(), "", 1)
180 + ". Choosing the bigger.");
181 }
182 regionToFlush = bestAnyRegion;
183 } else {
184 if (bestFlushableRegion == null) {
185 regionToFlush = bestAnyRegion;
186 } else {
187 regionToFlush = bestFlushableRegion;
188 }
189 }
190
191 Preconditions.checkState(
192 (regionToFlush != null && regionToFlush.getMemstoreSize() > 0) ||
193 (bestRegionReplica != null && bestRegionReplica.getMemstoreSize() > 0));
194
195 if (regionToFlush == null ||
196 (bestRegionReplica != null &&
197 ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
198 (bestRegionReplica.getMemstoreSize()
199 > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
200 LOG.info("Refreshing storefiles of region " + bestRegionReplica +
201 " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
202 server.getRegionServerAccounting().getGlobalMemstoreSize()));
203 flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
204 if (!flushedOne) {
205 LOG.info("Excluding secondary region " + bestRegionReplica +
206 " - trying to find a different region to refresh files.");
207 excludedRegions.add(bestRegionReplica);
208 }
209 } else {
210 LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
211 + "Total Memstore size="
212 + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
213 + ", Region memstore size="
214 + humanReadableInt(regionToFlush.getMemstoreSize()));
215 flushedOne = flushRegion(regionToFlush, true, true);
216
217 if (!flushedOne) {
218 LOG.info("Excluding unflushable region " + regionToFlush +
219 " - trying to find a different region to flush.");
220 excludedRegions.add(regionToFlush);
221 }
222 }
223 }
224 return true;
225 }
226
227 private class FlushHandler extends HasThread {
228
229 private FlushHandler(String name) {
230 super(name);
231 }
232
233 @Override
234 public void run() {
235 while (!server.isStopped()) {
236 FlushQueueEntry fqe = null;
237 try {
238 wakeupPending.set(false);
239 fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
240 if (fqe == null || fqe instanceof WakeupFlushThread) {
241 if (isAboveLowWaterMark()) {
242 LOG.debug("Flush thread woke up because memory above low water="
243 + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
244 if (!flushOneForGlobalPressure()) {
245
246
247
248
249
250 Thread.sleep(1000);
251 wakeUpIfBlocking();
252 }
253
254 wakeupFlushThread();
255 }
256 continue;
257 }
258 FlushRegionEntry fre = (FlushRegionEntry) fqe;
259 if (!flushRegion(fre)) {
260 break;
261 }
262 } catch (InterruptedException ex) {
263 continue;
264 } catch (ConcurrentModificationException ex) {
265 continue;
266 } catch (Exception ex) {
267 LOG.error("Cache flusher failed for entry " + fqe, ex);
268 if (!server.checkFileSystem()) {
269 break;
270 }
271 }
272 }
273 synchronized (regionsInQueue) {
274 regionsInQueue.clear();
275 flushQueue.clear();
276 }
277
278
279 wakeUpIfBlocking();
280 LOG.info(getName() + " exiting");
281 }
282 }
283
284
285 private void wakeupFlushThread() {
286 if (wakeupPending.compareAndSet(false, true)) {
287 flushQueue.add(new WakeupFlushThread());
288 }
289 }
290
291 private Region getBiggestMemstoreRegion(
292 SortedMap<Long, Region> regionsBySize,
293 Set<Region> excludedRegions,
294 boolean checkStoreFileCount) {
295 synchronized (regionsInQueue) {
296 for (Region region : regionsBySize.values()) {
297 if (excludedRegions.contains(region)) {
298 continue;
299 }
300
301 if (((HRegion)region).writestate.flushing ||
302 !((HRegion)region).writestate.writesEnabled) {
303 continue;
304 }
305
306 if (checkStoreFileCount && isTooManyStoreFiles(region)) {
307 continue;
308 }
309 return region;
310 }
311 }
312 return null;
313 }
314
315 private Region getBiggestMemstoreOfRegionReplica(SortedMap<Long, Region> regionsBySize,
316 Set<Region> excludedRegions) {
317 synchronized (regionsInQueue) {
318 for (Region region : regionsBySize.values()) {
319 if (excludedRegions.contains(region)) {
320 continue;
321 }
322
323 if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
324 continue;
325 }
326
327 return region;
328 }
329 }
330 return null;
331 }
332
333 private boolean refreshStoreFilesAndReclaimMemory(Region region) {
334 try {
335 return region.refreshStoreFiles();
336 } catch (IOException e) {
337 LOG.warn("Refreshing store files failed with exception", e);
338 }
339 return false;
340 }
341
342
343
344
345 private boolean isAboveHighWaterMark() {
346 return server.getRegionServerAccounting().
347 getGlobalMemstoreSize() >= globalMemStoreLimit;
348 }
349
350
351
352
353 private boolean isAboveLowWaterMark() {
354 return server.getRegionServerAccounting().
355 getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
356 }
357
358 @Override
359 public void requestFlush(Region r, boolean forceFlushAllStores) {
360 synchronized (regionsInQueue) {
361 if (!regionsInQueue.containsKey(r)) {
362
363
364 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
365 this.regionsInQueue.put(r, fqe);
366 this.flushQueue.add(fqe);
367 }
368 }
369 }
370
371 @Override
372 public void requestDelayedFlush(Region r, long delay, boolean forceFlushAllStores) {
373 synchronized (regionsInQueue) {
374 if (!regionsInQueue.containsKey(r)) {
375
376 FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
377 fqe.requeue(delay);
378 this.regionsInQueue.put(r, fqe);
379 this.flushQueue.add(fqe);
380 }
381 }
382 }
383
384 public int getFlushQueueSize() {
385 return flushQueue.size();
386 }
387
388
389
390
391 void interruptIfNecessary() {
392 lock.writeLock().lock();
393 try {
394 for (FlushHandler flushHander : flushHandlers) {
395 if (flushHander != null) flushHander.interrupt();
396 }
397 } finally {
398 lock.writeLock().unlock();
399 }
400 }
401
402 synchronized void start(UncaughtExceptionHandler eh) {
403 ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory(
404 server.getServerName().toShortString() + "-MemStoreFlusher", eh);
405 for (int i = 0; i < flushHandlers.length; i++) {
406 flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
407 flusherThreadFactory.newThread(flushHandlers[i]);
408 flushHandlers[i].start();
409 }
410 }
411
412 boolean isAlive() {
413 for (FlushHandler flushHander : flushHandlers) {
414 if (flushHander != null && flushHander.isAlive()) {
415 return true;
416 }
417 }
418 return false;
419 }
420
421 void join() {
422 for (FlushHandler flushHander : flushHandlers) {
423 if (flushHander != null) {
424 Threads.shutdown(flushHander.getThread());
425 }
426 }
427 }
428
429
430
431
432
433
434
435
436
437 private boolean flushRegion(final FlushRegionEntry fqe) {
438 Region region = fqe.region;
439 if (!region.getRegionInfo().isMetaRegion() &&
440 isTooManyStoreFiles(region)) {
441 if (fqe.isMaximumWait(this.blockingWaitTime)) {
442 LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
443 "ms on a compaction to clean up 'too many store files'; waited " +
444 "long enough... proceeding with flush of " +
445 region.getRegionInfo().getRegionNameAsString());
446 } else {
447
448 if (fqe.getRequeueCount() <= 0) {
449
450 LOG.warn("Region " + region.getRegionInfo().getRegionNameAsString() + " has too many " +
451 "store files; delaying flush up to " + this.blockingWaitTime + "ms");
452 if (!this.server.compactSplitThread.requestSplit(region)) {
453 try {
454 this.server.compactSplitThread.requestSystemCompaction(
455 region, Thread.currentThread().getName());
456 } catch (IOException e) {
457 LOG.error("Cache flush failed for region " +
458 Bytes.toStringBinary(region.getRegionInfo().getRegionName()),
459 RemoteExceptionHandler.checkIOException(e));
460 }
461 }
462 }
463
464
465
466 this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
467
468 return true;
469 }
470 }
471 return flushRegion(region, false, fqe.isForceFlushAllStores());
472 }
473
474
475
476
477
478
479
480
481
482
483
484
485
486 private boolean flushRegion(final Region region, final boolean emergencyFlush,
487 boolean forceFlushAllStores) {
488 synchronized (this.regionsInQueue) {
489 FlushRegionEntry fqe = this.regionsInQueue.remove(region);
490
491 if (fqe != null && emergencyFlush) {
492
493
494 flushQueue.remove(fqe);
495 }
496 }
497
498 lock.readLock().lock();
499 try {
500 notifyFlushRequest(region, emergencyFlush);
501 FlushResult flushResult = region.flush(forceFlushAllStores);
502 boolean shouldCompact = flushResult.isCompactionNeeded();
503
504 boolean shouldSplit = ((HRegion)region).checkSplit() != null;
505 if (shouldSplit) {
506 this.server.compactSplitThread.requestSplit(region);
507 } else if (shouldCompact) {
508 server.compactSplitThread.requestSystemCompaction(
509 region, Thread.currentThread().getName());
510 }
511 } catch (DroppedSnapshotException ex) {
512
513
514
515
516
517 server.abort("Replay of WAL required. Forcing server shutdown", ex);
518 return false;
519 } catch (IOException ex) {
520 LOG.error("Cache flush failed" + (region != null ? (" for region " +
521 Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
522 RemoteExceptionHandler.checkIOException(ex));
523 if (!server.checkFileSystem()) {
524 return false;
525 }
526 } finally {
527 lock.readLock().unlock();
528 wakeUpIfBlocking();
529 }
530 return true;
531 }
532
533 private void notifyFlushRequest(Region region, boolean emergencyFlush) {
534 FlushType type = FlushType.NORMAL;
535 if (emergencyFlush) {
536 type = isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
537 }
538 for (FlushRequestListener listener : flushRequestListeners) {
539 listener.flushRequested(type, region);
540 }
541 }
542
543 private void wakeUpIfBlocking() {
544 synchronized (blockSignal) {
545 blockSignal.notifyAll();
546 }
547 }
548
549 private boolean isTooManyStoreFiles(Region region) {
550 for (Store store : region.getStores()) {
551 if (store.hasTooManyStoreFiles()) {
552 return true;
553 }
554 }
555 return false;
556 }
557
558
559
560
561
562
563
564 public void reclaimMemStoreMemory() {
565 TraceScope scope = Trace.startSpan("MemStoreFluser.reclaimMemStoreMemory");
566 if (isAboveHighWaterMark()) {
567 if (Trace.isTracing()) {
568 scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
569 }
570 long start = EnvironmentEdgeManager.currentTime();
571 synchronized (this.blockSignal) {
572 boolean blocked = false;
573 long startTime = 0;
574 boolean interrupted = false;
575 try {
576 while (isAboveHighWaterMark() && !server.isStopped()) {
577 if (!blocked) {
578 startTime = EnvironmentEdgeManager.currentTime();
579 LOG.info("Blocking updates on "
580 + server.toString()
581 + ": the global memstore size "
582 + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
583 .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
584 + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
585 }
586 blocked = true;
587 wakeupFlushThread();
588 try {
589
590
591 blockSignal.wait(5 * 1000);
592 } catch (InterruptedException ie) {
593 LOG.warn("Interrupted while waiting");
594 interrupted = true;
595 }
596 long took = EnvironmentEdgeManager.currentTime() - start;
597 LOG.warn("Memstore is above high water mark and block " + took + "ms");
598 }
599 } finally {
600 if (interrupted) {
601 Thread.currentThread().interrupt();
602 }
603 }
604
605 if(blocked){
606 final long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
607 if(totalTime > 0){
608 this.updatesBlockedMsHighWater.add(totalTime);
609 }
610 LOG.info("Unblocking updates for server " + server.toString());
611 }
612 }
613 } else if (isAboveLowWaterMark()) {
614 wakeupFlushThread();
615 }
616 scope.close();
617 }
618 @Override
619 public String toString() {
620 return "flush_queue="
621 + flushQueue.size();
622 }
623
624 public String dumpQueue() {
625 StringBuilder queueList = new StringBuilder();
626 queueList.append("Flush Queue Queue dump:\n");
627 queueList.append(" Flush Queue:\n");
628 java.util.Iterator<FlushQueueEntry> it = flushQueue.iterator();
629
630 while(it.hasNext()){
631 queueList.append(" "+it.next().toString());
632 queueList.append("\n");
633 }
634
635 return queueList.toString();
636 }
637
638
639
640
641
642 @Override
643 public void registerFlushRequestListener(final FlushRequestListener listener) {
644 this.flushRequestListeners.add(listener);
645 }
646
647
648
649
650
651
652 @Override
653 public boolean unregisterFlushRequestListener(final FlushRequestListener listener) {
654 return this.flushRequestListeners.remove(listener);
655 }
656
657
658
659
660
661 @Override
662 public void setGlobalMemstoreLimit(long globalMemStoreSize) {
663 this.globalMemStoreLimit = globalMemStoreSize;
664 this.globalMemStoreLimitLowMark =
665 (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
666 reclaimMemStoreMemory();
667 }
668
669 public long getMemoryLimit() {
670 return this.globalMemStoreLimit;
671 }
672
673 interface FlushQueueEntry extends Delayed {
674 }
675
676
677
678
679 static class WakeupFlushThread implements FlushQueueEntry {
680 @Override
681 public long getDelay(TimeUnit unit) {
682 return 0;
683 }
684
685 @Override
686 public int compareTo(Delayed o) {
687 return -1;
688 }
689
690 @Override
691 public boolean equals(Object obj) {
692 return (this == obj);
693 }
694 }
695
696
697
698
699
700
701
702
703
704 static class FlushRegionEntry implements FlushQueueEntry {
705 private final Region region;
706
707 private final long createTime;
708 private long whenToExpire;
709 private int requeueCount = 0;
710
711 private boolean forceFlushAllStores;
712
713 FlushRegionEntry(final Region r, boolean forceFlushAllStores) {
714 this.region = r;
715 this.createTime = EnvironmentEdgeManager.currentTime();
716 this.whenToExpire = this.createTime;
717 this.forceFlushAllStores = forceFlushAllStores;
718 }
719
720
721
722
723
724 public boolean isMaximumWait(final long maximumWait) {
725 return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
726 }
727
728
729
730
731
732 public int getRequeueCount() {
733 return this.requeueCount;
734 }
735
736
737
738
739 public boolean isForceFlushAllStores() {
740 return forceFlushAllStores;
741 }
742
743
744
745
746
747
748
749 public FlushRegionEntry requeue(final long when) {
750 this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
751 this.requeueCount++;
752 return this;
753 }
754
755 @Override
756 public long getDelay(TimeUnit unit) {
757 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
758 TimeUnit.MILLISECONDS);
759 }
760
761 @Override
762 public int compareTo(Delayed other) {
763
764 int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
765 other.getDelay(TimeUnit.MILLISECONDS)).intValue();
766 if (ret != 0) {
767 return ret;
768 }
769 FlushQueueEntry otherEntry = (FlushQueueEntry) other;
770 return hashCode() - otherEntry.hashCode();
771 }
772
773 @Override
774 public String toString() {
775 return "[flush region "+Bytes.toStringBinary(region.getRegionInfo().getRegionName())+"]";
776 }
777
778 @Override
779 public int hashCode() {
780 int hash = (int) getDelay(TimeUnit.MILLISECONDS);
781 return hash ^ region.hashCode();
782 }
783
784 @Override
785 public boolean equals(Object obj) {
786 if (this == obj) {
787 return true;
788 }
789 if (obj == null || getClass() != obj.getClass()) {
790 return false;
791 }
792 Delayed other = (Delayed) obj;
793 return compareTo(other) == 0;
794 }
795 }
796 }
797
798 enum FlushType {
799 NORMAL, ABOVE_LOWER_MARK, ABOVE_HIGHER_MARK;
800 }