1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.procedure.flush;
19
20 import java.util.List;
21 import java.util.concurrent.Callable;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.errorhandling.ForeignException;
27 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
28 import org.apache.hadoop.hbase.procedure.ProcedureMember;
29 import org.apache.hadoop.hbase.procedure.Subprocedure;
30 import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool;
31 import org.apache.hadoop.hbase.regionserver.Region;
32
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class FlushTableSubprocedure extends Subprocedure {
40 private static final Log LOG = LogFactory.getLog(FlushTableSubprocedure.class);
41
42 private final String table;
43 private final List<Region> regions;
44 private final FlushTableSubprocedurePool taskManager;
45
46 public FlushTableSubprocedure(ProcedureMember member,
47 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
48 List<Region> regions, String table,
49 FlushTableSubprocedurePool taskManager) {
50 super(member, table, errorListener, wakeFrequency, timeout);
51 this.table = table;
52 this.regions = regions;
53 this.taskManager = taskManager;
54 }
55
56 private static class RegionFlushTask implements Callable<Void> {
57 Region region;
58 RegionFlushTask(Region region) {
59 this.region = region;
60 }
61
62 @Override
63 public Void call() throws Exception {
64 LOG.debug("Starting region operation on " + region);
65 region.startRegionOperation();
66 try {
67 LOG.debug("Flush region " + region.toString() + " started...");
68 region.flush(true);
69
70 } finally {
71 LOG.debug("Closing region operation on " + region);
72 region.closeRegionOperation();
73 }
74 return null;
75 }
76 }
77
78 private void flushRegions() throws ForeignException {
79 if (regions.isEmpty()) {
80
81 return;
82 }
83
84 monitor.rethrowException();
85
86
87 if (taskManager.hasTasks()) {
88 throw new IllegalStateException("Attempting to flush "
89 + table + " but we currently have outstanding tasks");
90 }
91
92
93 for (Region region : regions) {
94
95 taskManager.submitTask(new RegionFlushTask(region));
96 monitor.rethrowException();
97 }
98
99
100 LOG.debug("Flush region tasks submitted for " + regions.size() + " regions");
101 try {
102 taskManager.waitForOutstandingTasks();
103 } catch (InterruptedException e) {
104 throw new ForeignException(getMemberName(), e);
105 }
106 }
107
108
109
110
111 @Override
112 public void acquireBarrier() throws ForeignException {
113 flushRegions();
114 }
115
116 @Override
117 public byte[] insideBarrier() throws ForeignException {
118
119 return new byte[0];
120 }
121
122
123
124
125 @Override
126 public void cleanup(Exception e) {
127 LOG.info("Aborting all flush region subprocedure task threads for '"
128 + table + "' due to error", e);
129 try {
130 taskManager.cancelTasks();
131 } catch (InterruptedException e1) {
132 Thread.currentThread().interrupt();
133 }
134 }
135
136 public void releaseBarrier() {
137
138 }
139
140 }