View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.InetSocketAddress;
25  import java.net.UnknownHostException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Set;
34  import java.util.TreeSet;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.atomic.AtomicLong;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.CellScannable;
44  import org.apache.hadoop.hbase.CellScanner;
45  import org.apache.hadoop.hbase.CellUtil;
46  import org.apache.hadoop.hbase.DoNotRetryIOException;
47  import org.apache.hadoop.hbase.DroppedSnapshotException;
48  import org.apache.hadoop.hbase.HBaseIOException;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.NotServingRegionException;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.TableName;
56  import org.apache.hadoop.hbase.UnknownScannerException;
57  import org.apache.hadoop.hbase.classification.InterfaceAudience;
58  import org.apache.hadoop.hbase.client.Append;
59  import org.apache.hadoop.hbase.client.ConnectionUtils;
60  import org.apache.hadoop.hbase.client.Delete;
61  import org.apache.hadoop.hbase.client.Durability;
62  import org.apache.hadoop.hbase.client.Get;
63  import org.apache.hadoop.hbase.client.Increment;
64  import org.apache.hadoop.hbase.client.Mutation;
65  import org.apache.hadoop.hbase.client.Put;
66  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
67  import org.apache.hadoop.hbase.client.Result;
68  import org.apache.hadoop.hbase.client.RowMutations;
69  import org.apache.hadoop.hbase.client.Scan;
70  import org.apache.hadoop.hbase.client.VersionInfoUtil;
71  import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
72  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
73  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
74  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
75  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
76  import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
77  import org.apache.hadoop.hbase.exceptions.ScannerResetException;
78  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
81  import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
82  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
83  import org.apache.hadoop.hbase.ipc.PriorityFunction;
84  import org.apache.hadoop.hbase.ipc.QosPriority;
85  import org.apache.hadoop.hbase.ipc.RpcCallContext;
86  import org.apache.hadoop.hbase.ipc.RpcServer;
87  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
88  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
89  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
90  import org.apache.hadoop.hbase.ipc.ServerRpcController;
91  import org.apache.hadoop.hbase.master.MasterRpcServices;
92  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93  import org.apache.hadoop.hbase.protobuf.RequestConverter;
94  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
95  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
96  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
97  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
98  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
99  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
156 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
159 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
166 import org.apache.hadoop.hbase.quotas.OperationQuota;
167 import org.apache.hadoop.hbase.quotas.QuotaUtil;
168 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
169 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
170 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
171 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
172 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
173 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
174 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
175 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
176 import org.apache.hadoop.hbase.regionserver.Region.Operation;
177 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
178 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
179 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
180 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
181 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
182 import org.apache.hadoop.hbase.security.Superusers;
183 import org.apache.hadoop.hbase.security.User;
184 import org.apache.hadoop.hbase.util.Bytes;
185 import org.apache.hadoop.hbase.util.Counter;
186 import org.apache.hadoop.hbase.util.DNS;
187 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
188 import org.apache.hadoop.hbase.util.Pair;
189 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
190 import org.apache.hadoop.hbase.util.Strings;
191 import org.apache.hadoop.hbase.wal.WAL;
192 import org.apache.hadoop.hbase.wal.WALKey;
193 import org.apache.hadoop.hbase.wal.WALSplitter;
194 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
195 import org.apache.zookeeper.KeeperException;
196 
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.protobuf.ByteString;
199 import com.google.protobuf.Message;
200 import com.google.protobuf.RpcController;
201 import com.google.protobuf.ServiceException;
202 import com.google.protobuf.TextFormat;
203 
204 /**
205  * Implements the regionserver RPC services.
206  */
207 @InterfaceAudience.Private
208 @SuppressWarnings("deprecation")
209 public class RSRpcServices implements HBaseRPCErrorHandler,
210     AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction {
211   protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
212 
213   /** RPC scheduler to use for the region server. */
214   public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
215     "hbase.region.server.rpc.scheduler.factory.class";
216 
217   /**
218    * Minimum allowable time limit delta (in milliseconds) that can be enforced during scans. This
219    * configuration exists to prevent the scenario where a time limit is specified to be so
220    * restrictive that the time limit is reached immediately (before any cells are scanned).
221    */
222   private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
223       "hbase.region.server.rpc.minimum.scan.time.limit.delta";
224   /**
225    * Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
226    */
227   private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
228 
229   // Request counter. (Includes requests that are not serviced by regions.)
230   final Counter requestCount = new Counter();
231 
232   // Request counter for rpc get
233   final Counter rpcGetRequestCount = new Counter();
234 
235   // Request counter for rpc scan
236   final Counter rpcScanRequestCount = new Counter();
237 
238   // Request counter for rpc multi
239   final Counter rpcMultiRequestCount = new Counter();
240 
241   // Request counter for rpc mutate
242   final Counter rpcMutateRequestCount = new Counter();
243 
244   // Server to handle client requests.
245   final RpcServerInterface rpcServer;
246   final InetSocketAddress isa;
247 
248   private final HRegionServer regionServer;
249   private final long maxScannerResultSize;
250 
251   // The reference to the priority extraction function
252   private final PriorityFunction priority;
253 
254   private final AtomicLong scannerIdGen = new AtomicLong(0L);
255   private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
256     new ConcurrentHashMap<String, RegionScannerHolder>();
257 
258   /**
259    * The lease timeout period for client scanners (milliseconds).
260    */
261   private final int scannerLeaseTimeoutPeriod;
262 
263   /**
264    * The RPC timeout period (milliseconds)
265    */
266   private final int rpcTimeout;
267 
268   /**
269    * The minimum allowable delta to use for the scan limit
270    */
271   private final long minimumScanTimeLimitDelta;
272 
273   /**
274    * Holder class which holds the RegionScanner and nextCallSeq together.
275    */
276   private static class RegionScannerHolder {
277     private AtomicLong nextCallSeq = new AtomicLong(0);
278     private RegionScanner s;
279     private Region r;
280 
281     public RegionScannerHolder(RegionScanner s, Region r) {
282       this.s = s;
283       this.r = r;
284     }
285 
286     private long getNextCallSeq() {
287       return nextCallSeq.get();
288     }
289 
290     private void incNextCallSeq() {
291       nextCallSeq.incrementAndGet();
292     }
293 
294     private void rollbackNextCallSeq() {
295       nextCallSeq.decrementAndGet();
296     }
297   }
298 
299   /**
300    * Instantiated as a scanner lease. If the lease times out, the scanner is
301    * closed
302    */
303   private class ScannerListener implements LeaseListener {
304     private final String scannerName;
305 
306     ScannerListener(final String n) {
307       this.scannerName = n;
308     }
309 
310     @Override
311     public void leaseExpired() {
312       RegionScannerHolder rsh = scanners.remove(this.scannerName);
313       if (rsh != null) {
314         RegionScanner s = rsh.s;
315         LOG.info("Scanner " + this.scannerName + " lease expired on region "
316           + s.getRegionInfo().getRegionNameAsString());
317         Region region = null;
318         try {
319           region = regionServer.getRegion(s.getRegionInfo().getRegionName());
320           if (region != null && region.getCoprocessorHost() != null) {
321             region.getCoprocessorHost().preScannerClose(s);
322           }
323 
324           s.close();
325           if (region != null && region.getCoprocessorHost() != null) {
326             region.getCoprocessorHost().postScannerClose(s);
327           }
328         } catch (IOException e) {
329           LOG.error("Closing scanner for "
330               + s.getRegionInfo().getRegionNameAsString(), e);
331         } finally {
332           try {
333             s.close();
334             if (region != null && region.getCoprocessorHost() != null) {
335               region.getCoprocessorHost().postScannerClose(s);
336             }
337           } catch (IOException e) {
338             LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
339           }
340         }
341       } else {
342         LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
343           " scanner found, hence no chance to close that related scanner!");
344       }
345     }
346   }
347 
348   private static ResultOrException getResultOrException(
349       final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
350     return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
351   }
352 
353   private static ResultOrException getResultOrException(final Exception e, final int index) {
354     return getResultOrException(ResponseConverter.buildActionResult(e), index);
355   }
356 
357   private static ResultOrException getResultOrException(
358       final ResultOrException.Builder builder, final int index) {
359     return builder.setIndex(index).build();
360   }
361 
362   /**
363    * Starts the nonce operation for a mutation, if needed.
364    * @param mutation Mutation.
365    * @param nonceGroup Nonce group from the request.
366    * @returns Nonce used (can be NO_NONCE).
367    */
368   private long startNonceOperation(final MutationProto mutation, long nonceGroup)
369       throws IOException, OperationConflictException {
370     if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
371     boolean canProceed = false;
372     try {
373       canProceed = regionServer.nonceManager.startOperation(
374         nonceGroup, mutation.getNonce(), regionServer);
375     } catch (InterruptedException ex) {
376       throw new InterruptedIOException("Nonce start operation interrupted");
377     }
378     if (!canProceed) {
379       // TODO: instead, we could convert append/increment to get w/mvcc
380       String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
381         + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
382         + "] may have already completed";
383       throw new OperationConflictException(message);
384     }
385     return mutation.getNonce();
386   }
387 
388   /**
389    * Ends nonce operation for a mutation, if needed.
390    * @param mutation Mutation.
391    * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
392    * @param success Whether the operation for this nonce has succeeded.
393    */
394   private void endNonceOperation(final MutationProto mutation,
395       long nonceGroup, boolean success) {
396     if (regionServer.nonceManager != null && mutation.hasNonce()) {
397       regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
398     }
399   }
400 
401   /**
402    * @return True if current call supports cellblocks
403    */
404   private boolean isClientCellBlockSupport() {
405     RpcCallContext context = RpcServer.getCurrentCall();
406     return context != null && context.isClientCellBlockSupport();
407   }
408 
409   private void addResult(final MutateResponse.Builder builder,
410       final Result result, final PayloadCarryingRpcController rpcc) {
411     if (result == null) return;
412     if (isClientCellBlockSupport()) {
413       builder.setResult(ProtobufUtil.toResultNoData(result));
414       rpcc.setCellScanner(result.cellScanner());
415     } else {
416       ClientProtos.Result pbr = ProtobufUtil.toResult(result);
417       builder.setResult(pbr);
418     }
419   }
420 
421   private void addResults(final ScanResponse.Builder builder, final List<Result> results,
422       final RpcController controller, boolean isDefaultRegion) {
423     builder.setStale(!isDefaultRegion);
424     if (results == null || results.isEmpty()) return;
425     if (isClientCellBlockSupport()) {
426       for (Result res : results) {
427         builder.addCellsPerResult(res.size());
428         builder.addPartialFlagPerResult(res.isPartial());
429       }
430       ((PayloadCarryingRpcController)controller).
431         setCellScanner(CellUtil.createCellScanner(results));
432     } else {
433       for (Result res: results) {
434         ClientProtos.Result pbr = ProtobufUtil.toResult(res);
435         builder.addResults(pbr);
436       }
437     }
438   }
439 
440   /**
441    * Mutate a list of rows atomically.
442    *
443    * @param region
444    * @param actions
445    * @param cellScanner if non-null, the mutation data -- the Cell content.
446    * @throws IOException
447    */
448   private ClientProtos.RegionLoadStats mutateRows(final Region region,
449       final List<ClientProtos.Action> actions,
450       final CellScanner cellScanner) throws IOException {
451     if (!region.getRegionInfo().isMetaTable()) {
452       regionServer.cacheFlusher.reclaimMemStoreMemory();
453     }
454     RowMutations rm = null;
455     for (ClientProtos.Action action: actions) {
456       if (action.hasGet()) {
457         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
458           action.getGet());
459       }
460       MutationType type = action.getMutation().getMutateType();
461       if (rm == null) {
462         rm = new RowMutations(action.getMutation().getRow().toByteArray());
463       }
464       switch (type) {
465       case PUT:
466         rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
467         break;
468       case DELETE:
469         rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
470         break;
471       default:
472           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
473       }
474     }
475     region.mutateRow(rm);
476     return ((HRegion)region).getRegionStats();
477   }
478 
479   /**
480    * Mutate a list of rows atomically.
481    *
482    * @param region
483    * @param actions
484    * @param cellScanner if non-null, the mutation data -- the Cell content.
485    * @param row
486    * @param family
487    * @param qualifier
488    * @param compareOp
489    * @param comparator @throws IOException
490    */
491   private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
492       final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
493       CompareOp compareOp, ByteArrayComparable comparator,
494       ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
495     if (!region.getRegionInfo().isMetaTable()) {
496       regionServer.cacheFlusher.reclaimMemStoreMemory();
497     }
498     RowMutations rm = null;
499     for (ClientProtos.Action action: actions) {
500       if (action.hasGet()) {
501         throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
502             action.getGet());
503       }
504       MutationType type = action.getMutation().getMutateType();
505       if (rm == null) {
506         rm = new RowMutations(action.getMutation().getRow().toByteArray());
507       }
508       switch (type) {
509         case PUT:
510           Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
511           spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
512           rm.add(put);
513           break;
514         case DELETE:
515           Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
516           spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
517           rm.add(del);
518           break;
519         default:
520           throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
521       }
522     }
523     return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
524   }
525 
526   /**
527    * Execute an append mutation.
528    *
529    * @param region
530    * @param m
531    * @param cellScanner
532    * @return result to return to client if default operation should be
533    * bypassed as indicated by RegionObserver, null otherwise
534    * @throws IOException
535    */
536   private Result append(final Region region, final OperationQuota quota,
537       final MutationProto m, final CellScanner cellScanner, long nonceGroup,
538       ActivePolicyEnforcement spaceQuota)
539       throws IOException {
540     long before = EnvironmentEdgeManager.currentTime();
541     Append append = ProtobufUtil.toAppend(m, cellScanner);
542     spaceQuota.getPolicyEnforcement(region).check(append);
543     quota.addMutation(append);
544     Result r = null;
545     if (region.getCoprocessorHost() != null) {
546       r = region.getCoprocessorHost().preAppend(append);
547     }
548     if (r == null) {
549       long nonce = startNonceOperation(m, nonceGroup);
550       boolean success = false;
551       try {
552         r = region.append(append, nonceGroup, nonce);
553         success = true;
554       } finally {
555         endNonceOperation(m, nonceGroup, success);
556       }
557       if (region.getCoprocessorHost() != null) {
558         region.getCoprocessorHost().postAppend(append, r);
559       }
560     }
561     if (regionServer.metricsRegionServer != null) {
562       regionServer.metricsRegionServer.updateAppend(
563           region.getTableDesc().getTableName(),
564         EnvironmentEdgeManager.currentTime() - before);
565     }
566     return r;
567   }
568 
569   /**
570    * Execute an increment mutation.
571    *
572    * @param region
573    * @param mutation
574    * @return the Result
575    * @throws IOException
576    */
577   private Result increment(final Region region, final OperationQuota quota,
578       final MutationProto mutation, final CellScanner cells, long nonceGroup,
579       ActivePolicyEnforcement spaceQuota) throws IOException {
580     long before = EnvironmentEdgeManager.currentTime();
581     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
582     spaceQuota.getPolicyEnforcement(region).check(increment);
583     quota.addMutation(increment);
584     Result r = null;
585     if (region.getCoprocessorHost() != null) {
586       r = region.getCoprocessorHost().preIncrement(increment);
587     }
588     if (r == null) {
589       long nonce = startNonceOperation(mutation, nonceGroup);
590       boolean success = false;
591       try {
592         r = region.increment(increment, nonceGroup, nonce);
593         success = true;
594       } finally {
595         endNonceOperation(mutation, nonceGroup, success);
596       }
597       if (region.getCoprocessorHost() != null) {
598         r = region.getCoprocessorHost().postIncrement(increment, r);
599       }
600     }
601     if (regionServer.metricsRegionServer != null) {
602       regionServer.metricsRegionServer.updateIncrement(
603           region.getTableDesc().getTableName(),
604           EnvironmentEdgeManager.currentTime() - before);
605     }
606     return r;
607   }
608 
609   /**
610    * Run through the regionMutation <code>rm</code> and per Mutation, do the work, and then when
611    * done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
612    * @param region
613    * @param actions
614    * @param cellScanner
615    * @param builder
616    * @param cellsToReturn  Could be null. May be allocated in this method.  This is what this
617    * method returns as a 'result'.
618    * @return Return the <code>cellScanner</code> passed
619    */
620   private List<CellScannable> doNonAtomicRegionMutation(final Region region,
621       final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
622       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
623       ActivePolicyEnforcement spaceQuotaEnforcement) {
624     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
625     // one at a time, we instead pass them in batch.  Be aware that the corresponding
626     // ResultOrException instance that matches each Put or Delete is then added down in the
627     // doBatchOp call.  We should be staying aligned though the Put and Delete are deferred/batched
628     List<ClientProtos.Action> mutations = null;
629     ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
630     boolean hasResultOrException = false;
631 
632     for (ClientProtos.Action action: actions.getActionList()) {
633       hasResultOrException = false;
634       resultOrExceptionBuilder.clear();
635       try {
636         Result r = null;
637         if (action.hasGet()) {
638           long before = EnvironmentEdgeManager.currentTime();
639           try {
640             Get get = ProtobufUtil.toGet(action.getGet());
641             r = region.get(get);
642           } finally {
643             if (regionServer.metricsRegionServer != null) {
644               regionServer.metricsRegionServer.updateGet(
645                   region.getTableDesc().getTableName(),
646                   EnvironmentEdgeManager.currentTime() - before);
647             }
648           }
649         } else if (action.hasServiceCall()) {
650           hasResultOrException = true;
651           try {
652             Message result = execServiceOnRegion(region, action.getServiceCall());
653             ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
654                 ClientProtos.CoprocessorServiceResult.newBuilder();
655             resultOrExceptionBuilder.setServiceResult(
656                 serviceResultBuilder.setValue(
657                   serviceResultBuilder.getValueBuilder()
658                     .setName(result.getClass().getName())
659                     .setValue(result.toByteString())));
660           } catch (IOException ioe) {
661             rpcServer.getMetrics().exception(ioe);
662             resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
663           }
664         } else if (action.hasMutation()) {
665           MutationType type = action.getMutation().getMutateType();
666           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
667               !mutations.isEmpty()) {
668             // Flush out any Puts or Deletes already collected.
669             doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
670             mutations.clear();
671           }
672           switch (type) {
673             case APPEND:
674               r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
675                   spaceQuotaEnforcement);
676               break;
677             case INCREMENT:
678               r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
679                   spaceQuotaEnforcement);
680               break;
681             case PUT:
682             case DELETE:
683               // Collect the individual mutations and apply in a batch
684               if (mutations == null) {
685                 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
686               }
687               mutations.add(action);
688               break;
689             default:
690               throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
691           }
692         } else {
693           throw new HBaseIOException("Unexpected Action type");
694         }
695         if (r != null) {
696           ClientProtos.Result pbResult = null;
697           if (isClientCellBlockSupport()) {
698             pbResult = ProtobufUtil.toResultNoData(r);
699             //  Hard to guess the size here.  Just make a rough guess.
700             if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
701             cellsToReturn.add(r);
702           } else {
703             pbResult = ProtobufUtil.toResult(r);
704           }
705           hasResultOrException = true;
706           resultOrExceptionBuilder.setResult(pbResult);
707         }
708         // Could get to here and there was no result and no exception.  Presumes we added
709         // a Put or Delete to the collecting Mutations List for adding later.  In this
710         // case the corresponding ResultOrException instance for the Put or Delete will be added
711         // down in the doBatchOp method call rather than up here.
712       } catch (IOException ie) {
713         rpcServer.getMetrics().exception(ie);
714         hasResultOrException = true;
715         resultOrExceptionBuilder.setException(ResponseConverter.buildException(ie));
716       }
717       if (hasResultOrException) {
718         // Propagate index.
719         resultOrExceptionBuilder.setIndex(action.getIndex());
720         builder.addResultOrException(resultOrExceptionBuilder.build());
721       }
722     }
723     // Finish up any outstanding mutations
724     if (mutations != null && !mutations.isEmpty()) {
725       doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
726     }
727     return cellsToReturn;
728   }
729 
730   /**
731    * Execute a list of Put/Delete mutations.
732    *
733    * @param builder
734    * @param region
735    * @param mutations
736    */
737   private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
738       final OperationQuota quota, final List<ClientProtos.Action> mutations,
739       final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
740     Mutation[] mArray = new Mutation[mutations.size()];
741     long before = EnvironmentEdgeManager.currentTime();
742     boolean batchContainsPuts = false, batchContainsDelete = false;
743     try {
744       int i = 0;
745       for (ClientProtos.Action action: mutations) {
746         MutationProto m = action.getMutation();
747         Mutation mutation;
748         if (m.getMutateType() == MutationType.PUT) {
749           mutation = ProtobufUtil.toPut(m, cells);
750           batchContainsPuts = true;
751         } else {
752           mutation = ProtobufUtil.toDelete(m, cells);
753           batchContainsDelete = true;
754         }
755         mArray[i++] = mutation;
756         // Check if a space quota disallows this mutation
757         spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
758         quota.addMutation(mutation);
759       }
760 
761       if (!region.getRegionInfo().isMetaTable()) {
762         regionServer.cacheFlusher.reclaimMemStoreMemory();
763       }
764 
765       OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE,
766         HConstants.NO_NONCE);
767       for (i = 0; i < codes.length; i++) {
768         int index = mutations.get(i).getIndex();
769         Exception e = null;
770         switch (codes[i].getOperationStatusCode()) {
771           case BAD_FAMILY:
772             e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
773             builder.addResultOrException(getResultOrException(e, index));
774             break;
775 
776           case SANITY_CHECK_FAILURE:
777             e = new FailedSanityCheckException(codes[i].getExceptionMsg());
778             builder.addResultOrException(getResultOrException(e, index));
779             break;
780 
781           default:
782             e = new DoNotRetryIOException(codes[i].getExceptionMsg());
783             builder.addResultOrException(getResultOrException(e, index));
784             break;
785 
786           case SUCCESS:
787             builder.addResultOrException(getResultOrException(
788               ClientProtos.Result.getDefaultInstance(), index,
789                 ((HRegion)region).getRegionStats()));
790             break;
791         }
792       }
793     } catch (IOException ie) {
794       for (int i = 0; i < mutations.size(); i++) {
795         builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
796       }
797     }
798     if (regionServer.metricsRegionServer != null) {
799       long after = EnvironmentEdgeManager.currentTime();
800       if (batchContainsPuts) {
801         regionServer.metricsRegionServer.updatePut(
802             region.getTableDesc().getTableName(), after - before);
803       }
804       if (batchContainsDelete) {
805         regionServer.metricsRegionServer.updateDelete(
806             region.getTableDesc().getTableName(), after - before);
807       }
808     }
809   }
810 
811   /**
812    * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
813    * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
814    * @param region
815    * @param mutations
816    * @param replaySeqId
817    * @return an array of OperationStatus which internally contains the OperationStatusCode and the
818    *         exceptionMessage if any
819    * @throws IOException
820    */
821   private OperationStatus [] doReplayBatchOp(final Region region,
822       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
823     long before = EnvironmentEdgeManager.currentTime();
824     boolean batchContainsPuts = false, batchContainsDelete = false;
825     try {
826       for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
827         WALSplitter.MutationReplay m = it.next();
828 
829         if (m.type == MutationType.PUT) {
830           batchContainsPuts = true;
831         } else {
832           batchContainsDelete = true;
833         }
834 
835         NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
836         List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
837         if (metaCells != null && !metaCells.isEmpty()) {
838           for (Cell metaCell : metaCells) {
839             CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
840             boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
841             HRegion hRegion = (HRegion)region;
842             if (compactionDesc != null) {
843               // replay the compaction. Remove the files from stores only if we are the primary
844               // region replica (thus own the files)
845               hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
846                 replaySeqId);
847               continue;
848             }
849             FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
850             if (flushDesc != null && !isDefaultReplica) {
851               hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
852               continue;
853             }
854             RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
855             if (regionEvent != null && !isDefaultReplica) {
856               hRegion.replayWALRegionEventMarker(regionEvent);
857               continue;
858             }
859             BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
860             if (bulkLoadEvent != null) {
861               hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
862               continue;
863             }
864           }
865           it.remove();
866         }
867       }
868       requestCount.add(mutations.size());
869       if (!region.getRegionInfo().isMetaTable()) {
870         regionServer.cacheFlusher.reclaimMemStoreMemory();
871       }
872       return region.batchReplay(mutations.toArray(
873         new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
874     } finally {
875       if (regionServer.metricsRegionServer != null) {
876         long after = EnvironmentEdgeManager.currentTime();
877           if (batchContainsPuts) {
878           regionServer.metricsRegionServer.updatePut(
879               region.getTableDesc().getTableName(), after - before);
880         }
881         if (batchContainsDelete) {
882           regionServer.metricsRegionServer.updateDelete(
883               region.getTableDesc().getTableName(), after - before);
884         }
885       }
886     }
887   }
888 
889   private void closeAllScanners() {
890     // Close any outstanding scanners. Means they'll get an UnknownScanner
891     // exception next time they come in.
892     for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
893       try {
894         e.getValue().s.close();
895       } catch (IOException ioe) {
896         LOG.warn("Closing scanner " + e.getKey(), ioe);
897       }
898     }
899   }
900 
901   public RSRpcServices(HRegionServer rs) throws IOException {
902     regionServer = rs;
903 
904     RpcSchedulerFactory rpcSchedulerFactory;
905     try {
906       Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
907           REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
908           SimpleRpcSchedulerFactory.class);
909       rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
910     } catch (InstantiationException e) {
911       throw new IllegalArgumentException(e);
912     } catch (IllegalAccessException e) {
913       throw new IllegalArgumentException(e);
914     }
915     // Server to handle client requests.
916     InetSocketAddress initialIsa;
917     InetSocketAddress bindAddress;
918     if(this instanceof MasterRpcServices) {
919       String hostname = getHostname(rs.conf, true);
920       int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
921       // Creation of a HSA will force a resolve.
922       initialIsa = new InetSocketAddress(hostname, port);
923       bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
924     } else {
925       String hostname = getHostname(rs.conf, false);
926       int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
927         HConstants.DEFAULT_REGIONSERVER_PORT);
928       // Creation of a HSA will force a resolve.
929       initialIsa = new InetSocketAddress(hostname, port);
930       bindAddress = new InetSocketAddress(
931         rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
932     }
933     if (initialIsa.getAddress() == null) {
934       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
935     }
936     priority = new AnnotationReadingPriorityFunction(this);
937     String name = rs.getProcessName() + "/" + initialIsa.toString();
938     // Set how many times to retry talking to another server over HConnection.
939     ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
940     rpcServer = new RpcServer(rs, name, getServices(),
941       bindAddress, // use final bindAddress for this server.
942       rs.conf,
943       rpcSchedulerFactory.create(rs.conf, this, rs));
944 
945     scannerLeaseTimeoutPeriod = rs.conf.getInt(
946       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
947       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
948     maxScannerResultSize = rs.conf.getLong(
949       HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
950       HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
951     rpcTimeout = rs.conf.getInt(
952       HConstants.HBASE_RPC_TIMEOUT_KEY,
953       HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
954     minimumScanTimeLimitDelta = rs.conf.getLong(
955       REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
956       DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
957 
958     InetSocketAddress address = rpcServer.getListenerAddress();
959     if (address == null) {
960       throw new IOException("Listener channel is closed");
961     }
962     // Set our address, however we need the final port that was given to rpcServer
963     isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
964     rpcServer.setErrorHandler(this);
965     rs.setName(name);
966   }
967 
968   public static String getHostname(Configuration conf, boolean isMaster)
969       throws UnknownHostException {
970     String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
971       HRegionServer.RS_HOSTNAME_KEY);
972     if (hostname == null || hostname.isEmpty()) {
973       String masterOrRS = isMaster ? "master" : "regionserver";
974       return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
975         conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
976         conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
977     } else {
978       LOG.info("hostname is configured to be " + hostname);
979       return hostname;
980     }
981   }
982 
983   RegionScanner getScanner(long scannerId) {
984     String scannerIdString = Long.toString(scannerId);
985     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
986     if (scannerHolder != null) {
987       return scannerHolder.s;
988     }
989     return null;
990   }
991 
992   /**
993    * Get the vtime associated with the scanner.
994    * Currently the vtime is the number of "next" calls.
995    */
996   long getScannerVirtualTime(long scannerId) {
997     String scannerIdString = Long.toString(scannerId);
998     RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
999     if (scannerHolder != null) {
1000       return scannerHolder.getNextCallSeq();
1001     }
1002     return 0L;
1003   }
1004 
1005   long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1006     long scannerId = this.scannerIdGen.incrementAndGet();
1007     String scannerName = String.valueOf(scannerId);
1008 
1009     RegionScannerHolder existing =
1010       scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1011     assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1012 
1013     regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1014         new ScannerListener(scannerName));
1015     return scannerId;
1016   }
1017 
1018   /**
1019    * Find the HRegion based on a region specifier
1020    *
1021    * @param regionSpecifier the region specifier
1022    * @return the corresponding region
1023    * @throws IOException if the specifier is not null,
1024    *    but failed to find the region
1025    */
1026   Region getRegion(
1027       final RegionSpecifier regionSpecifier) throws IOException {
1028     ByteString value = regionSpecifier.getValue();
1029     RegionSpecifierType type = regionSpecifier.getType();
1030     switch (type) {
1031       case REGION_NAME:
1032         byte[] regionName = value.toByteArray();
1033         String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
1034         return regionServer.getRegionByEncodedName(regionName, encodedRegionName);
1035       case ENCODED_REGION_NAME:
1036         return regionServer.getRegionByEncodedName(value.toStringUtf8());
1037       default:
1038         throw new DoNotRetryIOException(
1039           "Unsupported region specifier type: " + type);
1040     }
1041   }
1042 
1043   @VisibleForTesting
1044   public PriorityFunction getPriority() {
1045     return priority;
1046   }
1047 
1048   Configuration getConfiguration() {
1049     return regionServer.getConfiguration();
1050   }
1051 
1052   private RegionServerRpcQuotaManager getRpcQuotaManager() {
1053     return regionServer.getRegionServerRpcQuotaManager();
1054   }
1055 
1056   private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1057     return regionServer.getRegionServerSpaceQuotaManager();
1058   }
1059 
1060   void start() {
1061     rpcServer.start();
1062   }
1063 
1064   void stop() {
1065     closeAllScanners();
1066     rpcServer.stop();
1067   }
1068 
1069   /**
1070    * Called to verify that this server is up and running.
1071    *
1072    * @throws IOException
1073    */
1074   protected void checkOpen() throws IOException {
1075     if (regionServer.isAborted()) {
1076       throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1077     }
1078     if (regionServer.isStopped()) {
1079       throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1080     }
1081     if (!regionServer.fsOk) {
1082       throw new RegionServerStoppedException("File system not available");
1083     }
1084     if (!regionServer.isOnline()) {
1085       throw new ServerNotRunningYetException("Server " + regionServer.serverName
1086           + " is not running yet");
1087     }
1088   }
1089 
1090   /**
1091    * @return list of blocking services and their security info classes that this server supports
1092    */
1093   protected List<BlockingServiceAndInterface> getServices() {
1094     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1095     bssi.add(new BlockingServiceAndInterface(
1096       ClientService.newReflectiveBlockingService(this),
1097       ClientService.BlockingInterface.class));
1098     bssi.add(new BlockingServiceAndInterface(
1099       AdminService.newReflectiveBlockingService(this),
1100       AdminService.BlockingInterface.class));
1101     return bssi;
1102   }
1103 
1104   public InetSocketAddress getSocketAddress() {
1105     return isa;
1106   }
1107 
1108   @Override
1109   public int getPriority(RequestHeader header, Message param) {
1110     return priority.getPriority(header, param);
1111   }
1112 
1113   @Override
1114   public long getDeadline(RequestHeader header, Message param) {
1115     return priority.getDeadline(header, param);
1116   }
1117 
1118   /*
1119    * Check if an OOME and, if so, abort immediately to avoid creating more objects.
1120    *
1121    * @param e
1122    *
1123    * @return True if we OOME'd and are aborting.
1124    */
1125   @Override
1126   public boolean checkOOME(final Throwable e) {
1127     boolean stop = false;
1128     try {
1129       if (e instanceof OutOfMemoryError
1130           || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1131           || (e.getMessage() != null && e.getMessage().contains(
1132               "java.lang.OutOfMemoryError"))) {
1133         stop = true;
1134         LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1135           + " will abort itself immediately", e);
1136       }
1137     } finally {
1138       if (stop) {
1139         Runtime.getRuntime().halt(1);
1140       }
1141     }
1142     return stop;
1143   }
1144 
1145   /**
1146    * Close a region on the region server.
1147    *
1148    * @param controller the RPC controller
1149    * @param request the request
1150    * @throws ServiceException
1151    */
1152   @Override
1153   @QosPriority(priority=HConstants.ADMIN_QOS)
1154   public CloseRegionResponse closeRegion(final RpcController controller,
1155       final CloseRegionRequest request) throws ServiceException {
1156     final ServerName sn = (request.hasDestinationServer() ?
1157       ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1158 
1159     try {
1160       checkOpen();
1161       if (request.hasServerStartCode()) {
1162         // check that we are the same server that this RPC is intended for.
1163         long serverStartCode = request.getServerStartCode();
1164         if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1165           throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1166               "different server with startCode: " + serverStartCode + ", this server is: "
1167               + regionServer.serverName));
1168         }
1169       }
1170       final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1171 
1172       // Can be null if we're calling close on a region that's not online
1173       final Region region = regionServer.getFromOnlineRegions(encodedRegionName);
1174       if ((region  != null) && (region .getCoprocessorHost() != null)) {
1175         region.getCoprocessorHost().preClose(false);
1176       }
1177 
1178       requestCount.increment();
1179       LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1180       CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1181         .getCloseRegionCoordination().parseFromProtoRequest(request);
1182 
1183       boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1184       CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1185       return builder.build();
1186     } catch (IOException ie) {
1187       throw new ServiceException(ie);
1188     }
1189   }
1190 
1191   /**
1192    * Compact a region on the region server.
1193    *
1194    * @param controller the RPC controller
1195    * @param request the request
1196    * @throws ServiceException
1197    */
1198   @Override
1199   @QosPriority(priority=HConstants.ADMIN_QOS)
1200   public CompactRegionResponse compactRegion(final RpcController controller,
1201       final CompactRegionRequest request) throws ServiceException {
1202     try {
1203       checkOpen();
1204       requestCount.increment();
1205       Region region = getRegion(request.getRegion());
1206       // Quota support is enabled, the requesting user is not system/super user
1207       // and a quota policy is enforced that disables compactions.
1208       if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1209           !Superusers.isSuperUser(RpcServer.getRequestUser()) &&
1210           this.regionServer.getRegionServerSpaceQuotaManager().areCompactionsDisabled(
1211               region.getTableDesc().getTableName())) {
1212         throw new DoNotRetryIOException("Compactions on this region are "
1213             + "disabled due to a space quota violation.");
1214       }
1215       region.startRegionOperation(Operation.COMPACT_REGION);
1216       LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1217       boolean major = false;
1218       byte [] family = null;
1219       Store store = null;
1220       if (request.hasFamily()) {
1221         family = request.getFamily().toByteArray();
1222         store = region.getStore(family);
1223         if (store == null) {
1224           throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1225             + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1226         }
1227       }
1228       if (request.hasMajor()) {
1229         major = request.getMajor();
1230       }
1231       if (major) {
1232         if (family != null) {
1233           store.triggerMajorCompaction();
1234         } else {
1235           region.triggerMajorCompaction();
1236         }
1237       }
1238 
1239       String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1240       if (LOG.isTraceEnabled()) {
1241         LOG.trace("User-triggered compaction requested for region "
1242           + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1243       }
1244       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1245       if(family != null) {
1246         regionServer.compactSplitThread.requestCompaction(region, store, log,
1247           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1248       } else {
1249         regionServer.compactSplitThread.requestCompaction(region, log,
1250           Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1251       }
1252       return CompactRegionResponse.newBuilder().build();
1253     } catch (IOException ie) {
1254       throw new ServiceException(ie);
1255     }
1256   }
1257 
1258   /**
1259    * Flush a region on the region server.
1260    *
1261    * @param controller the RPC controller
1262    * @param request the request
1263    * @throws ServiceException
1264    */
1265   @Override
1266   @QosPriority(priority=HConstants.ADMIN_QOS)
1267   public FlushRegionResponse flushRegion(final RpcController controller,
1268       final FlushRegionRequest request) throws ServiceException {
1269     try {
1270       checkOpen();
1271       requestCount.increment();
1272       Region region = getRegion(request.getRegion());
1273       LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1274       boolean shouldFlush = true;
1275       if (request.hasIfOlderThanTs()) {
1276         shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1277       }
1278       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1279       if (shouldFlush) {
1280         boolean writeFlushWalMarker =  request.hasWriteFlushWalMarker() ?
1281             request.getWriteFlushWalMarker() : false;
1282         // Go behind the curtain so we can manage writing of the flush WAL marker
1283         HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1284             ((HRegion)region).flushcache(true, writeFlushWalMarker);
1285         boolean compactionNeeded = flushResult.isCompactionNeeded();
1286         if (compactionNeeded) {
1287           regionServer.compactSplitThread.requestSystemCompaction(region,
1288             "Compaction through user triggered flush");
1289         }
1290         builder.setFlushed(flushResult.isFlushSucceeded());
1291         builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1292       }
1293       builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1294       return builder.build();
1295     } catch (DroppedSnapshotException ex) {
1296       // Cache flush can fail in a few places. If it fails in a critical
1297       // section, we get a DroppedSnapshotException and a replay of wal
1298       // is required. Currently the only way to do this is a restart of
1299       // the server.
1300       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1301       throw new ServiceException(ex);
1302     } catch (IOException ie) {
1303       throw new ServiceException(ie);
1304     }
1305   }
1306 
1307   @Override
1308   @QosPriority(priority=HConstants.ADMIN_QOS)
1309   public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1310       final GetOnlineRegionRequest request) throws ServiceException {
1311     try {
1312       checkOpen();
1313       requestCount.increment();
1314       Map<String, Region> onlineRegions = regionServer.onlineRegions;
1315       List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1316       for (Region region: onlineRegions.values()) {
1317         list.add(region.getRegionInfo());
1318       }
1319       Collections.sort(list);
1320       return ResponseConverter.buildGetOnlineRegionResponse(list);
1321     } catch (IOException ie) {
1322       throw new ServiceException(ie);
1323     }
1324   }
1325 
1326   @Override
1327   @QosPriority(priority=HConstants.ADMIN_QOS)
1328   public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1329       final GetRegionInfoRequest request) throws ServiceException {
1330     try {
1331       checkOpen();
1332       requestCount.increment();
1333       Region region = getRegion(request.getRegion());
1334       HRegionInfo info = region.getRegionInfo();
1335       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1336       builder.setRegionInfo(HRegionInfo.convert(info));
1337       if (request.hasCompactionState() && request.getCompactionState()) {
1338         builder.setCompactionState(region.getCompactionState());
1339       }
1340       builder.setIsRecovering(region.isRecovering());
1341       return builder.build();
1342     } catch (IOException ie) {
1343       throw new ServiceException(ie);
1344     }
1345   }
1346 
1347   /**
1348    * Get some information of the region server.
1349    *
1350    * @param controller the RPC controller
1351    * @param request the request
1352    * @throws ServiceException
1353    */
1354   @Override
1355   @QosPriority(priority=HConstants.ADMIN_QOS)
1356   public GetServerInfoResponse getServerInfo(final RpcController controller,
1357       final GetServerInfoRequest request) throws ServiceException {
1358     try {
1359       checkOpen();
1360     } catch (IOException ie) {
1361       throw new ServiceException(ie);
1362     }
1363     requestCount.increment();
1364     int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1365     return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1366   }
1367 
1368   @Override
1369   @QosPriority(priority=HConstants.ADMIN_QOS)
1370   public GetStoreFileResponse getStoreFile(final RpcController controller,
1371       final GetStoreFileRequest request) throws ServiceException {
1372     try {
1373       checkOpen();
1374       Region region = getRegion(request.getRegion());
1375       requestCount.increment();
1376       Set<byte[]> columnFamilies;
1377       if (request.getFamilyCount() == 0) {
1378         columnFamilies = region.getTableDesc().getFamiliesKeys();
1379       } else {
1380         columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1381         for (ByteString cf: request.getFamilyList()) {
1382           columnFamilies.add(cf.toByteArray());
1383         }
1384       }
1385       int nCF = columnFamilies.size();
1386       List<String>  fileList = region.getStoreFileList(
1387         columnFamilies.toArray(new byte[nCF][]));
1388       GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1389       builder.addAllStoreFile(fileList);
1390       return builder.build();
1391     } catch (IOException ie) {
1392       throw new ServiceException(ie);
1393     }
1394   }
1395 
1396   /**
1397    * Merge regions on the region server.
1398    *
1399    * @param controller the RPC controller
1400    * @param request the request
1401    * @return merge regions response
1402    * @throws ServiceException
1403    */
1404   @Override
1405   @QosPriority(priority = HConstants.ADMIN_QOS)
1406   public MergeRegionsResponse mergeRegions(final RpcController controller,
1407       final MergeRegionsRequest request) throws ServiceException {
1408     try {
1409       checkOpen();
1410       requestCount.increment();
1411       Region regionA = getRegion(request.getRegionA());
1412       Region regionB = getRegion(request.getRegionB());
1413       boolean forcible = request.getForcible();
1414       long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1415       regionA.startRegionOperation(Operation.MERGE_REGION);
1416       regionB.startRegionOperation(Operation.MERGE_REGION);
1417       if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1418           regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1419         throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1420       }
1421       LOG.info("Receiving merging request for  " + regionA + ", " + regionB
1422           + ",forcible=" + forcible);
1423       regionA.flush(true);
1424       regionB.flush(true);
1425       regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1426           masterSystemTime, RpcServer.getRequestUser());
1427       return MergeRegionsResponse.newBuilder().build();
1428     } catch (DroppedSnapshotException ex) {
1429       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1430       throw new ServiceException(ex);
1431     } catch (IOException ie) {
1432       throw new ServiceException(ie);
1433     }
1434   }
1435 
1436   /**
1437    * Open asynchronously a region or a set of regions on the region server.
1438    *
1439    * The opening is coordinated by ZooKeeper, and this method requires the znode to be created
1440    *  before being called. As a consequence, this method should be called only from the master.
1441    * <p>
1442    * Different manages states for the region are:<ul>
1443    *  <li>region not opened: the region opening will start asynchronously.</li>
1444    *  <li>a close is already in progress: this is considered as an error.</li>
1445    *  <li>an open is already in progress: this new open request will be ignored. This is important
1446    *  because the Master can do multiple requests if it crashes.</li>
1447    *  <li>the region is already opened:  this new open request will be ignored./li>
1448    *  </ul>
1449    * </p>
1450    * <p>
1451    * Bulk assign: If there are more than 1 region to open, it will be considered as a bulk assign.
1452    * For a single region opening, errors are sent through a ServiceException. For bulk assign,
1453    * errors are put in the response as FAILED_OPENING.
1454    * </p>
1455    * @param controller the RPC controller
1456    * @param request the request
1457    * @throws ServiceException
1458    */
1459   @Override
1460   @QosPriority(priority=HConstants.ADMIN_QOS)
1461   public OpenRegionResponse openRegion(final RpcController controller,
1462       final OpenRegionRequest request) throws ServiceException {
1463     requestCount.increment();
1464     if (request.hasServerStartCode()) {
1465       // check that we are the same server that this RPC is intended for.
1466       long serverStartCode = request.getServerStartCode();
1467       if (regionServer.serverName.getStartcode() !=  serverStartCode) {
1468         throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1469             "different server with startCode: " + serverStartCode + ", this server is: "
1470             + regionServer.serverName));
1471       }
1472     }
1473 
1474     OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1475     final int regionCount = request.getOpenInfoCount();
1476     final Map<TableName, HTableDescriptor> htds =
1477         new HashMap<TableName, HTableDescriptor>(regionCount);
1478     final boolean isBulkAssign = regionCount > 1;
1479     try {
1480       checkOpen();
1481     } catch (IOException ie) {
1482       TableName tableName = null;
1483       if (regionCount == 1) {
1484         RegionInfo ri = request.getOpenInfo(0).getRegion();
1485         if (ri != null) {
1486           tableName = ProtobufUtil.toTableName(ri.getTableName());
1487         }
1488       }
1489       if (!TableName.META_TABLE_NAME.equals(tableName)) {
1490         throw new ServiceException(ie);
1491       }
1492       // We are assigning meta, wait a little for regionserver to finish initialization.
1493       int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1494         HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2; // Quarter of RPC timeout
1495       long endTime = System.currentTimeMillis() + timeout;
1496       synchronized (regionServer.online) {
1497         try {
1498           while (System.currentTimeMillis() <= endTime
1499               && !regionServer.isStopped() && !regionServer.isOnline()) {
1500             regionServer.online.wait(regionServer.msgInterval);
1501           }
1502           checkOpen();
1503         } catch (InterruptedException t) {
1504           Thread.currentThread().interrupt();
1505           throw new ServiceException(t);
1506         } catch (IOException e) {
1507           throw new ServiceException(e);
1508         }
1509       }
1510     }
1511 
1512     long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1513 
1514     for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1515       final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1516       OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1517         getOpenRegionCoordination();
1518       OpenRegionCoordination.OpenRegionDetails ord =
1519         coordination.parseFromProtoRequest(regionOpenInfo);
1520 
1521       HTableDescriptor htd;
1522       try {
1523         final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1524         if (onlineRegion != null) {
1525           //Check if the region can actually be opened.
1526           if (onlineRegion.getCoprocessorHost() != null) {
1527             onlineRegion.getCoprocessorHost().preOpen();
1528           }
1529           // See HBASE-5094. Cross check with hbase:meta if still this RS is owning
1530           // the region.
1531           Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1532             regionServer.getConnection(), region.getRegionName());
1533           if (p != null && regionServer.serverName.equals(p.getSecond())) {
1534             Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1535             // Map regionsInTransitionInRSOnly has an entry for a region only if the region
1536             // is in transition on this RS, so here closing can be null. If not null, it can
1537             // be true or false. True means the region is opening on this RS; while false
1538             // means the region is closing. Only return ALREADY_OPENED if not closing (i.e.
1539             // not in transition any more, or still transition to open.
1540             if (!Boolean.FALSE.equals(closing)
1541                 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1542               LOG.warn("Attempted open of " + region.getEncodedName()
1543                 + " but already online on this server");
1544               builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1545               continue;
1546             }
1547           } else {
1548             LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1549               + " but hbase:meta does not have this server - continue opening.");
1550             regionServer.removeFromOnlineRegions(onlineRegion, null);
1551           }
1552         }
1553         LOG.info("Open " + region.getRegionNameAsString());
1554         htd = htds.get(region.getTable());
1555         if (htd == null) {
1556           htd = regionServer.tableDescriptors.get(region.getTable());
1557           htds.put(region.getTable(), htd);
1558         }
1559 
1560         final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1561           region.getEncodedNameAsBytes(), Boolean.TRUE);
1562 
1563         if (Boolean.FALSE.equals(previous)) {
1564           // There is a close in progress. We need to mark this open as failed in ZK.
1565 
1566           coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1567 
1568           throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1569             + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1570         }
1571 
1572         if (Boolean.TRUE.equals(previous)) {
1573           // An open is in progress. This is supported, but let's log this.
1574           LOG.info("Receiving OPEN for the region:" +
1575             region.getRegionNameAsString() + " , which we are already trying to OPEN"
1576               + " - ignoring this new request for this region.");
1577         }
1578 
1579         // We are opening this region. If it moves back and forth for whatever reason, we don't
1580         // want to keep returning the stale moved record while we are opening/if we close again.
1581         regionServer.removeFromMovedRegions(region.getEncodedName());
1582 
1583         if (previous == null) {
1584           // check if the region to be opened is marked in recovering state in ZK
1585           if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1586               region.getEncodedName())) {
1587             // Check if current region open is for distributedLogReplay. This check is to support
1588             // rolling restart/upgrade where we want to Master/RS see same configuration
1589             if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1590                   || regionOpenInfo.getOpenForDistributedLogReplay()) {
1591               regionServer.recoveringRegions.put(region.getEncodedName(), null);
1592             } else {
1593               // Remove stale recovery region from ZK when we open region not for recovering which
1594               // could happen when turn distributedLogReplay off from on.
1595               List<String> tmpRegions = new ArrayList<String>();
1596               tmpRegions.add(region.getEncodedName());
1597               ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1598                 tmpRegions);
1599             }
1600           }
1601           if (htd == null) {
1602             throw new IOException("Missing table descriptor for " + region.getEncodedName());
1603           }
1604           // If there is no action in progress, we can submit a specific handler.
1605           // Need to pass the expected version in the constructor.
1606           if (region.isMetaRegion()) {
1607             regionServer.service.submit(new OpenMetaHandler(
1608               regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1609           } else {
1610             regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1611               regionOpenInfo.getFavoredNodesList());
1612             if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1613               regionServer.service.submit(new OpenPriorityRegionHandler(
1614                 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1615             } else {
1616               regionServer.service.submit(new OpenRegionHandler(
1617                 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1618             }
1619           }
1620         }
1621 
1622         builder.addOpeningState(RegionOpeningState.OPENED);
1623 
1624       } catch (KeeperException zooKeeperEx) {
1625         LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1626         throw new ServiceException(zooKeeperEx);
1627       } catch (IOException ie) {
1628         LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1629         if (isBulkAssign) {
1630           builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1631         } else {
1632           throw new ServiceException(ie);
1633         }
1634       }
1635     }
1636     return builder.build();
1637   }
1638 
1639   /**
1640    *  Wamrmup a region on this server.
1641    *
1642    * This method should only be called by Master. It synchrnously opens the region and
1643    * closes the region bringing the most important pages in cache.
1644    * <p>
1645    *
1646    * @param controller the RPC controller
1647    * @param request the request
1648    * @throws ServiceException
1649    */
1650   @Override
1651   public WarmupRegionResponse warmupRegion(final RpcController controller,
1652       final WarmupRegionRequest request) throws ServiceException {
1653 
1654     RegionInfo regionInfo = request.getRegionInfo();
1655     final HRegionInfo region = HRegionInfo.convert(regionInfo);
1656     HTableDescriptor htd;
1657     WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1658 
1659     try {
1660       checkOpen();
1661       String encodedName = region.getEncodedName();
1662       byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1663       final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1664 
1665       if (onlineRegion != null) {
1666         LOG.info("Region already online. Skipping warming up " + region);
1667         return response;
1668       }
1669 
1670       if (LOG.isDebugEnabled()) {
1671         LOG.debug("Warming up Region " + region.getRegionNameAsString());
1672       }
1673 
1674       htd = regionServer.tableDescriptors.get(region.getTable());
1675 
1676       if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1677         LOG.info("Region is in transition. Skipping warmup " + region);
1678         return response;
1679       }
1680 
1681       HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1682           regionServer.getConfiguration(), regionServer, null);
1683 
1684     } catch (IOException ie) {
1685       LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1686       throw new ServiceException(ie);
1687     }
1688 
1689     return response;
1690   }
1691 
1692   /**
1693    * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
1694    * that the given mutations will be durable on the receiving RS if this method returns without any
1695    * exception.
1696    * @param controller the RPC controller
1697    * @param request the request
1698    * @throws ServiceException
1699    */
1700   @Override
1701   @QosPriority(priority = HConstants.REPLAY_QOS)
1702   public ReplicateWALEntryResponse replay(final RpcController controller,
1703       final ReplicateWALEntryRequest request) throws ServiceException {
1704     long before = EnvironmentEdgeManager.currentTime();
1705     CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1706     try {
1707       checkOpen();
1708       List<WALEntry> entries = request.getEntryList();
1709       if (entries == null || entries.isEmpty()) {
1710         // empty input
1711         return ReplicateWALEntryResponse.newBuilder().build();
1712       }
1713       ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1714       Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1715       RegionCoprocessorHost coprocessorHost =
1716           ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1717             ? region.getCoprocessorHost()
1718             : null; // do not invoke coprocessors if this is a secondary region replica
1719       List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1720 
1721       // Skip adding the edits to WAL if this is a secondary region replica
1722       boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1723       Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1724 
1725       for (WALEntry entry : entries) {
1726         if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1727           throw new NotServingRegionException("Replay request contains entries from multiple " +
1728               "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1729               + entry.getKey().getEncodedRegionName());
1730         }
1731         if (regionServer.nonceManager != null && isPrimary) {
1732           long nonceGroup = entry.getKey().hasNonceGroup()
1733             ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1734           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1735           regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
1736         }
1737         Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1738           new Pair<WALKey, WALEdit>();
1739         List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1740           cells, walEntry, durability);
1741         if (coprocessorHost != null) {
1742           // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
1743           // KeyValue.
1744           if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1745             walEntry.getSecond())) {
1746             // if bypass this log entry, ignore it ...
1747             continue;
1748           }
1749           walEntries.add(walEntry);
1750         }
1751         if(edits!=null && !edits.isEmpty()) {
1752           long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1753             entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1754           OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1755           // check if it's a partial success
1756           for (int i = 0; result != null && i < result.length; i++) {
1757             if (result[i] != OperationStatus.SUCCESS) {
1758               throw new IOException(result[i].getExceptionMsg());
1759             }
1760           }
1761         }
1762       }
1763 
1764       //sync wal at the end because ASYNC_WAL is used above
1765       WAL wal = getWAL(region);
1766       if (wal != null) {
1767         wal.sync();
1768       }
1769 
1770       if (coprocessorHost != null) {
1771         for (Pair<WALKey, WALEdit> entry : walEntries) {
1772           coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1773             entry.getSecond());
1774         }
1775       }
1776       return ReplicateWALEntryResponse.newBuilder().build();
1777     } catch (IOException ie) {
1778       throw new ServiceException(ie);
1779     } finally {
1780       if (regionServer.metricsRegionServer != null) {
1781         regionServer.metricsRegionServer.updateReplay(
1782           EnvironmentEdgeManager.currentTime() - before);
1783       }
1784     }
1785   }
1786 
1787   WAL getWAL(Region region) {
1788     return ((HRegion)region).getWAL();
1789   }
1790 
1791   /**
1792    * Replicate WAL entries on the region server.
1793    *
1794    * @param controller the RPC controller
1795    * @param request the request
1796    * @throws ServiceException
1797    */
1798   @Override
1799   @QosPriority(priority=HConstants.REPLICATION_QOS)
1800   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1801       final ReplicateWALEntryRequest request) throws ServiceException {
1802     try {
1803       checkOpen();
1804       if (regionServer.replicationSinkHandler != null) {
1805         requestCount.increment();
1806         List<WALEntry> entries = request.getEntryList();
1807         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1808         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1809         regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
1810           request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
1811           request.getSourceHFileArchiveDirPath());
1812         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1813         return ReplicateWALEntryResponse.newBuilder().build();
1814       } else {
1815         throw new ServiceException("Replication services are not initialized yet");
1816       }
1817     } catch (IOException ie) {
1818       throw new ServiceException(ie);
1819     }
1820   }
1821 
1822   /**
1823    * Roll the WAL writer of the region server.
1824    * @param controller the RPC controller
1825    * @param request the request
1826    * @throws ServiceException
1827    */
1828   @Override
1829   public RollWALWriterResponse rollWALWriter(final RpcController controller,
1830       final RollWALWriterRequest request) throws ServiceException {
1831     try {
1832       checkOpen();
1833       requestCount.increment();
1834       regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1835       regionServer.walRoller.requestRollAll();
1836       regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1837       RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1838       return builder.build();
1839     } catch (IOException ie) {
1840       throw new ServiceException(ie);
1841     }
1842   }
1843 
1844   /**
1845    * Split a region on the region server.
1846    *
1847    * @param controller the RPC controller
1848    * @param request the request
1849    * @throws ServiceException
1850    */
1851   @Override
1852   @QosPriority(priority=HConstants.ADMIN_QOS)
1853   public SplitRegionResponse splitRegion(final RpcController controller,
1854       final SplitRegionRequest request) throws ServiceException {
1855     try {
1856       checkOpen();
1857       requestCount.increment();
1858       Region region = getRegion(request.getRegion());
1859       region.startRegionOperation(Operation.SPLIT_REGION);
1860       if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1861         throw new IOException("Can't split replicas directly. "
1862             + "Replicas are auto-split when their primary is split.");
1863       }
1864       LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1865       region.flush(true);
1866       byte[] splitPoint = null;
1867       if (request.hasSplitPoint()) {
1868         splitPoint = request.getSplitPoint().toByteArray();
1869       }
1870       ((HRegion)region).forceSplit(splitPoint);
1871       regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1872         RpcServer.getRequestUser());
1873       return SplitRegionResponse.newBuilder().build();
1874     } catch (DroppedSnapshotException ex) {
1875       regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1876       throw new ServiceException(ex);
1877     } catch (IOException ie) {
1878       throw new ServiceException(ie);
1879     }
1880   }
1881 
1882   /**
1883    * Stop the region server.
1884    *
1885    * @param controller the RPC controller
1886    * @param request the request
1887    * @throws ServiceException
1888    */
1889   @Override
1890   @QosPriority(priority=HConstants.ADMIN_QOS)
1891   public StopServerResponse stopServer(final RpcController controller,
1892       final StopServerRequest request) throws ServiceException {
1893     requestCount.increment();
1894     String reason = request.getReason();
1895     regionServer.stop(reason);
1896     return StopServerResponse.newBuilder().build();
1897   }
1898 
1899   @Override
1900   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1901       UpdateFavoredNodesRequest request) throws ServiceException {
1902     List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1903     UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1904     for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1905       HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1906       regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1907         regionUpdateInfo.getFavoredNodesList());
1908     }
1909     respBuilder.setResponse(openInfoList.size());
1910     return respBuilder.build();
1911   }
1912 
1913   /**
1914    * Atomically bulk load several HFiles into an open region
1915    * @return true if successful, false is failed but recoverably (no action)
1916    * @throws IOException if failed unrecoverably
1917    */
1918   @Override
1919   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1920       final BulkLoadHFileRequest request) throws ServiceException {
1921     try {
1922       checkOpen();
1923       requestCount.increment();
1924       Region region = getRegion(request.getRegion());
1925       List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1926       for (FamilyPath familyPath: request.getFamilyPathList()) {
1927         familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1928           familyPath.getPath()));
1929       }
1930       boolean bypass = false;
1931       if (region.getCoprocessorHost() != null) {
1932         bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1933       }
1934       boolean loaded = false;
1935 
1936       // Check to see if this bulk load would exceed the space quota for this table
1937       if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
1938         ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
1939         SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
1940             region);
1941         if (enforcement != null) {
1942           // Bulk loads must still be atomic. We must enact all or none.
1943           List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
1944           for (FamilyPath familyPath : request.getFamilyPathList()) {
1945             filePaths.add(familyPath.getPath());
1946           }
1947           // Check if the batch of files exceeds the current quota
1948           enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
1949         }
1950       }
1951 
1952       Map<byte[], List<Path>> map = null;
1953       if (!bypass) {
1954         map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
1955             request.getCopyFile());
1956         if (map != null) {
1957           loaded = true;
1958         }
1959       }
1960       if (region.getCoprocessorHost() != null) {
1961         loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
1962       }
1963       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1964       if (map != null) {
1965         loaded = true;
1966       }
1967       builder.setLoaded(loaded);
1968       return builder.build();
1969     } catch (IOException ie) {
1970       throw new ServiceException(ie);
1971     }
1972   }
1973 
1974   @Override
1975   public CoprocessorServiceResponse execService(final RpcController controller,
1976       final CoprocessorServiceRequest request) throws ServiceException {
1977     try {
1978       checkOpen();
1979       requestCount.increment();
1980       Region region = getRegion(request.getRegion());
1981       Message result = execServiceOnRegion(region, request.getCall());
1982       CoprocessorServiceResponse.Builder builder =
1983         CoprocessorServiceResponse.newBuilder();
1984       builder.setRegion(RequestConverter.buildRegionSpecifier(
1985         RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1986       builder.setValue(
1987         builder.getValueBuilder().setName(result.getClass().getName())
1988           .setValue(result.toByteString()));
1989       return builder.build();
1990     } catch (IOException ie) {
1991       throw new ServiceException(ie);
1992     }
1993   }
1994 
1995   private Message execServiceOnRegion(Region region,
1996       final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1997     // ignore the passed in controller (from the serialized call)
1998     ServerRpcController execController = new ServerRpcController();
1999     Message result = region.execService(execController, serviceCall);
2000     if (execController.getFailedOn() != null) {
2001       throw execController.getFailedOn();
2002     }
2003     return result;
2004   }
2005 
2006   /**
2007    * Get data from a table.
2008    *
2009    * @param controller the RPC controller
2010    * @param request the get request
2011    * @throws ServiceException
2012    */
2013   @Override
2014   public GetResponse get(final RpcController controller,
2015       final GetRequest request) throws ServiceException {
2016     long before = EnvironmentEdgeManager.currentTime();
2017     OperationQuota quota = null;
2018     Region region = null;
2019     try {
2020       checkOpen();
2021       requestCount.increment();
2022       rpcGetRequestCount.increment();
2023       region = getRegion(request.getRegion());
2024 
2025       GetResponse.Builder builder = GetResponse.newBuilder();
2026       ClientProtos.Get get = request.getGet();
2027       Boolean existence = null;
2028       Result r = null;
2029       quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2030 
2031       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2032         if (get.getColumnCount() != 1) {
2033           throw new DoNotRetryIOException(
2034             "get ClosestRowBefore supports one and only one family now, not "
2035               + get.getColumnCount() + " families");
2036         }
2037         byte[] row = get.getRow().toByteArray();
2038         byte[] family = get.getColumn(0).getFamily().toByteArray();
2039         r = region.getClosestRowBefore(row, family);
2040       } else {
2041         Get clientGet = ProtobufUtil.toGet(get);
2042         if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2043           existence = region.getCoprocessorHost().preExists(clientGet);
2044         }
2045         if (existence == null) {
2046           r = region.get(clientGet);
2047           if (get.getExistenceOnly()) {
2048             boolean exists = r.getExists();
2049             if (region.getCoprocessorHost() != null) {
2050               exists = region.getCoprocessorHost().postExists(clientGet, exists);
2051             }
2052             existence = exists;
2053           }
2054         }
2055       }
2056       if (existence != null){
2057         ClientProtos.Result pbr =
2058             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2059         builder.setResult(pbr);
2060       } else  if (r != null) {
2061         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2062         builder.setResult(pbr);
2063       }
2064       if (r != null) {
2065         quota.addGetResult(r);
2066       }
2067       return builder.build();
2068     } catch (IOException ie) {
2069       throw new ServiceException(ie);
2070     } finally {
2071       if (regionServer.metricsRegionServer != null && region != null) {
2072         regionServer.metricsRegionServer.updateGet(
2073             region.getTableDesc().getTableName(), EnvironmentEdgeManager.currentTime() - before);
2074       }
2075       if (quota != null) {
2076         quota.close();
2077       }
2078     }
2079   }
2080 
2081   /**
2082    * Execute multiple actions on a table: get, mutate, and/or execCoprocessor
2083    *
2084    * @param rpcc the RPC controller
2085    * @param request the multi request
2086    * @throws ServiceException
2087    */
2088   @Override
2089   public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2090   throws ServiceException {
2091     try {
2092       checkOpen();
2093     } catch (IOException ie) {
2094       throw new ServiceException(ie);
2095     }
2096 
2097     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2098     // It is also the conduit via which we pass back data.
2099     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2100     CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2101     if (controller != null) controller.setCellScanner(null);
2102 
2103     long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2104 
2105     // this will contain all the cells that we need to return. It's created later, if needed.
2106     List<CellScannable> cellsToReturn = null;
2107     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2108     RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2109     Boolean processed = null;
2110 
2111     RpcCallContext context = RpcServer.getCurrentCall();
2112     this.rpcMultiRequestCount.increment();
2113     Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2114       .getRegionActionCount());
2115     ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2116     for (RegionAction regionAction : request.getRegionActionList()) {
2117       this.requestCount.add(regionAction.getActionCount());
2118       OperationQuota quota;
2119       Region region;
2120       regionActionResultBuilder.clear();
2121       try {
2122         region = getRegion(regionAction.getRegion());
2123         quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2124       } catch (IOException e) {
2125         rpcServer.getMetrics().exception(e);
2126         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2127         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2128         if (cellScanner != null) {
2129           skipCellsForMutations(regionAction.getActionList(), cellScanner);
2130         }
2131         continue;  // For this region it's a failure.
2132       }
2133 
2134       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2135         // How does this call happen?  It may need some work to play well w/ the surroundings.
2136         // Need to return an item per Action along w/ Action index.  TODO.
2137         try {
2138           if (request.hasCondition()) {
2139             Condition condition = request.getCondition();
2140             byte[] row = condition.getRow().toByteArray();
2141             byte[] family = condition.getFamily().toByteArray();
2142             byte[] qualifier = condition.getQualifier().toByteArray();
2143             CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2144             ByteArrayComparable comparator =
2145                 ProtobufUtil.toComparator(condition.getComparator());
2146             processed = checkAndRowMutate(region, regionAction.getActionList(),
2147                   cellScanner, row, family, qualifier, compareOp,
2148                   comparator, spaceQuotaEnforcement);
2149           } else {
2150             ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2151                 cellScanner);
2152             // add the stats to the request
2153             if(stats != null) {
2154               responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2155                   .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2156             }
2157             processed = Boolean.TRUE;
2158           }
2159         } catch (IOException e) {
2160           rpcServer.getMetrics().exception(e);
2161           // As it's atomic, we may expect it's a global failure.
2162           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2163         }
2164       } else {
2165         // doNonAtomicRegionMutation manages the exception internally
2166         cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2167             regionActionResultBuilder, cellsToReturn, nonceGroup, spaceQuotaEnforcement);
2168       }
2169       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2170       quota.close();
2171     }
2172     // Load the controller with the Cells to return.
2173     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2174       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2175     }
2176     if (processed != null) responseBuilder.setProcessed(processed);
2177     return responseBuilder.build();
2178   }
2179 
2180   private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2181     for (Action action : actions) {
2182       skipCellsForMutation(action, cellScanner);
2183     }
2184   }
2185 
2186   private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2187     try {
2188       if (action.hasMutation()) {
2189         MutationProto m = action.getMutation();
2190         if (m.hasAssociatedCellCount()) {
2191           for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2192             cellScanner.advance();
2193           }
2194         }
2195       }
2196     } catch (IOException e) {
2197       // No need to handle these Individual Muatation level issue. Any way this entire RegionAction
2198       // marked as failed as we could not see the Region here. At client side the top level
2199       // RegionAction exception will be considered first.
2200       LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2201     }
2202   }
2203 
2204   /**
2205    * Mutate data in a table.
2206    *
2207    * @param rpcc the RPC controller
2208    * @param request the mutate request
2209    * @throws ServiceException
2210    */
2211   @Override
2212   public MutateResponse mutate(final RpcController rpcc,
2213       final MutateRequest request) throws ServiceException {
2214     // rpc controller is how we bring in data via the back door;  it is unprotobuf'ed data.
2215     // It is also the conduit via which we pass back data.
2216     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2217     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2218     OperationQuota quota = null;
2219     ActivePolicyEnforcement spaceQuotaEnforcement = null;
2220     // Clear scanner so we are not holding on to reference across call.
2221     if (controller != null) controller.setCellScanner(null);
2222     try {
2223       checkOpen();
2224       requestCount.increment();
2225       rpcMutateRequestCount.increment();
2226       Region region = getRegion(request.getRegion());
2227       MutateResponse.Builder builder = MutateResponse.newBuilder();
2228       MutationProto mutation = request.getMutation();
2229       if (!region.getRegionInfo().isMetaTable()) {
2230         regionServer.cacheFlusher.reclaimMemStoreMemory();
2231       }
2232       long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2233       Result r = null;
2234       Boolean processed = null;
2235       MutationType type = mutation.getMutateType();
2236       quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2237       spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2238 
2239       switch (type) {
2240       case APPEND:
2241         // TODO: this doesn't actually check anything.
2242         r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2243         break;
2244       case INCREMENT:
2245         // TODO: this doesn't actually check anything.
2246         r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2247         break;
2248       case PUT:
2249         Put put = ProtobufUtil.toPut(mutation, cellScanner);
2250         // Throws an exception when violated
2251         spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
2252         quota.addMutation(put);
2253         if (request.hasCondition()) {
2254           Condition condition = request.getCondition();
2255           byte[] row = condition.getRow().toByteArray();
2256           byte[] family = condition.getFamily().toByteArray();
2257           byte[] qualifier = condition.getQualifier().toByteArray();
2258           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2259           ByteArrayComparable comparator =
2260             ProtobufUtil.toComparator(condition.getComparator());
2261           if (region.getCoprocessorHost() != null) {
2262             processed = region.getCoprocessorHost().preCheckAndPut(
2263               row, family, qualifier, compareOp, comparator, put);
2264           }
2265           if (processed == null) {
2266             boolean result = region.checkAndMutate(row, family,
2267               qualifier, compareOp, comparator, put, true);
2268             if (region.getCoprocessorHost() != null) {
2269               result = region.getCoprocessorHost().postCheckAndPut(row, family,
2270                 qualifier, compareOp, comparator, put, result);
2271             }
2272             processed = result;
2273           }
2274         } else {
2275           region.put(put);
2276           processed = Boolean.TRUE;
2277         }
2278         break;
2279       case DELETE:
2280         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2281         spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
2282         quota.addMutation(delete);
2283         if (request.hasCondition()) {
2284           Condition condition = request.getCondition();
2285           byte[] row = condition.getRow().toByteArray();
2286           byte[] family = condition.getFamily().toByteArray();
2287           byte[] qualifier = condition.getQualifier().toByteArray();
2288           CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2289           ByteArrayComparable comparator =
2290             ProtobufUtil.toComparator(condition.getComparator());
2291           if (region.getCoprocessorHost() != null) {
2292             processed = region.getCoprocessorHost().preCheckAndDelete(
2293               row, family, qualifier, compareOp, comparator, delete);
2294           }
2295           if (processed == null) {
2296             boolean result = region.checkAndMutate(row, family,
2297               qualifier, compareOp, comparator, delete, true);
2298             if (region.getCoprocessorHost() != null) {
2299               result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2300                 qualifier, compareOp, comparator, delete, result);
2301             }
2302             processed = result;
2303           }
2304         } else {
2305           region.delete(delete);
2306           processed = Boolean.TRUE;
2307         }
2308         break;
2309       default:
2310           throw new DoNotRetryIOException(
2311             "Unsupported mutate type: " + type.name());
2312       }
2313       if (processed != null) builder.setProcessed(processed.booleanValue());
2314       addResult(builder, r, controller);
2315       return builder.build();
2316     } catch (IOException ie) {
2317       regionServer.checkFileSystem();
2318       throw new ServiceException(ie);
2319     } finally {
2320       if (quota != null) {
2321         quota.close();
2322       }
2323     }
2324   }
2325 
2326   /**
2327    * Scan data in a table.
2328    *
2329    * @param controller the RPC controller
2330    * @param request the scan request
2331    * @throws ServiceException
2332    */
2333   @Override
2334   public ScanResponse scan(final RpcController controller, final ScanRequest request)
2335   throws ServiceException {
2336     OperationQuota quota = null;
2337     Leases.Lease lease = null;
2338     String scannerName = null;
2339     try {
2340       if (!request.hasScannerId() && !request.hasScan()) {
2341         throw new DoNotRetryIOException(
2342           "Missing required input: scannerId or scan");
2343       }
2344       long scannerId = -1;
2345       if (request.hasScannerId()) {
2346         scannerId = request.getScannerId();
2347         scannerName = String.valueOf(scannerId);
2348       }
2349       try {
2350         checkOpen();
2351       } catch (IOException e) {
2352         // If checkOpen failed, server not running or filesystem gone,
2353         // cancel this lease; filesystem is gone or we're closing or something.
2354         if (scannerName != null) {
2355           LOG.debug("Server shutting down and client tried to access missing scanner "
2356             + scannerName);
2357           if (regionServer.leases != null) {
2358             try {
2359               regionServer.leases.cancelLease(scannerName);
2360             } catch (LeaseException le) {
2361               // No problem, ignore
2362             }
2363           }
2364         }
2365         throw e;
2366       }
2367       requestCount.increment();
2368       rpcScanRequestCount.increment();
2369 
2370       int ttl = 0;
2371       Region region = null;
2372       RegionScanner scanner = null;
2373       RegionScannerHolder rsh = null;
2374       boolean moreResults = true;
2375       boolean closeScanner = false;
2376       boolean isSmallScan = false;
2377       ScanResponse.Builder builder = ScanResponse.newBuilder();
2378       if (request.hasCloseScanner()) {
2379         closeScanner = request.getCloseScanner();
2380       }
2381       int rows = closeScanner ? 0 : 1;
2382       if (request.hasNumberOfRows()) {
2383         rows = request.getNumberOfRows();
2384       }
2385       if (request.hasScannerId()) {
2386         rsh = scanners.get(scannerName);
2387         if (rsh == null) {
2388           LOG.info("Client tried to access missing scanner " + scannerName);
2389           throw new UnknownScannerException(
2390             "Name: " + scannerName + ", already closed?");
2391         }
2392         scanner = rsh.s;
2393         HRegionInfo hri = scanner.getRegionInfo();
2394         region = regionServer.getRegion(hri.getRegionName());
2395         if (region != rsh.r) { // Yes, should be the same instance
2396           throw new NotServingRegionException("Region was re-opened after the scanner"
2397             + scannerName + " was created: " + hri.getRegionNameAsString());
2398         }
2399       } else {
2400         region = getRegion(request.getRegion());
2401         ClientProtos.Scan protoScan = request.getScan();
2402         boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2403         Scan scan = ProtobufUtil.toScan(protoScan);
2404         // if the request doesn't set this, get the default region setting.
2405         if (!isLoadingCfsOnDemandSet) {
2406           scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2407         }
2408 
2409         isSmallScan = scan.isSmall();
2410         if (!scan.hasFamilies()) {
2411           // Adding all families to scanner
2412           for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2413             scan.addFamily(family);
2414           }
2415         }
2416 
2417         if (region.getCoprocessorHost() != null) {
2418           scanner = region.getCoprocessorHost().preScannerOpen(scan);
2419         }
2420         if (scanner == null) {
2421           scanner = region.getScanner(scan);
2422         }
2423         if (region.getCoprocessorHost() != null) {
2424           scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2425         }
2426         scannerId = addScanner(scanner, region);
2427         scannerName = String.valueOf(scannerId);
2428         ttl = this.scannerLeaseTimeoutPeriod;
2429       }
2430       if (request.hasRenew() && request.getRenew()) {
2431         rsh = scanners.get(scannerName);
2432         lease = regionServer.leases.removeLease(scannerName);
2433         if (lease != null && rsh != null) {
2434           regionServer.leases.addLease(lease);
2435           // Increment the nextCallSeq value which is the next expected from client.
2436           rsh.incNextCallSeq();
2437         }
2438         return builder.build();
2439       }
2440 
2441       quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2442       long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2443       if (rows > 0) {
2444         // if nextCallSeq does not match throw Exception straight away. This needs to be
2445         // performed even before checking of Lease.
2446         // See HBASE-5974
2447         if (request.hasNextCallSeq()) {
2448           if (rsh == null) {
2449             rsh = scanners.get(scannerName);
2450           }
2451           if (rsh != null) {
2452             if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2453               throw new OutOfOrderScannerNextException(
2454                 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2455                 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2456                 "; request=" + TextFormat.shortDebugString(request));
2457             }
2458             // Increment the nextCallSeq value which is the next expected from client.
2459             rsh.incNextCallSeq();
2460           }
2461         }
2462         try {
2463           // Remove lease while its being processed in server; protects against case
2464           // where processing of request takes > lease expiration time.
2465           lease = regionServer.leases.removeLease(scannerName);
2466           List<Result> results = new ArrayList<Result>();
2467           long totalCellSize = 0;
2468           long currentScanResultSize = 0;
2469 
2470           boolean done = false;
2471           // Call coprocessor. Get region info from scanner.
2472           if (region != null && region.getCoprocessorHost() != null) {
2473             Boolean bypass = region.getCoprocessorHost().preScannerNext(
2474               scanner, results, rows);
2475             if (!results.isEmpty()) {
2476               for (Result r : results) {
2477                 for (Cell cell : r.rawCells()) {
2478                   totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2479                   currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
2480                 }
2481               }
2482             }
2483             if (bypass != null && bypass.booleanValue()) {
2484               done = true;
2485             }
2486           }
2487 
2488           if (!done) {
2489             long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2490             if (maxResultSize <= 0) {
2491               maxResultSize = maxQuotaResultSize;
2492             }
2493             // This is cells inside a row. Default size is 10 so if many versions or many cfs,
2494             // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
2495             // arbitrary 32. TODO: keep record of general size of results being returned.
2496             List<Cell> values = new ArrayList<Cell>(32);
2497             region.startRegionOperation(Operation.SCAN);
2498             try {
2499               int i = 0;
2500               long before = EnvironmentEdgeManager.currentTime();
2501               synchronized(scanner) {
2502                 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2503                 boolean clientHandlesPartials =
2504                     request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2505                 boolean clientHandlesHeartbeats =
2506                     request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2507 
2508                 // On the server side we must ensure that the correct ordering of partial results is
2509                 // returned to the client to allow them to properly reconstruct the partial results.
2510                 // If the coprocessor host is adding to the result list, we cannot guarantee the
2511                 // correct ordering of partial results and so we prevent partial results from being
2512                 // formed.
2513                 boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
2514                 boolean allowPartialResults =
2515                     clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2516                 boolean moreRows = false;
2517 
2518                 // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
2519                 // certain time threshold on the server. When the time threshold is exceeded, the
2520                 // server stops the scan and sends back whatever Results it has accumulated within
2521                 // that time period (may be empty). Since heartbeat messages have the potential to
2522                 // create partial Results (in the event that the timeout occurs in the middle of a
2523                 // row), we must only generate heartbeat messages when the client can handle both
2524                 // heartbeats AND partials
2525                 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2526 
2527                 // Default value of timeLimit is negative to indicate no timeLimit should be
2528                 // enforced.
2529                 long timeLimit = -1;
2530 
2531                 // Set the time limit to be half of the more restrictive timeout value (one of the
2532                 // timeout values must be positive). In the event that both values are positive, the
2533                 // more restrictive of the two is used to calculate the limit.
2534                 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2535                   long timeLimitDelta;
2536                   if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2537                     timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2538                   } else {
2539                     timeLimitDelta =
2540                         scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2541                   }
2542                   // Use half of whichever timeout value was more restrictive... But don't allow
2543                   // the time limit to be less than the allowable minimum (could cause an
2544                   // immediatate timeout before scanning any data).
2545                   timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2546                   timeLimit = System.currentTimeMillis() + timeLimitDelta;
2547                 }
2548 
2549                 final LimitScope sizeScope =
2550                     allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2551                 final LimitScope timeScope =
2552                     allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2553 
2554                 // Configure with limits for this RPC. Set keep progress true since size progress
2555                 // towards size limit should be kept between calls to nextRaw
2556                 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2557                 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2558                 contextBuilder.setBatchLimit(scanner.getBatch());
2559                 contextBuilder.setTimeLimit(timeScope, timeLimit);
2560                 ScannerContext scannerContext = contextBuilder.build();
2561 
2562                 boolean limitReached = false;
2563                 while (i < rows) {
2564                   // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
2565                   // batch limit is a limit on the number of cells per Result. Thus, if progress is
2566                   // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
2567                   // reset the batch progress between nextRaw invocations since we don't want the
2568                   // batch progress from previous calls to affect future calls
2569                   scannerContext.setBatchProgress(0);
2570 
2571                   // Collect values to be returned here
2572                   moreRows = scanner.nextRaw(values, scannerContext);
2573 
2574                   if (!values.isEmpty()) {
2575                     for (Cell cell : values) {
2576                       totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2577                     }
2578                     final boolean partial = scannerContext.partialResultFormed();
2579                     results.add(Result.create(values, null, stale, partial));
2580                     i++;
2581                   }
2582 
2583                   boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2584                   boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2585                   boolean rowLimitReached = i >= rows;
2586                   limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2587 
2588                   if (limitReached || !moreRows) {
2589                     if (LOG.isTraceEnabled()) {
2590                       LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2591                           + moreRows + " scannerContext: " + scannerContext);
2592                     }
2593                     // We only want to mark a ScanResponse as a heartbeat message in the event that
2594                     // there are more values to be read server side. If there aren't more values,
2595                     // marking it as a heartbeat is wasteful because the client will need to issue
2596                     // another ScanRequest only to realize that they already have all the values
2597                     if (moreRows) {
2598                       // Heartbeat messages occur when the time limit has been reached.
2599                       builder.setHeartbeatMessage(timeLimitReached);
2600                     }
2601                     break;
2602                   }
2603                   values.clear();
2604                 }
2605 
2606                 if (limitReached || moreRows) {
2607                   // We stopped prematurely
2608                   builder.setMoreResultsInRegion(true);
2609                 } else {
2610                   // We didn't get a single batch
2611                   builder.setMoreResultsInRegion(false);
2612                 }
2613               }
2614               region.updateReadRequestsCount(i);
2615               long end = EnvironmentEdgeManager.currentTime();
2616               region.getMetrics().updateScanTime(end - before);
2617               if (regionServer.metricsRegionServer != null) {
2618                 regionServer.metricsRegionServer.updateScanSize(
2619                     region.getTableDesc().getTableName(), totalCellSize);
2620                 regionServer.metricsRegionServer.updateScanTime(
2621                     region.getTableDesc().getTableName(), end - before);
2622               }
2623             } finally {
2624               region.closeRegionOperation();
2625             }
2626 
2627             // coprocessor postNext hook
2628             if (region != null && region.getCoprocessorHost() != null) {
2629               region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2630             }
2631           }
2632 
2633           quota.addScanResult(results);
2634 
2635           // If the scanner's filter - if any - is done with the scan
2636           // and wants to tell the client to stop the scan. This is done by passing
2637           // a null result, and setting moreResults to false.
2638           if (scanner.isFilterDone() && results.isEmpty()) {
2639             moreResults = false;
2640             results = null;
2641           } else {
2642             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2643           }
2644         } catch (IOException e) {
2645           // The scanner state might be left in a dirty state, so we will tell the Client to
2646           // fail this RPC and close the scanner while opening up another one from the start of
2647           // row that the client has last seen.
2648           closeScanner(region, scanner, scannerName);
2649 
2650           // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
2651           // used in two different semantics.
2652           // (1) The first is to close the client scanner and bubble up the exception all the way
2653           // to the application. This is preferred when the exception is really un-recoverable
2654           // (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
2655           // bucket usually.
2656           // (2) Second semantics is to close the current region scanner only, but continue the
2657           // client scanner by overriding the exception. This is usually UnknownScannerException,
2658           // OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
2659           // application-level ClientScanner has to continue without bubbling up the exception to
2660           // the client. See ClientScanner code to see how it deals with these special exceptions.
2661           if (e instanceof DoNotRetryIOException) {
2662             throw e;
2663           }
2664 
2665           // If it is a CorruptHFileException or a FileNotFoundException, throw the
2666           // DoNotRetryIOException. This can avoid the retry in ClientScanner.
2667           if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
2668             throw new DoNotRetryIOException(e);
2669           }
2670 
2671           // We closed the scanner already. Instead of throwing the IOException, and client
2672           // retrying with the same scannerId only to get USE on the next RPC, we directly throw
2673           // a special exception to save an RPC.
2674           RpcCallContext context = RpcServer.getCurrentCall();
2675           if (context != null && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2676             // 1.4.0+ clients know how to handle
2677             throw new ScannerResetException("Scanner is closed on the server-side", e);
2678           } else {
2679             // older clients do not know about SRE. Just throw USE, which they will handle
2680             throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2681                 + " scanner state for clients older than 1.3.", e);
2682           }
2683         } finally {
2684           // We're done. On way out re-add the above removed lease.
2685           // Adding resets expiration time on lease.
2686           if (scanners.containsKey(scannerName)) {
2687             if (lease != null) regionServer.leases.addLease(lease);
2688             ttl = this.scannerLeaseTimeoutPeriod;
2689           }
2690         }
2691       }
2692 
2693       if (!moreResults || closeScanner) {
2694         ttl = 0;
2695         moreResults = false;
2696         if (closeScanner(region, scanner, scannerName)) {
2697           return builder.build(); // bypass
2698         }
2699       }
2700 
2701       if (ttl > 0) {
2702         builder.setTtl(ttl);
2703       }
2704       builder.setScannerId(scannerId);
2705       builder.setMoreResults(moreResults);
2706       return builder.build();
2707     } catch (IOException ie) {
2708       if (scannerName != null && ie instanceof NotServingRegionException) {
2709         RegionScannerHolder rsh = scanners.remove(scannerName);
2710         if (rsh != null) {
2711           try {
2712             RegionScanner scanner = rsh.s;
2713             LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2714             scanner.close();
2715             regionServer.leases.cancelLease(scannerName);
2716           } catch (IOException e) {
2717            LOG.warn("Getting exception closing " + scannerName, e);
2718           }
2719         }
2720       }
2721       throw new ServiceException(ie);
2722     } finally {
2723       if (quota != null) {
2724         quota.close();
2725       }
2726     }
2727   }
2728 
2729   private boolean closeScanner(Region region, RegionScanner scanner, String scannerName) 
2730     throws IOException {
2731     if (region != null && region.getCoprocessorHost() != null) {
2732       if (region.getCoprocessorHost().preScannerClose(scanner)) {
2733         return true; // bypass
2734       }
2735     }
2736     RegionScannerHolder rsh = scanners.remove(scannerName);
2737     if (rsh != null) {
2738       scanner = rsh.s;
2739       scanner.close();
2740       try {
2741         regionServer.leases.cancelLease(scannerName);
2742       } catch (LeaseException le) {
2743         // No problem, ignore
2744         if (LOG.isTraceEnabled()) {
2745           LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2746         }
2747       }
2748       if (region != null && region.getCoprocessorHost() != null) {
2749         region.getCoprocessorHost().postScannerClose(scanner);
2750       }
2751     }
2752     return false;
2753   }
2754 
2755   @Override
2756   public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2757       CoprocessorServiceRequest request) throws ServiceException {
2758     return regionServer.execRegionServerService(controller, request);
2759   }
2760 
2761   @Override
2762   public UpdateConfigurationResponse updateConfiguration(
2763       RpcController controller, UpdateConfigurationRequest request)
2764       throws ServiceException {
2765     try {
2766       this.regionServer.updateConfiguration();
2767     } catch (Exception e) {
2768       throw new ServiceException(e);
2769     }
2770     return UpdateConfigurationResponse.getDefaultInstance();
2771   }
2772 
2773   @Override
2774   public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
2775       RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
2776     try {
2777       final RegionServerSpaceQuotaManager manager =
2778           regionServer.getRegionServerSpaceQuotaManager();
2779       final GetSpaceQuotaSnapshotsResponse.Builder builder =
2780           GetSpaceQuotaSnapshotsResponse.newBuilder();
2781       if (manager != null) {
2782         final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
2783         for (Map.Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
2784           builder.addSnapshots(TableQuotaSnapshot.newBuilder()
2785               .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
2786               .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
2787               .build());
2788         }
2789       }
2790       return builder.build();
2791     } catch (Exception e) {
2792       throw new ServiceException(e);
2793     }
2794   }
2795 }