1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.hbase.util.LoadTestTool;
32 import org.apache.hadoop.hbase.util.Threads;
33 import org.apache.hadoop.util.StringUtils;
34 import org.apache.hadoop.util.ToolRunner;
35 import org.junit.Assert;
36 import org.junit.Test;
37 import org.junit.experimental.categories.Category;
38
39 import com.google.common.collect.Sets;
40
41
42
43
44
45 @Category(IntegrationTests.class)
46 public class IntegrationTestIngest extends IntegrationTestBase {
47 public static final char HIPHEN = '-';
48 private static final int SERVER_COUNT = 1;
49 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
50 protected static final long JUNIT_RUN_TIME = 10 * 60 * 1000;
51
52
53 protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
54
55 protected static final String NUM_KEYS_PER_SERVER_KEY = "num_keys_per_server";
56 protected static final long DEFAULT_NUM_KEYS_PER_SERVER = 2500;
57
58 protected static final String NUM_WRITE_THREADS_KEY = "num_write_threads";
59 protected static final int DEFAULT_NUM_WRITE_THREADS = 20;
60
61 protected static final String NUM_READ_THREADS_KEY = "num_read_threads";
62 protected static final int DEFAULT_NUM_READ_THREADS = 20;
63
64 protected static final Log LOG = LogFactory.getLog(IntegrationTestIngest.class);
65 protected IntegrationTestingUtility util;
66 protected HBaseCluster cluster;
67 protected LoadTestTool loadTool;
68
69 protected String[] LOAD_TEST_TOOL_INIT_ARGS = {
70 LoadTestTool.OPT_COLUMN_FAMILIES,
71 LoadTestTool.OPT_COMPRESSION,
72 LoadTestTool.OPT_DATA_BLOCK_ENCODING,
73 LoadTestTool.OPT_INMEMORY,
74 LoadTestTool.OPT_ENCRYPTION,
75 LoadTestTool.OPT_NUM_REGIONS_PER_SERVER,
76 LoadTestTool.OPT_REGION_REPLICATION,
77 };
78
79 @Override
80 public void setUpCluster() throws Exception {
81 util = getTestingUtil(getConf());
82 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
83 util.initializeCluster(getMinServerCount());
84 LOG.debug("Done initializing/checking cluster");
85 cluster = util.getHBaseClusterInterface();
86 deleteTableIfNecessary();
87 loadTool = new LoadTestTool();
88 loadTool.setConf(util.getConfiguration());
89
90
91 initTable();
92 }
93
94 protected int getMinServerCount() {
95 return SERVER_COUNT;
96 }
97
98 protected void initTable() throws IOException {
99 int ret = loadTool.run(getArgsForLoadTestToolInitTable());
100 Assert.assertEquals("Failed to initialize LoadTestTool", 0, ret);
101 }
102
103 @Override
104 public int runTestFromCommandLine() throws Exception {
105 internalRunIngestTest(DEFAULT_RUN_TIME);
106 return 0;
107 }
108
109 @Test
110 public void testIngest() throws Exception {
111 runIngestTest(JUNIT_RUN_TIME, 2500, 10, 1024, 10, 20);
112 }
113
114 protected void internalRunIngestTest(long runTime) throws Exception {
115 String clazz = this.getClass().getSimpleName();
116 long numKeysPerServer = conf.getLong(String.format("%s.%s", clazz, NUM_KEYS_PER_SERVER_KEY),
117 DEFAULT_NUM_KEYS_PER_SERVER);
118 int numWriteThreads = conf.getInt(
119 String.format("%s.%s", clazz, NUM_WRITE_THREADS_KEY), DEFAULT_NUM_WRITE_THREADS);
120 int numReadThreads = conf.getInt(
121 String.format("%s.%s", clazz, NUM_READ_THREADS_KEY), DEFAULT_NUM_READ_THREADS);
122 runIngestTest(runTime, numKeysPerServer, 10, 1024, numWriteThreads, numReadThreads);
123 }
124
125 @Override
126 public TableName getTablename() {
127 String clazz = this.getClass().getSimpleName();
128 return TableName.valueOf(
129 conf.get(String.format("%s.%s", clazz, LoadTestTool.OPT_TABLE_NAME), clazz));
130 }
131
132 @Override
133 protected Set<String> getColumnFamilies() {
134 Set<String> families = Sets.newHashSet();
135 String clazz = this.getClass().getSimpleName();
136
137 String familiesString = getConf().get(
138 String.format("%s.%s", clazz, LoadTestTool.OPT_COLUMN_FAMILIES));
139 if (familiesString == null) {
140 for (byte[] family : LoadTestTool.DEFAULT_COLUMN_FAMILIES) {
141 families.add(Bytes.toString(family));
142 }
143 } else {
144 for (String family : familiesString.split(",")) {
145 families.add(family);
146 }
147 }
148
149 return families;
150 }
151
152 private void deleteTableIfNecessary() throws IOException {
153 if (util.getHBaseAdmin().tableExists(getTablename())) {
154 util.deleteTable(getTablename());
155 }
156 }
157
158 protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
159 int recordSize, int writeThreads, int readThreads) throws Exception {
160
161 LOG.info("Running ingest");
162 LOG.info("Cluster size:" + util.getHBaseClusterInterface().getClusterStatus().getServersSize());
163
164 long start = System.currentTimeMillis();
165 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
166 long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
167 long startKey = 0;
168
169 long numKeys = getNumKeys(keysPerServerPerIter);
170 while (System.currentTimeMillis() - start < 0.9 * runtime) {
171 LOG.info("Intended run time: " + (runtime/60000) + " min, left:" +
172 ((runtime - (System.currentTimeMillis() - start))/60000) + " min");
173
174 int ret = -1;
175 ret = loadTool.run(getArgsForLoadTestTool("-write",
176 String.format("%d:%d:%d", colsPerKey, recordSize, writeThreads), startKey, numKeys));
177 if (0 != ret) {
178 String errorMsg = "Load failed with error code " + ret;
179 LOG.error(errorMsg);
180 Assert.fail(errorMsg);
181 }
182
183 ret = loadTool.run(getArgsForLoadTestTool("-update", String.format("60:%d:1", writeThreads),
184 startKey, numKeys));
185 if (0 != ret) {
186 String errorMsg = "Update failed with error code " + ret;
187 LOG.error(errorMsg);
188 Assert.fail(errorMsg);
189 }
190
191 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
192 , startKey, numKeys));
193 if (0 != ret) {
194 String errorMsg = "Verification failed with error code " + ret;
195 LOG.error(errorMsg + " Rerunning verification after 1 minute for debugging");
196 Threads.sleep(1000 * 60);
197 ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
198 , startKey, numKeys));
199 if (0 != ret) {
200 LOG.error("Rerun of Verification failed with error code " + ret);
201 }
202 Assert.fail(errorMsg);
203 }
204 startKey += numKeys;
205 }
206 }
207
208 protected String[] getArgsForLoadTestToolInitTable() {
209 List<String> args = new ArrayList<String>();
210 args.add("-tn");
211 args.add(getTablename().getNameAsString());
212
213 String clazz = this.getClass().getSimpleName();
214 for (String arg : LOAD_TEST_TOOL_INIT_ARGS) {
215 String val = conf.get(String.format("%s.%s", clazz, arg));
216 if (val != null) {
217 args.add("-" + arg);
218 args.add(val);
219 }
220 }
221 args.add("-init_only");
222 return args.toArray(new String[args.size()]);
223 }
224
225 protected String[] getArgsForLoadTestTool(String mode, String modeSpecificArg, long startKey,
226 long numKeys) {
227 List<String> args = new ArrayList<String>();
228 args.add("-tn");
229 args.add(getTablename().getNameAsString());
230 args.add("-families");
231 args.add(getColumnFamiliesAsString());
232 args.add(mode);
233 args.add(modeSpecificArg);
234 args.add("-start_key");
235 args.add(String.valueOf(startKey));
236 args.add("-num_keys");
237 args.add(String.valueOf(numKeys));
238 args.add("-skip_init");
239
240 return args.toArray(new String[args.size()]);
241 }
242
243 private String getColumnFamiliesAsString() {
244 return StringUtils.join(",", getColumnFamilies());
245 }
246
247
248 protected long getNumKeys(long keysPerServer)
249 throws IOException {
250 int numRegionServers = cluster.getClusterStatus().getServersSize();
251 return keysPerServer * numRegionServers;
252 }
253
254 public static void main(String[] args) throws Exception {
255 Configuration conf = HBaseConfiguration.create();
256 IntegrationTestingUtility.setUseDistributedCluster(conf);
257 int ret = ToolRunner.run(conf, new IntegrationTestIngest(), args);
258 System.exit(ret);
259 }
260 }