1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.mapreduce;
19
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 import static org.mockito.Matchers.any;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.when;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.PrintStream;
31 import java.util.ArrayList;
32
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.CellUtil;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.testclassification.LargeTests;
42 import org.apache.hadoop.hbase.MiniHBaseCluster;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.client.Delete;
45 import org.apache.hadoop.hbase.client.Get;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Result;
48 import org.apache.hadoop.hbase.client.Table;
49 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
50 import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
51 import org.apache.hadoop.hbase.wal.WAL;
52 import org.apache.hadoop.hbase.wal.WALKey;
53 import org.apache.hadoop.hbase.util.FSUtils;
54 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.LauncherSecurityManager;
57 import org.apache.hadoop.mapreduce.Mapper;
58 import org.apache.hadoop.mapreduce.Mapper.Context;
59 import org.junit.AfterClass;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.junit.experimental.categories.Category;
63 import org.mockito.invocation.InvocationOnMock;
64 import org.mockito.stubbing.Answer;
65 import org.mortbay.log.Log;
66
67
68
69
70 @Category(LargeTests.class)
71 public class TestWALPlayer {
72 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
73 private static MiniHBaseCluster cluster;
74 private static Path rootDir;
75 private static Path walRootDir;
76 private static FileSystem fs;
77 private static FileSystem walFs;
78 private static Configuration conf;
79
80 @BeforeClass
81 public static void beforeClass() throws Exception {
82 cluster = TEST_UTIL.startMiniCluster();
83 conf= TEST_UTIL.getConfiguration();
84 rootDir = TEST_UTIL.createRootDir();
85 walRootDir = TEST_UTIL.createWALRootDir();
86 fs = FSUtils.getRootDirFileSystem(conf);
87 walFs = FSUtils.getWALFileSystem(conf);
88 TEST_UTIL.startMiniMapReduceCluster();
89 }
90
91 @AfterClass
92 public static void afterClass() throws Exception {
93 TEST_UTIL.shutdownMiniMapReduceCluster();
94 TEST_UTIL.shutdownMiniCluster();
95 try {
96 fs.delete(rootDir, true);
97 } catch (IOException ioe) {
98 Log.debug("Got " + ioe.getMessage() + " deleting " + rootDir);
99 }
100 try {
101 walFs.delete(walRootDir, true);
102 } catch (IOException ioe) {
103 Log.debug("Got " + ioe.getMessage() + " deleting " + walRootDir);
104 }
105 }
106
107
108
109
110
111 @Test
112 public void testWALPlayer() throws Exception {
113 final TableName TABLENAME1 = TableName.valueOf("testWALPlayer1");
114 final TableName TABLENAME2 = TableName.valueOf("testWALPlayer2");
115 final byte[] FAMILY = Bytes.toBytes("family");
116 final byte[] COLUMN1 = Bytes.toBytes("c1");
117 final byte[] COLUMN2 = Bytes.toBytes("c2");
118 final byte[] ROW = Bytes.toBytes("row");
119 Table t1 = TEST_UTIL.createTable(TABLENAME1, FAMILY);
120 Table t2 = TEST_UTIL.createTable(TABLENAME2, FAMILY);
121
122
123 Put p = new Put(ROW);
124 p.add(FAMILY, COLUMN1, COLUMN1);
125 p.add(FAMILY, COLUMN2, COLUMN2);
126 t1.put(p);
127
128 Delete d = new Delete(ROW);
129 d.deleteColumns(FAMILY, COLUMN1);
130 t1.delete(d);
131
132
133 WAL log = cluster.getRegionServer(0).getWAL(null);
134 log.rollWriter();
135 String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
136 .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
137
138 Configuration configuration= TEST_UTIL.getConfiguration();
139 WALPlayer player = new WALPlayer(configuration);
140 String optionName="_test_.name";
141 configuration.set(optionName, "1000");
142 player.setupTime(configuration, optionName);
143 assertEquals(1000,configuration.getLong(optionName,0));
144 assertEquals(0, player.run(new String[] {walInputDir, TABLENAME1.getNameAsString(),
145 TABLENAME2.getNameAsString() }));
146
147
148
149 Get g = new Get(ROW);
150 Result r = t2.get(g);
151 assertEquals(1, r.size());
152 assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
153 }
154
155
156
157
158 @Test
159 public void testWALKeyValueMapper() throws Exception {
160 testWALKeyValueMapper(WALPlayer.TABLES_KEY);
161 }
162
163 @Test
164 public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
165 testWALKeyValueMapper("hlog.input.tables");
166 }
167
168 private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
169 Configuration configuration = new Configuration();
170 configuration.set(tableConfigKey, "table");
171 WALKeyValueMapper mapper = new WALKeyValueMapper();
172 WALKey key = mock(WALKey.class);
173 when(key.getTablename()).thenReturn(TableName.valueOf("table"));
174 @SuppressWarnings("unchecked")
175 Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
176 mock(Context.class);
177 when(context.getConfiguration()).thenReturn(configuration);
178
179 WALEdit value = mock(WALEdit.class);
180 ArrayList<Cell> values = new ArrayList<Cell>();
181 KeyValue kv1 = mock(KeyValue.class);
182 when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
183 when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
184 values.add(kv1);
185 when(value.getCells()).thenReturn(values);
186 mapper.setup(context);
187
188 doAnswer(new Answer<Void>() {
189
190 @Override
191 public Void answer(InvocationOnMock invocation) throws Throwable {
192 ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
193 KeyValue key = (KeyValue) invocation.getArguments()[1];
194 assertEquals("row", Bytes.toString(writer.get()));
195 assertEquals("row", Bytes.toString(key.getRow()));
196 return null;
197 }
198 }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
199
200 mapper.map(key, value, context);
201
202 }
203
204
205
206
207 @Test
208 public void testMainMethod() throws Exception {
209
210 PrintStream oldPrintStream = System.err;
211 SecurityManager SECURITY_MANAGER = System.getSecurityManager();
212 LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
213 System.setSecurityManager(newSecurityManager);
214 ByteArrayOutputStream data = new ByteArrayOutputStream();
215 String[] args = {};
216 System.setErr(new PrintStream(data));
217 try {
218 System.setErr(new PrintStream(data));
219 try {
220 WALPlayer.main(args);
221 fail("should be SecurityException");
222 } catch (SecurityException e) {
223 assertEquals(-1, newSecurityManager.getExitCode());
224 assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
225 assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
226 " <tables> [<tableMappings>]"));
227 assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
228 }
229
230 } finally {
231 System.setErr(oldPrintStream);
232 System.setSecurityManager(SECURITY_MANAGER);
233 }
234
235 }
236
237 }