1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.ipc;
20
21 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.ServerSocket;
31 import java.net.Socket;
32 import java.net.SocketException;
33 import java.net.UnknownHostException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.Channels;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.GatheringByteChannel;
39 import java.nio.channels.ReadableByteChannel;
40 import java.nio.channels.SelectionKey;
41 import java.nio.channels.Selector;
42 import java.nio.channels.ServerSocketChannel;
43 import java.nio.channels.SocketChannel;
44 import java.nio.channels.WritableByteChannel;
45 import java.security.PrivilegedExceptionAction;
46 import java.util.ArrayList;
47 import java.util.Arrays;
48 import java.util.Collections;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Random;
55 import java.util.Set;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ConcurrentLinkedDeque;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.locks.Lock;
62 import java.util.concurrent.locks.ReentrantLock;
63
64 import javax.security.sasl.Sasl;
65 import javax.security.sasl.SaslException;
66 import javax.security.sasl.SaslServer;
67
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.hbase.CallQueueTooBigException;
71 import org.apache.hadoop.hbase.classification.InterfaceAudience;
72 import org.apache.hadoop.hbase.classification.InterfaceStability;
73 import org.apache.hadoop.conf.Configuration;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.DoNotRetryIOException;
76 import org.apache.hadoop.hbase.HBaseIOException;
77 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
78 import org.apache.hadoop.hbase.HConstants;
79 import org.apache.hadoop.hbase.HRegionInfo;
80 import org.apache.hadoop.hbase.Server;
81 import org.apache.hadoop.hbase.TableName;
82 import org.apache.hadoop.hbase.client.Operation;
83 import org.apache.hadoop.hbase.codec.Codec;
84 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
85 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
86 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
87 import org.apache.hadoop.hbase.io.ByteBufferInputStream;
88 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
89 import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
90 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
91 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
92 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
93 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.VersionInfo;
94 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
95 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
96 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
97 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
98 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
99 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
100 import org.apache.hadoop.hbase.regionserver.HRegionServer;
101 import org.apache.hadoop.hbase.security.AccessDeniedException;
102 import org.apache.hadoop.hbase.security.AuthMethod;
103 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
104 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
105 import org.apache.hadoop.hbase.security.User;
106 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
107 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
108 import org.apache.hadoop.hbase.security.SaslStatus;
109 import org.apache.hadoop.hbase.security.SaslUtil;
110 import org.apache.hadoop.hbase.security.UserProvider;
111 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
112 import org.apache.hadoop.hbase.util.Bytes;
113 import org.apache.hadoop.hbase.util.Counter;
114 import org.apache.hadoop.hbase.util.Pair;
115 import org.apache.hadoop.hbase.util.Threads;
116 import org.apache.hadoop.io.BytesWritable;
117 import org.apache.hadoop.io.IntWritable;
118 import org.apache.hadoop.io.Writable;
119 import org.apache.hadoop.io.WritableUtils;
120 import org.apache.hadoop.io.compress.CompressionCodec;
121 import org.apache.hadoop.security.UserGroupInformation;
122 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
123 import org.apache.hadoop.security.authorize.AuthorizationException;
124 import org.apache.hadoop.security.authorize.PolicyProvider;
125 import org.apache.hadoop.security.authorize.ProxyUsers;
126 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
127 import org.apache.hadoop.security.token.SecretManager;
128 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
129 import org.apache.hadoop.security.token.TokenIdentifier;
130 import org.apache.hadoop.util.StringUtils;
131 import org.codehaus.jackson.map.ObjectMapper;
132 import org.apache.htrace.TraceInfo;
133
134 import com.google.common.util.concurrent.ThreadFactoryBuilder;
135 import com.google.protobuf.BlockingService;
136 import com.google.protobuf.CodedInputStream;
137 import com.google.protobuf.Descriptors.MethodDescriptor;
138 import com.google.protobuf.Message;
139 import com.google.protobuf.ServiceException;
140 import com.google.protobuf.TextFormat;
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
163 @InterfaceStability.Evolving
164 public class RpcServer implements RpcServerInterface, ConfigurationObserver {
165 public static final Log LOG = LogFactory.getLog(RpcServer.class);
166 private static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
167 = new CallQueueTooBigException();
168
169 private final boolean authorize;
170 private boolean isSecurityEnabled;
171
172 public static final byte CURRENT_VERSION = 0;
173
174
175
176
177 static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
178
179
180
181
182 private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
183
184 private static final String WARN_DELAYED_CALLS = "hbase.ipc.warn.delayedrpc.number";
185
186 private static final int DEFAULT_WARN_DELAYED_CALLS = 1000;
187
188 private final int warnDelayedCalls;
189
190 private AtomicInteger delayedCalls;
191 private final IPCUtil ipcUtil;
192
193 private static final String AUTH_FAILED_FOR = "Auth failed for ";
194 private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
195 private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." +
196 Server.class.getName());
197 protected SecretManager<TokenIdentifier> secretManager;
198 protected ServiceAuthorizationManager authManager;
199
200
201
202
203 protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
204
205
206 static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
207 = new ThreadLocal<MonitoredRPCHandler>();
208
209 protected final InetSocketAddress bindAddress;
210 protected int port;
211 private int readThreads;
212 protected int maxIdleTime;
213
214
215 protected int thresholdIdleConnections;
216
217
218
219 int maxConnectionsToNuke;
220
221
222
223 protected MetricsHBaseServer metrics;
224
225 protected final Configuration conf;
226
227 private int maxQueueSize;
228 protected int socketSendBufferSize;
229 protected final boolean tcpNoDelay;
230 protected final boolean tcpKeepAlive;
231 protected final long purgeTimeout;
232
233
234
235
236
237
238 volatile boolean running = true;
239
240
241
242
243
244 volatile boolean started = false;
245
246
247
248
249 protected final Counter callQueueSize = new Counter();
250
251 protected final List<Connection> connectionList =
252 Collections.synchronizedList(new LinkedList<Connection>());
253
254
255 private Listener listener = null;
256 protected Responder responder = null;
257 protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
258 protected int numConnections = 0;
259
260 protected HBaseRPCErrorHandler errorHandler = null;
261
262 private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
263 private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
264
265
266 private static final int DEFAULT_WARN_RESPONSE_TIME = 10000;
267 private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
268
269 private static final ObjectMapper MAPPER = new ObjectMapper();
270
271 private final int warnResponseTime;
272 private final int warnResponseSize;
273 private final Server server;
274 private final List<BlockingServiceAndInterface> services;
275
276 private final RpcScheduler scheduler;
277
278 private UserProvider userProvider;
279
280 private final BoundedByteBufferPool reservoir;
281
282
283
284
285
286
287 @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
288 @InterfaceStability.Evolving
289 public class Call implements RpcCallContext {
290 protected int id;
291 protected BlockingService service;
292 protected MethodDescriptor md;
293 protected RequestHeader header;
294 protected Message param;
295
296 protected CellScanner cellScanner;
297 protected Connection connection;
298 protected long timestamp;
299
300
301
302
303 protected BufferChain response;
304 protected boolean delayResponse;
305 protected Responder responder;
306 protected boolean delayReturnValue;
307
308 protected long size;
309 protected boolean isError;
310 protected TraceInfo tinfo;
311 private ByteBuffer cellBlock = null;
312
313 private User user;
314 private InetAddress remoteAddress;
315
316 Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
317 Message param, CellScanner cellScanner, Connection connection, Responder responder,
318 long size, TraceInfo tinfo, final InetAddress remoteAddress) {
319 this.id = id;
320 this.service = service;
321 this.md = md;
322 this.header = header;
323 this.param = param;
324 this.cellScanner = cellScanner;
325 this.connection = connection;
326 this.timestamp = System.currentTimeMillis();
327 this.response = null;
328 this.delayResponse = false;
329 this.responder = responder;
330 this.isError = false;
331 this.size = size;
332 this.tinfo = tinfo;
333 this.user = connection.user == null? null: userProvider.create(connection.user);
334 this.remoteAddress = remoteAddress;
335 }
336
337
338
339
340
341 void done() {
342 if (this.cellBlock != null) {
343
344 reservoir.putBuffer(this.cellBlock);
345 this.cellBlock = null;
346 }
347 this.connection.decRpcCount();
348 }
349
350 @Override
351 public String toString() {
352 return toShortString() + " param: " +
353 (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
354 " connection: " + connection.toString();
355 }
356
357 protected RequestHeader getHeader() {
358 return this.header;
359 }
360
361 public boolean hasPriority() {
362 return this.header.hasPriority();
363 }
364
365 public int getPriority() {
366 return this.header.getPriority();
367 }
368
369
370
371
372
373 String toShortString() {
374 String serviceName = this.connection.service != null ?
375 this.connection.service.getDescriptorForType().getName() : "null";
376 return "callId: " + this.id + " service: " + serviceName +
377 " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
378 " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
379 " connection: " + connection.toString();
380 }
381
382 String toTraceString() {
383 String serviceName = this.connection.service != null ?
384 this.connection.service.getDescriptorForType().getName() : "";
385 String methodName = (this.md != null) ? this.md.getName() : "";
386 return serviceName + "." + methodName;
387 }
388
389 protected synchronized void setSaslTokenResponse(ByteBuffer response) {
390 this.response = new BufferChain(response);
391 }
392
393 protected synchronized void setResponse(Object m, final CellScanner cells,
394 Throwable t, String errorMsg) {
395 if (this.isError) return;
396 if (t != null) this.isError = true;
397 BufferChain bc = null;
398 try {
399 ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
400
401 Message result = (Message)m;
402
403 headerBuilder.setCallId(this.id);
404 if (t != null) {
405 ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
406 exceptionBuilder.setExceptionClassName(t.getClass().getName());
407 exceptionBuilder.setStackTrace(errorMsg);
408 exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
409 if (t instanceof RegionMovedException) {
410
411
412
413 RegionMovedException rme = (RegionMovedException)t;
414 exceptionBuilder.setHostname(rme.getHostname());
415 exceptionBuilder.setPort(rme.getPort());
416 }
417
418 headerBuilder.setException(exceptionBuilder.build());
419 }
420
421
422
423 this.cellBlock = ipcUtil.buildCellBlock(this.connection.codec,
424 this.connection.compressionCodec, cells, reservoir);
425 if (this.cellBlock != null) {
426 CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
427
428 cellBlockBuilder.setLength(this.cellBlock.limit());
429 headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
430 }
431 Message header = headerBuilder.build();
432
433
434
435 ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
436 ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
437 int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
438 (this.cellBlock == null? 0: this.cellBlock.limit());
439 ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
440 bc = new BufferChain(bbTotalSize, bbHeader, bbResult, this.cellBlock);
441 if (connection.useWrap) {
442 bc = wrapWithSasl(bc);
443 }
444 } catch (IOException e) {
445 LOG.warn("Exception while creating response " + e);
446 }
447 this.response = bc;
448 }
449
450 private BufferChain wrapWithSasl(BufferChain bc)
451 throws IOException {
452 if (!this.connection.useSasl) return bc;
453
454
455 byte [] responseBytes = bc.getBytes();
456 byte [] token;
457
458
459 synchronized (connection.saslServer) {
460 token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
461 }
462 if (LOG.isTraceEnabled()) {
463 LOG.trace("Adding saslServer wrapped token of size " + token.length
464 + " as call response.");
465 }
466
467 ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
468 ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
469 return new BufferChain(bbTokenLength, bbTokenBytes);
470 }
471
472 @Override
473 public synchronized void endDelay(Object result) throws IOException {
474 assert this.delayResponse;
475 assert this.delayReturnValue || result == null;
476 this.delayResponse = false;
477 delayedCalls.decrementAndGet();
478 if (this.delayReturnValue) {
479 this.setResponse(result, null, null, null);
480 }
481 this.responder.doRespond(this);
482 }
483
484 @Override
485 public synchronized void endDelay() throws IOException {
486 this.endDelay(null);
487 }
488
489 @Override
490 public synchronized void startDelay(boolean delayReturnValue) {
491 assert !this.delayResponse;
492 this.delayResponse = true;
493 this.delayReturnValue = delayReturnValue;
494 int numDelayed = delayedCalls.incrementAndGet();
495 if (numDelayed > warnDelayedCalls) {
496 LOG.warn("Too many delayed calls: limit " + warnDelayedCalls + " current " + numDelayed);
497 }
498 }
499
500 @Override
501 public synchronized void endDelayThrowing(Throwable t) throws IOException {
502 this.setResponse(null, null, t, StringUtils.stringifyException(t));
503 this.delayResponse = false;
504 this.sendResponseIfReady();
505 }
506
507 @Override
508 public synchronized boolean isDelayed() {
509 return this.delayResponse;
510 }
511
512 @Override
513 public synchronized boolean isReturnValueDelayed() {
514 return this.delayReturnValue;
515 }
516
517 @Override
518 public boolean isClientCellBlockSupport() {
519 return this.connection != null && this.connection.codec != null;
520 }
521
522 @Override
523 public long disconnectSince() {
524 if (!connection.channel.isOpen()) {
525 return System.currentTimeMillis() - timestamp;
526 } else {
527 return -1L;
528 }
529 }
530
531 public long getSize() {
532 return this.size;
533 }
534
535
536
537
538
539
540 public synchronized void sendResponseIfReady() throws IOException {
541 if (!this.delayResponse) {
542 this.responder.doRespond(this);
543 }
544 }
545
546 public UserGroupInformation getRemoteUser() {
547 return connection.user;
548 }
549
550 @Override
551 public User getRequestUser() {
552 return user;
553 }
554
555 @Override
556 public String getRequestUserName() {
557 User user = getRequestUser();
558 return user == null? null: user.getShortName();
559 }
560
561 @Override
562 public InetAddress getRemoteAddress() {
563 return remoteAddress;
564 }
565
566 @Override
567 public VersionInfo getClientVersionInfo() {
568 return connection.getVersionInfo();
569 }
570 }
571
572
573 private class Listener extends Thread {
574
575 private ServerSocketChannel acceptChannel = null;
576 private Selector selector = null;
577 private Reader[] readers = null;
578 private int currentReader = 0;
579 private Random rand = new Random();
580 private long lastCleanupRunTime = 0;
581
582 private long cleanupInterval = 10000;
583
584 private int backlogLength;
585
586 private ExecutorService readPool;
587
588 public Listener(final String name) throws IOException {
589 super(name);
590 backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
591
592 acceptChannel = ServerSocketChannel.open();
593 acceptChannel.configureBlocking(false);
594
595
596 bind(acceptChannel.socket(), bindAddress, backlogLength);
597 port = acceptChannel.socket().getLocalPort();
598
599 selector= Selector.open();
600
601 readers = new Reader[readThreads];
602 readPool = Executors.newFixedThreadPool(readThreads,
603 new ThreadFactoryBuilder().setNameFormat(
604 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
605 ",port=" + port).setDaemon(true)
606 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
607 for (int i = 0; i < readThreads; ++i) {
608 Reader reader = new Reader();
609 readers[i] = reader;
610 readPool.execute(reader);
611 }
612 LOG.info(getName() + ": started " + readThreads + " reader(s).");
613
614
615 acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
616 this.setName("RpcServer.listener,port=" + port);
617 this.setDaemon(true);
618 }
619
620
621 private class Reader implements Runnable {
622 private volatile boolean adding = false;
623 private final Selector readSelector;
624
625 Reader() throws IOException {
626 this.readSelector = Selector.open();
627 }
628 @Override
629 public void run() {
630 try {
631 doRunLoop();
632 } finally {
633 try {
634 readSelector.close();
635 } catch (IOException ioe) {
636 LOG.error(getName() + ": error closing read selector in " + getName(), ioe);
637 }
638 }
639 }
640
641 private synchronized void doRunLoop() {
642 while (running) {
643 try {
644 readSelector.select();
645 while (adding) {
646 this.wait(1000);
647 }
648
649 Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
650 while (iter.hasNext()) {
651 SelectionKey key = iter.next();
652 iter.remove();
653 if (key.isValid()) {
654 if (key.isReadable()) {
655 doRead(key);
656 }
657 }
658 }
659 } catch (InterruptedException e) {
660 LOG.debug("Interrupted while sleeping");
661 return;
662 } catch (IOException ex) {
663 LOG.info(getName() + ": IOException in Reader", ex);
664 }
665 }
666 }
667
668
669
670
671
672
673
674
675 public void startAdd() {
676 adding = true;
677 readSelector.wakeup();
678 }
679
680 public synchronized SelectionKey registerChannel(SocketChannel channel)
681 throws IOException {
682 return channel.register(readSelector, SelectionKey.OP_READ);
683 }
684
685 public synchronized void finishAdd() {
686 adding = false;
687 this.notify();
688 }
689 }
690
691
692
693
694
695
696
697
698 private void cleanupConnections(boolean force) {
699 if (force || numConnections > thresholdIdleConnections) {
700 long currentTime = System.currentTimeMillis();
701 if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
702 return;
703 }
704 int start = 0;
705 int end = numConnections - 1;
706 if (!force) {
707 start = rand.nextInt() % numConnections;
708 end = rand.nextInt() % numConnections;
709 int temp;
710 if (end < start) {
711 temp = start;
712 start = end;
713 end = temp;
714 }
715 }
716 int i = start;
717 int numNuked = 0;
718 while (i <= end) {
719 Connection c;
720 synchronized (connectionList) {
721 try {
722 c = connectionList.get(i);
723 } catch (Exception e) {return;}
724 }
725 if (c.timedOut(currentTime)) {
726 if (LOG.isDebugEnabled())
727 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
728 closeConnection(c);
729 numNuked++;
730 end--;
731
732 c = null;
733 if (!force && numNuked == maxConnectionsToNuke) break;
734 }
735 else i++;
736 }
737 lastCleanupRunTime = System.currentTimeMillis();
738 }
739 }
740
741 @Override
742 public void run() {
743 LOG.info(getName() + ": starting");
744 while (running) {
745 SelectionKey key = null;
746 try {
747 selector.select();
748 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
749 while (iter.hasNext()) {
750 key = iter.next();
751 iter.remove();
752 try {
753 if (key.isValid()) {
754 if (key.isAcceptable())
755 doAccept(key);
756 }
757 } catch (IOException ignored) {
758 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
759 }
760 key = null;
761 }
762 } catch (OutOfMemoryError e) {
763 if (errorHandler != null) {
764 if (errorHandler.checkOOME(e)) {
765 LOG.info(getName() + ": exiting on OutOfMemoryError");
766 closeCurrentConnection(key, e);
767 cleanupConnections(true);
768 return;
769 }
770 } else {
771
772
773
774 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
775 closeCurrentConnection(key, e);
776 cleanupConnections(true);
777 try {
778 Thread.sleep(60000);
779 } catch (InterruptedException ex) {
780 LOG.debug("Interrupted while sleeping");
781 return;
782 }
783 }
784 } catch (Exception e) {
785 closeCurrentConnection(key, e);
786 }
787 cleanupConnections(false);
788 }
789
790 LOG.info(getName() + ": stopping");
791
792 synchronized (this) {
793 try {
794 acceptChannel.close();
795 selector.close();
796 } catch (IOException ignored) {
797 if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
798 }
799
800 selector= null;
801 acceptChannel= null;
802
803
804 while (!connectionList.isEmpty()) {
805 closeConnection(connectionList.remove(0));
806 }
807 }
808 }
809
810 private void closeCurrentConnection(SelectionKey key, Throwable e) {
811 if (key != null) {
812 Connection c = (Connection)key.attachment();
813 if (c != null) {
814 if (LOG.isDebugEnabled()) {
815 LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
816 (e != null ? " on error " + e.getMessage() : ""));
817 }
818 closeConnection(c);
819 key.attach(null);
820 }
821 }
822 }
823
824 InetSocketAddress getAddress() {
825 return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
826 }
827
828 void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
829 Connection c;
830 ServerSocketChannel server = (ServerSocketChannel) key.channel();
831
832 SocketChannel channel;
833 while ((channel = server.accept()) != null) {
834 try {
835 channel.configureBlocking(false);
836 channel.socket().setTcpNoDelay(tcpNoDelay);
837 channel.socket().setKeepAlive(tcpKeepAlive);
838 } catch (IOException ioe) {
839 channel.close();
840 throw ioe;
841 }
842
843 Reader reader = getReader();
844 try {
845 reader.startAdd();
846 SelectionKey readKey = reader.registerChannel(channel);
847 c = getConnection(channel, System.currentTimeMillis());
848 readKey.attach(c);
849 synchronized (connectionList) {
850 connectionList.add(numConnections, c);
851 numConnections++;
852 }
853 if (LOG.isDebugEnabled())
854 LOG.debug(getName() + ": connection from " + c.toString() +
855 "; # active connections: " + numConnections);
856 } finally {
857 reader.finishAdd();
858 }
859 }
860 }
861
862 void doRead(SelectionKey key) throws InterruptedException {
863 int count;
864 Connection c = (Connection) key.attachment();
865 if (c == null) {
866 return;
867 }
868 c.setLastContact(System.currentTimeMillis());
869 try {
870 count = c.readAndProcess();
871
872 if (count > 0) {
873 c.setLastContact(System.currentTimeMillis());
874 }
875
876 } catch (InterruptedException ieo) {
877 throw ieo;
878 } catch (Exception e) {
879 if (LOG.isDebugEnabled()) {
880 LOG.debug(getName() + ": Caught exception while reading:", e);
881 }
882 count = -1;
883 }
884 if (count < 0) {
885 if (LOG.isDebugEnabled()) {
886 LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
887 " because read count=" + count +
888 ". Number of active connections: " + numConnections);
889 }
890 closeConnection(c);
891 }
892 }
893
894 synchronized void doStop() {
895 if (selector != null) {
896 selector.wakeup();
897 Thread.yield();
898 }
899 if (acceptChannel != null) {
900 try {
901 acceptChannel.socket().close();
902 } catch (IOException e) {
903 LOG.info(getName() + ": exception in closing listener socket. " + e);
904 }
905 }
906 readPool.shutdownNow();
907 }
908
909
910
911 Reader getReader() {
912 currentReader = (currentReader + 1) % readers.length;
913 return readers[currentReader];
914 }
915 }
916
917
918 protected class Responder extends Thread {
919 private final Selector writeSelector;
920 private final Set<Connection> writingCons =
921 Collections.newSetFromMap(new ConcurrentHashMap<Connection, Boolean>());
922
923 Responder() throws IOException {
924 this.setName("RpcServer.responder");
925 this.setDaemon(true);
926 this.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER);
927 writeSelector = Selector.open();
928 }
929
930 @Override
931 public void run() {
932 LOG.info(getName() + ": starting");
933 try {
934 doRunLoop();
935 } finally {
936 LOG.info(getName() + ": stopping");
937 try {
938 writeSelector.close();
939 } catch (IOException ioe) {
940 LOG.error(getName() + ": couldn't close write selector", ioe);
941 }
942 }
943 }
944
945
946
947
948
949 private void registerWrites() {
950 Iterator<Connection> it = writingCons.iterator();
951 while (it.hasNext()) {
952 Connection c = it.next();
953 it.remove();
954 SelectionKey sk = c.channel.keyFor(writeSelector);
955 try {
956 if (sk == null) {
957 try {
958 c.channel.register(writeSelector, SelectionKey.OP_WRITE, c);
959 } catch (ClosedChannelException e) {
960
961 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
962 }
963 } else {
964 sk.interestOps(SelectionKey.OP_WRITE);
965 }
966 } catch (CancelledKeyException e) {
967
968 if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
969 }
970 }
971 }
972
973
974
975
976 public void registerForWrite(Connection c) {
977 if (writingCons.add(c)) {
978 writeSelector.wakeup();
979 }
980 }
981
982 private void doRunLoop() {
983 long lastPurgeTime = 0;
984 while (running) {
985 try {
986 registerWrites();
987 int keyCt = writeSelector.select(purgeTimeout);
988 if (keyCt == 0) {
989 continue;
990 }
991
992 Set<SelectionKey> keys = writeSelector.selectedKeys();
993 Iterator<SelectionKey> iter = keys.iterator();
994 while (iter.hasNext()) {
995 SelectionKey key = iter.next();
996 iter.remove();
997 try {
998 if (key.isValid() && key.isWritable()) {
999 doAsyncWrite(key);
1000 }
1001 } catch (IOException e) {
1002 LOG.debug(getName() + ": asyncWrite", e);
1003 }
1004 }
1005
1006 lastPurgeTime = purge(lastPurgeTime);
1007
1008 } catch (OutOfMemoryError e) {
1009 if (errorHandler != null) {
1010 if (errorHandler.checkOOME(e)) {
1011 LOG.info(getName() + ": exiting on OutOfMemoryError");
1012 return;
1013 }
1014 } else {
1015
1016
1017
1018
1019
1020 LOG.warn(getName() + ": OutOfMemoryError in server select", e);
1021 try {
1022 Thread.sleep(60000);
1023 } catch (InterruptedException ex) {
1024 LOG.debug("Interrupted while sleeping");
1025 return;
1026 }
1027 }
1028 } catch (Exception e) {
1029 LOG.warn(getName() + ": exception in Responder " +
1030 StringUtils.stringifyException(e), e);
1031 }
1032 }
1033 LOG.info(getName() + ": stopped");
1034 }
1035
1036
1037
1038
1039
1040
1041 private long purge(long lastPurgeTime) {
1042 long now = System.currentTimeMillis();
1043 if (now < lastPurgeTime + purgeTimeout) {
1044 return lastPurgeTime;
1045 }
1046
1047 ArrayList<Connection> conWithOldCalls = new ArrayList<Connection>();
1048
1049 synchronized (writeSelector.keys()) {
1050 for (SelectionKey key : writeSelector.keys()) {
1051 Connection connection = (Connection) key.attachment();
1052 if (connection == null) {
1053 throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
1054 }
1055 Call call = connection.responseQueue.peekFirst();
1056 if (call != null && now > call.timestamp + purgeTimeout) {
1057 conWithOldCalls.add(call.connection);
1058 }
1059 }
1060 }
1061
1062
1063 for (Connection connection : conWithOldCalls) {
1064 closeConnection(connection);
1065 }
1066
1067 return now;
1068 }
1069
1070 private void doAsyncWrite(SelectionKey key) throws IOException {
1071 Connection connection = (Connection) key.attachment();
1072 if (connection == null) {
1073 throw new IOException("doAsyncWrite: no connection");
1074 }
1075 if (key.channel() != connection.channel) {
1076 throw new IOException("doAsyncWrite: bad channel");
1077 }
1078
1079 if (processAllResponses(connection)) {
1080 try {
1081
1082
1083 key.interestOps(0);
1084 } catch (CancelledKeyException e) {
1085
1086
1087
1088
1089
1090 LOG.warn("Exception while changing ops : " + e);
1091 }
1092 }
1093 }
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103 private boolean processResponse(final Call call) throws IOException {
1104 boolean error = true;
1105 try {
1106
1107 long numBytes = channelWrite(call.connection.channel, call.response);
1108 if (numBytes < 0) {
1109 throw new HBaseIOException("Error writing on the socket " +
1110 "for the call:" + call.toShortString());
1111 }
1112 error = false;
1113 } finally {
1114 if (error) {
1115 LOG.debug(getName() + call.toShortString() + ": output error -- closing");
1116 closeConnection(call.connection);
1117 }
1118 }
1119
1120 if (!call.response.hasRemaining()) {
1121 call.done();
1122 return true;
1123 } else {
1124 return false;
1125 }
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136 private boolean processAllResponses(final Connection connection) throws IOException {
1137
1138 connection.responseWriteLock.lock();
1139 try {
1140 for (int i = 0; i < 20; i++) {
1141
1142 Call call = connection.responseQueue.pollFirst();
1143 if (call == null) {
1144 return true;
1145 }
1146 if (!processResponse(call)) {
1147 connection.responseQueue.addFirst(call);
1148 return false;
1149 }
1150 }
1151 } finally {
1152 connection.responseWriteLock.unlock();
1153 }
1154
1155 return connection.responseQueue.isEmpty();
1156 }
1157
1158
1159
1160
1161 void doRespond(Call call) throws IOException {
1162 boolean added = false;
1163
1164
1165
1166 if (call.connection.responseQueue.isEmpty() && call.connection.responseWriteLock.tryLock()) {
1167 try {
1168 if (call.connection.responseQueue.isEmpty()) {
1169
1170
1171 if (processResponse(call)) {
1172 return;
1173 }
1174
1175 call.connection.responseQueue.addFirst(call);
1176 added = true;
1177 }
1178 } finally {
1179 call.connection.responseWriteLock.unlock();
1180 }
1181 }
1182
1183 if (!added) {
1184 call.connection.responseQueue.addLast(call);
1185 }
1186 call.responder.registerForWrite(call.connection);
1187
1188
1189 call.timestamp = System.currentTimeMillis();
1190 }
1191 }
1192
1193
1194 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
1195 value="VO_VOLATILE_INCREMENT",
1196 justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
1197 public class Connection {
1198
1199 private boolean connectionPreambleRead = false;
1200
1201 private boolean connectionHeaderRead = false;
1202 protected SocketChannel channel;
1203 private ByteBuffer data;
1204 private ByteBuffer dataLengthBuffer;
1205 protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<Call>();
1206 private final Lock responseWriteLock = new ReentrantLock();
1207 private Counter rpcCount = new Counter();
1208 private long lastContact;
1209 private InetAddress addr;
1210 protected Socket socket;
1211
1212
1213 protected String hostAddress;
1214 protected int remotePort;
1215 ConnectionHeader connectionHeader;
1216
1217
1218
1219 private Codec codec;
1220
1221
1222
1223 private CompressionCodec compressionCodec;
1224 BlockingService service;
1225 protected UserGroupInformation user = null;
1226 private AuthMethod authMethod;
1227 private boolean saslContextEstablished;
1228 private boolean skipInitialSaslHandshake;
1229 private ByteBuffer unwrappedData;
1230
1231 private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
1232 boolean useSasl;
1233 SaslServer saslServer;
1234 private boolean useWrap = false;
1235
1236 private static final int AUTHORIZATION_FAILED_CALLID = -1;
1237 private final Call authFailedCall =
1238 new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
1239 null);
1240 private ByteArrayOutputStream authFailedResponse =
1241 new ByteArrayOutputStream();
1242
1243 private static final int SASL_CALLID = -33;
1244 private final Call saslCall =
1245 new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
1246
1247 public UserGroupInformation attemptingUser = null;
1248
1249 public Connection(SocketChannel channel, long lastContact) {
1250 this.channel = channel;
1251 this.lastContact = lastContact;
1252 this.data = null;
1253 this.dataLengthBuffer = ByteBuffer.allocate(4);
1254 this.socket = channel.socket();
1255 this.addr = socket.getInetAddress();
1256 if (addr == null) {
1257 this.hostAddress = "*Unknown*";
1258 } else {
1259 this.hostAddress = addr.getHostAddress();
1260 }
1261 this.remotePort = socket.getPort();
1262 if (socketSendBufferSize != 0) {
1263 try {
1264 socket.setSendBufferSize(socketSendBufferSize);
1265 } catch (IOException e) {
1266 LOG.warn("Connection: unable to set socket send buffer size to " +
1267 socketSendBufferSize);
1268 }
1269 }
1270 }
1271
1272 @Override
1273 public String toString() {
1274 return getHostAddress() + ":" + remotePort;
1275 }
1276
1277 public String getHostAddress() {
1278 return hostAddress;
1279 }
1280
1281 public InetAddress getHostInetAddress() {
1282 return addr;
1283 }
1284
1285 public int getRemotePort() {
1286 return remotePort;
1287 }
1288
1289 public void setLastContact(long lastContact) {
1290 this.lastContact = lastContact;
1291 }
1292
1293 public VersionInfo getVersionInfo() {
1294 if (connectionHeader.hasVersionInfo()) {
1295 return connectionHeader.getVersionInfo();
1296 }
1297 return null;
1298 }
1299
1300
1301 private boolean isIdle() {
1302 return rpcCount.get() == 0;
1303 }
1304
1305
1306 protected void decRpcCount() {
1307 rpcCount.decrement();
1308 }
1309
1310
1311 protected void incRpcCount() {
1312 rpcCount.increment();
1313 }
1314
1315 protected boolean timedOut(long currentTime) {
1316 return isIdle() && currentTime - lastContact > maxIdleTime;
1317 }
1318
1319 private UserGroupInformation getAuthorizedUgi(String authorizedId)
1320 throws IOException {
1321 if (authMethod == AuthMethod.DIGEST) {
1322 TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
1323 secretManager);
1324 UserGroupInformation ugi = tokenId.getUser();
1325 if (ugi == null) {
1326 throw new AccessDeniedException(
1327 "Can't retrieve username from tokenIdentifier.");
1328 }
1329 ugi.addTokenIdentifier(tokenId);
1330 return ugi;
1331 } else {
1332 return UserGroupInformation.createRemoteUser(authorizedId);
1333 }
1334 }
1335
1336 private void saslReadAndProcess(ByteBuffer saslToken) throws IOException,
1337 InterruptedException {
1338 if (saslContextEstablished) {
1339 if (LOG.isTraceEnabled())
1340 LOG.trace("Have read input token of size " + saslToken.limit()
1341 + " for processing by saslServer.unwrap()");
1342
1343 if (!useWrap) {
1344 processOneRpc(saslToken);
1345 } else {
1346 byte[] b = saslToken.array();
1347 byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit());
1348 processUnwrappedData(plaintextData);
1349 }
1350 } else {
1351 byte[] replyToken;
1352 try {
1353 if (saslServer == null) {
1354 switch (authMethod) {
1355 case DIGEST:
1356 if (secretManager == null) {
1357 throw new AccessDeniedException(
1358 "Server is not configured to do DIGEST authentication.");
1359 }
1360 saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
1361 .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
1362 SaslUtil.SASL_PROPS, new SaslDigestCallbackHandler(
1363 secretManager, this));
1364 break;
1365 default:
1366 UserGroupInformation current = UserGroupInformation.getCurrentUser();
1367 String fullName = current.getUserName();
1368 if (LOG.isDebugEnabled()) {
1369 LOG.debug("Kerberos principal name is " + fullName);
1370 }
1371 final String names[] = SaslUtil.splitKerberosName(fullName);
1372 if (names.length != 3) {
1373 throw new AccessDeniedException(
1374 "Kerberos principal name does NOT have the expected "
1375 + "hostname part: " + fullName);
1376 }
1377 current.doAs(new PrivilegedExceptionAction<Object>() {
1378 @Override
1379 public Object run() throws SaslException {
1380 saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
1381 .getMechanismName(), names[0], names[1],
1382 SaslUtil.SASL_PROPS, new SaslGssCallbackHandler());
1383 return null;
1384 }
1385 });
1386 }
1387 if (saslServer == null)
1388 throw new AccessDeniedException(
1389 "Unable to find SASL server implementation for "
1390 + authMethod.getMechanismName());
1391 if (LOG.isDebugEnabled()) {
1392 LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
1393 }
1394 }
1395 if (LOG.isDebugEnabled()) {
1396 LOG.debug("Have read input token of size " + saslToken.limit()
1397 + " for processing by saslServer.evaluateResponse()");
1398 }
1399 replyToken = saslServer.evaluateResponse(saslToken.array());
1400 } catch (IOException e) {
1401 IOException sendToClient = e;
1402 Throwable cause = e;
1403 while (cause != null) {
1404 if (cause instanceof InvalidToken) {
1405 sendToClient = (InvalidToken) cause;
1406 break;
1407 }
1408 cause = cause.getCause();
1409 }
1410 doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
1411 sendToClient.getLocalizedMessage());
1412 metrics.authenticationFailure();
1413 String clientIP = this.toString();
1414
1415 AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
1416 throw e;
1417 }
1418 if (replyToken != null) {
1419 if (LOG.isDebugEnabled()) {
1420 LOG.debug("Will send token of size " + replyToken.length
1421 + " from saslServer.");
1422 }
1423 doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
1424 null);
1425 }
1426 if (saslServer.isComplete()) {
1427 String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
1428 useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
1429 user = getAuthorizedUgi(saslServer.getAuthorizationID());
1430 if (LOG.isDebugEnabled()) {
1431 LOG.debug("SASL server context established. Authenticated client: "
1432 + user + ". Negotiated QoP is "
1433 + saslServer.getNegotiatedProperty(Sasl.QOP));
1434 }
1435 metrics.authenticationSuccess();
1436 AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
1437 saslContextEstablished = true;
1438 }
1439 }
1440 }
1441
1442
1443
1444
1445 private void doRawSaslReply(SaslStatus status, Writable rv,
1446 String errorClass, String error) throws IOException {
1447 ByteBufferOutputStream saslResponse = null;
1448 DataOutputStream out = null;
1449 try {
1450
1451
1452 saslResponse = new ByteBufferOutputStream(256);
1453 out = new DataOutputStream(saslResponse);
1454 out.writeInt(status.state);
1455 if (status == SaslStatus.SUCCESS) {
1456 rv.write(out);
1457 } else {
1458 WritableUtils.writeString(out, errorClass);
1459 WritableUtils.writeString(out, error);
1460 }
1461 saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
1462 saslCall.responder = responder;
1463 saslCall.sendResponseIfReady();
1464 } finally {
1465 if (saslResponse != null) {
1466 saslResponse.close();
1467 }
1468 if (out != null) {
1469 out.close();
1470 }
1471 }
1472 }
1473
1474 private void disposeSasl() {
1475 if (saslServer != null) {
1476 try {
1477 saslServer.dispose();
1478 saslServer = null;
1479 } catch (SaslException ignored) {
1480
1481 }
1482 }
1483 }
1484
1485 private int readPreamble() throws IOException {
1486 int count;
1487
1488 this.dataLengthBuffer.flip();
1489 if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
1490 return doBadPreambleHandling("Expected HEADER=" +
1491 Bytes.toStringBinary(HConstants.RPC_HEADER) +
1492 " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
1493 " from " + toString());
1494 }
1495
1496 ByteBuffer versionAndAuthBytes = ByteBuffer.allocate(2);
1497 count = channelRead(channel, versionAndAuthBytes);
1498 if (count < 0 || versionAndAuthBytes.remaining() > 0) {
1499 return count;
1500 }
1501 int version = versionAndAuthBytes.get(0);
1502 byte authbyte = versionAndAuthBytes.get(1);
1503 this.authMethod = AuthMethod.valueOf(authbyte);
1504 if (version != CURRENT_VERSION) {
1505 String msg = getFatalConnectionString(version, authbyte);
1506 return doBadPreambleHandling(msg, new WrongVersionException(msg));
1507 }
1508 if (authMethod == null) {
1509 String msg = getFatalConnectionString(version, authbyte);
1510 return doBadPreambleHandling(msg, new BadAuthException(msg));
1511 }
1512 if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
1513 AccessDeniedException ae = new AccessDeniedException("Authentication is required");
1514 setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
1515 responder.doRespond(authFailedCall);
1516 throw ae;
1517 }
1518 if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
1519 doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(
1520 SaslUtil.SWITCH_TO_SIMPLE_AUTH), null, null);
1521 authMethod = AuthMethod.SIMPLE;
1522
1523
1524
1525 skipInitialSaslHandshake = true;
1526 }
1527 if (authMethod != AuthMethod.SIMPLE) {
1528 useSasl = true;
1529 }
1530
1531 dataLengthBuffer.clear();
1532 connectionPreambleRead = true;
1533 return count;
1534 }
1535
1536 private int read4Bytes() throws IOException {
1537 if (this.dataLengthBuffer.remaining() > 0) {
1538 return channelRead(channel, this.dataLengthBuffer);
1539 } else {
1540 return 0;
1541 }
1542 }
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552 public int readAndProcess() throws IOException, InterruptedException {
1553
1554
1555
1556
1557 int count = read4Bytes();
1558 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1559 return count;
1560 }
1561
1562
1563 if (!connectionPreambleRead) {
1564 count = readPreamble();
1565 if (!connectionPreambleRead) {
1566 return count;
1567 }
1568
1569 count = read4Bytes();
1570 if (count < 0 || dataLengthBuffer.remaining() > 0) {
1571 return count;
1572 }
1573 }
1574
1575
1576
1577 if (data == null) {
1578 dataLengthBuffer.flip();
1579 int dataLength = dataLengthBuffer.getInt();
1580 if (dataLength == RpcClient.PING_CALL_ID) {
1581 if (!useWrap) {
1582 dataLengthBuffer.clear();
1583 return 0;
1584 }
1585 }
1586 if (dataLength < 0) {
1587 throw new IllegalArgumentException("Unexpected data length "
1588 + dataLength + "!! from " + getHostAddress());
1589 }
1590
1591
1592 data = ByteBuffer.allocate(dataLength);
1593
1594
1595
1596
1597 incRpcCount();
1598 }
1599
1600 count = channelRead(channel, data);
1601
1602 if (count >= 0 && data.remaining() == 0) {
1603 process();
1604 }
1605
1606 return count;
1607 }
1608
1609
1610
1611
1612 private void process() throws IOException, InterruptedException {
1613 data.flip();
1614 try {
1615 if (skipInitialSaslHandshake) {
1616 skipInitialSaslHandshake = false;
1617 return;
1618 }
1619
1620 if (useSasl) {
1621 saslReadAndProcess(data);
1622 } else {
1623 processOneRpc(data);
1624 }
1625
1626 } finally {
1627 dataLengthBuffer.clear();
1628 data = null;
1629 }
1630 }
1631
1632 private String getFatalConnectionString(final int version, final byte authByte) {
1633 return "serverVersion=" + CURRENT_VERSION +
1634 ", clientVersion=" + version + ", authMethod=" + authByte +
1635 ", authSupported=" + (authMethod != null) + " from " + toString();
1636 }
1637
1638 private int doBadPreambleHandling(final String msg) throws IOException {
1639 return doBadPreambleHandling(msg, new FatalConnectionException(msg));
1640 }
1641
1642 private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
1643 LOG.warn(msg);
1644 Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null);
1645 setupResponse(null, fakeCall, e, msg);
1646 responder.doRespond(fakeCall);
1647
1648 return -1;
1649 }
1650
1651
1652 private void processConnectionHeader(ByteBuffer buf) throws IOException {
1653 this.connectionHeader = ConnectionHeader.parseFrom(
1654 new ByteBufferInputStream(buf));
1655 String serviceName = connectionHeader.getServiceName();
1656 if (serviceName == null) throw new EmptyServiceNameException();
1657 this.service = getService(services, serviceName);
1658 if (this.service == null) throw new UnknownServiceException(serviceName);
1659 setupCellBlockCodecs(this.connectionHeader);
1660 UserGroupInformation protocolUser = createUser(connectionHeader);
1661 if (!useSasl) {
1662 user = protocolUser;
1663 if (user != null) {
1664 user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
1665 }
1666 } else {
1667
1668 user.setAuthenticationMethod(authMethod.authenticationMethod);
1669
1670
1671
1672 if ((protocolUser != null)
1673 && (!protocolUser.getUserName().equals(user.getUserName()))) {
1674 if (authMethod == AuthMethod.DIGEST) {
1675
1676 throw new AccessDeniedException("Authenticated user (" + user
1677 + ") doesn't match what the client claims to be ("
1678 + protocolUser + ")");
1679 } else {
1680
1681
1682
1683 UserGroupInformation realUser = user;
1684 user = UserGroupInformation.createProxyUser(protocolUser
1685 .getUserName(), realUser);
1686
1687 user.setAuthenticationMethod(AuthenticationMethod.PROXY);
1688 }
1689 }
1690 }
1691 if (connectionHeader.hasVersionInfo()) {
1692 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1693 + " with version info: "
1694 + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
1695 } else {
1696 AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
1697 + " with unknown version info");
1698 }
1699 }
1700
1701
1702
1703
1704
1705 private void setupCellBlockCodecs(final ConnectionHeader header)
1706 throws FatalConnectionException {
1707
1708 if (!header.hasCellBlockCodecClass()) return;
1709 String className = header.getCellBlockCodecClass();
1710 if (className == null || className.length() == 0) return;
1711 try {
1712 this.codec = (Codec)Class.forName(className).newInstance();
1713 } catch (Exception e) {
1714 throw new UnsupportedCellCodecException(className, e);
1715 }
1716 if (!header.hasCellBlockCompressorClass()) return;
1717 className = header.getCellBlockCompressorClass();
1718 try {
1719 this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
1720 } catch (Exception e) {
1721 throw new UnsupportedCompressionCodecException(className, e);
1722 }
1723 }
1724
1725 private void processUnwrappedData(byte[] inBuf) throws IOException,
1726 InterruptedException {
1727 ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
1728
1729 while (true) {
1730 int count;
1731 if (unwrappedDataLengthBuffer.remaining() > 0) {
1732 count = channelRead(ch, unwrappedDataLengthBuffer);
1733 if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
1734 return;
1735 }
1736
1737 if (unwrappedData == null) {
1738 unwrappedDataLengthBuffer.flip();
1739 int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
1740
1741 if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
1742 if (LOG.isDebugEnabled())
1743 LOG.debug("Received ping message");
1744 unwrappedDataLengthBuffer.clear();
1745 continue;
1746 }
1747 unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
1748 }
1749
1750 count = channelRead(ch, unwrappedData);
1751 if (count <= 0 || unwrappedData.remaining() > 0)
1752 return;
1753
1754 if (unwrappedData.remaining() == 0) {
1755 unwrappedDataLengthBuffer.clear();
1756 unwrappedData.flip();
1757 processOneRpc(unwrappedData);
1758 unwrappedData = null;
1759 }
1760 }
1761 }
1762
1763
1764 private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException {
1765 if (connectionHeaderRead) {
1766 processRequest(buf);
1767 } else {
1768 processConnectionHeader(buf);
1769 this.connectionHeaderRead = true;
1770 if (!authorizeConnection()) {
1771
1772
1773 throw new AccessDeniedException("Connection from " + this + " for service " +
1774 connectionHeader.getServiceName() + " is unauthorized for user: " + user);
1775 }
1776 }
1777 rpcCount.destroy();
1778 }
1779
1780
1781
1782
1783
1784
1785
1786 protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
1787 long totalRequestSize = buf.limit();
1788 int offset = 0;
1789
1790
1791 CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit());
1792 int headerSize = cis.readRawVarint32();
1793 offset = cis.getTotalBytesRead();
1794 Message.Builder builder = RequestHeader.newBuilder();
1795 ProtobufUtil.mergeFrom(builder, cis, headerSize);
1796 RequestHeader header = (RequestHeader) builder.build();
1797 offset += headerSize;
1798 int id = header.getCallId();
1799 if (LOG.isTraceEnabled()) {
1800 LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
1801 " totalRequestSize: " + totalRequestSize + " bytes");
1802 }
1803
1804
1805 if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
1806 final Call callTooBig =
1807 new Call(id, this.service, null, null, null, null, this,
1808 responder, totalRequestSize, null, null);
1809 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1810 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1811 InetSocketAddress address = getListenerAddress();
1812 setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
1813 "Call queue is full on " + (address != null ? address : "(channel closed)") +
1814 ", is hbase.ipc.server.max.callqueue.size too small?");
1815 responder.doRespond(callTooBig);
1816 return;
1817 }
1818 MethodDescriptor md = null;
1819 Message param = null;
1820 CellScanner cellScanner = null;
1821 try {
1822 if (header.hasRequestParam() && header.getRequestParam()) {
1823 md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
1824 if (md == null) throw new UnsupportedOperationException(header.getMethodName());
1825 builder = this.service.getRequestPrototype(md).newBuilderForType();
1826 cis.resetSizeCounter();
1827 int paramSize = cis.readRawVarint32();
1828 offset += cis.getTotalBytesRead();
1829 if (builder != null) {
1830 ProtobufUtil.mergeFrom(builder, cis, paramSize);
1831 param = builder.build();
1832 }
1833 offset += paramSize;
1834 }
1835 if (header.hasCellBlockMeta()) {
1836 buf.position(offset);
1837 cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
1838 }
1839 } catch (Throwable t) {
1840 InetSocketAddress address = getListenerAddress();
1841 String msg = (address != null ? address : "(channel closed)") +
1842 " is unable to read call parameter from client " + getHostAddress();
1843 LOG.warn(msg, t);
1844
1845 metrics.exception(t);
1846
1847
1848 if (t instanceof LinkageError) {
1849 t = new DoNotRetryIOException(t);
1850 }
1851
1852 if (t instanceof UnsupportedOperationException) {
1853 t = new DoNotRetryIOException(t);
1854 }
1855
1856 final Call readParamsFailedCall =
1857 new Call(id, this.service, null, null, null, null, this,
1858 responder, totalRequestSize, null, null);
1859 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1860 setupResponse(responseBuffer, readParamsFailedCall, t,
1861 msg + "; " + t.getMessage());
1862 responder.doRespond(readParamsFailedCall);
1863 return;
1864 }
1865
1866 TraceInfo traceInfo = header.hasTraceInfo()
1867 ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
1868 : null;
1869 Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
1870 totalRequestSize, traceInfo, RpcServer.getRemoteIp());
1871 if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
1872 callQueueSize.add(-1 * call.getSize());
1873
1874 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
1875 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
1876 InetSocketAddress address = getListenerAddress();
1877 setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
1878 "Call queue is full on " + (address != null ? address : "(channel closed)") +
1879 ", too many items queued ?");
1880 responder.doRespond(call);
1881 }
1882 }
1883
1884 private boolean authorizeConnection() throws IOException {
1885 try {
1886
1887
1888
1889
1890 if (user != null && user.getRealUser() != null
1891 && (authMethod != AuthMethod.DIGEST)) {
1892 ProxyUsers.authorize(user, this.getHostAddress(), conf);
1893 }
1894 authorize(user, connectionHeader, getHostInetAddress());
1895 metrics.authorizationSuccess();
1896 } catch (AuthorizationException ae) {
1897 if (LOG.isDebugEnabled()) {
1898 LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
1899 }
1900 metrics.authorizationFailure();
1901 setupResponse(authFailedResponse, authFailedCall,
1902 new AccessDeniedException(ae), ae.getMessage());
1903 responder.doRespond(authFailedCall);
1904 return false;
1905 }
1906 return true;
1907 }
1908
1909 protected synchronized void close() {
1910 disposeSasl();
1911 data = null;
1912 if (!channel.isOpen())
1913 return;
1914 try {socket.shutdownOutput();} catch(Exception ignored) {}
1915 if (channel.isOpen()) {
1916 try {channel.close();} catch(Exception ignored) {}
1917 }
1918 try {socket.close();} catch(Exception ignored) {}
1919 }
1920
1921 private UserGroupInformation createUser(ConnectionHeader head) {
1922 UserGroupInformation ugi = null;
1923
1924 if (!head.hasUserInfo()) {
1925 return null;
1926 }
1927 UserInformation userInfoProto = head.getUserInfo();
1928 String effectiveUser = null;
1929 if (userInfoProto.hasEffectiveUser()) {
1930 effectiveUser = userInfoProto.getEffectiveUser();
1931 }
1932 String realUser = null;
1933 if (userInfoProto.hasRealUser()) {
1934 realUser = userInfoProto.getRealUser();
1935 }
1936 if (effectiveUser != null) {
1937 if (realUser != null) {
1938 UserGroupInformation realUserUgi =
1939 UserGroupInformation.createRemoteUser(realUser);
1940 ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
1941 } else {
1942 ugi = UserGroupInformation.createRemoteUser(effectiveUser);
1943 }
1944 }
1945 return ugi;
1946 }
1947 }
1948
1949
1950
1951
1952
1953
1954
1955 public static class BlockingServiceAndInterface {
1956 private final BlockingService service;
1957 private final Class<?> serviceInterface;
1958 public BlockingServiceAndInterface(final BlockingService service,
1959 final Class<?> serviceInterface) {
1960 this.service = service;
1961 this.serviceInterface = serviceInterface;
1962 }
1963 public Class<?> getServiceInterface() {
1964 return this.serviceInterface;
1965 }
1966 public BlockingService getBlockingService() {
1967 return this.service;
1968 }
1969 }
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981 public RpcServer(final Server server, final String name,
1982 final List<BlockingServiceAndInterface> services,
1983 final InetSocketAddress bindAddress, Configuration conf,
1984 RpcScheduler scheduler)
1985 throws IOException {
1986 this.reservoir = new BoundedByteBufferPool(
1987 conf.getInt("hbase.ipc.server.reservoir.max.buffer.size", 1024 * 1024),
1988 conf.getInt("hbase.ipc.server.reservoir.initial.buffer.size", 16 * 1024),
1989
1990 conf.getInt("hbase.ipc.server.reservoir.initial.max",
1991 conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
1992 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * 2));
1993 this.server = server;
1994 this.services = services;
1995 this.bindAddress = bindAddress;
1996 this.conf = conf;
1997 this.socketSendBufferSize = 0;
1998 this.maxQueueSize =
1999 this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
2000 this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
2001 this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
2002 this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
2003 this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
2004 this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2005 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
2006 this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
2007 this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE, DEFAULT_WARN_RESPONSE_SIZE);
2008
2009
2010 listener = new Listener(name);
2011 this.port = listener.getAddress().getPort();
2012
2013 this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this));
2014 this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
2015 this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
2016
2017 this.warnDelayedCalls = conf.getInt(WARN_DELAYED_CALLS, DEFAULT_WARN_DELAYED_CALLS);
2018 this.delayedCalls = new AtomicInteger(0);
2019 this.ipcUtil = new IPCUtil(conf);
2020
2021
2022
2023 responder = new Responder();
2024 this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
2025 this.userProvider = UserProvider.instantiate(conf);
2026 this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
2027 if (isSecurityEnabled) {
2028 HBaseSaslRpcServer.init(conf);
2029 }
2030 this.scheduler = scheduler;
2031 this.scheduler.init(new RpcSchedulerContext(this));
2032 }
2033
2034
2035
2036
2037
2038 protected Connection getConnection(SocketChannel channel, long time) {
2039 return new Connection(channel, time);
2040 }
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050 private void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
2051 throws IOException {
2052 if (response != null) response.reset();
2053 call.setResponse(null, null, t, error);
2054 }
2055
2056 protected void closeConnection(Connection connection) {
2057 synchronized (connectionList) {
2058 if (connectionList.remove(connection)) {
2059 numConnections--;
2060 }
2061 }
2062 connection.close();
2063 }
2064
2065 Configuration getConf() {
2066 return conf;
2067 }
2068
2069
2070
2071
2072 @Override
2073 public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
2074
2075 @Override
2076 public boolean isStarted() {
2077 return this.started;
2078 }
2079
2080
2081 @Override
2082 public synchronized void start() {
2083 if (started) return;
2084 authTokenSecretMgr = createSecretManager();
2085 if (authTokenSecretMgr != null) {
2086 setSecretManager(authTokenSecretMgr);
2087 authTokenSecretMgr.start();
2088 }
2089 this.authManager = new ServiceAuthorizationManager();
2090 HBasePolicyProvider.init(conf, authManager);
2091 responder.start();
2092 listener.start();
2093 scheduler.start();
2094 started = true;
2095 }
2096
2097 @Override
2098 public void onConfigurationChange(Configuration newConf) {
2099
2100 if (scheduler instanceof ConfigurationObserver) {
2101 ((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
2102 }
2103 }
2104
2105 @Override
2106 public void refreshAuthManager(PolicyProvider pp) {
2107
2108
2109 this.authManager.refresh(this.conf, pp);
2110 }
2111
2112 private AuthenticationTokenSecretManager createSecretManager() {
2113 if (!isSecurityEnabled) return null;
2114 if (server == null) return null;
2115 Configuration conf = server.getConfiguration();
2116 long keyUpdateInterval =
2117 conf.getLong("hbase.auth.key.update.interval", 24*60*60*1000);
2118 long maxAge =
2119 conf.getLong("hbase.auth.token.max.lifetime", 7*24*60*60*1000);
2120 return new AuthenticationTokenSecretManager(conf, server.getZooKeeper(),
2121 server.getServerName().toString(), keyUpdateInterval, maxAge);
2122 }
2123
2124 public SecretManager<? extends TokenIdentifier> getSecretManager() {
2125 return this.secretManager;
2126 }
2127
2128 @SuppressWarnings("unchecked")
2129 public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
2130 this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
2131 }
2132
2133
2134
2135
2136
2137
2138 @Override
2139 public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
2140 Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
2141 throws IOException {
2142 try {
2143 status.setRPC(md.getName(), new Object[]{param}, receiveTime);
2144
2145 status.setRPCPacket(param);
2146 status.resume("Servicing call");
2147
2148 long startTime = System.currentTimeMillis();
2149 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
2150 Message result = service.callBlockingMethod(md, controller, param);
2151 long endTime = System.currentTimeMillis();
2152 int processingTime = (int) (endTime - startTime);
2153 int qTime = (int) (startTime - receiveTime);
2154 int totalTime = (int) (endTime - receiveTime);
2155 if (LOG.isTraceEnabled()) {
2156 LOG.trace(CurCall.get().toString() +
2157 ", response " + TextFormat.shortDebugString(result) +
2158 " queueTime: " + qTime +
2159 " processingTime: " + processingTime +
2160 " totalTime: " + totalTime);
2161 }
2162 long requestSize = param.getSerializedSize();
2163 long responseSize = result.getSerializedSize();
2164 metrics.dequeuedCall(qTime);
2165 metrics.processedCall(processingTime);
2166 metrics.totalCall(totalTime);
2167 metrics.receivedRequest(requestSize);
2168 metrics.sentResponse(responseSize);
2169
2170
2171 boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
2172 boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
2173 if (tooSlow || tooLarge) {
2174
2175
2176 logResponse(new Object[]{param},
2177 md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
2178 (tooLarge ? "TooLarge" : "TooSlow"),
2179 status.getClient(), startTime, processingTime, qTime,
2180 responseSize);
2181 }
2182 return new Pair<Message, CellScanner>(result, controller.cellScanner());
2183 } catch (Throwable e) {
2184
2185
2186
2187 if (e instanceof ServiceException) e = e.getCause();
2188
2189
2190 metrics.exception(e);
2191
2192 if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
2193 if (e instanceof IOException) throw (IOException)e;
2194 LOG.error("Unexpected throwable object ", e);
2195 throw new IOException(e.getMessage(), e);
2196 }
2197 }
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213 void logResponse(Object[] params, String methodName, String call, String tag,
2214 String clientAddress, long startTime, int processingTime, int qTime,
2215 long responseSize)
2216 throws IOException {
2217
2218 Map<String, Object> responseInfo = new HashMap<String, Object>();
2219 responseInfo.put("starttimems", startTime);
2220 responseInfo.put("processingtimems", processingTime);
2221 responseInfo.put("queuetimems", qTime);
2222 responseInfo.put("responsesize", responseSize);
2223 responseInfo.put("client", clientAddress);
2224 responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
2225 responseInfo.put("method", methodName);
2226 if (params.length == 2 && server instanceof HRegionServer &&
2227 params[0] instanceof byte[] &&
2228 params[1] instanceof Operation) {
2229
2230
2231 TableName tableName = TableName.valueOf(
2232 HRegionInfo.parseRegionName((byte[]) params[0])[0]);
2233 responseInfo.put("table", tableName.getNameAsString());
2234
2235 responseInfo.putAll(((Operation) params[1]).toMap());
2236
2237 LOG.warn("(operation" + tag + "): " +
2238 MAPPER.writeValueAsString(responseInfo));
2239 } else if (params.length == 1 && server instanceof HRegionServer &&
2240 params[0] instanceof Operation) {
2241
2242 responseInfo.putAll(((Operation) params[0]).toMap());
2243
2244 LOG.warn("(operation" + tag + "): " +
2245 MAPPER.writeValueAsString(responseInfo));
2246 } else {
2247
2248
2249 responseInfo.put("call", call);
2250 LOG.warn("(response" + tag + "): " + MAPPER.writeValueAsString(responseInfo));
2251 }
2252 }
2253
2254
2255 @Override
2256 public synchronized void stop() {
2257 LOG.info("Stopping server on " + port);
2258 running = false;
2259 if (authTokenSecretMgr != null) {
2260 authTokenSecretMgr.stop();
2261 authTokenSecretMgr = null;
2262 }
2263 listener.interrupt();
2264 listener.doStop();
2265 responder.interrupt();
2266 scheduler.stop();
2267 notifyAll();
2268 }
2269
2270
2271
2272
2273
2274
2275 @Override
2276 public synchronized void join() throws InterruptedException {
2277 while (running) {
2278 wait();
2279 }
2280 }
2281
2282
2283
2284
2285
2286
2287
2288 @Override
2289 public synchronized InetSocketAddress getListenerAddress() {
2290 if (listener == null) {
2291 return null;
2292 }
2293 return listener.getAddress();
2294 }
2295
2296
2297
2298
2299
2300 @Override
2301 public void setErrorHandler(HBaseRPCErrorHandler handler) {
2302 this.errorHandler = handler;
2303 }
2304
2305 @Override
2306 public HBaseRPCErrorHandler getErrorHandler() {
2307 return this.errorHandler;
2308 }
2309
2310
2311
2312
2313 @Override
2314 public MetricsHBaseServer getMetrics() {
2315 return metrics;
2316 }
2317
2318 @Override
2319 public void addCallSize(final long diff) {
2320 this.callQueueSize.add(diff);
2321 }
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332 public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
2333 throws AuthorizationException {
2334 if (authorize) {
2335 Class<?> c = getServiceInterface(services, connection.getServiceName());
2336 this.authManager.authorize(user != null ? user : null, c, getConf(), addr);
2337 }
2338 }
2339
2340
2341
2342
2343
2344
2345 private static int NIO_BUFFER_LIMIT = 64 * 1024;
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361 protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
2362 throws IOException {
2363 long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
2364 if (count > 0) this.metrics.sentBytes(count);
2365 return count;
2366 }
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380 protected int channelRead(ReadableByteChannel channel,
2381 ByteBuffer buffer) throws IOException {
2382
2383 int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
2384 channel.read(buffer) : channelIO(channel, null, buffer);
2385 if (count > 0) {
2386 metrics.receivedBytes(count);
2387 }
2388 return count;
2389 }
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404 private static int channelIO(ReadableByteChannel readCh,
2405 WritableByteChannel writeCh,
2406 ByteBuffer buf) throws IOException {
2407
2408 int originalLimit = buf.limit();
2409 int initialRemaining = buf.remaining();
2410 int ret = 0;
2411
2412 while (buf.remaining() > 0) {
2413 try {
2414 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
2415 buf.limit(buf.position() + ioSize);
2416
2417 ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
2418
2419 if (ret < ioSize) {
2420 break;
2421 }
2422
2423 } finally {
2424 buf.limit(originalLimit);
2425 }
2426 }
2427
2428 int nBytes = initialRemaining - buf.remaining();
2429 return (nBytes > 0) ? nBytes : ret;
2430 }
2431
2432
2433
2434
2435
2436
2437
2438 public static RpcCallContext getCurrentCall() {
2439 return CurCall.get();
2440 }
2441
2442 public static boolean isInRpcCallContext() {
2443 return CurCall.get() != null;
2444 }
2445
2446
2447
2448
2449
2450
2451 public static User getRequestUser() {
2452 RpcCallContext ctx = getCurrentCall();
2453 return ctx == null? null: ctx.getRequestUser();
2454 }
2455
2456
2457
2458
2459
2460 public static String getRequestUserName() {
2461 User user = getRequestUser();
2462 return user == null? null: user.getShortName();
2463 }
2464
2465
2466
2467
2468 public static InetAddress getRemoteAddress() {
2469 RpcCallContext ctx = getCurrentCall();
2470 return ctx == null? null: ctx.getRemoteAddress();
2471 }
2472
2473
2474
2475
2476
2477
2478 static BlockingServiceAndInterface getServiceAndInterface(
2479 final List<BlockingServiceAndInterface> services, final String serviceName) {
2480 for (BlockingServiceAndInterface bs : services) {
2481 if (bs.getBlockingService().getDescriptorForType().getName().equals(serviceName)) {
2482 return bs;
2483 }
2484 }
2485 return null;
2486 }
2487
2488
2489
2490
2491
2492
2493 static Class<?> getServiceInterface(
2494 final List<BlockingServiceAndInterface> services,
2495 final String serviceName) {
2496 BlockingServiceAndInterface bsasi =
2497 getServiceAndInterface(services, serviceName);
2498 return bsasi == null? null: bsasi.getServiceInterface();
2499 }
2500
2501
2502
2503
2504
2505
2506 static BlockingService getService(
2507 final List<BlockingServiceAndInterface> services,
2508 final String serviceName) {
2509 BlockingServiceAndInterface bsasi =
2510 getServiceAndInterface(services, serviceName);
2511 return bsasi == null? null: bsasi.getBlockingService();
2512 }
2513
2514 static MonitoredRPCHandler getStatus() {
2515
2516 MonitoredRPCHandler status = RpcServer.MONITORED_RPC.get();
2517 if (status != null) {
2518 return status;
2519 }
2520 status = TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
2521 status.pause("Waiting for a call");
2522 RpcServer.MONITORED_RPC.set(status);
2523 return status;
2524 }
2525
2526
2527
2528
2529
2530 public static InetAddress getRemoteIp() {
2531 Call call = CurCall.get();
2532 if (call != null && call.connection.socket != null) {
2533 return call.connection.socket.getInetAddress();
2534 }
2535 return null;
2536 }
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549 public static void bind(ServerSocket socket, InetSocketAddress address,
2550 int backlog) throws IOException {
2551 try {
2552 socket.bind(address, backlog);
2553 } catch (BindException e) {
2554 BindException bindException =
2555 new BindException("Problem binding to " + address + " : " +
2556 e.getMessage());
2557 bindException.initCause(e);
2558 throw bindException;
2559 } catch (SocketException e) {
2560
2561
2562 if ("Unresolved address".equals(e.getMessage())) {
2563 throw new UnknownHostException("Invalid hostname for server: " +
2564 address.getHostName());
2565 }
2566 throw e;
2567 }
2568 }
2569
2570 @Override
2571 public RpcScheduler getScheduler() {
2572 return scheduler;
2573 }
2574 }