View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.lang.reflect.Method;
21  import java.util.HashMap;
22  import java.util.Map;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.HConstants;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.ipc.PriorityFunction;
31  import org.apache.hadoop.hbase.ipc.QosPriority;
32  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
33  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
34  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
35  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
36  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
37  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
38  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
39  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
40  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
41  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
42  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
43  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
44  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
45  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
46  import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  import com.google.protobuf.Message;
50  import com.google.protobuf.TextFormat;
51  
52  
53  /**
54   * Reads special method annotations and table names to figure a priority for use by QoS facility in
55   * ipc; e.g: rpcs to hbase:meta get priority.
56   */
57  // TODO: Remove.  This is doing way too much work just to figure a priority.  Do as Elliott
58  // suggests and just have the client specify a priority.
59  
60  //The logic for figuring out high priority RPCs is as follows:
61  //1. if the method is annotated with a QosPriority of QOS_HIGH,
62  //   that is honored
63  //2. parse out the protobuf message and see if the request is for meta
64  //   region, and if so, treat it as a high priority RPC
65  //Some optimizations for (2) are done here -
66  //Clients send the argument classname as part of making the RPC. The server
67  //decides whether to deserialize the proto argument message based on the
68  //pre-established set of argument classes (knownArgumentClasses below).
69  //This prevents the server from having to deserialize all proto argument
70  //messages prematurely.
71  //All the argument classes declare a 'getRegion' method that returns a
72  //RegionSpecifier object. Methods can be invoked on the returned object
73  //to figure out whether it is a meta region or not.
74  @InterfaceAudience.Private
75  class AnnotationReadingPriorityFunction implements PriorityFunction {
76    public static final Log LOG =
77      LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
78  
79    /** Used to control the scan delay, currently sqrt(numNextCall * weight) */
80    public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
81  
82    private final Map<String, Integer> annotatedQos;
83    //We need to mock the regionserver instance for some unit tests (set via
84    //setRegionServer method.
85    private RSRpcServices rpcServices;
86    @SuppressWarnings("unchecked")
87    private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
88        GetRegionInfoRequest.class,
89        GetStoreFileRequest.class,
90        CloseRegionRequest.class,
91        FlushRegionRequest.class,
92        SplitRegionRequest.class,
93        CompactRegionRequest.class,
94        GetRequest.class,
95        MutateRequest.class,
96        ScanRequest.class
97    };
98  
99    // Some caches for helping performance
100   private final Map<String, Class<? extends Message>> argumentToClassMap =
101     new HashMap<String, Class<? extends Message>>();
102   private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
103     new HashMap<String, Map<Class<? extends Message>, Method>>();
104 
105   private final float scanVirtualTimeWeight;
106 
107   /**
108    * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
109    * {@code rpcServices#getClass()}
110    *
111    * @param rpcServices
112    *          The RPC server implementation
113    */
114   AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
115     this(rpcServices, rpcServices.getClass());
116   }
117 
118   /**
119    * Constructs the priority function given the RPC server implementation and the annotations on the
120    * methods in the provided {@code clz}.
121    *
122    * @param rpcServices
123    *          The RPC server implementation
124    * @param clz
125    *          The concrete RPC server implementation's class
126    */
127   AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
128       Class<? extends RSRpcServices> clz) {
129     Map<String,Integer> qosMap = new HashMap<String,Integer>();
130     for (Method m : clz.getMethods()) {
131       QosPriority p = m.getAnnotation(QosPriority.class);
132       if (p != null) {
133         // Since we protobuf'd, and then subsequently, when we went with pb style, method names
134         // are capitalized.  This meant that this brittle compare of method names gotten by
135         // reflection no longer matched the method names coming in over pb.  TODO: Get rid of this
136         // check.  For now, workaround is to capitalize the names we got from reflection so they
137         // have chance of matching the pb ones.
138         String capitalizedMethodName = capitalize(m.getName());
139         qosMap.put(capitalizedMethodName, p.priority());
140       }
141     }
142     this.rpcServices = rpcServices;
143     this.annotatedQos = qosMap;
144     if (methodMap.get("getRegion") == null) {
145       methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
146       methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
147     }
148     for (Class<? extends Message> cls : knownArgumentClasses) {
149       argumentToClassMap.put(cls.getName(), cls);
150       try {
151         methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
152         methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
153       } catch (Exception e) {
154         throw new RuntimeException(e);
155       }
156     }
157 
158     Configuration conf = rpcServices.getConfiguration();
159     scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
160   }
161 
162   private String capitalize(final String s) {
163     StringBuilder strBuilder = new StringBuilder(s);
164     strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
165     return strBuilder.toString();
166   }
167 
168   /**
169    * Returns a 'priority' based on the request type.
170    *
171    * Currently the returned priority is used for queue selection.
172    * See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
173    * HIGH_QOS (meta requests), REPLICATION_QOS (replication requests),
174    * NORMAL_QOS (user requests).
175    */
176   @Override
177   public int getPriority(RequestHeader header, Message param) {
178     String methodName = header.getMethodName();
179     Integer priorityByAnnotation = annotatedQos.get(methodName);
180     if (priorityByAnnotation != null) {
181       return priorityByAnnotation;
182     }
183     if (param == null) {
184       return HConstants.NORMAL_QOS;
185     }
186 
187     // Trust the client-set priorities if set
188     if (header.hasPriority()) {
189       return header.getPriority();
190     }
191     String cls = param.getClass().getName();
192     Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
193     RegionSpecifier regionSpecifier = null;
194     //check whether the request has reference to meta region or now.
195     try {
196       // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
197       // hasRegion returns true.  Not all listed methods have region specifier each time.  For
198       // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
199       // send the region over every time.
200       Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
201       if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
202         Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
203         regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
204         Region region = rpcServices.getRegion(regionSpecifier);
205         if (region.getRegionInfo().isSystemTable()) {
206           if (LOG.isTraceEnabled()) {
207             LOG.trace("High priority because region=" +
208               region.getRegionInfo().getRegionNameAsString());
209           }
210           return HConstants.SYSTEMTABLE_QOS;
211         }
212       }
213     } catch (Exception ex) {
214       // Not good throwing an exception out of here, a runtime anyways.  Let the query go into the
215       // server and have it throw the exception if still an issue.  Just mark it normal priority.
216       if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
217       return HConstants.NORMAL_QOS;
218     }
219 
220     if (param instanceof ScanRequest) { // scanner methods...
221       ScanRequest request = (ScanRequest)param;
222       if (!request.hasScannerId()) {
223         return HConstants.NORMAL_QOS;
224       }
225       RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
226       if (scanner != null && scanner.getRegionInfo().isSystemTable()) {
227         if (LOG.isTraceEnabled()) {
228           // Scanner requests are small in size so TextFormat version should not overwhelm log.
229           LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
230         }
231         return HConstants.SYSTEMTABLE_QOS;
232       }
233     }
234 
235     // If meta is moving then all the rest of report the report state transitions will be
236     // blocked. We shouldn't be in the same queue.
237     if (param instanceof ReportRegionStateTransitionRequest) { // Regions are moving
238       ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
239       for (RegionStateTransition transition : tRequest.getTransitionList()) {
240         if (transition.getRegionInfoList() != null) {
241           for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) {
242             TableName tn = ProtobufUtil.toTableName(info.getTableName());
243             if (tn.isSystemTable()) {
244               return HConstants.SYSTEMTABLE_QOS;
245             }
246           }
247         }
248       }
249     }
250     return HConstants.NORMAL_QOS;
251   }
252 
253   /**
254    * Based on the request content, returns the deadline of the request.
255    *
256    * @param header
257    * @param param
258    * @return Deadline of this request. 0 now, otherwise msec of 'delay'
259    */
260   @Override
261   public long getDeadline(RequestHeader header, Message param) {
262     if (param instanceof ScanRequest) {
263       ScanRequest request = (ScanRequest)param;
264       if (!request.hasScannerId()) {
265         return 0;
266       }
267 
268       // get the 'virtual time' of the scanner, and applies sqrt() to get a
269       // nice curve for the delay. More a scanner is used the less priority it gets.
270       // The weight is used to have more control on the delay.
271       long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
272       return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
273     }
274     return 0;
275   }
276 
277   @VisibleForTesting
278   void setRegionServer(final HRegionServer hrs) {
279     this.rpcServices = hrs.getRSRpcServices();
280   }
281 }