View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  
25  import java.io.ByteArrayOutputStream;
26  import java.io.IOException;
27  import java.io.PrintStream;
28  import java.util.ArrayList;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.hbase.HBaseTestingUtility;
34  import org.apache.hadoop.hbase.testclassification.MediumTests;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.mapreduce.RowCounter.RowCounterMapper;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
42  import org.apache.hadoop.mapreduce.Counter;
43  import org.apache.hadoop.mapreduce.Job;
44  import org.apache.hadoop.util.GenericOptionsParser;
45  import org.junit.AfterClass;
46  import org.junit.BeforeClass;
47  import org.junit.Test;
48  import org.junit.experimental.categories.Category;
49  
50  /**
51   * Test the rowcounter map reduce job.
52   */
53  @Category(MediumTests.class)
54  public class TestRowCounter {
55    final Log LOG = LogFactory.getLog(getClass());
56  
57    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
58  
59    private final static String TABLE_NAME = "testRowCounter";
60  
61    private final static String COL_FAM = "col_fam";
62  
63    private final static String COL1 = "c1";
64  
65    private final static String COL2 = "c2";
66  
67    private final static String COMPOSITE_COLUMN = "C:A:A";
68  
69    private final static int TOTAL_ROWS = 10;
70  
71    private final static int ROWS_WITH_ONE_COL = 2;
72  
73    /**
74     * @throws java.lang.Exception
75     */
76    @BeforeClass
77    public static void setUpBeforeClass()
78        throws Exception {
79      TEST_UTIL.startMiniCluster();
80      TEST_UTIL.startMiniMapReduceCluster();
81      Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
82      writeRows(table);
83      table.close();
84    }
85  
86    /**
87     * @throws java.lang.Exception
88     */
89    @AfterClass
90    public static void tearDownAfterClass()
91        throws Exception {
92      TEST_UTIL.shutdownMiniCluster();
93      TEST_UTIL.shutdownMiniMapReduceCluster();
94    }
95  
96    /**
97     * Test a case when no column was specified in command line arguments.
98     *
99     * @throws Exception
100    */
101   @Test
102   public void testRowCounterNoColumn()
103       throws Exception {
104     String[] args = new String[] {TABLE_NAME};
105     runRowCount(args, 10);
106   }
107 
108   /**
109    * Test a case when the column specified in command line arguments is
110    * exclusive for few rows.
111    *
112    * @throws Exception
113    */
114   @Test
115   public void testRowCounterExclusiveColumn()
116       throws Exception {
117     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1};
118     runRowCount(args, 8);
119   }
120 
121   /**
122    * Test a case when the column specified in command line arguments is
123    * one for which the qualifier contains colons.
124    *
125    * @throws Exception
126    */
127   @Test
128   public void testRowCounterColumnWithColonInQualifier()
129       throws Exception {
130     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN};
131     runRowCount(args, 8);
132   }
133 
134   /**
135    * Test a case when the column specified in command line arguments is not part
136    * of first KV for a row.
137    *
138    * @throws Exception
139    */
140   @Test
141   public void testRowCounterHiddenColumn()
142       throws Exception {
143     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL2};
144     runRowCount(args, 10);
145   }
146 
147   /**
148    * Test a case when the timerange is specified with --starttime and --endtime options
149    *
150    * @throws Exception
151    */
152   @Test
153   public void testRowCounterTimeRange()
154       throws Exception {
155     final byte[] family = Bytes.toBytes(COL_FAM);
156     final byte[] col1 = Bytes.toBytes(COL1);
157     Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
158     Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
159     Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
160 
161     long ts;
162 
163     // clean up content of TABLE_NAME
164     HTable table = TEST_UTIL.deleteTableData(TableName.valueOf(TABLE_NAME));
165     ts = System.currentTimeMillis();
166     put1.add(family, col1, ts, Bytes.toBytes("val1"));
167     table.put(put1);
168     Thread.sleep(100);
169 
170     ts = System.currentTimeMillis();
171     put2.add(family, col1, ts, Bytes.toBytes("val2"));
172     put3.add(family, col1, ts, Bytes.toBytes("val3"));
173     table.put(put2);
174     table.put(put3);
175     table.close();
176 
177     String[] args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
178                                      "--endtime=" + ts};
179     runRowCount(args, 1);
180 
181     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + 0,
182                             "--endtime=" + (ts - 10)};
183     runRowCount(args, 1);
184 
185     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + ts,
186                             "--endtime=" + (ts + 1000)};
187     runRowCount(args, 2);
188 
189     args = new String[] {TABLE_NAME, COL_FAM + ":" + COL1, "--starttime=" + (ts - 30 * 1000),
190                             "--endtime=" + (ts + 30 * 1000),};
191     runRowCount(args, 3);
192   }
193 
194   /**
195    * Run the RowCounter map reduce job and verify the row count.
196    *
197    * @param args the command line arguments to be used for rowcounter job.
198    * @param expectedCount the expected row count (result of map reduce job).
199    * @throws Exception
200    */
201   private void runRowCount(String[] args, int expectedCount)
202       throws Exception {
203     GenericOptionsParser opts = new GenericOptionsParser(TEST_UTIL.getConfiguration(), args);
204     Configuration conf = opts.getConfiguration();
205     args = opts.getRemainingArgs();
206     Job job = RowCounter.createSubmittableJob(conf, args);
207     job.waitForCompletion(true);
208     assertTrue(job.isSuccessful());
209     Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS);
210     assertEquals(expectedCount, counter.getValue());
211   }
212 
213   /**
214    * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
215    * two columns, Few have one.
216    *
217    * @param table
218    * @throws IOException
219    */
220   private static void writeRows(Table table)
221       throws IOException {
222     final byte[] family = Bytes.toBytes(COL_FAM);
223     final byte[] value = Bytes.toBytes("abcd");
224     final byte[] col1 = Bytes.toBytes(COL1);
225     final byte[] col2 = Bytes.toBytes(COL2);
226     final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
227     ArrayList<Put> rowsUpdate = new ArrayList<Put>();
228     // write few rows with two columns
229     int i = 0;
230     for (; i < TOTAL_ROWS - ROWS_WITH_ONE_COL; i++) {
231       byte[] row = Bytes.toBytes("row" + i);
232       Put put = new Put(row);
233       put.add(family, col1, value);
234       put.add(family, col2, value);
235       put.add(family, col3, value);
236       rowsUpdate.add(put);
237     }
238 
239     // write few rows with only one column
240     for (; i < TOTAL_ROWS; i++) {
241       byte[] row = Bytes.toBytes("row" + i);
242       Put put = new Put(row);
243       put.add(family, col2, value);
244       rowsUpdate.add(put);
245     }
246     table.put(rowsUpdate);
247   }
248 
249   /**
250    * test main method. Import should print help and call System.exit
251    */
252   @Test
253   public void testImportMain()
254       throws Exception {
255     PrintStream oldPrintStream = System.err;
256     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
257     LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
258     System.setSecurityManager(newSecurityManager);
259     ByteArrayOutputStream data = new ByteArrayOutputStream();
260     String[] args = {};
261     System.setErr(new PrintStream(data));
262     try {
263       System.setErr(new PrintStream(data));
264 
265       try {
266         RowCounter.main(args);
267         fail("should be SecurityException");
268       } catch (SecurityException e) {
269         assertEquals(-1, newSecurityManager.getExitCode());
270         assertTrue(data.toString().contains("Wrong number of parameters:"));
271         assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
272                                                 "[--starttime=[start] --endtime=[end] " +
273                                                 "[--range=[startKey],[endKey]] " +
274                                                 "[<column1> <column2>...]"));
275         assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
276         assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
277       }
278       data.reset();
279       try {
280         args = new String[2];
281         args[0] = "table";
282         args[1] = "--range=1";
283         RowCounter.main(args);
284         fail("should be SecurityException");
285       } catch (SecurityException e) {
286         assertEquals(-1, newSecurityManager.getExitCode());
287         assertTrue(data.toString().contains("Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
288 
289                                                 " \"--range=,b\" or \"--range=a,\""));
290         assertTrue(data.toString().contains("Usage: RowCounter [options] <tablename> " +
291                                                 "[--starttime=[start] --endtime=[end] " +
292                                                 "[--range=[startKey],[endKey]] " +
293                                                 "[<column1> <column2>...]"));
294       }
295 
296     } finally {
297       System.setErr(oldPrintStream);
298       System.setSecurityManager(SECURITY_MANAGER);
299     }
300 
301   }
302 }