1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.PrintStream;
28 import java.util.ArrayList;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.HBaseTestingUtility;
34 import org.apache.hadoop.hbase.testclassification.MediumTests;
35 import org.apache.hadoop.hbase.TableName;
36 import org.apache.hadoop.hbase.client.HTable;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
42 import org.apache.hadoop.mapreduce.Counter;
43 import org.apache.hadoop.mapreduce.Job;
44 import org.apache.hadoop.util.GenericOptionsParser;
45 import org.junit.AfterClass;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49
50
51
52
53 @Category(MediumTests.class)
54 public class TestRowCounter {
55 final Log LOG = LogFactory.getLog(getClass());
56
57 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58
59 private final static String TABLE_NAME = "testRowCounter";
60
61 private final static String COL_FAM = "col_fam";
62
63 private final static String COL1 = "c1";
64
65 private final static String COL2 = "c2";
66
67 private final static String COMPOSITE_COLUMN = "C:A:A";
68
69 private final static int TOTAL_ROWS = 10;
70
71 private final static int ROWS_WITH_ONE_COL = 2;
72
73
74
75
76 @BeforeClass
77 public static void setUpBeforeClass()
78 throws Exception {
79 TEST_UTIL.startMiniCluster();
80 TEST_UTIL.startMiniMapReduceCluster();
81 Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
82 writeRows(table);
83 table.close();
84 }
85
86
87
88
89 @AfterClass
90 public static void tearDownAfterClass()
91 throws Exception {
92 TEST_UTIL.shutdownMiniCluster();
93 TEST_UTIL.shutdownMiniMapReduceCluster();
94 }
95
96
97
98
99
100
101 @Test
102 public void testRowCounterNoColumn()
103 throws Exception {
104 String[] args = new String[] {TABLE_NAME};
105 runRowCount(args, 10);
106 }
107
108
109
110
111
112
113
114 @Test
115 public void testRowCounterExclusiveColumn()
116 throws Exception {
117 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1};
118 runRowCount(args, 8);
119 }
120
121
122
123
124
125
126
127 @Test
128 public void testRowCounterColumnWithColonInQualifier()
129 throws Exception {
130 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN};
131 runRowCount(args, 8);
132 }
133
134
135
136
137
138
139
140 @Test
141 public void testRowCounterHiddenColumn()
142 throws Exception {
143 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2};
144 runRowCount(args, 10);
145 }
146
147
148
149
150
151
152 @Test
153 public void testRowCounterTimeRange()
154 throws Exception {
155 final byte[] family = Bytes.toBytes(COL_FAM);
156 final byte[] col1 = Bytes.toBytes(COL1);
157 Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
158 Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
159 Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
160
161 long ts;
162
163
164 HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME));
165 ts = System.currentTimeMillis();
166 put1.add(family, col1, ts, Bytes.toBytes("val1"));
167 table.put(put1);
168 Thread.sleep(100);
169
170 ts = System.currentTimeMillis();
171 put2.add(family, col1, ts, Bytes.toBytes("val2"));
172 put3.add(family, col1, ts, Bytes.toBytes("val3"));
173 table.put(put2);
174 table.put(put3);
175 table.close();
176
177 String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
178 "--endtime=" + ts};
179 runRowCount(args, 1);
180
181 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
182 "--endtime=" + (ts - 10)};
183 runRowCount(args, 1);
184
185 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
186 "--endtime=" + (ts + 1000)};
187 runRowCount(args, 2);
188
189 args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
190 "--endtime=" + (ts + 30 * 1000),};
191 runRowCount(args, 3);
192 }
193
194
195
196
197
198
199
200
201 private void runRowCount(String[] args, int expectedCount)
202 throws Exception {
203 GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
204 Configuration conf = opts.getConfiguration();
205 args = opts.getRemainingArgs();
206 Job job = RowCounter.createSubmittableJob(conf, args);
207 job.waitForCompletion(true);
208 assertTrue(job.isSuccessful());
209 Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
210 assertEquals(expectedCount, counter.getValue());
211 }
212
213
214
215
216
217
218
219
220 private static void writeRows(Table table)
221 throws IOException {
222 final byte[] family = Bytes.toBytes(COL_FAM);
223 final byte[] value = Bytes.toBytes("abcd");
224 final byte[] col1 = Bytes.toBytes(COL1);
225 final byte[] col2 = Bytes.toBytes(COL2);
226 final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
227 ArrayList<Put> rowsUpdate = new ArrayList<Put>();
228
229 int i = 0;
230 for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
231 byte[] row = Bytes.toBytes("row" + i);
232 Put put = new Put(row);
233 put.add(family, col1, value);
234 put.add(family, col2, value);
235 put.add(family, col3, value);
236 rowsUpdate.add(put);
237 }
238
239
240 for (; i < TOTAL_ROWS; i++) {
241 byte[] row = Bytes.toBytes("row" + i);
242 Put put = new Put(row);
243 put.add(family, col2, value);
244 rowsUpdate.add(put);
245 }
246 table.put(rowsUpdate);
247 }
248
249
250
251
252 @Test
253 public void testImportMain()
254 throws Exception {
255 PrintStream oldPrintStream = System.err;
256 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
257 LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
258 System.setSecurityManager(newSecurityManager);
259 ByteArrayOutputStream data = new ByteArrayOutputStream();
260 String[] args = {};
261 System.setErr(new PrintStream(data));
262 try {
263 System.setErr(new PrintStream(data));
264
265 try {
266 RowCounter.main(args);
267 fail("should be SecurityException");
268 } catch (SecurityException e) {
269 assertEquals(-1, newSecurityManager.getExitCode());
270 assertTrue(data.toString().contains("Wrong number of parameters:"));
271 assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
272 "[--starttime=[start] --endtime=[end] " +
273 "[--range=[startKey],[endKey]] " +
274 "[<column1> <column2>...]"));
275 assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
276 assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
277 }
278 data.reset();
279 try {
280 args = new String[2];
281 args[0] = "table";
282 args[1] = "--range=1";
283 RowCounter.main(args);
284 fail("should be SecurityException");
285 } catch (SecurityException e) {
286 assertEquals(-1, newSecurityManager.getExitCode());
287 assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
288
289 " \"--range=,b\" or \"--range=a,\""));
290 assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
291 "[--starttime=[start] --endtime=[end] " +
292 "[--range=[startKey],[endKey]] " +
293 "[<column1> <column2>...]"));
294 }
295
296 } finally {
297 System.setErr(oldPrintStream);
298 System.setSecurityManager(SECURITY_MANAGER);
299 }
300
301 }
302 }