View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase;
20  
21  import com.google.common.base.Objects;
22  import com.yammer.metrics.core.Histogram;
23  import org.apache.commons.cli.CommandLine;
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
28  import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
29  import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
30  import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
31  import org.apache.hadoop.hbase.chaos.policies.Policy;
32  import org.apache.hadoop.hbase.client.Admin;
33  import org.apache.hadoop.hbase.ipc.RpcClient;
34  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
35  import org.apache.hadoop.hbase.testclassification.IntegrationTests;
36  import org.apache.hadoop.hbase.util.YammerHistogramUtils;
37  import org.apache.hadoop.mapreduce.Counters;
38  import org.apache.hadoop.mapreduce.Job;
39  import org.apache.hadoop.util.ToolRunner;
40  import org.junit.experimental.categories.Category;
41  
42  import java.util.*;
43  import java.util.concurrent.Callable;
44  
45  import static java.lang.String.format;
46  import static org.junit.Assert.assertEquals;
47  import static org.junit.Assert.assertNotNull;
48  import static org.junit.Assert.assertTrue;
49  
50  /**
51   * Test for comparing the performance impact of region replicas. Uses
52   * components of {@link PerformanceEvaluation}. Does not run from
53   * {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
54   * with the JUnit runner. Hence no @Test annotations either. See {@code -help}
55   * for full list of options.
56   */
57  @Category(IntegrationTests.class)
58  public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {
59  
60    private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);
61  
62    private static final String SLEEP_TIME_KEY = "sleeptime";
63    // short default interval because tests don't run very long.
64    private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
65    private static final String TABLE_NAME_KEY = "tableName";
66    private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
67    private static final String REPLICA_COUNT_KEY = "replicas";
68    private static final String REPLICA_COUNT_DEFAULT = "" + 3;
69    private static final String PRIMARY_TIMEOUT_KEY = "timeout";
70    private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
71    private static final String NUM_RS_KEY = "numRs";
72    private static final String NUM_RS_DEFAULT = "" + 3;
73  
74    /** Extract a descriptive statistic from a {@link com.yammer.metrics.core.Histogram}. */
75    private enum Stat {
76      STDEV {
77        @Override
78        double apply(Histogram hist) {
79          return hist.stdDev();
80        }
81      },
82      FOUR_9S {
83        @Override
84        double apply(Histogram hist) {
85          return hist.getSnapshot().getValue(0.9999);
86        }
87      };
88  
89      abstract double apply(Histogram hist);
90    }
91  
92    private TableName tableName;
93    private long sleepTime;
94    private int replicaCount;
95    private int primaryTimeout;
96    private int clusterSize;
97  
98    /**
99     * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
100    */
101   static class PerfEvalCallable implements Callable<TimingResult> {
102     private final Queue<String> argv = new LinkedList<String>();
103     private final Admin admin;
104 
105     public PerfEvalCallable(Admin admin, String argv) {
106       // TODO: this API is awkward, should take HConnection, not HBaseAdmin
107       this.admin = admin;
108       this.argv.addAll(Arrays.asList(argv.split(" ")));
109       LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
110     }
111 
112     @Override
113     public TimingResult call() throws Exception {
114       PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
115       PerformanceEvaluation.checkTable(admin, opts);
116       PerformanceEvaluation.RunResult results[] = null;
117       long numRows = opts.totalRows;
118       long elapsedTime = 0;
119       if (opts.nomapred) {
120         results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
121         for (PerformanceEvaluation.RunResult r : results) {
122           elapsedTime = Math.max(elapsedTime, r.duration);
123         }
124       } else {
125         Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
126         Counters counters = job.getCounters();
127         numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
128         elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
129       }
130       return new TimingResult(numRows, elapsedTime, results);
131     }
132   }
133 
134   /**
135    * Record the results from a single {@link PerformanceEvaluation} job run.
136    */
137   static class TimingResult {
138     public final long numRows;
139     public final long elapsedTime;
140     public final PerformanceEvaluation.RunResult results[];
141 
142     public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
143       this.numRows = numRows;
144       this.elapsedTime = elapsedTime;
145       this.results = results;
146     }
147 
148     @Override
149     public String toString() {
150       return Objects.toStringHelper(this)
151         .add("numRows", numRows)
152         .add("elapsedTime", elapsedTime)
153         .toString();
154     }
155   }
156 
157   @Override
158   public void setUp() throws Exception {
159     super.setUp();
160     Configuration conf = util.getConfiguration();
161 
162     // sanity check cluster
163     // TODO: this should reach out to master and verify online state instead
164     assertEquals("Master must be configured with StochasticLoadBalancer",
165       "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
166       conf.get("hbase.master.loadbalancer.class"));
167     // TODO: this should reach out to master and verify online state instead
168     assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
169       conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);
170 
171     // enable client-side settings
172     conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
173     // TODO: expose these settings to CLI override
174     conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
175     conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
176   }
177 
178   @Override
179   public void setUpCluster() throws Exception {
180     util = getTestingUtil(getConf());
181     util.initializeCluster(clusterSize);
182   }
183 
184   @Override
185   public void setUpMonkey() throws Exception {
186     Policy p = new PeriodicRandomActionPolicy(sleepTime,
187       new RestartRandomRsExceptMetaAction(sleepTime),
188       new MoveRandomRegionOfTableAction(tableName));
189     this.monkey = new PolicyBasedChaosMonkey(util, p);
190     // don't start monkey right away
191   }
192 
193   @Override
194   protected void addOptions() {
195     addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
196       + TABLE_NAME_DEFAULT + "'");
197     addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
198       + SLEEP_TIME_DEFAULT);
199     addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
200       + REPLICA_COUNT_DEFAULT);
201     addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
202       + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
203     addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
204         + NUM_RS_DEFAULT);
205   }
206 
207   @Override
208   protected void processOptions(CommandLine cmd) {
209     tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
210     sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
211     replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
212     primaryTimeout =
213       Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
214     clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
215     LOG.debug(Objects.toStringHelper("Parsed Options")
216       .add(TABLE_NAME_KEY, tableName)
217       .add(SLEEP_TIME_KEY, sleepTime)
218       .add(REPLICA_COUNT_KEY, replicaCount)
219       .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
220       .add(NUM_RS_KEY, clusterSize)
221       .toString());
222   }
223 
224   @Override
225   public int runTestFromCommandLine() throws Exception {
226     test();
227     return 0;
228   }
229 
230   @Override
231   public TableName getTablename() {
232     return tableName;
233   }
234 
235   @Override
236   protected Set<String> getColumnFamilies() {
237     return null;
238   }
239 
240   /** Compute the mean of the given {@code stat} from a timing results. */
241   private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
242     double sum = 0;
243     int count = 0;
244 
245     for (TimingResult tr : results) {
246       for (PerformanceEvaluation.RunResult r : tr.results) {
247         assertNotNull("One of the run results is missing detailed run data.", r.hist);
248         sum += stat.apply(r.hist);
249         count += 1;
250         LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
251       }
252     }
253     return sum / count;
254   }
255 
256   public void test() throws Exception {
257     int maxIters = 3;
258     String replicas = "--replicas=" + replicaCount;
259     // TODO: splits disabled until "phase 2" is complete.
260     String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
261     String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
262       splitPolicy, tableName);
263     String readOpts =
264       format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
265     String replicaReadOpts = format("%s %s", replicas, readOpts);
266 
267     ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
268     ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);
269 
270     // create/populate the table, replicas disabled
271     LOG.debug("Populating table.");
272     new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();
273 
274     // one last sanity check, then send in the clowns!
275     assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
276         DisabledRegionSplitPolicy.class.getName(),
277         util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
278     startMonkey();
279 
280     // collect a baseline without region replicas.
281     for (int i = 0; i < maxIters; i++) {
282       LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
283       resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
284       // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
285       Thread.sleep(5000l);
286     }
287 
288     // disable monkey, enable region replicas, enable monkey
289     cleanUpMonkey("Altering table.");
290     LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
291     IntegrationTestingUtility.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
292     setUpMonkey();
293     startMonkey();
294 
295     // run test with region replicas.
296     for (int i = 0; i < maxIters; i++) {
297       LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
298       resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
299       // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
300       Thread.sleep(5000l);
301     }
302 
303     // compare the average of the stdev and 99.99pct across runs to determine if region replicas
304     // are having an overall improvement on response variance experienced by clients.
305     double withoutReplicasStdevMean =
306         calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
307     double withoutReplicas9999Mean =
308         calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
309     double withReplicasStdevMean =
310         calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
311     double withReplicas9999Mean =
312         calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);
313 
314     LOG.info(Objects.toStringHelper(this)
315       .add("withoutReplicas", resultsWithoutReplicas)
316       .add("withReplicas", resultsWithReplicas)
317       .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
318       .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
319       .add("withReplicasStdevMean", withReplicasStdevMean)
320       .add("withReplicas99.99Mean", withReplicas9999Mean)
321       .toString());
322 
323     assertTrue(
324       "Running with region replicas under chaos should have less request variance than without. "
325       + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
326       + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
327       withReplicasStdevMean <= withoutReplicasStdevMean);
328     assertTrue(
329         "Running with region replicas under chaos should improve 99.99pct latency. "
330             + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
331             + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
332         withReplicas9999Mean <= withoutReplicas9999Mean);
333   }
334 
335   public static void main(String[] args) throws Exception {
336     Configuration conf = HBaseConfiguration.create();
337     IntegrationTestingUtility.setUseDistributedCluster(conf);
338     int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
339     System.exit(status);
340   }
341 }