1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertTrue;
24
25 import java.net.SocketAddress;
26 import java.net.SocketTimeoutException;
27 import java.util.Random;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.MasterNotRunningException;
37 import org.apache.hadoop.hbase.testclassification.MediumTests;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
40 import org.apache.hadoop.hbase.ipc.RpcClient;
41 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
42 import org.apache.hadoop.hbase.ipc.RpcClientImpl;
43 import org.apache.hadoop.hbase.security.User;
44 import org.junit.AfterClass;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.experimental.categories.Category;
48
49 import com.google.protobuf.BlockingRpcChannel;
50 import com.google.protobuf.Descriptors.MethodDescriptor;
51 import com.google.protobuf.Message;
52 import com.google.protobuf.RpcController;
53 import com.google.protobuf.ServiceException;
54
55 @Category(MediumTests.class)
56 public class TestClientTimeouts {
57 final Log LOG = LogFactory.getLog(getClass());
58 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
59 protected static int SLAVES = 1;
60
61
62
63
64 @BeforeClass
65 public static void setUpBeforeClass() throws Exception {
66 TEST_UTIL.startMiniCluster(SLAVES);
67
68 TEST_UTIL.getConfiguration().set(
69 RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
70 RandomTimeoutRpcClient.class.getName());
71 }
72
73
74
75
76 @AfterClass
77 public static void tearDownAfterClass() throws Exception {
78 TEST_UTIL.shutdownMiniCluster();
79 }
80
81
82
83
84
85
86 @Test
87 public void testAdminTimeout() throws Exception {
88 Connection lastConnection = null;
89 boolean lastFailed = false;
90 int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
91 RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
92 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
93
94 try {
95 for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
96 lastFailed = false;
97
98 Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
99 conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
100 HBaseAdmin admin = null;
101 try {
102 admin = new HBaseAdmin(conf);
103 Connection connection = admin.getConnection();
104 assertFalse(connection == lastConnection);
105 lastConnection = connection;
106
107 HBaseAdmin.checkHBaseAvailable(conf);
108 admin.setBalancerRunning(false, false);
109 } catch (MasterNotRunningException ex) {
110
111
112 lastFailed = true;
113 } finally {
114 admin.close();
115 if (admin.getConnection().isClosed()) {
116 rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
117 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
118 }
119 }
120 }
121
122 assertFalse(lastFailed);
123 assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
124 } finally {
125 rpcClient.close();
126 }
127 }
128
129
130
131
132 public static class RandomTimeoutRpcClient extends RpcClientImpl{
133 public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
134 super(conf, clusterId, localAddr);
135 }
136
137
138 @Override
139 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
140 User ticket, int rpcTimeout) {
141 return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
142 }
143 }
144
145
146
147
148 static class RandomTimeoutBlockingRpcChannel
149 extends AbstractRpcClient.BlockingRpcChannelImplementation {
150 private static final Random RANDOM = new Random(System.currentTimeMillis());
151 public static final double CHANCE_OF_TIMEOUT = 0.3;
152 private static AtomicInteger invokations = new AtomicInteger();
153
154 RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
155 final User ticket, final int rpcTimeout) {
156 super(rpcClient, sn, ticket, rpcTimeout);
157 }
158
159 @Override
160 public Message callBlockingMethod(MethodDescriptor md,
161 RpcController controller, Message param, Message returnType)
162 throws ServiceException {
163 invokations.getAndIncrement();
164 if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) {
165
166
167
168 throw new ServiceException(new SocketTimeoutException("fake timeout"));
169 }
170 return super.callBlockingMethod(md, controller, param, returnType);
171 }
172 }
173 }