1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.lang.reflect.Method;
21 import java.util.HashMap;
22 import java.util.Map;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.ipc.PriorityFunction;
31 import org.apache.hadoop.hbase.ipc.QosPriority;
32 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
33 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
34 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
35 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
36 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
38 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
40 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
41 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
42 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
43 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
44 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
45 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
46 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
47
48 import com.google.common.annotations.VisibleForTesting;
49 import com.google.protobuf.Message;
50 import com.google.protobuf.TextFormat;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 @InterfaceAudience.Private
75 class AnnotationReadingPriorityFunction implements PriorityFunction {
76 public static final Log LOG =
77 LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
78
79
80 public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
81
82 private final Map<String, Integer> annotatedQos;
83
84
85 private RSRpcServices rpcServices;
86 @SuppressWarnings("unchecked")
87 private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
88 GetRegionInfoRequest.class,
89 GetStoreFileRequest.class,
90 CloseRegionRequest.class,
91 FlushRegionRequest.class,
92 SplitRegionRequest.class,
93 CompactRegionRequest.class,
94 GetRequest.class,
95 MutateRequest.class,
96 ScanRequest.class
97 };
98
99
100 private final Map<String, Class<? extends Message>> argumentToClassMap =
101 new HashMap<String, Class<? extends Message>>();
102 private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
103 new HashMap<String, Map<Class<? extends Message>, Method>>();
104
105 private final float scanVirtualTimeWeight;
106
107
108
109
110
111
112
113
114 AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
115 this(rpcServices, rpcServices.getClass());
116 }
117
118
119
120
121
122
123
124
125
126
127 AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
128 Class<? extends RSRpcServices> clz) {
129 Map<String,Integer> qosMap = new HashMap<String,Integer>();
130 for (Method m : clz.getMethods()) {
131 QosPriority p = m.getAnnotation(QosPriority.class);
132 if (p != null) {
133
134
135
136
137
138 String capitalizedMethodName = capitalize(m.getName());
139 qosMap.put(capitalizedMethodName, p.priority());
140 }
141 }
142 this.rpcServices = rpcServices;
143 this.annotatedQos = qosMap;
144 if (methodMap.get("getRegion") == null) {
145 methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
146 methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
147 }
148 for (Class<? extends Message> cls : knownArgumentClasses) {
149 argumentToClassMap.put(cls.getName(), cls);
150 try {
151 methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
152 methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
153 } catch (Exception e) {
154 throw new RuntimeException(e);
155 }
156 }
157
158 Configuration conf = rpcServices.getConfiguration();
159 scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
160 }
161
162 private String capitalize(final String s) {
163 StringBuilder strBuilder = new StringBuilder(s);
164 strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
165 return strBuilder.toString();
166 }
167
168
169
170
171
172
173
174
175
176 @Override
177 public int getPriority(RequestHeader header, Message param) {
178 String methodName = header.getMethodName();
179 Integer priorityByAnnotation = annotatedQos.get(methodName);
180 if (priorityByAnnotation != null) {
181 return priorityByAnnotation;
182 }
183 if (param == null) {
184 return HConstants.NORMAL_QOS;
185 }
186
187
188 if (header.hasPriority()) {
189 return header.getPriority();
190 }
191 String cls = param.getClass().getName();
192 Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
193 RegionSpecifier regionSpecifier = null;
194
195 try {
196
197
198
199
200 Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
201 if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
202 Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
203 regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
204 Region region = rpcServices.getRegion(regionSpecifier);
205 if (region.getRegionInfo().isSystemTable()) {
206 if (LOG.isTraceEnabled()) {
207 LOG.trace("High priority because region=" +
208 region.getRegionInfo().getRegionNameAsString());
209 }
210 return HConstants.SYSTEMTABLE_QOS;
211 }
212 }
213 } catch (Exception ex) {
214
215
216 if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
217 return HConstants.NORMAL_QOS;
218 }
219
220 if (param instanceof ScanRequest) {
221 ScanRequest request = (ScanRequest)param;
222 if (!request.hasScannerId()) {
223 return HConstants.NORMAL_QOS;
224 }
225 RegionScanner scanner = rpcServices.getScanner(request.getScannerId());
226 if (scanner != null && scanner.getRegionInfo().isSystemTable()) {
227 if (LOG.isTraceEnabled()) {
228
229 LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
230 }
231 return HConstants.SYSTEMTABLE_QOS;
232 }
233 }
234
235
236
237 if (param instanceof ReportRegionStateTransitionRequest) {
238 ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
239 for (RegionStateTransition transition : tRequest.getTransitionList()) {
240 if (transition.getRegionInfoList() != null) {
241 for (HBaseProtos.RegionInfo info : transition.getRegionInfoList()) {
242 TableName tn = ProtobufUtil.toTableName(info.getTableName());
243 if (tn.isSystemTable()) {
244 return HConstants.SYSTEMTABLE_QOS;
245 }
246 }
247 }
248 }
249 }
250 return HConstants.NORMAL_QOS;
251 }
252
253
254
255
256
257
258
259
260 @Override
261 public long getDeadline(RequestHeader header, Message param) {
262 if (param instanceof ScanRequest) {
263 ScanRequest request = (ScanRequest)param;
264 if (!request.hasScannerId()) {
265 return 0;
266 }
267
268
269
270
271 long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
272 return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
273 }
274 return 0;
275 }
276
277 @VisibleForTesting
278 void setRegionServer(final HRegionServer hrs) {
279 this.rpcServices = hrs.getRSRpcServices();
280 }
281 }