1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.client;
19
20 import java.io.IOException;
21 import java.util.Random;
22 import java.util.concurrent.ExecutorService;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.RegionLocations;
29 import org.apache.hadoop.hbase.ServerName;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
32 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
33 import org.apache.hadoop.hbase.security.User;
34
35 import com.google.common.annotations.VisibleForTesting;
36
37
38
39
40 @InterfaceAudience.Private
41 public class ConnectionUtils {
42
43 private static final Random RANDOM = new Random();
44
45
46
47
48
49
50
51 public static long getPauseTime(final long pause, final int tries) {
52 int ntries = tries;
53 if (ntries >= HConstants.RETRY_BACKOFF.length) {
54 ntries = HConstants.RETRY_BACKOFF.length - 1;
55 }
56 if (ntries < 0) {
57 ntries = 0;
58 }
59
60 long normalPause = pause * HConstants.RETRY_BACKOFF[ntries];
61 long jitter = (long)(normalPause * RANDOM.nextFloat() * 0.01f);
62 return normalPause + jitter;
63 }
64
65
66
67
68
69
70
71 public static long addJitter(final long pause, final float jitter) {
72 float lag = pause * (RANDOM.nextFloat() - 0.5f) * jitter;
73 long newPause = pause + (long) lag;
74 if (newPause <= 0) {
75 return 1;
76 }
77 return newPause;
78 }
79
80
81
82
83
84
85 public static NonceGenerator injectNonceGeneratorForTesting(
86 ClusterConnection conn, NonceGenerator cnm) {
87 return ConnectionManager.injectNonceGeneratorForTesting(conn, cnm);
88 }
89
90
91
92
93
94
95
96
97 public static void setServerSideHConnectionRetriesConfig(
98 final Configuration c, final String sn, final Log log) {
99
100 int hcRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
101 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
102
103
104 int serversideMultiplier = c.getInt("hbase.client.serverside.retries.multiplier", 10);
105 int retries = hcRetries * serversideMultiplier;
106 c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
107 log.info(sn + " server-side HConnection retries=" + retries);
108 }
109
110
111
112
113
114
115
116
117
118
119 public static ClusterConnection createShortCircuitHConnection(final Connection conn,
120 final ServerName serverName, final AdminService.BlockingInterface admin,
121 final ClientService.BlockingInterface client) {
122 return new ConnectionAdapter(conn) {
123 @Override
124 public AdminService.BlockingInterface getAdmin(
125 ServerName sn, boolean getMaster) throws IOException {
126 return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster);
127 }
128
129 @Override
130 public ClientService.BlockingInterface getClient(
131 ServerName sn) throws IOException {
132 return serverName.equals(sn) ? client : super.getClient(sn);
133 }
134 };
135 }
136
137
138
139
140
141 @VisibleForTesting
142 public static void setupMasterlessConnection(Configuration conf) {
143 conf.set(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
144 MasterlessConnection.class.getName());
145 }
146
147
148
149
150
151 static class MasterlessConnection extends ConnectionManager.HConnectionImplementation {
152 MasterlessConnection(Configuration conf, boolean managed,
153 ExecutorService pool, User user) throws IOException {
154 super(conf, managed, pool, user);
155 }
156
157 @Override
158 public boolean isTableDisabled(TableName tableName) throws IOException {
159
160 return false;
161 }
162 }
163 }