1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.List;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
28 import org.apache.hadoop.hbase.util.ConstantDelayQueue;
29 import org.apache.hadoop.hbase.util.LoadTestTool;
30 import org.apache.hadoop.hbase.util.MultiThreadedUpdater;
31 import org.apache.hadoop.hbase.util.MultiThreadedWriter;
32 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
33 import org.apache.hadoop.hbase.util.Threads;
34 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
35 import org.apache.hadoop.util.StringUtils;
36 import org.apache.hadoop.util.ToolRunner;
37 import org.junit.Assert;
38 import org.junit.Test;
39 import org.junit.experimental.categories.Category;
40
41 import com.google.common.collect.Lists;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @Category(IntegrationTests.class)
71 public class IntegrationTestRegionReplicaReplication extends IntegrationTestIngest {
72
73 private static final String TEST_NAME
74 = IntegrationTestRegionReplicaReplication.class.getSimpleName();
75
76 private static final String OPT_READ_DELAY_MS = "read_delay_ms";
77
78 private static final int DEFAULT_REGION_REPLICATION = 2;
79 private static final int SERVER_COUNT = 1;
80 private static final String[] DEFAULT_COLUMN_FAMILIES = new String[] {"f1", "f2", "f3"};
81
82 @Override
83 protected int getMinServerCount() {
84 return SERVER_COUNT;
85 }
86
87 @Override
88 public void setConf(Configuration conf) {
89 conf.setIfUnset(
90 String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_REGION_REPLICATION),
91 String.valueOf(DEFAULT_REGION_REPLICATION));
92
93 conf.setIfUnset(
94 String.format("%s.%s", TEST_NAME, LoadTestTool.OPT_COLUMN_FAMILIES),
95 StringUtils.join(",", DEFAULT_COLUMN_FAMILIES));
96
97 conf.setBoolean("hbase.table.sanity.checks", true);
98
99
100 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
101 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
102
103 conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024L * 1024 * 4);
104 conf.setInt("hbase.hstore.blockingStoreFiles", 100);
105
106 super.setConf(conf);
107 }
108
109 @Override
110 @Test
111 public void testIngest() throws Exception {
112 runIngestTest(JUNIT_RUN_TIME, 25000, 10, 1024, 10, 20);
113 }
114
115
116
117
118
119
120 public static class DelayingMultiThreadedWriter extends MultiThreadedWriter {
121 private long delayMs;
122 public DelayingMultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
123 TableName tableName) throws IOException {
124 super(dataGen, conf, tableName);
125 }
126 @Override
127 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
128 this.delayMs = conf.getLong(String.format("%s.%s",
129 IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
130 return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
131 }
132 }
133
134
135
136
137
138
139 public static class DelayingMultiThreadedUpdater extends MultiThreadedUpdater {
140 private long delayMs;
141 public DelayingMultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
142 TableName tableName, double updatePercent) throws IOException {
143 super(dataGen, conf, tableName, updatePercent);
144 }
145 @Override
146 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
147 this.delayMs = conf.getLong(String.format("%s.%s",
148 IntegrationTestRegionReplicaReplication.class.getSimpleName(), OPT_READ_DELAY_MS), 5000);
149 return new ConstantDelayQueue<Long>(TimeUnit.MILLISECONDS, delayMs);
150 }
151 }
152
153 @Override
154 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
155 int recordSize, int writeThreads, int readThreads) throws Exception {
156
157 LOG.info("Running ingest");
158 LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
159
160
161 Threads.sleep(
162 getConf().getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs",
163 5000) + 1000);
164
165 long start = System.currentTimeMillis();
166 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
167 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
168 long startKey = 0;
169
170 long numKeys = getNumKeys(keysPerServerPerIter);
171 while (System.currentTimeMillis() - start < 0.9 * runtime) {
172 LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
173 ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
174
175 int verifyPercent = 100;
176 int updatePercent = 20;
177 int ret = -1;
178 int regionReplicaId = conf.getInt(String.format("%s.%s"
179 , TEST_NAME, LoadTestTool.OPT_REGION_REPLICA_ID), 1);
180
181
182 List<String> args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
183 args.add("-write");
184 args.add(String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads));
185 args.add("-" + LoadTestTool.OPT_MULTIPUT);
186 args.add("-writer");
187 args.add(DelayingMultiThreadedWriter.class.getName());
188 args.add("-read");
189 args.add(String.format("%d:%d", verifyPercent, readThreads));
190 args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
191 args.add(String.valueOf(regionReplicaId));
192
193 ret = loadTool.run(args.toArray(new String[args.size()]));
194 if (0 != ret) {
195 String errorMsg = "Load failed with error code " + ret;
196 LOG.error(errorMsg);
197 Assert.fail(errorMsg);
198 }
199
200 args = Lists.newArrayList(getArgsForLoadTestTool("", "", startKey, numKeys));
201 args.add("-update");
202 args.add(String.format("%s:%s:1", updatePercent, writeThreads));
203 args.add("-updater");
204 args.add(DelayingMultiThreadedUpdater.class.getName());
205 args.add("-read");
206 args.add(String.format("%d:%d", verifyPercent, readThreads));
207 args.add("-" + LoadTestTool.OPT_REGION_REPLICA_ID);
208 args.add(String.valueOf(regionReplicaId));
209
210 ret = loadTool.run(args.toArray(new String[args.size()]));
211 if (0 != ret) {
212 String errorMsg = "Load failed with error code " + ret;
213 LOG.error(errorMsg);
214 Assert.fail(errorMsg);
215 }
216 startKey += numKeys;
217 }
218 }
219
220 public static void main(String[] args) throws Exception {
221 Configuration conf = HBaseConfiguration.create();
222 IntegrationTestingUtility.setUseDistributedCluster(conf);
223 int ret = ToolRunner.run(conf, new IntegrationTestRegionReplicaReplication(), args);
224 System.exit(ret);
225 }
226 }