View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.SynchronousQueue;
26  import java.util.concurrent.ThreadFactory;
27  import java.util.concurrent.ThreadPoolExecutor;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.mob.MobUtils;
38  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
39  
40  /**
41   * The mob compaction thread used in {@link MasterRpcServices}
42   */
43  @InterfaceAudience.Private
44  public class MasterMobCompactionThread {
45    static final Log LOG = LogFactory.getLog(MasterMobCompactionThread.class);
46    private final HMaster master;
47    private final Configuration conf;
48    private final ExecutorService mobCompactorPool;
49    private final ExecutorService masterMobPool;
50  
51    public MasterMobCompactionThread(HMaster master) {
52      this.master = master;
53      this.conf = master.getConfiguration();
54      final String n = Thread.currentThread().getName();
55      // this pool is used to run the mob compaction
56      this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
57        new SynchronousQueue<Runnable>(), new ThreadFactory() {
58          @Override
59          public Thread newThread(Runnable r) {
60            Thread t = new Thread(r);
61            t.setName(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime());
62            return t;
63          }
64        });
65      ((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
66      // this pool is used in the mob compaction to compact the mob files by partitions
67      // in parallel
68      this.mobCompactorPool = MobUtils
69        .createMobCompactorThreadPool(master.getConfiguration());
70    }
71  
72    /**
73     * Requests mob compaction
74     * @param conf The Configuration
75     * @param fs The file system
76     * @param tableName The table the compact
77     * @param columns The column descriptors
78     * @param tableLockManager The tableLock manager
79     * @param allFiles Whether add all mob files into the compaction.
80     */
81    public void requestMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
82      List<HColumnDescriptor> columns, TableLockManager tableLockManager, boolean allFiles)
83      throws IOException {
84      master.reportMobCompactionStart(tableName);
85      try {
86        masterMobPool.execute(new CompactionRunner(fs, tableName, columns, tableLockManager,
87          allFiles, mobCompactorPool));
88      } catch (RejectedExecutionException e) {
89        // in case the request is rejected by the pool
90        try {
91          master.reportMobCompactionEnd(tableName);
92        } catch (IOException e1) {
93          LOG.error("Failed to mark end of mob compation", e1);
94        }
95        throw e;
96      }
97      if (LOG.isDebugEnabled()) {
98        LOG.debug("The mob compaction is requested for the columns " + columns
99          + " of the table " + tableName.getNameAsString());
100     }
101   }
102 
103   private class CompactionRunner implements Runnable {
104     private FileSystem fs;
105     private TableName tableName;
106     private List<HColumnDescriptor> hcds;
107     private TableLockManager tableLockManager;
108     private boolean allFiles;
109     private ExecutorService pool;
110 
111     public CompactionRunner(FileSystem fs, TableName tableName, List<HColumnDescriptor> hcds,
112       TableLockManager tableLockManager, boolean allFiles, ExecutorService pool) {
113       super();
114       this.fs = fs;
115       this.tableName = tableName;
116       this.hcds = hcds;
117       this.tableLockManager = tableLockManager;
118       this.allFiles = allFiles;
119       this.pool = pool;
120     }
121 
122     @Override
123     public void run() {
124       try {
125         for (HColumnDescriptor hcd : hcds) {
126           MobUtils.doMobCompaction(conf, fs, tableName, hcd, pool, tableLockManager,
127             allFiles);
128         }
129       } catch (IOException e) {
130         LOG.error("Failed to perform the mob compaction", e);
131       } finally {
132         try {
133           master.reportMobCompactionEnd(tableName);
134         } catch (IOException e) {
135           LOG.error("Failed to mark end of mob compation", e);
136         }
137       }
138     }
139   }
140 
141   /**
142    * Only interrupt once it's done with a run through the work loop.
143    */
144   private void interruptIfNecessary() {
145     mobCompactorPool.shutdown();
146     masterMobPool.shutdown();
147   }
148 
149   /**
150    * Wait for all the threads finish.
151    */
152   private void join() {
153     waitFor(mobCompactorPool, "Mob Compaction Thread");
154     waitFor(masterMobPool, "Region Server Mob Compaction Thread");
155   }
156 
157   /**
158    * Closes the MasterMobCompactionThread.
159    */
160   public void close() {
161     interruptIfNecessary();
162     join();
163   }
164 
165   /**
166    * Wait for thread finish.
167    * @param t the thread to wait
168    * @param name the thread name.
169    */
170   private void waitFor(ExecutorService t, String name) {
171     boolean done = false;
172     while (!done) {
173       try {
174         done = t.awaitTermination(60, TimeUnit.SECONDS);
175         LOG.info("Waiting for " + name + " to finish...");
176         if (!done) {
177           t.shutdownNow();
178         }
179       } catch (InterruptedException ie) {
180         LOG.warn("Interrupted waiting for " + name + " to finish...");
181       }
182     }
183   }
184 }