1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import static org.junit.Assert.assertTrue;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.commons.logging.impl.Log4JLogger;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.testclassification.MediumTests;
31 import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.CoordinatedStateManager;
34 import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
35 import org.apache.hadoop.hbase.ipc.RpcServer;
36 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
37 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
38 import org.apache.hadoop.hbase.regionserver.HRegionServer;
39 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.log4j.Level;
42 import org.junit.AfterClass;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
46
47 import com.google.protobuf.RpcController;
48 import com.google.protobuf.ServiceException;
49
50
51
52
53
54 @Category(MediumTests.class)
55 public class TestClientScannerRPCTimeout {
56 final Log LOG = LogFactory.getLog(getClass());
57 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58 private static final byte[] FAMILY = Bytes.toBytes("testFamily");
59 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
60 private static final byte[] VALUE = Bytes.toBytes("testValue");
61 private static final int rpcTimeout = 2 * 1000;
62 private static final int CLIENT_RETRIES_NUMBER = 3;
63
64 @BeforeClass
65 public static void setUpBeforeClass() throws Exception {
66 ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
67 ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
68 ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
69 Configuration conf = TEST_UTIL.getConfiguration();
70
71 conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
72 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
73 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
74 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
75 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
76 TEST_UTIL.startMiniCluster(1);
77 }
78
79 @AfterClass
80 public static void tearDownAfterClass() throws Exception {
81 TEST_UTIL.shutdownMiniCluster();
82 }
83
84 @Test
85 public void testScannerNextRPCTimesout() throws Exception {
86 final TableName TABLE_NAME = TableName.valueOf("testScannerNextRPCTimesout");
87 Table ht = TEST_UTIL.createTable(TABLE_NAME, FAMILY);
88 byte[] r1 = Bytes.toBytes("row-1");
89 byte[] r2 = Bytes.toBytes("row-2");
90 byte[] r3 = Bytes.toBytes("row-3");
91 putToTable(ht, r1);
92 putToTable(ht, r2);
93 putToTable(ht, r3);
94 LOG.info("Wrote our three values");
95 RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
96 Scan scan = new Scan();
97 scan.setCaching(1);
98 ResultScanner scanner = ht.getScanner(scan);
99 Result result = scanner.next();
100 assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
101 LOG.info("Got expected first row");
102 long t1 = System.currentTimeMillis();
103 result = scanner.next();
104 assertTrue((System.currentTimeMillis() - t1) > rpcTimeout);
105 assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
106 RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;
107 result = scanner.next();
108 assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
109 scanner.close();
110
111
112 scanner = ht.getScanner(scan);
113 RSRpcServicesWithScanTimeout.sleepAlways = true;
114 RSRpcServicesWithScanTimeout.tryNumber = 0;
115 try {
116 result = scanner.next();
117 } catch (IOException ioe) {
118
119 LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
120 }
121 assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
122 + ", actual =" + RSRpcServicesWithScanTimeout.tryNumber,
123 RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
124 }
125
126 private void putToTable(Table ht, byte[] rowkey) throws IOException {
127 Put put = new Put(rowkey);
128 put.add(FAMILY, QUALIFIER, VALUE);
129 ht.put(put);
130 }
131
132 private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
133 public RegionServerWithScanTimeout(Configuration conf, CoordinatedStateManager cp)
134 throws IOException, InterruptedException {
135 super(conf, cp);
136 }
137
138 protected RSRpcServices createRpcServices() throws IOException {
139 return new RSRpcServicesWithScanTimeout(this);
140 }
141 }
142
143 private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
144 private long tableScannerId;
145 private boolean slept;
146 private static long seqNoToSleepOn = -1;
147 private static boolean sleepAlways = false;
148 private static int tryNumber = 0;
149
150 public RSRpcServicesWithScanTimeout(HRegionServer rs)
151 throws IOException {
152 super(rs);
153 }
154
155 @Override
156 public ScanResponse scan(final RpcController controller, final ScanRequest request)
157 throws ServiceException {
158 if (request.hasScannerId()) {
159 ScanResponse scanResponse = super.scan(controller, request);
160 if (this.tableScannerId == request.getScannerId() &&
161 (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
162 try {
163 LOG.info("SLEEPING " + (rpcTimeout + 500));
164 Thread.sleep(rpcTimeout + 500);
165 } catch (InterruptedException e) {
166 }
167 slept = true;
168 tryNumber++;
169 if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
170 sleepAlways = false;
171 }
172 }
173 return scanResponse;
174 } else {
175 ScanResponse scanRes = super.scan(controller, request);
176 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
177 if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
178 tableScannerId = scanRes.getScannerId();
179 }
180 return scanRes;
181 }
182 }
183 }
184 }