1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.FileNotFoundException;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.InetSocketAddress;
25 import java.net.UnknownHostException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Set;
34 import java.util.TreeSet;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicLong;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.CellScannable;
44 import org.apache.hadoop.hbase.CellScanner;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.DoNotRetryIOException;
47 import org.apache.hadoop.hbase.DroppedSnapshotException;
48 import org.apache.hadoop.hbase.HBaseIOException;
49 import org.apache.hadoop.hbase.HConstants;
50 import org.apache.hadoop.hbase.HRegionInfo;
51 import org.apache.hadoop.hbase.HTableDescriptor;
52 import org.apache.hadoop.hbase.MetaTableAccessor;
53 import org.apache.hadoop.hbase.NotServingRegionException;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.UnknownScannerException;
57 import org.apache.hadoop.hbase.classification.InterfaceAudience;
58 import org.apache.hadoop.hbase.client.Append;
59 import org.apache.hadoop.hbase.client.ConnectionUtils;
60 import org.apache.hadoop.hbase.client.Delete;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.Increment;
64 import org.apache.hadoop.hbase.client.Mutation;
65 import org.apache.hadoop.hbase.client.Put;
66 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
67 import org.apache.hadoop.hbase.client.Result;
68 import org.apache.hadoop.hbase.client.RowMutations;
69 import org.apache.hadoop.hbase.client.Scan;
70 import org.apache.hadoop.hbase.client.VersionInfoUtil;
71 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
72 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
73 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
74 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
75 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
76 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
77 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
78 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
79 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
80 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
81 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
82 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
83 import org.apache.hadoop.hbase.ipc.PriorityFunction;
84 import org.apache.hadoop.hbase.ipc.QosPriority;
85 import org.apache.hadoop.hbase.ipc.RpcCallContext;
86 import org.apache.hadoop.hbase.ipc.RpcServer;
87 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
88 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
89 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
90 import org.apache.hadoop.hbase.ipc.ServerRpcController;
91 import org.apache.hadoop.hbase.master.MasterRpcServices;
92 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93 import org.apache.hadoop.hbase.protobuf.RequestConverter;
94 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
95 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
96 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
97 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
98 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
99 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
100 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
101 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
102 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
103 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
104 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
105 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
106 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
107 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
108 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
109 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
110 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsRequest;
111 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.MergeRegionsResponse;
112 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
113 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
114 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
116 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
117 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
118 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
119 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
120 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
121 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
122 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
125 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
126 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
127 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
128 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
129 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
132 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
133 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
138 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
139 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
140 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
142 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
143 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
144 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
146 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
147 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
148 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
149 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
150 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
151 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
154 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
155 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
156 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
157 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
158 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
159 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.SpaceViolationPolicy;
160 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
161 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
162 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
163 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
164 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
165 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
166 import org.apache.hadoop.hbase.quotas.OperationQuota;
167 import org.apache.hadoop.hbase.quotas.QuotaUtil;
168 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
169 import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
170 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
171 import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
172 import org.apache.hadoop.hbase.quotas.SpaceViolationPolicyEnforcement;
173 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
174 import org.apache.hadoop.hbase.regionserver.Leases.Lease;
175 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
176 import org.apache.hadoop.hbase.regionserver.Region.Operation;
177 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
178 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
179 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
180 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
181 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
182 import org.apache.hadoop.hbase.security.Superusers;
183 import org.apache.hadoop.hbase.security.User;
184 import org.apache.hadoop.hbase.util.Bytes;
185 import org.apache.hadoop.hbase.util.Counter;
186 import org.apache.hadoop.hbase.util.DNS;
187 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
188 import org.apache.hadoop.hbase.util.Pair;
189 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
190 import org.apache.hadoop.hbase.util.Strings;
191 import org.apache.hadoop.hbase.wal.WAL;
192 import org.apache.hadoop.hbase.wal.WALKey;
193 import org.apache.hadoop.hbase.wal.WALSplitter;
194 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
195 import org.apache.zookeeper.KeeperException;
196
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.protobuf.ByteString;
199 import com.google.protobuf.Message;
200 import com.google.protobuf.RpcController;
201 import com.google.protobuf.ServiceException;
202 import com.google.protobuf.TextFormat;
203
204
205
206
207 @InterfaceAudience.Private
208 @SuppressWarnings("deprecation")
209 public class RSRpcServices implements HBaseRPCErrorHandler,
210 AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction {
211 protected static final Log LOG = LogFactory.getLog(RSRpcServices.class);
212
213
214 public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
215 "hbase.region.server.rpc.scheduler.factory.class";
216
217
218
219
220
221
222 private static final String REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA =
223 "hbase.region.server.rpc.minimum.scan.time.limit.delta";
224
225
226
227 private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
228
229
230 final Counter requestCount = new Counter();
231
232
233 final Counter rpcGetRequestCount = new Counter();
234
235
236 final Counter rpcScanRequestCount = new Counter();
237
238
239 final Counter rpcMultiRequestCount = new Counter();
240
241
242 final Counter rpcMutateRequestCount = new Counter();
243
244
245 final RpcServerInterface rpcServer;
246 final InetSocketAddress isa;
247
248 private final HRegionServer regionServer;
249 private final long maxScannerResultSize;
250
251
252 private final PriorityFunction priority;
253
254 private final AtomicLong scannerIdGen = new AtomicLong(0L);
255 private final ConcurrentHashMap<String, RegionScannerHolder> scanners =
256 new ConcurrentHashMap<String, RegionScannerHolder>();
257
258
259
260
261 private final int scannerLeaseTimeoutPeriod;
262
263
264
265
266 private final int rpcTimeout;
267
268
269
270
271 private final long minimumScanTimeLimitDelta;
272
273
274
275
276 private static class RegionScannerHolder {
277 private AtomicLong nextCallSeq = new AtomicLong(0);
278 private RegionScanner s;
279 private Region r;
280
281 public RegionScannerHolder(RegionScanner s, Region r) {
282 this.s = s;
283 this.r = r;
284 }
285
286 private long getNextCallSeq() {
287 return nextCallSeq.get();
288 }
289
290 private void incNextCallSeq() {
291 nextCallSeq.incrementAndGet();
292 }
293
294 private void rollbackNextCallSeq() {
295 nextCallSeq.decrementAndGet();
296 }
297 }
298
299
300
301
302
303 private class ScannerListener implements LeaseListener {
304 private final String scannerName;
305
306 ScannerListener(final String n) {
307 this.scannerName = n;
308 }
309
310 @Override
311 public void leaseExpired() {
312 RegionScannerHolder rsh = scanners.remove(this.scannerName);
313 if (rsh != null) {
314 RegionScanner s = rsh.s;
315 LOG.info("Scanner " + this.scannerName + " lease expired on region "
316 + s.getRegionInfo().getRegionNameAsString());
317 Region region = null;
318 try {
319 region = regionServer.getRegion(s.getRegionInfo().getRegionName());
320 if (region != null && region.getCoprocessorHost() != null) {
321 region.getCoprocessorHost().preScannerClose(s);
322 }
323
324 s.close();
325 if (region != null && region.getCoprocessorHost() != null) {
326 region.getCoprocessorHost().postScannerClose(s);
327 }
328 } catch (IOException e) {
329 LOG.error("Closing scanner for "
330 + s.getRegionInfo().getRegionNameAsString(), e);
331 } finally {
332 try {
333 s.close();
334 if (region != null && region.getCoprocessorHost() != null) {
335 region.getCoprocessorHost().postScannerClose(s);
336 }
337 } catch (IOException e) {
338 LOG.error("Closing scanner for " + s.getRegionInfo().getRegionNameAsString(), e);
339 }
340 }
341 } else {
342 LOG.warn("Scanner " + this.scannerName + " lease expired, but no related" +
343 " scanner found, hence no chance to close that related scanner!");
344 }
345 }
346 }
347
348 private static ResultOrException getResultOrException(
349 final ClientProtos.Result r, final int index, final ClientProtos.RegionLoadStats stats) {
350 return getResultOrException(ResponseConverter.buildActionResult(r, stats), index);
351 }
352
353 private static ResultOrException getResultOrException(final Exception e, final int index) {
354 return getResultOrException(ResponseConverter.buildActionResult(e), index);
355 }
356
357 private static ResultOrException getResultOrException(
358 final ResultOrException.Builder builder, final int index) {
359 return builder.setIndex(index).build();
360 }
361
362
363
364
365
366
367
368 private long startNonceOperation(final MutationProto mutation, long nonceGroup)
369 throws IOException, OperationConflictException {
370 if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
371 boolean canProceed = false;
372 try {
373 canProceed = regionServer.nonceManager.startOperation(
374 nonceGroup, mutation.getNonce(), regionServer);
375 } catch (InterruptedException ex) {
376 throw new InterruptedIOException("Nonce start operation interrupted");
377 }
378 if (!canProceed) {
379
380 String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
381 + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
382 + "] may have already completed";
383 throw new OperationConflictException(message);
384 }
385 return mutation.getNonce();
386 }
387
388
389
390
391
392
393
394 private void endNonceOperation(final MutationProto mutation,
395 long nonceGroup, boolean success) {
396 if (regionServer.nonceManager != null && mutation.hasNonce()) {
397 regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
398 }
399 }
400
401
402
403
404 private boolean isClientCellBlockSupport() {
405 RpcCallContext context = RpcServer.getCurrentCall();
406 return context != null && context.isClientCellBlockSupport();
407 }
408
409 private void addResult(final MutateResponse.Builder builder,
410 final Result result, final PayloadCarryingRpcController rpcc) {
411 if (result == null) return;
412 if (isClientCellBlockSupport()) {
413 builder.setResult(ProtobufUtil.toResultNoData(result));
414 rpcc.setCellScanner(result.cellScanner());
415 } else {
416 ClientProtos.Result pbr = ProtobufUtil.toResult(result);
417 builder.setResult(pbr);
418 }
419 }
420
421 private void addResults(final ScanResponse.Builder builder, final List<Result> results,
422 final RpcController controller, boolean isDefaultRegion) {
423 builder.setStale(!isDefaultRegion);
424 if (results == null || results.isEmpty()) return;
425 if (isClientCellBlockSupport()) {
426 for (Result res : results) {
427 builder.addCellsPerResult(res.size());
428 builder.addPartialFlagPerResult(res.isPartial());
429 }
430 ((PayloadCarryingRpcController)controller).
431 setCellScanner(CellUtil.createCellScanner(results));
432 } else {
433 for (Result res: results) {
434 ClientProtos.Result pbr = ProtobufUtil.toResult(res);
435 builder.addResults(pbr);
436 }
437 }
438 }
439
440
441
442
443
444
445
446
447
448 private ClientProtos.RegionLoadStats mutateRows(final Region region,
449 final List<ClientProtos.Action> actions,
450 final CellScanner cellScanner) throws IOException {
451 if (!region.getRegionInfo().isMetaTable()) {
452 regionServer.cacheFlusher.reclaimMemStoreMemory();
453 }
454 RowMutations rm = null;
455 for (ClientProtos.Action action: actions) {
456 if (action.hasGet()) {
457 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
458 action.getGet());
459 }
460 MutationType type = action.getMutation().getMutateType();
461 if (rm == null) {
462 rm = new RowMutations(action.getMutation().getRow().toByteArray());
463 }
464 switch (type) {
465 case PUT:
466 rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
467 break;
468 case DELETE:
469 rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
470 break;
471 default:
472 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
473 }
474 }
475 region.mutateRow(rm);
476 return ((HRegion)region).getRegionStats();
477 }
478
479
480
481
482
483
484
485
486
487
488
489
490
491 private boolean checkAndRowMutate(final Region region, final List<ClientProtos.Action> actions,
492 final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
493 CompareOp compareOp, ByteArrayComparable comparator,
494 ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
495 if (!region.getRegionInfo().isMetaTable()) {
496 regionServer.cacheFlusher.reclaimMemStoreMemory();
497 }
498 RowMutations rm = null;
499 for (ClientProtos.Action action: actions) {
500 if (action.hasGet()) {
501 throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
502 action.getGet());
503 }
504 MutationType type = action.getMutation().getMutateType();
505 if (rm == null) {
506 rm = new RowMutations(action.getMutation().getRow().toByteArray());
507 }
508 switch (type) {
509 case PUT:
510 Put put = ProtobufUtil.toPut(action.getMutation(), cellScanner);
511 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
512 rm.add(put);
513 break;
514 case DELETE:
515 Delete del = ProtobufUtil.toDelete(action.getMutation(), cellScanner);
516 spaceQuotaEnforcement.getPolicyEnforcement(region).check(del);
517 rm.add(del);
518 break;
519 default:
520 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
521 }
522 }
523 return region.checkAndRowMutate(row, family, qualifier, compareOp, comparator, rm, Boolean.TRUE);
524 }
525
526
527
528
529
530
531
532
533
534
535
536 private Result append(final Region region, final OperationQuota quota,
537 final MutationProto m, final CellScanner cellScanner, long nonceGroup,
538 ActivePolicyEnforcement spaceQuota)
539 throws IOException {
540 long before = EnvironmentEdgeManager.currentTime();
541 Append append = ProtobufUtil.toAppend(m, cellScanner);
542 spaceQuota.getPolicyEnforcement(region).check(append);
543 quota.addMutation(append);
544 Result r = null;
545 if (region.getCoprocessorHost() != null) {
546 r = region.getCoprocessorHost().preAppend(append);
547 }
548 if (r == null) {
549 long nonce = startNonceOperation(m, nonceGroup);
550 boolean success = false;
551 try {
552 r = region.append(append, nonceGroup, nonce);
553 success = true;
554 } finally {
555 endNonceOperation(m, nonceGroup, success);
556 }
557 if (region.getCoprocessorHost() != null) {
558 region.getCoprocessorHost().postAppend(append, r);
559 }
560 }
561 if (regionServer.metricsRegionServer != null) {
562 regionServer.metricsRegionServer.updateAppend(
563 region.getTableDesc().getTableName(),
564 EnvironmentEdgeManager.currentTime() - before);
565 }
566 return r;
567 }
568
569
570
571
572
573
574
575
576
577 private Result increment(final Region region, final OperationQuota quota,
578 final MutationProto mutation, final CellScanner cells, long nonceGroup,
579 ActivePolicyEnforcement spaceQuota) throws IOException {
580 long before = EnvironmentEdgeManager.currentTime();
581 Increment increment = ProtobufUtil.toIncrement(mutation, cells);
582 spaceQuota.getPolicyEnforcement(region).check(increment);
583 quota.addMutation(increment);
584 Result r = null;
585 if (region.getCoprocessorHost() != null) {
586 r = region.getCoprocessorHost().preIncrement(increment);
587 }
588 if (r == null) {
589 long nonce = startNonceOperation(mutation, nonceGroup);
590 boolean success = false;
591 try {
592 r = region.increment(increment, nonceGroup, nonce);
593 success = true;
594 } finally {
595 endNonceOperation(mutation, nonceGroup, success);
596 }
597 if (region.getCoprocessorHost() != null) {
598 r = region.getCoprocessorHost().postIncrement(increment, r);
599 }
600 }
601 if (regionServer.metricsRegionServer != null) {
602 regionServer.metricsRegionServer.updateIncrement(
603 region.getTableDesc().getTableName(),
604 EnvironmentEdgeManager.currentTime() - before);
605 }
606 return r;
607 }
608
609
610
611
612
613
614
615
616
617
618
619
620 private List<CellScannable> doNonAtomicRegionMutation(final Region region,
621 final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
622 final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup,
623 ActivePolicyEnforcement spaceQuotaEnforcement) {
624
625
626
627
628 List<ClientProtos.Action> mutations = null;
629 ClientProtos.ResultOrException.Builder resultOrExceptionBuilder = ResultOrException.newBuilder();
630 boolean hasResultOrException = false;
631
632 for (ClientProtos.Action action: actions.getActionList()) {
633 hasResultOrException = false;
634 resultOrExceptionBuilder.clear();
635 try {
636 Result r = null;
637 if (action.hasGet()) {
638 long before = EnvironmentEdgeManager.currentTime();
639 try {
640 Get get = ProtobufUtil.toGet(action.getGet());
641 r = region.get(get);
642 } finally {
643 if (regionServer.metricsRegionServer != null) {
644 regionServer.metricsRegionServer.updateGet(
645 region.getTableDesc().getTableName(),
646 EnvironmentEdgeManager.currentTime() - before);
647 }
648 }
649 } else if (action.hasServiceCall()) {
650 hasResultOrException = true;
651 try {
652 Message result = execServiceOnRegion(region, action.getServiceCall());
653 ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
654 ClientProtos.CoprocessorServiceResult.newBuilder();
655 resultOrExceptionBuilder.setServiceResult(
656 serviceResultBuilder.setValue(
657 serviceResultBuilder.getValueBuilder()
658 .setName(result.getClass().getName())
659 .setValue(result.toByteString())));
660 } catch (IOException ioe) {
661 rpcServer.getMetrics().exception(ioe);
662 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ioe));
663 }
664 } else if (action.hasMutation()) {
665 MutationType type = action.getMutation().getMutateType();
666 if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
667 !mutations.isEmpty()) {
668
669 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
670 mutations.clear();
671 }
672 switch (type) {
673 case APPEND:
674 r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
675 spaceQuotaEnforcement);
676 break;
677 case INCREMENT:
678 r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
679 spaceQuotaEnforcement);
680 break;
681 case PUT:
682 case DELETE:
683
684 if (mutations == null) {
685 mutations = new ArrayList<ClientProtos.Action>(actions.getActionCount());
686 }
687 mutations.add(action);
688 break;
689 default:
690 throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
691 }
692 } else {
693 throw new HBaseIOException("Unexpected Action type");
694 }
695 if (r != null) {
696 ClientProtos.Result pbResult = null;
697 if (isClientCellBlockSupport()) {
698 pbResult = ProtobufUtil.toResultNoData(r);
699
700 if (cellsToReturn == null) cellsToReturn = new ArrayList<CellScannable>();
701 cellsToReturn.add(r);
702 } else {
703 pbResult = ProtobufUtil.toResult(r);
704 }
705 hasResultOrException = true;
706 resultOrExceptionBuilder.setResult(pbResult);
707 }
708
709
710
711
712 } catch (IOException ie) {
713 rpcServer.getMetrics().exception(ie);
714 hasResultOrException = true;
715 resultOrExceptionBuilder.setException(ResponseConverter.buildException(ie));
716 }
717 if (hasResultOrException) {
718
719 resultOrExceptionBuilder.setIndex(action.getIndex());
720 builder.addResultOrException(resultOrExceptionBuilder.build());
721 }
722 }
723
724 if (mutations != null && !mutations.isEmpty()) {
725 doBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
726 }
727 return cellsToReturn;
728 }
729
730
731
732
733
734
735
736
737 private void doBatchOp(final RegionActionResult.Builder builder, final Region region,
738 final OperationQuota quota, final List<ClientProtos.Action> mutations,
739 final CellScanner cells, ActivePolicyEnforcement spaceQuotaEnforcement) {
740 Mutation[] mArray = new Mutation[mutations.size()];
741 long before = EnvironmentEdgeManager.currentTime();
742 boolean batchContainsPuts = false, batchContainsDelete = false;
743 try {
744 int i = 0;
745 for (ClientProtos.Action action: mutations) {
746 MutationProto m = action.getMutation();
747 Mutation mutation;
748 if (m.getMutateType() == MutationType.PUT) {
749 mutation = ProtobufUtil.toPut(m, cells);
750 batchContainsPuts = true;
751 } else {
752 mutation = ProtobufUtil.toDelete(m, cells);
753 batchContainsDelete = true;
754 }
755 mArray[i++] = mutation;
756
757 spaceQuotaEnforcement.getPolicyEnforcement(region).check(mutation);
758 quota.addMutation(mutation);
759 }
760
761 if (!region.getRegionInfo().isMetaTable()) {
762 regionServer.cacheFlusher.reclaimMemStoreMemory();
763 }
764
765 OperationStatus codes[] = region.batchMutate(mArray, HConstants.NO_NONCE,
766 HConstants.NO_NONCE);
767 for (i = 0; i < codes.length; i++) {
768 int index = mutations.get(i).getIndex();
769 Exception e = null;
770 switch (codes[i].getOperationStatusCode()) {
771 case BAD_FAMILY:
772 e = new NoSuchColumnFamilyException(codes[i].getExceptionMsg());
773 builder.addResultOrException(getResultOrException(e, index));
774 break;
775
776 case SANITY_CHECK_FAILURE:
777 e = new FailedSanityCheckException(codes[i].getExceptionMsg());
778 builder.addResultOrException(getResultOrException(e, index));
779 break;
780
781 default:
782 e = new DoNotRetryIOException(codes[i].getExceptionMsg());
783 builder.addResultOrException(getResultOrException(e, index));
784 break;
785
786 case SUCCESS:
787 builder.addResultOrException(getResultOrException(
788 ClientProtos.Result.getDefaultInstance(), index,
789 ((HRegion)region).getRegionStats()));
790 break;
791 }
792 }
793 } catch (IOException ie) {
794 for (int i = 0; i < mutations.size(); i++) {
795 builder.addResultOrException(getResultOrException(ie, mutations.get(i).getIndex()));
796 }
797 }
798 if (regionServer.metricsRegionServer != null) {
799 long after = EnvironmentEdgeManager.currentTime();
800 if (batchContainsPuts) {
801 regionServer.metricsRegionServer.updatePut(
802 region.getTableDesc().getTableName(), after - before);
803 }
804 if (batchContainsDelete) {
805 regionServer.metricsRegionServer.updateDelete(
806 region.getTableDesc().getTableName(), after - before);
807 }
808 }
809 }
810
811
812
813
814
815
816
817
818
819
820
821 private OperationStatus [] doReplayBatchOp(final Region region,
822 final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
823 long before = EnvironmentEdgeManager.currentTime();
824 boolean batchContainsPuts = false, batchContainsDelete = false;
825 try {
826 for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
827 WALSplitter.MutationReplay m = it.next();
828
829 if (m.type == MutationType.PUT) {
830 batchContainsPuts = true;
831 } else {
832 batchContainsDelete = true;
833 }
834
835 NavigableMap<byte[], List<Cell>> map = m.mutation.getFamilyCellMap();
836 List<Cell> metaCells = map.get(WALEdit.METAFAMILY);
837 if (metaCells != null && !metaCells.isEmpty()) {
838 for (Cell metaCell : metaCells) {
839 CompactionDescriptor compactionDesc = WALEdit.getCompaction(metaCell);
840 boolean isDefaultReplica = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
841 HRegion hRegion = (HRegion)region;
842 if (compactionDesc != null) {
843
844
845 hRegion.replayWALCompactionMarker(compactionDesc, !isDefaultReplica, isDefaultReplica,
846 replaySeqId);
847 continue;
848 }
849 FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(metaCell);
850 if (flushDesc != null && !isDefaultReplica) {
851 hRegion.replayWALFlushMarker(flushDesc, replaySeqId);
852 continue;
853 }
854 RegionEventDescriptor regionEvent = WALEdit.getRegionEventDescriptor(metaCell);
855 if (regionEvent != null && !isDefaultReplica) {
856 hRegion.replayWALRegionEventMarker(regionEvent);
857 continue;
858 }
859 BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell);
860 if (bulkLoadEvent != null) {
861 hRegion.replayWALBulkLoadEventMarker(bulkLoadEvent);
862 continue;
863 }
864 }
865 it.remove();
866 }
867 }
868 requestCount.add(mutations.size());
869 if (!region.getRegionInfo().isMetaTable()) {
870 regionServer.cacheFlusher.reclaimMemStoreMemory();
871 }
872 return region.batchReplay(mutations.toArray(
873 new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
874 } finally {
875 if (regionServer.metricsRegionServer != null) {
876 long after = EnvironmentEdgeManager.currentTime();
877 if (batchContainsPuts) {
878 regionServer.metricsRegionServer.updatePut(
879 region.getTableDesc().getTableName(), after - before);
880 }
881 if (batchContainsDelete) {
882 regionServer.metricsRegionServer.updateDelete(
883 region.getTableDesc().getTableName(), after - before);
884 }
885 }
886 }
887 }
888
889 private void closeAllScanners() {
890
891
892 for (Map.Entry<String, RegionScannerHolder> e : scanners.entrySet()) {
893 try {
894 e.getValue().s.close();
895 } catch (IOException ioe) {
896 LOG.warn("Closing scanner " + e.getKey(), ioe);
897 }
898 }
899 }
900
901 public RSRpcServices(HRegionServer rs) throws IOException {
902 regionServer = rs;
903
904 RpcSchedulerFactory rpcSchedulerFactory;
905 try {
906 Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
907 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
908 SimpleRpcSchedulerFactory.class);
909 rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
910 } catch (InstantiationException e) {
911 throw new IllegalArgumentException(e);
912 } catch (IllegalAccessException e) {
913 throw new IllegalArgumentException(e);
914 }
915
916 InetSocketAddress initialIsa;
917 InetSocketAddress bindAddress;
918 if(this instanceof MasterRpcServices) {
919 String hostname = getHostname(rs.conf, true);
920 int port = rs.conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
921
922 initialIsa = new InetSocketAddress(hostname, port);
923 bindAddress = new InetSocketAddress(rs.conf.get("hbase.master.ipc.address", hostname), port);
924 } else {
925 String hostname = getHostname(rs.conf, false);
926 int port = rs.conf.getInt(HConstants.REGIONSERVER_PORT,
927 HConstants.DEFAULT_REGIONSERVER_PORT);
928
929 initialIsa = new InetSocketAddress(hostname, port);
930 bindAddress = new InetSocketAddress(
931 rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
932 }
933 if (initialIsa.getAddress() == null) {
934 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
935 }
936 priority = new AnnotationReadingPriorityFunction(this);
937 String name = rs.getProcessName() + "/" + initialIsa.toString();
938
939 ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
940 rpcServer = new RpcServer(rs, name, getServices(),
941 bindAddress,
942 rs.conf,
943 rpcSchedulerFactory.create(rs.conf, this, rs));
944
945 scannerLeaseTimeoutPeriod = rs.conf.getInt(
946 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
947 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
948 maxScannerResultSize = rs.conf.getLong(
949 HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY,
950 HConstants.DEFAULT_HBASE_SERVER_SCANNER_MAX_RESULT_SIZE);
951 rpcTimeout = rs.conf.getInt(
952 HConstants.HBASE_RPC_TIMEOUT_KEY,
953 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
954 minimumScanTimeLimitDelta = rs.conf.getLong(
955 REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
956 DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA);
957
958 InetSocketAddress address = rpcServer.getListenerAddress();
959 if (address == null) {
960 throw new IOException("Listener channel is closed");
961 }
962
963 isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort());
964 rpcServer.setErrorHandler(this);
965 rs.setName(name);
966 }
967
968 public static String getHostname(Configuration conf, boolean isMaster)
969 throws UnknownHostException {
970 String hostname = conf.get(isMaster? HRegionServer.MASTER_HOSTNAME_KEY :
971 HRegionServer.RS_HOSTNAME_KEY);
972 if (hostname == null || hostname.isEmpty()) {
973 String masterOrRS = isMaster ? "master" : "regionserver";
974 return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
975 conf.get("hbase." + masterOrRS + ".dns.interface", "default"),
976 conf.get("hbase." + masterOrRS + ".dns.nameserver", "default")));
977 } else {
978 LOG.info("hostname is configured to be " + hostname);
979 return hostname;
980 }
981 }
982
983 RegionScanner getScanner(long scannerId) {
984 String scannerIdString = Long.toString(scannerId);
985 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
986 if (scannerHolder != null) {
987 return scannerHolder.s;
988 }
989 return null;
990 }
991
992
993
994
995
996 long getScannerVirtualTime(long scannerId) {
997 String scannerIdString = Long.toString(scannerId);
998 RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
999 if (scannerHolder != null) {
1000 return scannerHolder.getNextCallSeq();
1001 }
1002 return 0L;
1003 }
1004
1005 long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
1006 long scannerId = this.scannerIdGen.incrementAndGet();
1007 String scannerName = String.valueOf(scannerId);
1008
1009 RegionScannerHolder existing =
1010 scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
1011 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
1012
1013 regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
1014 new ScannerListener(scannerName));
1015 return scannerId;
1016 }
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026 Region getRegion(
1027 final RegionSpecifier regionSpecifier) throws IOException {
1028 ByteString value = regionSpecifier.getValue();
1029 RegionSpecifierType type = regionSpecifier.getType();
1030 switch (type) {
1031 case REGION_NAME:
1032 byte[] regionName = value.toByteArray();
1033 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
1034 return regionServer.getRegionByEncodedName(regionName, encodedRegionName);
1035 case ENCODED_REGION_NAME:
1036 return regionServer.getRegionByEncodedName(value.toStringUtf8());
1037 default:
1038 throw new DoNotRetryIOException(
1039 "Unsupported region specifier type: " + type);
1040 }
1041 }
1042
1043 @VisibleForTesting
1044 public PriorityFunction getPriority() {
1045 return priority;
1046 }
1047
1048 Configuration getConfiguration() {
1049 return regionServer.getConfiguration();
1050 }
1051
1052 private RegionServerRpcQuotaManager getRpcQuotaManager() {
1053 return regionServer.getRegionServerRpcQuotaManager();
1054 }
1055
1056 private RegionServerSpaceQuotaManager getSpaceQuotaManager() {
1057 return regionServer.getRegionServerSpaceQuotaManager();
1058 }
1059
1060 void start() {
1061 rpcServer.start();
1062 }
1063
1064 void stop() {
1065 closeAllScanners();
1066 rpcServer.stop();
1067 }
1068
1069
1070
1071
1072
1073
1074 protected void checkOpen() throws IOException {
1075 if (regionServer.isAborted()) {
1076 throw new RegionServerAbortedException("Server " + regionServer.serverName + " aborting");
1077 }
1078 if (regionServer.isStopped()) {
1079 throw new RegionServerStoppedException("Server " + regionServer.serverName + " stopping");
1080 }
1081 if (!regionServer.fsOk) {
1082 throw new RegionServerStoppedException("File system not available");
1083 }
1084 if (!regionServer.isOnline()) {
1085 throw new ServerNotRunningYetException("Server " + regionServer.serverName
1086 + " is not running yet");
1087 }
1088 }
1089
1090
1091
1092
1093 protected List<BlockingServiceAndInterface> getServices() {
1094 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(2);
1095 bssi.add(new BlockingServiceAndInterface(
1096 ClientService.newReflectiveBlockingService(this),
1097 ClientService.BlockingInterface.class));
1098 bssi.add(new BlockingServiceAndInterface(
1099 AdminService.newReflectiveBlockingService(this),
1100 AdminService.BlockingInterface.class));
1101 return bssi;
1102 }
1103
1104 public InetSocketAddress getSocketAddress() {
1105 return isa;
1106 }
1107
1108 @Override
1109 public int getPriority(RequestHeader header, Message param) {
1110 return priority.getPriority(header, param);
1111 }
1112
1113 @Override
1114 public long getDeadline(RequestHeader header, Message param) {
1115 return priority.getDeadline(header, param);
1116 }
1117
1118
1119
1120
1121
1122
1123
1124
1125 @Override
1126 public boolean checkOOME(final Throwable e) {
1127 boolean stop = false;
1128 try {
1129 if (e instanceof OutOfMemoryError
1130 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1131 || (e.getMessage() != null && e.getMessage().contains(
1132 "java.lang.OutOfMemoryError"))) {
1133 stop = true;
1134 LOG.fatal("Run out of memory; " + getClass().getSimpleName()
1135 + " will abort itself immediately", e);
1136 }
1137 } finally {
1138 if (stop) {
1139 Runtime.getRuntime().halt(1);
1140 }
1141 }
1142 return stop;
1143 }
1144
1145
1146
1147
1148
1149
1150
1151
1152 @Override
1153 @QosPriority(priority=HConstants.ADMIN_QOS)
1154 public CloseRegionResponse closeRegion(final RpcController controller,
1155 final CloseRegionRequest request) throws ServiceException {
1156 final ServerName sn = (request.hasDestinationServer() ?
1157 ProtobufUtil.toServerName(request.getDestinationServer()) : null);
1158
1159 try {
1160 checkOpen();
1161 if (request.hasServerStartCode()) {
1162
1163 long serverStartCode = request.getServerStartCode();
1164 if (regionServer.serverName.getStartcode() != serverStartCode) {
1165 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1166 "different server with startCode: " + serverStartCode + ", this server is: "
1167 + regionServer.serverName));
1168 }
1169 }
1170 final String encodedRegionName = ProtobufUtil.getRegionEncodedName(request.getRegion());
1171
1172
1173 final Region region = regionServer.getFromOnlineRegions(encodedRegionName);
1174 if ((region != null) && (region .getCoprocessorHost() != null)) {
1175 region.getCoprocessorHost().preClose(false);
1176 }
1177
1178 requestCount.increment();
1179 LOG.info("Close " + encodedRegionName + ", moving to " + sn);
1180 CloseRegionCoordination.CloseRegionDetails crd = regionServer.getCoordinatedStateManager()
1181 .getCloseRegionCoordination().parseFromProtoRequest(request);
1182
1183 boolean closed = regionServer.closeRegion(encodedRegionName, false, crd, sn);
1184 CloseRegionResponse.Builder builder = CloseRegionResponse.newBuilder().setClosed(closed);
1185 return builder.build();
1186 } catch (IOException ie) {
1187 throw new ServiceException(ie);
1188 }
1189 }
1190
1191
1192
1193
1194
1195
1196
1197
1198 @Override
1199 @QosPriority(priority=HConstants.ADMIN_QOS)
1200 public CompactRegionResponse compactRegion(final RpcController controller,
1201 final CompactRegionRequest request) throws ServiceException {
1202 try {
1203 checkOpen();
1204 requestCount.increment();
1205 Region region = getRegion(request.getRegion());
1206
1207
1208 if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
1209 !Superusers.isSuperUser(RpcServer.getRequestUser()) &&
1210 this.regionServer.getRegionServerSpaceQuotaManager().areCompactionsDisabled(
1211 region.getTableDesc().getTableName())) {
1212 throw new DoNotRetryIOException("Compactions on this region are "
1213 + "disabled due to a space quota violation.");
1214 }
1215 region.startRegionOperation(Operation.COMPACT_REGION);
1216 LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
1217 boolean major = false;
1218 byte [] family = null;
1219 Store store = null;
1220 if (request.hasFamily()) {
1221 family = request.getFamily().toByteArray();
1222 store = region.getStore(family);
1223 if (store == null) {
1224 throw new ServiceException(new IOException("column family " + Bytes.toString(family)
1225 + " does not exist in region " + region.getRegionInfo().getRegionNameAsString()));
1226 }
1227 }
1228 if (request.hasMajor()) {
1229 major = request.getMajor();
1230 }
1231 if (major) {
1232 if (family != null) {
1233 store.triggerMajorCompaction();
1234 } else {
1235 region.triggerMajorCompaction();
1236 }
1237 }
1238
1239 String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):"";
1240 if (LOG.isTraceEnabled()) {
1241 LOG.trace("User-triggered compaction requested for region "
1242 + region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
1243 }
1244 String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
1245 if(family != null) {
1246 regionServer.compactSplitThread.requestCompaction(region, store, log,
1247 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1248 } else {
1249 regionServer.compactSplitThread.requestCompaction(region, log,
1250 Store.PRIORITY_USER, null, RpcServer.getRequestUser());
1251 }
1252 return CompactRegionResponse.newBuilder().build();
1253 } catch (IOException ie) {
1254 throw new ServiceException(ie);
1255 }
1256 }
1257
1258
1259
1260
1261
1262
1263
1264
1265 @Override
1266 @QosPriority(priority=HConstants.ADMIN_QOS)
1267 public FlushRegionResponse flushRegion(final RpcController controller,
1268 final FlushRegionRequest request) throws ServiceException {
1269 try {
1270 checkOpen();
1271 requestCount.increment();
1272 Region region = getRegion(request.getRegion());
1273 LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
1274 boolean shouldFlush = true;
1275 if (request.hasIfOlderThanTs()) {
1276 shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
1277 }
1278 FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
1279 if (shouldFlush) {
1280 boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
1281 request.getWriteFlushWalMarker() : false;
1282
1283 HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
1284 ((HRegion)region).flushcache(true, writeFlushWalMarker);
1285 boolean compactionNeeded = flushResult.isCompactionNeeded();
1286 if (compactionNeeded) {
1287 regionServer.compactSplitThread.requestSystemCompaction(region,
1288 "Compaction through user triggered flush");
1289 }
1290 builder.setFlushed(flushResult.isFlushSucceeded());
1291 builder.setWroteFlushWalMarker(flushResult.wroteFlushWalMarker);
1292 }
1293 builder.setLastFlushTime(region.getEarliestFlushTimeForAllStores());
1294 return builder.build();
1295 } catch (DroppedSnapshotException ex) {
1296
1297
1298
1299
1300 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1301 throw new ServiceException(ex);
1302 } catch (IOException ie) {
1303 throw new ServiceException(ie);
1304 }
1305 }
1306
1307 @Override
1308 @QosPriority(priority=HConstants.ADMIN_QOS)
1309 public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
1310 final GetOnlineRegionRequest request) throws ServiceException {
1311 try {
1312 checkOpen();
1313 requestCount.increment();
1314 Map<String, Region> onlineRegions = regionServer.onlineRegions;
1315 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
1316 for (Region region: onlineRegions.values()) {
1317 list.add(region.getRegionInfo());
1318 }
1319 Collections.sort(list);
1320 return ResponseConverter.buildGetOnlineRegionResponse(list);
1321 } catch (IOException ie) {
1322 throw new ServiceException(ie);
1323 }
1324 }
1325
1326 @Override
1327 @QosPriority(priority=HConstants.ADMIN_QOS)
1328 public GetRegionInfoResponse getRegionInfo(final RpcController controller,
1329 final GetRegionInfoRequest request) throws ServiceException {
1330 try {
1331 checkOpen();
1332 requestCount.increment();
1333 Region region = getRegion(request.getRegion());
1334 HRegionInfo info = region.getRegionInfo();
1335 GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
1336 builder.setRegionInfo(HRegionInfo.convert(info));
1337 if (request.hasCompactionState() && request.getCompactionState()) {
1338 builder.setCompactionState(region.getCompactionState());
1339 }
1340 builder.setIsRecovering(region.isRecovering());
1341 return builder.build();
1342 } catch (IOException ie) {
1343 throw new ServiceException(ie);
1344 }
1345 }
1346
1347
1348
1349
1350
1351
1352
1353
1354 @Override
1355 @QosPriority(priority=HConstants.ADMIN_QOS)
1356 public GetServerInfoResponse getServerInfo(final RpcController controller,
1357 final GetServerInfoRequest request) throws ServiceException {
1358 try {
1359 checkOpen();
1360 } catch (IOException ie) {
1361 throw new ServiceException(ie);
1362 }
1363 requestCount.increment();
1364 int infoPort = regionServer.infoServer != null ? regionServer.infoServer.getPort() : -1;
1365 return ResponseConverter.buildGetServerInfoResponse(regionServer.serverName, infoPort);
1366 }
1367
1368 @Override
1369 @QosPriority(priority=HConstants.ADMIN_QOS)
1370 public GetStoreFileResponse getStoreFile(final RpcController controller,
1371 final GetStoreFileRequest request) throws ServiceException {
1372 try {
1373 checkOpen();
1374 Region region = getRegion(request.getRegion());
1375 requestCount.increment();
1376 Set<byte[]> columnFamilies;
1377 if (request.getFamilyCount() == 0) {
1378 columnFamilies = region.getTableDesc().getFamiliesKeys();
1379 } else {
1380 columnFamilies = new TreeSet<byte[]>(Bytes.BYTES_RAWCOMPARATOR);
1381 for (ByteString cf: request.getFamilyList()) {
1382 columnFamilies.add(cf.toByteArray());
1383 }
1384 }
1385 int nCF = columnFamilies.size();
1386 List<String> fileList = region.getStoreFileList(
1387 columnFamilies.toArray(new byte[nCF][]));
1388 GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
1389 builder.addAllStoreFile(fileList);
1390 return builder.build();
1391 } catch (IOException ie) {
1392 throw new ServiceException(ie);
1393 }
1394 }
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404 @Override
1405 @QosPriority(priority = HConstants.ADMIN_QOS)
1406 public MergeRegionsResponse mergeRegions(final RpcController controller,
1407 final MergeRegionsRequest request) throws ServiceException {
1408 try {
1409 checkOpen();
1410 requestCount.increment();
1411 Region regionA = getRegion(request.getRegionA());
1412 Region regionB = getRegion(request.getRegionB());
1413 boolean forcible = request.getForcible();
1414 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1415 regionA.startRegionOperation(Operation.MERGE_REGION);
1416 regionB.startRegionOperation(Operation.MERGE_REGION);
1417 if (regionA.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID ||
1418 regionB.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1419 throw new ServiceException(new MergeRegionException("Can't merge non-default replicas"));
1420 }
1421 LOG.info("Receiving merging request for " + regionA + ", " + regionB
1422 + ",forcible=" + forcible);
1423 regionA.flush(true);
1424 regionB.flush(true);
1425 regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
1426 masterSystemTime, RpcServer.getRequestUser());
1427 return MergeRegionsResponse.newBuilder().build();
1428 } catch (DroppedSnapshotException ex) {
1429 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1430 throw new ServiceException(ex);
1431 } catch (IOException ie) {
1432 throw new ServiceException(ie);
1433 }
1434 }
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459 @Override
1460 @QosPriority(priority=HConstants.ADMIN_QOS)
1461 public OpenRegionResponse openRegion(final RpcController controller,
1462 final OpenRegionRequest request) throws ServiceException {
1463 requestCount.increment();
1464 if (request.hasServerStartCode()) {
1465
1466 long serverStartCode = request.getServerStartCode();
1467 if (regionServer.serverName.getStartcode() != serverStartCode) {
1468 throw new ServiceException(new DoNotRetryIOException("This RPC was intended for a " +
1469 "different server with startCode: " + serverStartCode + ", this server is: "
1470 + regionServer.serverName));
1471 }
1472 }
1473
1474 OpenRegionResponse.Builder builder = OpenRegionResponse.newBuilder();
1475 final int regionCount = request.getOpenInfoCount();
1476 final Map<TableName, HTableDescriptor> htds =
1477 new HashMap<TableName, HTableDescriptor>(regionCount);
1478 final boolean isBulkAssign = regionCount > 1;
1479 try {
1480 checkOpen();
1481 } catch (IOException ie) {
1482 TableName tableName = null;
1483 if (regionCount == 1) {
1484 RegionInfo ri = request.getOpenInfo(0).getRegion();
1485 if (ri != null) {
1486 tableName = ProtobufUtil.toTableName(ri.getTableName());
1487 }
1488 }
1489 if (!TableName.META_TABLE_NAME.equals(tableName)) {
1490 throw new ServiceException(ie);
1491 }
1492
1493 int timeout = regionServer.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1494 HConstants.DEFAULT_HBASE_RPC_TIMEOUT) >> 2;
1495 long endTime = System.currentTimeMillis() + timeout;
1496 synchronized (regionServer.online) {
1497 try {
1498 while (System.currentTimeMillis() <= endTime
1499 && !regionServer.isStopped() && !regionServer.isOnline()) {
1500 regionServer.online.wait(regionServer.msgInterval);
1501 }
1502 checkOpen();
1503 } catch (InterruptedException t) {
1504 Thread.currentThread().interrupt();
1505 throw new ServiceException(t);
1506 } catch (IOException e) {
1507 throw new ServiceException(e);
1508 }
1509 }
1510 }
1511
1512 long masterSystemTime = request.hasMasterSystemTime() ? request.getMasterSystemTime() : -1;
1513
1514 for (RegionOpenInfo regionOpenInfo : request.getOpenInfoList()) {
1515 final HRegionInfo region = HRegionInfo.convert(regionOpenInfo.getRegion());
1516 OpenRegionCoordination coordination = regionServer.getCoordinatedStateManager().
1517 getOpenRegionCoordination();
1518 OpenRegionCoordination.OpenRegionDetails ord =
1519 coordination.parseFromProtoRequest(regionOpenInfo);
1520
1521 HTableDescriptor htd;
1522 try {
1523 final Region onlineRegion = regionServer.getFromOnlineRegions(region.getEncodedName());
1524 if (onlineRegion != null) {
1525
1526 if (onlineRegion.getCoprocessorHost() != null) {
1527 onlineRegion.getCoprocessorHost().preOpen();
1528 }
1529
1530
1531 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1532 regionServer.getConnection(), region.getRegionName());
1533 if (p != null && regionServer.serverName.equals(p.getSecond())) {
1534 Boolean closing = regionServer.regionsInTransitionInRS.get(region.getEncodedNameAsBytes());
1535
1536
1537
1538
1539
1540 if (!Boolean.FALSE.equals(closing)
1541 && regionServer.getFromOnlineRegions(region.getEncodedName()) != null) {
1542 LOG.warn("Attempted open of " + region.getEncodedName()
1543 + " but already online on this server");
1544 builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
1545 continue;
1546 }
1547 } else {
1548 LOG.warn("The region " + region.getEncodedName() + " is online on this server"
1549 + " but hbase:meta does not have this server - continue opening.");
1550 regionServer.removeFromOnlineRegions(onlineRegion, null);
1551 }
1552 }
1553 LOG.info("Open " + region.getRegionNameAsString());
1554 htd = htds.get(region.getTable());
1555 if (htd == null) {
1556 htd = regionServer.tableDescriptors.get(region.getTable());
1557 htds.put(region.getTable(), htd);
1558 }
1559
1560 final Boolean previous = regionServer.regionsInTransitionInRS.putIfAbsent(
1561 region.getEncodedNameAsBytes(), Boolean.TRUE);
1562
1563 if (Boolean.FALSE.equals(previous)) {
1564
1565
1566 coordination.tryTransitionFromOfflineToFailedOpen(regionServer, region, ord);
1567
1568 throw new RegionAlreadyInTransitionException("Received OPEN for the region:"
1569 + region.getRegionNameAsString() + " , which we are already trying to CLOSE ");
1570 }
1571
1572 if (Boolean.TRUE.equals(previous)) {
1573
1574 LOG.info("Receiving OPEN for the region:" +
1575 region.getRegionNameAsString() + " , which we are already trying to OPEN"
1576 + " - ignoring this new request for this region.");
1577 }
1578
1579
1580
1581 regionServer.removeFromMovedRegions(region.getEncodedName());
1582
1583 if (previous == null) {
1584
1585 if (ZKSplitLog.isRegionMarkedRecoveringInZK(regionServer.getZooKeeper(),
1586 region.getEncodedName())) {
1587
1588
1589 if (!regionOpenInfo.hasOpenForDistributedLogReplay()
1590 || regionOpenInfo.getOpenForDistributedLogReplay()) {
1591 regionServer.recoveringRegions.put(region.getEncodedName(), null);
1592 } else {
1593
1594
1595 List<String> tmpRegions = new ArrayList<String>();
1596 tmpRegions.add(region.getEncodedName());
1597 ZKSplitLog.deleteRecoveringRegionZNodes(regionServer.getZooKeeper(),
1598 tmpRegions);
1599 }
1600 }
1601 if (htd == null) {
1602 throw new IOException("Missing table descriptor for " + region.getEncodedName());
1603 }
1604
1605
1606 if (region.isMetaRegion()) {
1607 regionServer.service.submit(new OpenMetaHandler(
1608 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1609 } else {
1610 regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
1611 regionOpenInfo.getFavoredNodesList());
1612 if (htd.getPriority() >= HConstants.ADMIN_QOS || region.getTable().isSystemTable()) {
1613 regionServer.service.submit(new OpenPriorityRegionHandler(
1614 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1615 } else {
1616 regionServer.service.submit(new OpenRegionHandler(
1617 regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
1618 }
1619 }
1620 }
1621
1622 builder.addOpeningState(RegionOpeningState.OPENED);
1623
1624 } catch (KeeperException zooKeeperEx) {
1625 LOG.error("Can't retrieve recovering state from zookeeper", zooKeeperEx);
1626 throw new ServiceException(zooKeeperEx);
1627 } catch (IOException ie) {
1628 LOG.warn("Failed opening region " + region.getRegionNameAsString(), ie);
1629 if (isBulkAssign) {
1630 builder.addOpeningState(RegionOpeningState.FAILED_OPENING);
1631 } else {
1632 throw new ServiceException(ie);
1633 }
1634 }
1635 }
1636 return builder.build();
1637 }
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650 @Override
1651 public WarmupRegionResponse warmupRegion(final RpcController controller,
1652 final WarmupRegionRequest request) throws ServiceException {
1653
1654 RegionInfo regionInfo = request.getRegionInfo();
1655 final HRegionInfo region = HRegionInfo.convert(regionInfo);
1656 HTableDescriptor htd;
1657 WarmupRegionResponse response = WarmupRegionResponse.getDefaultInstance();
1658
1659 try {
1660 checkOpen();
1661 String encodedName = region.getEncodedName();
1662 byte[] encodedNameBytes = region.getEncodedNameAsBytes();
1663 final Region onlineRegion = regionServer.getFromOnlineRegions(encodedName);
1664
1665 if (onlineRegion != null) {
1666 LOG.info("Region already online. Skipping warming up " + region);
1667 return response;
1668 }
1669
1670 if (LOG.isDebugEnabled()) {
1671 LOG.debug("Warming up Region " + region.getRegionNameAsString());
1672 }
1673
1674 htd = regionServer.tableDescriptors.get(region.getTable());
1675
1676 if (regionServer.getRegionsInTransitionInRS().containsKey(encodedNameBytes)) {
1677 LOG.info("Region is in transition. Skipping warmup " + region);
1678 return response;
1679 }
1680
1681 HRegion.warmupHRegion(region, htd, regionServer.getWAL(region),
1682 regionServer.getConfiguration(), regionServer, null);
1683
1684 } catch (IOException ie) {
1685 LOG.error("Failed warming up region " + region.getRegionNameAsString(), ie);
1686 throw new ServiceException(ie);
1687 }
1688
1689 return response;
1690 }
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700 @Override
1701 @QosPriority(priority = HConstants.REPLAY_QOS)
1702 public ReplicateWALEntryResponse replay(final RpcController controller,
1703 final ReplicateWALEntryRequest request) throws ServiceException {
1704 long before = EnvironmentEdgeManager.currentTime();
1705 CellScanner cells = ((PayloadCarryingRpcController) controller).cellScanner();
1706 try {
1707 checkOpen();
1708 List<WALEntry> entries = request.getEntryList();
1709 if (entries == null || entries.isEmpty()) {
1710
1711 return ReplicateWALEntryResponse.newBuilder().build();
1712 }
1713 ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
1714 Region region = regionServer.getRegionByEncodedName(regionName.toStringUtf8());
1715 RegionCoprocessorHost coprocessorHost =
1716 ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())
1717 ? region.getCoprocessorHost()
1718 : null;
1719 List<Pair<WALKey, WALEdit>> walEntries = new ArrayList<Pair<WALKey, WALEdit>>();
1720
1721
1722 boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo());
1723 Durability durability = isPrimary ? Durability.USE_DEFAULT : Durability.SKIP_WAL;
1724
1725 for (WALEntry entry : entries) {
1726 if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
1727 throw new NotServingRegionException("Replay request contains entries from multiple " +
1728 "regions. First region:" + regionName.toStringUtf8() + " , other region:"
1729 + entry.getKey().getEncodedRegionName());
1730 }
1731 if (regionServer.nonceManager != null && isPrimary) {
1732 long nonceGroup = entry.getKey().hasNonceGroup()
1733 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
1734 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
1735 regionServer.nonceManager.reportOperationFromWal(nonceGroup, nonce, entry.getKey().getWriteTime());
1736 }
1737 Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
1738 new Pair<WALKey, WALEdit>();
1739 List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
1740 cells, walEntry, durability);
1741 if (coprocessorHost != null) {
1742
1743
1744 if (coprocessorHost.preWALRestore(region.getRegionInfo(), walEntry.getFirst(),
1745 walEntry.getSecond())) {
1746
1747 continue;
1748 }
1749 walEntries.add(walEntry);
1750 }
1751 if(edits!=null && !edits.isEmpty()) {
1752 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
1753 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
1754 OperationStatus[] result = doReplayBatchOp(region, edits, replaySeqId);
1755
1756 for (int i = 0; result != null && i < result.length; i++) {
1757 if (result[i] != OperationStatus.SUCCESS) {
1758 throw new IOException(result[i].getExceptionMsg());
1759 }
1760 }
1761 }
1762 }
1763
1764
1765 WAL wal = getWAL(region);
1766 if (wal != null) {
1767 wal.sync();
1768 }
1769
1770 if (coprocessorHost != null) {
1771 for (Pair<WALKey, WALEdit> entry : walEntries) {
1772 coprocessorHost.postWALRestore(region.getRegionInfo(), entry.getFirst(),
1773 entry.getSecond());
1774 }
1775 }
1776 return ReplicateWALEntryResponse.newBuilder().build();
1777 } catch (IOException ie) {
1778 throw new ServiceException(ie);
1779 } finally {
1780 if (regionServer.metricsRegionServer != null) {
1781 regionServer.metricsRegionServer.updateReplay(
1782 EnvironmentEdgeManager.currentTime() - before);
1783 }
1784 }
1785 }
1786
1787 WAL getWAL(Region region) {
1788 return ((HRegion)region).getWAL();
1789 }
1790
1791
1792
1793
1794
1795
1796
1797
1798 @Override
1799 @QosPriority(priority=HConstants.REPLICATION_QOS)
1800 public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
1801 final ReplicateWALEntryRequest request) throws ServiceException {
1802 try {
1803 checkOpen();
1804 if (regionServer.replicationSinkHandler != null) {
1805 requestCount.increment();
1806 List<WALEntry> entries = request.getEntryList();
1807 CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
1808 regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
1809 regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
1810 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
1811 request.getSourceHFileArchiveDirPath());
1812 regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
1813 return ReplicateWALEntryResponse.newBuilder().build();
1814 } else {
1815 throw new ServiceException("Replication services are not initialized yet");
1816 }
1817 } catch (IOException ie) {
1818 throw new ServiceException(ie);
1819 }
1820 }
1821
1822
1823
1824
1825
1826
1827
1828 @Override
1829 public RollWALWriterResponse rollWALWriter(final RpcController controller,
1830 final RollWALWriterRequest request) throws ServiceException {
1831 try {
1832 checkOpen();
1833 requestCount.increment();
1834 regionServer.getRegionServerCoprocessorHost().preRollWALWriterRequest();
1835 regionServer.walRoller.requestRollAll();
1836 regionServer.getRegionServerCoprocessorHost().postRollWALWriterRequest();
1837 RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
1838 return builder.build();
1839 } catch (IOException ie) {
1840 throw new ServiceException(ie);
1841 }
1842 }
1843
1844
1845
1846
1847
1848
1849
1850
1851 @Override
1852 @QosPriority(priority=HConstants.ADMIN_QOS)
1853 public SplitRegionResponse splitRegion(final RpcController controller,
1854 final SplitRegionRequest request) throws ServiceException {
1855 try {
1856 checkOpen();
1857 requestCount.increment();
1858 Region region = getRegion(request.getRegion());
1859 region.startRegionOperation(Operation.SPLIT_REGION);
1860 if (region.getRegionInfo().getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
1861 throw new IOException("Can't split replicas directly. "
1862 + "Replicas are auto-split when their primary is split.");
1863 }
1864 LOG.info("Splitting " + region.getRegionInfo().getRegionNameAsString());
1865 region.flush(true);
1866 byte[] splitPoint = null;
1867 if (request.hasSplitPoint()) {
1868 splitPoint = request.getSplitPoint().toByteArray();
1869 }
1870 ((HRegion)region).forceSplit(splitPoint);
1871 regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
1872 RpcServer.getRequestUser());
1873 return SplitRegionResponse.newBuilder().build();
1874 } catch (DroppedSnapshotException ex) {
1875 regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
1876 throw new ServiceException(ex);
1877 } catch (IOException ie) {
1878 throw new ServiceException(ie);
1879 }
1880 }
1881
1882
1883
1884
1885
1886
1887
1888
1889 @Override
1890 @QosPriority(priority=HConstants.ADMIN_QOS)
1891 public StopServerResponse stopServer(final RpcController controller,
1892 final StopServerRequest request) throws ServiceException {
1893 requestCount.increment();
1894 String reason = request.getReason();
1895 regionServer.stop(reason);
1896 return StopServerResponse.newBuilder().build();
1897 }
1898
1899 @Override
1900 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
1901 UpdateFavoredNodesRequest request) throws ServiceException {
1902 List<UpdateFavoredNodesRequest.RegionUpdateInfo> openInfoList = request.getUpdateInfoList();
1903 UpdateFavoredNodesResponse.Builder respBuilder = UpdateFavoredNodesResponse.newBuilder();
1904 for (UpdateFavoredNodesRequest.RegionUpdateInfo regionUpdateInfo : openInfoList) {
1905 HRegionInfo hri = HRegionInfo.convert(regionUpdateInfo.getRegion());
1906 regionServer.updateRegionFavoredNodesMapping(hri.getEncodedName(),
1907 regionUpdateInfo.getFavoredNodesList());
1908 }
1909 respBuilder.setResponse(openInfoList.size());
1910 return respBuilder.build();
1911 }
1912
1913
1914
1915
1916
1917
1918 @Override
1919 public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
1920 final BulkLoadHFileRequest request) throws ServiceException {
1921 try {
1922 checkOpen();
1923 requestCount.increment();
1924 Region region = getRegion(request.getRegion());
1925 List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
1926 for (FamilyPath familyPath: request.getFamilyPathList()) {
1927 familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(),
1928 familyPath.getPath()));
1929 }
1930 boolean bypass = false;
1931 if (region.getCoprocessorHost() != null) {
1932 bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
1933 }
1934 boolean loaded = false;
1935
1936
1937 if (QuotaUtil.isQuotaEnabled(getConfiguration())) {
1938 ActivePolicyEnforcement activeSpaceQuotas = getSpaceQuotaManager().getActiveEnforcements();
1939 SpaceViolationPolicyEnforcement enforcement = activeSpaceQuotas.getPolicyEnforcement(
1940 region);
1941 if (enforcement != null) {
1942
1943 List<String> filePaths = new ArrayList<>(request.getFamilyPathCount());
1944 for (FamilyPath familyPath : request.getFamilyPathList()) {
1945 filePaths.add(familyPath.getPath());
1946 }
1947
1948 enforcement.checkBulkLoad(regionServer.getFileSystem(), filePaths);
1949 }
1950 }
1951
1952 Map<byte[], List<Path>> map = null;
1953 if (!bypass) {
1954 map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
1955 request.getCopyFile());
1956 if (map != null) {
1957 loaded = true;
1958 }
1959 }
1960 if (region.getCoprocessorHost() != null) {
1961 loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map, loaded);
1962 }
1963 BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
1964 if (map != null) {
1965 loaded = true;
1966 }
1967 builder.setLoaded(loaded);
1968 return builder.build();
1969 } catch (IOException ie) {
1970 throw new ServiceException(ie);
1971 }
1972 }
1973
1974 @Override
1975 public CoprocessorServiceResponse execService(final RpcController controller,
1976 final CoprocessorServiceRequest request) throws ServiceException {
1977 try {
1978 checkOpen();
1979 requestCount.increment();
1980 Region region = getRegion(request.getRegion());
1981 Message result = execServiceOnRegion(region, request.getCall());
1982 CoprocessorServiceResponse.Builder builder =
1983 CoprocessorServiceResponse.newBuilder();
1984 builder.setRegion(RequestConverter.buildRegionSpecifier(
1985 RegionSpecifierType.REGION_NAME, region.getRegionInfo().getRegionName()));
1986 builder.setValue(
1987 builder.getValueBuilder().setName(result.getClass().getName())
1988 .setValue(result.toByteString()));
1989 return builder.build();
1990 } catch (IOException ie) {
1991 throw new ServiceException(ie);
1992 }
1993 }
1994
1995 private Message execServiceOnRegion(Region region,
1996 final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
1997
1998 ServerRpcController execController = new ServerRpcController();
1999 Message result = region.execService(execController, serviceCall);
2000 if (execController.getFailedOn() != null) {
2001 throw execController.getFailedOn();
2002 }
2003 return result;
2004 }
2005
2006
2007
2008
2009
2010
2011
2012
2013 @Override
2014 public GetResponse get(final RpcController controller,
2015 final GetRequest request) throws ServiceException {
2016 long before = EnvironmentEdgeManager.currentTime();
2017 OperationQuota quota = null;
2018 Region region = null;
2019 try {
2020 checkOpen();
2021 requestCount.increment();
2022 rpcGetRequestCount.increment();
2023 region = getRegion(request.getRegion());
2024
2025 GetResponse.Builder builder = GetResponse.newBuilder();
2026 ClientProtos.Get get = request.getGet();
2027 Boolean existence = null;
2028 Result r = null;
2029 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
2030
2031 if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
2032 if (get.getColumnCount() != 1) {
2033 throw new DoNotRetryIOException(
2034 "get ClosestRowBefore supports one and only one family now, not "
2035 + get.getColumnCount() + " families");
2036 }
2037 byte[] row = get.getRow().toByteArray();
2038 byte[] family = get.getColumn(0).getFamily().toByteArray();
2039 r = region.getClosestRowBefore(row, family);
2040 } else {
2041 Get clientGet = ProtobufUtil.toGet(get);
2042 if (get.getExistenceOnly() && region.getCoprocessorHost() != null) {
2043 existence = region.getCoprocessorHost().preExists(clientGet);
2044 }
2045 if (existence == null) {
2046 r = region.get(clientGet);
2047 if (get.getExistenceOnly()) {
2048 boolean exists = r.getExists();
2049 if (region.getCoprocessorHost() != null) {
2050 exists = region.getCoprocessorHost().postExists(clientGet, exists);
2051 }
2052 existence = exists;
2053 }
2054 }
2055 }
2056 if (existence != null){
2057 ClientProtos.Result pbr =
2058 ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
2059 builder.setResult(pbr);
2060 } else if (r != null) {
2061 ClientProtos.Result pbr = ProtobufUtil.toResult(r);
2062 builder.setResult(pbr);
2063 }
2064 if (r != null) {
2065 quota.addGetResult(r);
2066 }
2067 return builder.build();
2068 } catch (IOException ie) {
2069 throw new ServiceException(ie);
2070 } finally {
2071 if (regionServer.metricsRegionServer != null && region != null) {
2072 regionServer.metricsRegionServer.updateGet(
2073 region.getTableDesc().getTableName(), EnvironmentEdgeManager.currentTime() - before);
2074 }
2075 if (quota != null) {
2076 quota.close();
2077 }
2078 }
2079 }
2080
2081
2082
2083
2084
2085
2086
2087
2088 @Override
2089 public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
2090 throws ServiceException {
2091 try {
2092 checkOpen();
2093 } catch (IOException ie) {
2094 throw new ServiceException(ie);
2095 }
2096
2097
2098
2099 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2100 CellScanner cellScanner = controller != null ? controller.cellScanner(): null;
2101 if (controller != null) controller.setCellScanner(null);
2102
2103 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2104
2105
2106 List<CellScannable> cellsToReturn = null;
2107 MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
2108 RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
2109 Boolean processed = null;
2110
2111 RpcCallContext context = RpcServer.getCurrentCall();
2112 this.rpcMultiRequestCount.increment();
2113 Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats = new HashMap<>(request
2114 .getRegionActionCount());
2115 ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2116 for (RegionAction regionAction : request.getRegionActionList()) {
2117 this.requestCount.add(regionAction.getActionCount());
2118 OperationQuota quota;
2119 Region region;
2120 regionActionResultBuilder.clear();
2121 try {
2122 region = getRegion(regionAction.getRegion());
2123 quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList());
2124 } catch (IOException e) {
2125 rpcServer.getMetrics().exception(e);
2126 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2127 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2128 if (cellScanner != null) {
2129 skipCellsForMutations(regionAction.getActionList(), cellScanner);
2130 }
2131 continue;
2132 }
2133
2134 if (regionAction.hasAtomic() && regionAction.getAtomic()) {
2135
2136
2137 try {
2138 if (request.hasCondition()) {
2139 Condition condition = request.getCondition();
2140 byte[] row = condition.getRow().toByteArray();
2141 byte[] family = condition.getFamily().toByteArray();
2142 byte[] qualifier = condition.getQualifier().toByteArray();
2143 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2144 ByteArrayComparable comparator =
2145 ProtobufUtil.toComparator(condition.getComparator());
2146 processed = checkAndRowMutate(region, regionAction.getActionList(),
2147 cellScanner, row, family, qualifier, compareOp,
2148 comparator, spaceQuotaEnforcement);
2149 } else {
2150 ClientProtos.RegionLoadStats stats = mutateRows(region, regionAction.getActionList(),
2151 cellScanner);
2152
2153 if(stats != null) {
2154 responseBuilder.addRegionActionResult(RegionActionResult.newBuilder()
2155 .addResultOrException(ResultOrException.newBuilder().setLoadStats(stats)));
2156 }
2157 processed = Boolean.TRUE;
2158 }
2159 } catch (IOException e) {
2160 rpcServer.getMetrics().exception(e);
2161
2162 regionActionResultBuilder.setException(ResponseConverter.buildException(e));
2163 }
2164 } else {
2165
2166 cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
2167 regionActionResultBuilder, cellsToReturn, nonceGroup, spaceQuotaEnforcement);
2168 }
2169 responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
2170 quota.close();
2171 }
2172
2173 if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
2174 controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
2175 }
2176 if (processed != null) responseBuilder.setProcessed(processed);
2177 return responseBuilder.build();
2178 }
2179
2180 private void skipCellsForMutations(List<Action> actions, CellScanner cellScanner) {
2181 for (Action action : actions) {
2182 skipCellsForMutation(action, cellScanner);
2183 }
2184 }
2185
2186 private void skipCellsForMutation(Action action, CellScanner cellScanner) {
2187 try {
2188 if (action.hasMutation()) {
2189 MutationProto m = action.getMutation();
2190 if (m.hasAssociatedCellCount()) {
2191 for (int i = 0; i < m.getAssociatedCellCount(); i++) {
2192 cellScanner.advance();
2193 }
2194 }
2195 }
2196 } catch (IOException e) {
2197
2198
2199
2200 LOG.error("Error while skipping Cells in CellScanner for invalid Region Mutations", e);
2201 }
2202 }
2203
2204
2205
2206
2207
2208
2209
2210
2211 @Override
2212 public MutateResponse mutate(final RpcController rpcc,
2213 final MutateRequest request) throws ServiceException {
2214
2215
2216 PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
2217 CellScanner cellScanner = controller != null? controller.cellScanner(): null;
2218 OperationQuota quota = null;
2219 ActivePolicyEnforcement spaceQuotaEnforcement = null;
2220
2221 if (controller != null) controller.setCellScanner(null);
2222 try {
2223 checkOpen();
2224 requestCount.increment();
2225 rpcMutateRequestCount.increment();
2226 Region region = getRegion(request.getRegion());
2227 MutateResponse.Builder builder = MutateResponse.newBuilder();
2228 MutationProto mutation = request.getMutation();
2229 if (!region.getRegionInfo().isMetaTable()) {
2230 regionServer.cacheFlusher.reclaimMemStoreMemory();
2231 }
2232 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
2233 Result r = null;
2234 Boolean processed = null;
2235 MutationType type = mutation.getMutateType();
2236 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
2237 spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
2238
2239 switch (type) {
2240 case APPEND:
2241
2242 r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2243 break;
2244 case INCREMENT:
2245
2246 r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
2247 break;
2248 case PUT:
2249 Put put = ProtobufUtil.toPut(mutation, cellScanner);
2250
2251 spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
2252 quota.addMutation(put);
2253 if (request.hasCondition()) {
2254 Condition condition = request.getCondition();
2255 byte[] row = condition.getRow().toByteArray();
2256 byte[] family = condition.getFamily().toByteArray();
2257 byte[] qualifier = condition.getQualifier().toByteArray();
2258 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2259 ByteArrayComparable comparator =
2260 ProtobufUtil.toComparator(condition.getComparator());
2261 if (region.getCoprocessorHost() != null) {
2262 processed = region.getCoprocessorHost().preCheckAndPut(
2263 row, family, qualifier, compareOp, comparator, put);
2264 }
2265 if (processed == null) {
2266 boolean result = region.checkAndMutate(row, family,
2267 qualifier, compareOp, comparator, put, true);
2268 if (region.getCoprocessorHost() != null) {
2269 result = region.getCoprocessorHost().postCheckAndPut(row, family,
2270 qualifier, compareOp, comparator, put, result);
2271 }
2272 processed = result;
2273 }
2274 } else {
2275 region.put(put);
2276 processed = Boolean.TRUE;
2277 }
2278 break;
2279 case DELETE:
2280 Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
2281 spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
2282 quota.addMutation(delete);
2283 if (request.hasCondition()) {
2284 Condition condition = request.getCondition();
2285 byte[] row = condition.getRow().toByteArray();
2286 byte[] family = condition.getFamily().toByteArray();
2287 byte[] qualifier = condition.getQualifier().toByteArray();
2288 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
2289 ByteArrayComparable comparator =
2290 ProtobufUtil.toComparator(condition.getComparator());
2291 if (region.getCoprocessorHost() != null) {
2292 processed = region.getCoprocessorHost().preCheckAndDelete(
2293 row, family, qualifier, compareOp, comparator, delete);
2294 }
2295 if (processed == null) {
2296 boolean result = region.checkAndMutate(row, family,
2297 qualifier, compareOp, comparator, delete, true);
2298 if (region.getCoprocessorHost() != null) {
2299 result = region.getCoprocessorHost().postCheckAndDelete(row, family,
2300 qualifier, compareOp, comparator, delete, result);
2301 }
2302 processed = result;
2303 }
2304 } else {
2305 region.delete(delete);
2306 processed = Boolean.TRUE;
2307 }
2308 break;
2309 default:
2310 throw new DoNotRetryIOException(
2311 "Unsupported mutate type: " + type.name());
2312 }
2313 if (processed != null) builder.setProcessed(processed.booleanValue());
2314 addResult(builder, r, controller);
2315 return builder.build();
2316 } catch (IOException ie) {
2317 regionServer.checkFileSystem();
2318 throw new ServiceException(ie);
2319 } finally {
2320 if (quota != null) {
2321 quota.close();
2322 }
2323 }
2324 }
2325
2326
2327
2328
2329
2330
2331
2332
2333 @Override
2334 public ScanResponse scan(final RpcController controller, final ScanRequest request)
2335 throws ServiceException {
2336 OperationQuota quota = null;
2337 Leases.Lease lease = null;
2338 String scannerName = null;
2339 try {
2340 if (!request.hasScannerId() && !request.hasScan()) {
2341 throw new DoNotRetryIOException(
2342 "Missing required input: scannerId or scan");
2343 }
2344 long scannerId = -1;
2345 if (request.hasScannerId()) {
2346 scannerId = request.getScannerId();
2347 scannerName = String.valueOf(scannerId);
2348 }
2349 try {
2350 checkOpen();
2351 } catch (IOException e) {
2352
2353
2354 if (scannerName != null) {
2355 LOG.debug("Server shutting down and client tried to access missing scanner "
2356 + scannerName);
2357 if (regionServer.leases != null) {
2358 try {
2359 regionServer.leases.cancelLease(scannerName);
2360 } catch (LeaseException le) {
2361
2362 }
2363 }
2364 }
2365 throw e;
2366 }
2367 requestCount.increment();
2368 rpcScanRequestCount.increment();
2369
2370 int ttl = 0;
2371 Region region = null;
2372 RegionScanner scanner = null;
2373 RegionScannerHolder rsh = null;
2374 boolean moreResults = true;
2375 boolean closeScanner = false;
2376 boolean isSmallScan = false;
2377 ScanResponse.Builder builder = ScanResponse.newBuilder();
2378 if (request.hasCloseScanner()) {
2379 closeScanner = request.getCloseScanner();
2380 }
2381 int rows = closeScanner ? 0 : 1;
2382 if (request.hasNumberOfRows()) {
2383 rows = request.getNumberOfRows();
2384 }
2385 if (request.hasScannerId()) {
2386 rsh = scanners.get(scannerName);
2387 if (rsh == null) {
2388 LOG.info("Client tried to access missing scanner " + scannerName);
2389 throw new UnknownScannerException(
2390 "Name: " + scannerName + ", already closed?");
2391 }
2392 scanner = rsh.s;
2393 HRegionInfo hri = scanner.getRegionInfo();
2394 region = regionServer.getRegion(hri.getRegionName());
2395 if (region != rsh.r) {
2396 throw new NotServingRegionException("Region was re-opened after the scanner"
2397 + scannerName + " was created: " + hri.getRegionNameAsString());
2398 }
2399 } else {
2400 region = getRegion(request.getRegion());
2401 ClientProtos.Scan protoScan = request.getScan();
2402 boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
2403 Scan scan = ProtobufUtil.toScan(protoScan);
2404
2405 if (!isLoadingCfsOnDemandSet) {
2406 scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
2407 }
2408
2409 isSmallScan = scan.isSmall();
2410 if (!scan.hasFamilies()) {
2411
2412 for (byte[] family: region.getTableDesc().getFamiliesKeys()) {
2413 scan.addFamily(family);
2414 }
2415 }
2416
2417 if (region.getCoprocessorHost() != null) {
2418 scanner = region.getCoprocessorHost().preScannerOpen(scan);
2419 }
2420 if (scanner == null) {
2421 scanner = region.getScanner(scan);
2422 }
2423 if (region.getCoprocessorHost() != null) {
2424 scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
2425 }
2426 scannerId = addScanner(scanner, region);
2427 scannerName = String.valueOf(scannerId);
2428 ttl = this.scannerLeaseTimeoutPeriod;
2429 }
2430 if (request.hasRenew() && request.getRenew()) {
2431 rsh = scanners.get(scannerName);
2432 lease = regionServer.leases.removeLease(scannerName);
2433 if (lease != null && rsh != null) {
2434 regionServer.leases.addLease(lease);
2435
2436 rsh.incNextCallSeq();
2437 }
2438 return builder.build();
2439 }
2440
2441 quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
2442 long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
2443 if (rows > 0) {
2444
2445
2446
2447 if (request.hasNextCallSeq()) {
2448 if (rsh == null) {
2449 rsh = scanners.get(scannerName);
2450 }
2451 if (rsh != null) {
2452 if (request.getNextCallSeq() != rsh.getNextCallSeq()) {
2453 throw new OutOfOrderScannerNextException(
2454 "Expected nextCallSeq: " + rsh.getNextCallSeq()
2455 + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
2456 "; request=" + TextFormat.shortDebugString(request));
2457 }
2458
2459 rsh.incNextCallSeq();
2460 }
2461 }
2462 try {
2463
2464
2465 lease = regionServer.leases.removeLease(scannerName);
2466 List<Result> results = new ArrayList<Result>();
2467 long totalCellSize = 0;
2468 long currentScanResultSize = 0;
2469
2470 boolean done = false;
2471
2472 if (region != null && region.getCoprocessorHost() != null) {
2473 Boolean bypass = region.getCoprocessorHost().preScannerNext(
2474 scanner, results, rows);
2475 if (!results.isEmpty()) {
2476 for (Result r : results) {
2477 for (Cell cell : r.rawCells()) {
2478 totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2479 currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell);
2480 }
2481 }
2482 }
2483 if (bypass != null && bypass.booleanValue()) {
2484 done = true;
2485 }
2486 }
2487
2488 if (!done) {
2489 long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
2490 if (maxResultSize <= 0) {
2491 maxResultSize = maxQuotaResultSize;
2492 }
2493
2494
2495
2496 List<Cell> values = new ArrayList<Cell>(32);
2497 region.startRegionOperation(Operation.SCAN);
2498 try {
2499 int i = 0;
2500 long before = EnvironmentEdgeManager.currentTime();
2501 synchronized(scanner) {
2502 boolean stale = (region.getRegionInfo().getReplicaId() != 0);
2503 boolean clientHandlesPartials =
2504 request.hasClientHandlesPartials() && request.getClientHandlesPartials();
2505 boolean clientHandlesHeartbeats =
2506 request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
2507
2508
2509
2510
2511
2512
2513 boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0;
2514 boolean allowPartialResults =
2515 clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan;
2516 boolean moreRows = false;
2517
2518
2519
2520
2521
2522
2523
2524
2525 boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
2526
2527
2528
2529 long timeLimit = -1;
2530
2531
2532
2533
2534 if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
2535 long timeLimitDelta;
2536 if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
2537 timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
2538 } else {
2539 timeLimitDelta =
2540 scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
2541 }
2542
2543
2544
2545 timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
2546 timeLimit = System.currentTimeMillis() + timeLimitDelta;
2547 }
2548
2549 final LimitScope sizeScope =
2550 allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2551 final LimitScope timeScope =
2552 allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
2553
2554
2555
2556 ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
2557 contextBuilder.setSizeLimit(sizeScope, maxResultSize);
2558 contextBuilder.setBatchLimit(scanner.getBatch());
2559 contextBuilder.setTimeLimit(timeScope, timeLimit);
2560 ScannerContext scannerContext = contextBuilder.build();
2561
2562 boolean limitReached = false;
2563 while (i < rows) {
2564
2565
2566
2567
2568
2569 scannerContext.setBatchProgress(0);
2570
2571
2572 moreRows = scanner.nextRaw(values, scannerContext);
2573
2574 if (!values.isEmpty()) {
2575 for (Cell cell : values) {
2576 totalCellSize += CellUtil.estimatedSerializedSizeOf(cell);
2577 }
2578 final boolean partial = scannerContext.partialResultFormed();
2579 results.add(Result.create(values, null, stale, partial));
2580 i++;
2581 }
2582
2583 boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
2584 boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
2585 boolean rowLimitReached = i >= rows;
2586 limitReached = sizeLimitReached || timeLimitReached || rowLimitReached;
2587
2588 if (limitReached || !moreRows) {
2589 if (LOG.isTraceEnabled()) {
2590 LOG.trace("Done scanning. limitReached: " + limitReached + " moreRows: "
2591 + moreRows + " scannerContext: " + scannerContext);
2592 }
2593
2594
2595
2596
2597 if (moreRows) {
2598
2599 builder.setHeartbeatMessage(timeLimitReached);
2600 }
2601 break;
2602 }
2603 values.clear();
2604 }
2605
2606 if (limitReached || moreRows) {
2607
2608 builder.setMoreResultsInRegion(true);
2609 } else {
2610
2611 builder.setMoreResultsInRegion(false);
2612 }
2613 }
2614 region.updateReadRequestsCount(i);
2615 long end = EnvironmentEdgeManager.currentTime();
2616 region.getMetrics().updateScanTime(end - before);
2617 if (regionServer.metricsRegionServer != null) {
2618 regionServer.metricsRegionServer.updateScanSize(
2619 region.getTableDesc().getTableName(), totalCellSize);
2620 regionServer.metricsRegionServer.updateScanTime(
2621 region.getTableDesc().getTableName(), end - before);
2622 }
2623 } finally {
2624 region.closeRegionOperation();
2625 }
2626
2627
2628 if (region != null && region.getCoprocessorHost() != null) {
2629 region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
2630 }
2631 }
2632
2633 quota.addScanResult(results);
2634
2635
2636
2637
2638 if (scanner.isFilterDone() && results.isEmpty()) {
2639 moreResults = false;
2640 results = null;
2641 } else {
2642 addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
2643 }
2644 } catch (IOException e) {
2645
2646
2647
2648 closeScanner(region, scanner, scannerName);
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659
2660
2661 if (e instanceof DoNotRetryIOException) {
2662 throw e;
2663 }
2664
2665
2666
2667 if (e instanceof CorruptHFileException || e instanceof FileNotFoundException) {
2668 throw new DoNotRetryIOException(e);
2669 }
2670
2671
2672
2673
2674 RpcCallContext context = RpcServer.getCurrentCall();
2675 if (context != null && VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
2676
2677 throw new ScannerResetException("Scanner is closed on the server-side", e);
2678 } else {
2679
2680 throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
2681 + " scanner state for clients older than 1.3.", e);
2682 }
2683 } finally {
2684
2685
2686 if (scanners.containsKey(scannerName)) {
2687 if (lease != null) regionServer.leases.addLease(lease);
2688 ttl = this.scannerLeaseTimeoutPeriod;
2689 }
2690 }
2691 }
2692
2693 if (!moreResults || closeScanner) {
2694 ttl = 0;
2695 moreResults = false;
2696 if (closeScanner(region, scanner, scannerName)) {
2697 return builder.build();
2698 }
2699 }
2700
2701 if (ttl > 0) {
2702 builder.setTtl(ttl);
2703 }
2704 builder.setScannerId(scannerId);
2705 builder.setMoreResults(moreResults);
2706 return builder.build();
2707 } catch (IOException ie) {
2708 if (scannerName != null && ie instanceof NotServingRegionException) {
2709 RegionScannerHolder rsh = scanners.remove(scannerName);
2710 if (rsh != null) {
2711 try {
2712 RegionScanner scanner = rsh.s;
2713 LOG.warn(scannerName + " encountered " + ie.getMessage() + ", closing ...");
2714 scanner.close();
2715 regionServer.leases.cancelLease(scannerName);
2716 } catch (IOException e) {
2717 LOG.warn("Getting exception closing " + scannerName, e);
2718 }
2719 }
2720 }
2721 throw new ServiceException(ie);
2722 } finally {
2723 if (quota != null) {
2724 quota.close();
2725 }
2726 }
2727 }
2728
2729 private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
2730 throws IOException {
2731 if (region != null && region.getCoprocessorHost() != null) {
2732 if (region.getCoprocessorHost().preScannerClose(scanner)) {
2733 return true;
2734 }
2735 }
2736 RegionScannerHolder rsh = scanners.remove(scannerName);
2737 if (rsh != null) {
2738 scanner = rsh.s;
2739 scanner.close();
2740 try {
2741 regionServer.leases.cancelLease(scannerName);
2742 } catch (LeaseException le) {
2743
2744 if (LOG.isTraceEnabled()) {
2745 LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
2746 }
2747 }
2748 if (region != null && region.getCoprocessorHost() != null) {
2749 region.getCoprocessorHost().postScannerClose(scanner);
2750 }
2751 }
2752 return false;
2753 }
2754
2755 @Override
2756 public CoprocessorServiceResponse execRegionServerService(RpcController controller,
2757 CoprocessorServiceRequest request) throws ServiceException {
2758 return regionServer.execRegionServerService(controller, request);
2759 }
2760
2761 @Override
2762 public UpdateConfigurationResponse updateConfiguration(
2763 RpcController controller, UpdateConfigurationRequest request)
2764 throws ServiceException {
2765 try {
2766 this.regionServer.updateConfiguration();
2767 } catch (Exception e) {
2768 throw new ServiceException(e);
2769 }
2770 return UpdateConfigurationResponse.getDefaultInstance();
2771 }
2772
2773 @Override
2774 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
2775 RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
2776 try {
2777 final RegionServerSpaceQuotaManager manager =
2778 regionServer.getRegionServerSpaceQuotaManager();
2779 final GetSpaceQuotaSnapshotsResponse.Builder builder =
2780 GetSpaceQuotaSnapshotsResponse.newBuilder();
2781 if (manager != null) {
2782 final Map<TableName,SpaceQuotaSnapshot> snapshots = manager.copyQuotaSnapshots();
2783 for (Map.Entry<TableName,SpaceQuotaSnapshot> snapshot : snapshots.entrySet()) {
2784 builder.addSnapshots(TableQuotaSnapshot.newBuilder()
2785 .setTableName(ProtobufUtil.toProtoTableName(snapshot.getKey()))
2786 .setSnapshot(SpaceQuotaSnapshot.toProtoSnapshot(snapshot.getValue()))
2787 .build());
2788 }
2789 }
2790 return builder.build();
2791 } catch (Exception e) {
2792 throw new ServiceException(e);
2793 }
2794 }
2795 }