1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import com.google.common.base.Objects;
22 import com.yammer.metrics.core.Histogram;
23 import org.apache.commons.cli.CommandLine;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
28 import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
29 import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
30 import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
31 import org.apache.hadoop.hbase.chaos.policies.Policy;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.ipc.RpcClient;
34 import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
35 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
36 import org.apache.hadoop.hbase.util.YammerHistogramUtils;
37 import org.apache.hadoop.mapreduce.Counters;
38 import org.apache.hadoop.mapreduce.Job;
39 import org.apache.hadoop.util.ToolRunner;
40 import org.junit.experimental.categories.Category;
41
42 import java.util.*;
43 import java.util.concurrent.Callable;
44
45 import static java.lang.String.format;
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertNotNull;
48 import static org.junit.Assert.assertTrue;
49
50
51
52
53
54
55
56
57 @Category(IntegrationTests.class)
58 public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
59
60 private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
61
62 private static final String SLEEP_TIME_KEY = "sleeptime";
63
64 private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
65 private static final String TABLE_NAME_KEY = "tableName";
66 private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
67 private static final String REPLICA_COUNT_KEY = "replicas";
68 private static final String REPLICA_COUNT_DEFAULT = "" + 3;
69 private static final String PRIMARY_TIMEOUT_KEY = "timeout";
70 private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000;
71 private static final String NUM_RS_KEY = "numRs";
72 private static final String NUM_RS_DEFAULT = "" + 3;
73
74
75 private enum Stat {
76 STDEV {
77 @Override
78 double apply(Histogram hist) {
79 return hist.stdDev();
80 }
81 },
82 FOUR_9S {
83 @Override
84 double apply(Histogram hist) {
85 return hist.getSnapshot().getValue(0.9999);
86 }
87 };
88
89 abstract double apply(Histogram hist);
90 }
91
92 private TableName tableName;
93 private long sleepTime;
94 private int replicaCount;
95 private int primaryTimeout;
96 private int clusterSize;
97
98
99
100
101 static class PerfEvalCallable implements Callable<TimingResult> {
102 private final Queue<String> argv = new LinkedList<String>();
103 private final Admin admin;
104
105 public PerfEvalCallable(Admin admin, String argv) {
106
107 this.admin = admin;
108 this.argv.addAll(Arrays.asList(argv.split(" ")));
109 LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
110 }
111
112 @Override
113 public TimingResult call() throws Exception {
114 PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
115 PerformanceEvaluation.checkTable(admin, opts);
116 PerformanceEvaluation.RunResult results[] = null;
117 long numRows = opts.totalRows;
118 long elapsedTime = 0;
119 if (opts.nomapred) {
120 results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
121 for (PerformanceEvaluation.RunResult r : results) {
122 elapsedTime = Math.max(elapsedTime, r.duration);
123 }
124 } else {
125 Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
126 Counters counters = job.getCounters();
127 numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
128 elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
129 }
130 return new TimingResult(numRows, elapsedTime, results);
131 }
132 }
133
134
135
136
137 static class TimingResult {
138 public final long numRows;
139 public final long elapsedTime;
140 public final PerformanceEvaluation.RunResult results[];
141
142 public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
143 this.numRows = numRows;
144 this.elapsedTime = elapsedTime;
145 this.results = results;
146 }
147
148 @Override
149 public String toString() {
150 return Objects.toStringHelper(this)
151 .add("numRows", numRows)
152 .add("elapsedTime", elapsedTime)
153 .toString();
154 }
155 }
156
157 @Override
158 public void setUp() throws Exception {
159 super.setUp();
160 Configuration conf = util.getConfiguration();
161
162
163
164 assertEquals("Master must be configured with StochasticLoadBalancer",
165 "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
166 conf.get("hbase.master.loadbalancer.class"));
167
168 assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
169 conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
170
171
172 conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
173
174 conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
175 conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
176 }
177
178 @Override
179 public void setUpCluster() throws Exception {
180 util = getTestingUtil(getConf());
181 util.initializeCluster(clusterSize);
182 }
183
184 @Override
185 public void setUpMonkey() throws Exception {
186 Policy p = new PeriodicRandomActionPolicy(sleepTime,
187 new RestartRandomRsExceptMetaAction(sleepTime),
188 new MoveRandomRegionOfTableAction(tableName));
189 this.monkey = new PolicyBasedChaosMonkey(util, p);
190
191 }
192
193 @Override
194 protected void addOptions() {
195 addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
196 + TABLE_NAME_DEFAULT + "'");
197 addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
198 + SLEEP_TIME_DEFAULT);
199 addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
200 + REPLICA_COUNT_DEFAULT);
201 addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
202 + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
203 addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
204 + NUM_RS_DEFAULT);
205 }
206
207 @Override
208 protected void processOptions(CommandLine cmd) {
209 tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
210 sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
211 replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
212 primaryTimeout =
213 Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
214 clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
215 LOG.debug(Objects.toStringHelper("Parsed Options")
216 .add(TABLE_NAME_KEY, tableName)
217 .add(SLEEP_TIME_KEY, sleepTime)
218 .add(REPLICA_COUNT_KEY, replicaCount)
219 .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
220 .add(NUM_RS_KEY, clusterSize)
221 .toString());
222 }
223
224 @Override
225 public int runTestFromCommandLine() throws Exception {
226 test();
227 return 0;
228 }
229
230 @Override
231 public TableName getTablename() {
232 return tableName;
233 }
234
235 @Override
236 protected Set<String> getColumnFamilies() {
237 return null;
238 }
239
240
241 private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
242 double sum = 0;
243 int count = 0;
244
245 for (TimingResult tr : results) {
246 for (PerformanceEvaluation.RunResult r : tr.results) {
247 assertNotNull("One of the run results is missing detailed run data.", r.hist);
248 sum += stat.apply(r.hist);
249 count += 1;
250 LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
251 }
252 }
253 return sum / count;
254 }
255
256 public void test() throws Exception {
257 int maxIters = 3;
258 String replicas = "--replicas=" + replicaCount;
259
260 String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
261 String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
262 splitPolicy, tableName);
263 String readOpts =
264 format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
265 String replicaReadOpts = format("%s %s", replicas, readOpts);
266
267 ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
268 ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);
269
270
271 LOG.debug("Populating table.");
272 new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
273
274
275 assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
276 DisabledRegionSplitPolicy.class.getName(),
277 util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
278 startMonkey();
279
280
281 for (int i = 0; i < maxIters; i++) {
282 LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
283 resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
284
285 Thread.sleep(5000l);
286 }
287
288
289 cleanUpMonkey("Altering table.");
290 LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
291 IntegrationTestingUtility.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
292 setUpMonkey();
293 startMonkey();
294
295
296 for (int i = 0; i < maxIters; i++) {
297 LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
298 resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
299
300 Thread.sleep(5000l);
301 }
302
303
304
305 double withoutReplicasStdevMean =
306 calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
307 double withoutReplicas9999Mean =
308 calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
309 double withReplicasStdevMean =
310 calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
311 double withReplicas9999Mean =
312 calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
313
314 LOG.info(Objects.toStringHelper(this)
315 .add("withoutReplicas", resultsWithoutReplicas)
316 .add("withReplicas", resultsWithReplicas)
317 .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
318 .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
319 .add("withReplicasStdevMean", withReplicasStdevMean)
320 .add("withReplicas99.99Mean", withReplicas9999Mean)
321 .toString());
322
323 assertTrue(
324 "Running with region replicas under chaos should have less request variance than without. "
325 + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
326 + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
327 withReplicasStdevMean <= withoutReplicasStdevMean);
328 assertTrue(
329 "Running with region replicas under chaos should improve 99.99pct latency. "
330 + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
331 + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
332 withReplicas9999Mean <= withoutReplicas9999Mean);
333 }
334
335 public static void main(String[] args) throws Exception {
336 Configuration conf = HBaseConfiguration.create();
337 IntegrationTestingUtility.setUseDistributedCluster(conf);
338 int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
339 System.exit(status);
340 }
341 }