1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.SynchronousQueue;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.hbase.HColumnDescriptor;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.mob.MobUtils;
38 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39
40
41
42
43 @InterfaceAudience.Private
44 public class MasterMobCompactionThread {
45 static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class);
46 private final HMaster master;
47 private final Configuration conf;
48 private final ExecutorService mobCompactorPool;
49 private final ExecutorService masterMobPool;
50
51 public MasterMobCompactionThread(HMaster master) {
52 this.master = master;
53 this.conf = master.getConfiguration();
54 final String n = Thread.currentThread().getName();
55
56 this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
57 new SynchronousQueue<Runnable>(), new ThreadFactory() {
58 @Override
59 public Thread newThread(Runnable r) {
60 Thread t = new Thread(r);
61 t.setName(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime());
62 return t;
63 }
64 });
65 ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
66
67
68 this.mobCompactorPool = MobUtils
69 .createMobCompactorThreadPool(master.getConfiguration());
70 }
71
72
73
74
75
76
77
78
79
80
81 public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
82 List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles)
83 throws IOException {
84 master.reportMobCompactionStart(tableName);
85 try {
86 masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager,
87 allFiles, mobCompactorPool));
88 } catch (RejectedExecutionException e) {
89
90 try {
91 master.reportMobCompactionEnd(tableName);
92 } catch (IOException e1) {
93 LOG.error("Failed to mark end of mob compation", e1);
94 }
95 throw e;
96 }
97 if (LOG.isDebugEnabled()) {
98 LOG.debug("The mob compaction is requested for the columns " + columns
99 + " of the table " + tableName.getNameAsString());
100 }
101 }
102
103 private class CompactionRunner implements Runnable {
104 private FileSystem fs;
105 private TableName tableName;
106 private List<HColumnDescriptor> hcds;
107 private TableLockManager tableLockManager;
108 private boolean allFiles;
109 private ExecutorService pool;
110
111 public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
112 TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) {
113 super();
114 this.fs = fs;
115 this.tableName = tableName;
116 this.hcds = hcds;
117 this.tableLockManager = tableLockManager;
118 this.allFiles = allFiles;
119 this.pool = pool;
120 }
121
122 @Override
123 public void run() {
124 try {
125 for (HColumnDescriptor hcd : hcds) {
126 MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
127 allFiles);
128 }
129 } catch (IOException e) {
130 LOG.error("Failed to perform the mob compaction", e);
131 } finally {
132 try {
133 master.reportMobCompactionEnd(tableName);
134 } catch (IOException e) {
135 LOG.error("Failed to mark end of mob compation", e);
136 }
137 }
138 }
139 }
140
141
142
143
144 private void interruptIfNecessary() {
145 mobCompactorPool.shutdown();
146 masterMobPool.shutdown();
147 }
148
149
150
151
152 private void join() {
153 waitFor(mobCompactorPool, "Mob Compaction Thread");
154 waitFor(masterMobPool, "Region Server Mob Compaction Thread");
155 }
156
157
158
159
160 public void close() {
161 interruptIfNecessary();
162 join();
163 }
164
165
166
167
168
169
170 private void waitFor(ExecutorService t, String name) {
171 boolean done = false;
172 while (!done) {
173 try {
174 done = t.awaitTermination(60, TimeUnit.SECONDS);
175 LOG.info("Waiting for " + name + " to finish...");
176 if (!done) {
177 t.shutdownNow();
178 }
179 } catch (InterruptedException ie) {
180 LOG.warn("Interrupted waiting for " + name + " to finish...");
181 }
182 }
183 }
184 }