View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertTrue;
22  import static org.junit.Assert.fail;
23  import static org.mockito.Matchers.any;
24  import static org.mockito.Mockito.doAnswer;
25  import static org.mockito.Mockito.mock;
26  import static org.mockito.Mockito.when;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.io.PrintStream;
31  import java.util.ArrayList;
32  
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.Cell;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.testclassification.LargeTests;
42  import org.apache.hadoop.hbase.MiniHBaseCluster;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.client.Delete;
45  import org.apache.hadoop.hbase.client.Get;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Result;
48  import org.apache.hadoop.hbase.client.Table;
49  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
50  import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
51  import org.apache.hadoop.hbase.wal.WAL;
52  import org.apache.hadoop.hbase.wal.WALKey;
53  import org.apache.hadoop.hbase.util.FSUtils;
54  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
57  import org.apache.hadoop.mapreduce.Mapper;
58  import org.apache.hadoop.mapreduce.Mapper.Context;
59  import org.junit.AfterClass;
60  import org.junit.BeforeClass;
61  import org.junit.Test;
62  import org.junit.experimental.categories.Category;
63  import org.mockito.invocation.InvocationOnMock;
64  import org.mockito.stubbing.Answer;
65  import org.mortbay.log.Log;
66  
67  /**
68   * Basic test for the WALPlayer M/R tool
69   */
70  @Category(LargeTests.class)
71  public class TestWALPlayer {
72    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
73    private static MiniHBaseCluster cluster;
74    private static Path rootDir;
75    private static Path walRootDir;
76    private static FileSystem fs;
77    private static FileSystem walFs;
78    private static Configuration conf;
79  
80    @BeforeClass
81    public static void beforeClass() throws Exception {
82      cluster = TEST_UTIL.startMiniCluster();
83      conf= TEST_UTIL.getConfiguration();
84      rootDir = TEST_UTIL.createRootDir();
85      walRootDir = TEST_UTIL.createWALRootDir();
86      fs = FSUtils.getRootDirFileSystem(conf);
87      walFs = FSUtils.getWALFileSystem(conf);
88      TEST_UTIL.startMiniMapReduceCluster();
89    }
90  
91    @AfterClass
92    public static void afterClass() throws Exception {
93      TEST_UTIL.shutdownMiniMapReduceCluster();
94      TEST_UTIL.shutdownMiniCluster();
95      try {
96        fs.delete(rootDir, true);
97      } catch (IOException ioe) {
98        Log.debug("Got " + ioe.getMessage() + " deleting " + rootDir);
99      }
100     try {
101       walFs.delete(walRootDir, true);
102     } catch (IOException ioe) {
103       Log.debug("Got " + ioe.getMessage() + " deleting " + walRootDir);
104     }
105   }
106 
107   /**
108    * Simple end-to-end test
109    * @throws Exception
110    */
111   @Test
112   public void testWALPlayer() throws Exception {
113     final TableName TABLENAME1 = TableName.valueOf("testWALPlayer1");
114     final TableName TABLENAME2 = TableName.valueOf("testWALPlayer2");
115     final byte[] FAMILY = Bytes.toBytes("family");
116     final byte[] COLUMN1 = Bytes.toBytes("c1");
117     final byte[] COLUMN2 = Bytes.toBytes("c2");
118     final byte[] ROW = Bytes.toBytes("row");
119     Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
120     Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
121 
122     // put a row into the first table
123     Put p = new Put(ROW);
124     p.add(FAMILY, COLUMN1, COLUMN1);
125     p.add(FAMILY, COLUMN2, COLUMN2);
126     t1.put(p);
127     // delete one column
128     Delete d = new Delete(ROW);
129     d.deleteColumns(FAMILY, COLUMN1);
130     t1.delete(d);
131 
132     // replay the WAL, map table 1 to table 2
133     WAL log = cluster.getRegionServer(0).getWAL(null);
134     log.rollWriter();
135     String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
136         .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
137 
138     Configuration configuration= TEST_UTIL.getConfiguration();
139     WALPlayer player = new WALPlayer(configuration);
140     String optionName="_test_.name";
141     configuration.set(optionName, "1000");
142     player.setupTime(configuration, optionName);
143     assertEquals(1000,configuration.getLong(optionName,0));
144     assertEquals(0, player.run(new String[] {walInputDir, TABLENAME1.getNameAsString(),
145         TABLENAME2.getNameAsString() }));
146 
147     
148     // verify the WAL was player into table 2
149     Get g = new Get(ROW);
150     Result r = t2.get(g);
151     assertEquals(1, r.size());
152     assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
153   }
154 
155   /**
156    * Test WALKeyValueMapper setup and map
157    */
158   @Test
159   public void testWALKeyValueMapper() throws Exception {
160     testWALKeyValueMapper(WALPlayer.TABLES_KEY);
161   }
162 
163   @Test
164   public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
165     testWALKeyValueMapper("hlog.input.tables");
166   }
167 
168   private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
169     Configuration configuration = new Configuration();
170     configuration.set(tableConfigKey, "table");
171     WALKeyValueMapper mapper = new WALKeyValueMapper();
172     WALKey key = mock(WALKey.class);
173     when(key.getTablename()).thenReturn(TableName.valueOf("table"));
174     @SuppressWarnings("unchecked")
175     Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
176         mock(Context.class);
177     when(context.getConfiguration()).thenReturn(configuration);
178 
179     WALEdit value = mock(WALEdit.class);
180     ArrayList<Cell> values = new ArrayList<Cell>();
181     KeyValue kv1 = mock(KeyValue.class);
182     when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
183     when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
184     values.add(kv1);
185     when(value.getCells()).thenReturn(values);
186     mapper.setup(context);
187 
188     doAnswer(new Answer<Void>() {
189 
190       @Override
191       public Void answer(InvocationOnMock invocation) throws Throwable {
192         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
193         KeyValue key = (KeyValue) invocation.getArguments()[1];
194         assertEquals("row", Bytes.toString(writer.get()));
195         assertEquals("row", Bytes.toString(key.getRow()));
196         return null;
197       }
198     }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
199 
200     mapper.map(key, value, context);
201 
202   }
203 
204   /**
205    * Test main method
206    */
207   @Test
208   public void testMainMethod() throws Exception {
209 
210     PrintStream oldPrintStream = System.err;
211     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
212     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
213     System.setSecurityManager(newSecurityManager);
214     ByteArrayOutputStream data = new ByteArrayOutputStream();
215     String[] args = {};
216     System.setErr(new PrintStream(data));
217     try {
218       System.setErr(new PrintStream(data));
219       try {
220         WALPlayer.main(args);
221         fail("should be SecurityException");
222       } catch (SecurityException e) {
223         assertEquals(-1, newSecurityManager.getExitCode());
224         assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
225         assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
226             " <tables> [<tableMappings>]"));
227         assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
228       }
229 
230     } finally {
231       System.setErr(oldPrintStream);
232       System.setSecurityManager(SECURITY_MANAGER);
233     }
234 
235   }
236 
237 }