View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import static org.junit.Assert.assertFalse;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.net.SocketAddress;
26  import java.net.SocketTimeoutException;
27  import java.util.Random;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.HBaseConfiguration;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.MasterNotRunningException;
37  import org.apache.hadoop.hbase.testclassification.MediumTests;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
40  import org.apache.hadoop.hbase.ipc.RpcClient;
41  import org.apache.hadoop.hbase.ipc.RpcClientFactory;
42  import org.apache.hadoop.hbase.ipc.RpcClientImpl;
43  import org.apache.hadoop.hbase.security.User;
44  import org.junit.AfterClass;
45  import org.junit.BeforeClass;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  import com.google.protobuf.BlockingRpcChannel;
50  import com.google.protobuf.Descriptors.MethodDescriptor;
51  import com.google.protobuf.Message;
52  import com.google.protobuf.RpcController;
53  import com.google.protobuf.ServiceException;
54  
55  @Category(MediumTests.class)
56  public class TestClientTimeouts {
57    final Log LOG = LogFactory.getLog(getClass());
58    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
59    protected static int SLAVES = 1;
60  
61   /**
62     * @throws java.lang.Exception
63     */
64    @BeforeClass
65    public static void setUpBeforeClass() throws Exception {
66      TEST_UTIL.startMiniCluster(SLAVES);
67      // Set the custom RPC client with random timeouts as the client
68      TEST_UTIL.getConfiguration().set(
69          RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
70          RandomTimeoutRpcClient.class.getName());
71    }
72  
73    /**
74     * @throws java.lang.Exception
75     */
76    @AfterClass
77    public static void tearDownAfterClass() throws Exception {
78      TEST_UTIL.shutdownMiniCluster();
79    }
80  
81    /**
82     * Test that a client that fails an RPC to the master retries properly and
83     * doesn't throw any unexpected exceptions.
84     * @throws Exception
85     */
86    @Test
87    public void testAdminTimeout() throws Exception {
88      Connection lastConnection = null;
89      boolean lastFailed = false;
90      int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
91      RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
92          .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
93  
94      try {
95        for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
96          lastFailed = false;
97          // Ensure the HBaseAdmin uses a new connection by changing Configuration.
98          Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
99          conf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
100         HBaseAdmin admin = null;
101         try {
102           admin = new HBaseAdmin(conf);
103           Connection connection = admin.getConnection();
104           assertFalse(connection == lastConnection);
105           lastConnection = connection;
106           // run some admin commands
107           HBaseAdmin.checkHBaseAvailable(conf);
108           admin.setBalancerRunning(false, false);
109         } catch (MasterNotRunningException ex) {
110           // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
111           // a MasterNotRunningException.  It's a bug if we get other exceptions.
112           lastFailed = true;
113         } finally {
114           admin.close();
115           if (admin.getConnection().isClosed()) {
116             rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
117                 .createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
118           }
119         }
120       }
121       // Ensure the RandomTimeoutRpcEngine is actually being used.
122       assertFalse(lastFailed);
123       assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
124     } finally {
125       rpcClient.close();
126     }
127   }
128 
129   /**
130    * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
131    */
132   public static class RandomTimeoutRpcClient extends RpcClientImpl{
133     public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
134       super(conf, clusterId, localAddr);
135     }
136 
137     // Return my own instance, one that does random timeouts
138     @Override
139     public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
140         User ticket, int rpcTimeout) {
141       return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
142     }
143   }
144 
145   /**
146    * Blocking rpc channel that goes via hbase rpc.
147    */
148   static class RandomTimeoutBlockingRpcChannel
149       extends AbstractRpcClient.BlockingRpcChannelImplementation {
150     private static final Random RANDOM = new Random(System.currentTimeMillis());
151     public static final double CHANCE_OF_TIMEOUT = 0.3;
152     private static AtomicInteger invokations = new AtomicInteger();
153 
154     RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
155         final User ticket, final int rpcTimeout) {
156       super(rpcClient, sn, ticket, rpcTimeout);
157     }
158 
159     @Override
160     public Message callBlockingMethod(MethodDescriptor md,
161         RpcController controller, Message param, Message returnType)
162         throws ServiceException {
163       invokations.getAndIncrement();
164       if (RANDOM.nextFloat() < CHANCE_OF_TIMEOUT) {
165         // throw a ServiceException, becuase that is the only exception type that
166         // {@link ProtobufRpcEngine} throws.  If this RpcEngine is used with a different
167         // "actual" type, this may not properly mimic the underlying RpcEngine.
168         throw new ServiceException(new SocketTimeoutException("fake timeout"));
169       }
170       return super.callBlockingMethod(md, controller, param, returnType);
171     }
172   }
173 }