View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.ipc;
20  
21  import static org.mockito.Matchers.anyInt;
22  import static org.mockito.Mockito.doThrow;
23  import static org.mockito.Mockito.spy;
24  
25  import java.io.IOException;
26  import java.net.InetSocketAddress;
27  import java.net.Socket;
28  import java.util.ArrayList;
29  import java.util.List;
30  
31  import javax.net.SocketFactory;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.CellScannable;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HRegionInfo;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.KeyValueUtil;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.Waiter;
45  import org.apache.hadoop.hbase.testclassification.SmallTests;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.RowMutations;
48  import org.apache.hadoop.hbase.codec.Codec;
49  import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
50  import org.apache.hadoop.hbase.protobuf.RequestConverter;
51  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
52  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
53  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
54  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
55  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
56  import org.apache.hadoop.hbase.security.User;
57  import org.apache.hadoop.hbase.testclassification.SmallTests;
58  import org.apache.hadoop.net.NetUtils;
59  import org.junit.experimental.categories.Category;
60  import org.mockito.Mockito;
61  import org.mockito.invocation.InvocationOnMock;
62  import org.mockito.stubbing.Answer;
63  
64  import com.google.protobuf.ByteString;
65  import com.google.protobuf.Descriptors.MethodDescriptor;
66  
67  @Category({ SmallTests.class })
68  public class TestIPC extends AbstractTestIPC {
69  
70    private static final Log LOG = LogFactory.getLog(TestIPC.class);
71  
72    @Override
73    protected RpcClientImpl createRpcClientNoCodec(Configuration conf) {
74      return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
75        @Override
76        Codec getCodec() {
77          return null;
78        }
79      };
80    }
81  
82    @Override
83    protected RpcClientImpl createRpcClient(Configuration conf) {
84      return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
85    }
86  
87    @Override
88    protected RpcClientImpl createRpcClientRTEDuringConnectionSetup(Configuration conf)
89        throws IOException {
90      SocketFactory spyFactory = spy(NetUtils.getDefaultSocketFactory(conf));
91      Mockito.doAnswer(new Answer<Socket>() {
92        @Override
93        public Socket answer(InvocationOnMock invocation) throws Throwable {
94          Socket s = spy((Socket) invocation.callRealMethod());
95          doThrow(new RuntimeException("Injected fault")).when(s).setSoTimeout(anyInt());
96          return s;
97        }
98      }).when(spyFactory).createSocket();
99  
100     return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
101   }
102 
103   public static void main(String[] args) throws IOException, SecurityException,
104       NoSuchMethodException, InterruptedException {
105     if (args.length != 2) {
106       System.out.println("Usage: TestIPC <CYCLES> <CELLS_PER_CYCLE>");
107       return;
108     }
109     // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO);
110     // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO);
111     int cycles = Integer.parseInt(args[0]);
112     int cellcount = Integer.parseInt(args[1]);
113     Configuration conf = HBaseConfiguration.create();
114     TestRpcServer rpcServer = new TestRpcServer();
115     MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
116     EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
117     RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
118     KeyValue kv = BIG_CELL;
119     Put p = new Put(CellUtil.cloneRow(kv));
120     for (int i = 0; i < cellcount; i++) {
121       p.add(kv);
122     }
123     RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
124     rm.add(p);
125     try {
126       rpcServer.start();
127       long startTime = System.currentTimeMillis();
128       User user = User.getCurrent();
129       InetSocketAddress address = rpcServer.getListenerAddress();
130       if (address == null) {
131         throw new IOException("Listener channel is closed");
132       }
133       for (int i = 0; i < cycles; i++) {
134         List<CellScannable> cells = new ArrayList<CellScannable>();
135         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
136         ClientProtos.RegionAction.Builder builder =
137             RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells,
138               RegionAction.newBuilder(), ClientProtos.Action.newBuilder(),
139               MutationProto.newBuilder());
140         builder.setRegion(RegionSpecifier
141             .newBuilder()
142             .setType(RegionSpecifierType.REGION_NAME)
143             .setValue(
144               ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
145         if (i % 100000 == 0) {
146           LOG.info("" + i);
147           // Uncomment this for a thread dump every so often.
148           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
149           // "Thread dump " + Thread.currentThread().getName());
150         }
151         PayloadCarryingRpcController pcrc =
152             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
153         // Pair<Message, CellScanner> response =
154         client.call(pcrc, md, builder.build(), param, user, address);
155         /*
156          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
157          * count);
158          */
159       }
160       LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in "
161           + (System.currentTimeMillis() - startTime) + "ms");
162     } finally {
163       client.close();
164       rpcServer.stop();
165     }
166   }
167 
168 }