View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertFalse;
22  import static org.junit.Assert.assertTrue;
23  import static org.junit.Assert.fail;
24  import static org.mockito.Matchers.any;
25  import static org.mockito.Mockito.doAnswer;
26  import static org.mockito.Mockito.mock;
27  import static org.mockito.Mockito.when;
28  
29  import java.io.ByteArrayOutputStream;
30  import java.io.IOException;
31  import java.io.PrintStream;
32  import java.net.URL;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.List;
36  
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.KeyValue;
46  import org.apache.hadoop.hbase.testclassification.MediumTests;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Durability;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HTable;
52  import org.apache.hadoop.hbase.client.Put;
53  import org.apache.hadoop.hbase.client.Result;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.client.Table;
57  import org.apache.hadoop.hbase.filter.Filter;
58  import org.apache.hadoop.hbase.filter.FilterBase;
59  import org.apache.hadoop.hbase.filter.PrefixFilter;
60  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
61  import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
62  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
63  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
64  import org.apache.hadoop.hbase.wal.WAL;
65  import org.apache.hadoop.hbase.wal.WALKey;
66  import org.apache.hadoop.hbase.util.Bytes;
67  import org.apache.hadoop.hbase.util.LauncherSecurityManager;
68  import org.apache.hadoop.mapreduce.Job;
69  import org.apache.hadoop.mapreduce.Mapper.Context;
70  import org.apache.hadoop.util.GenericOptionsParser;
71  import org.junit.After;
72  import org.junit.AfterClass;
73  import org.junit.Assert;
74  import org.junit.Before;
75  import org.junit.BeforeClass;
76  import org.junit.Test;
77  import org.junit.experimental.categories.Category;
78  import org.mockito.invocation.InvocationOnMock;
79  import org.mockito.stubbing.Answer;
80  
81  /**
82   * Tests the table import and table export MR job functionality
83   */
84  @Category(MediumTests.class)
85  public class TestImportExport {
86    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
87    private static final byte[] ROW1 = Bytes.toBytes("row1");
88    private static final byte[] ROW2 = Bytes.toBytes("row2");
89    private static final String FAMILYA_STRING = "a";
90    private static final String FAMILYB_STRING = "b";
91    private static final byte[] FAMILYA = Bytes.toBytes(FAMILYA_STRING);
92    private static final byte[] FAMILYB = Bytes.toBytes(FAMILYB_STRING);
93    private static final byte[] QUAL = Bytes.toBytes("q");
94    private static final String OUTPUT_DIR = "outputdir";
95    private static String FQ_OUTPUT_DIR;
96    private static final String EXPORT_BATCH_SIZE = "100";
97  
98    private static long now = System.currentTimeMillis();
99  
100   @BeforeClass
101   public static void beforeClass() throws Exception {
102     UTIL.startMiniCluster();
103     UTIL.startMiniMapReduceCluster();
104     FQ_OUTPUT_DIR =  new Path(OUTPUT_DIR).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
105   }
106 
107   @AfterClass
108   public static void afterClass() throws Exception {
109     UTIL.shutdownMiniMapReduceCluster();
110     UTIL.shutdownMiniCluster();
111   }
112 
113   @Before
114   @After
115   public void cleanup() throws Exception {
116     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
117     fs.delete(new Path(OUTPUT_DIR), true);
118   }
119 
120   /**
121    * Runs an export job with the specified command line args
122    * @param args
123    * @return true if job completed successfully
124    * @throws IOException
125    * @throws InterruptedException
126    * @throws ClassNotFoundException
127    */
128   boolean runExport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
129     // need to make a copy of the configuration because to make sure different temp dirs are used.
130     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
131     Configuration conf = opts.getConfiguration();
132     args = opts.getRemainingArgs();
133     Job job = Export.createSubmittableJob(conf, args);
134     job.waitForCompletion(false);
135     return job.isSuccessful();
136   }
137 
138   /**
139    * Runs an import job with the specified command line args
140    * @param args
141    * @return true if job completed successfully
142    * @throws IOException
143    * @throws InterruptedException
144    * @throws ClassNotFoundException
145    */
146   boolean runImport(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
147     // need to make a copy of the configuration because to make sure different temp dirs are used.
148     GenericOptionsParser opts = new GenericOptionsParser(new Configuration(UTIL.getConfiguration()), args);
149     Configuration conf = opts.getConfiguration();
150     args = opts.getRemainingArgs();
151     Job job = Import.createSubmittableJob(conf, args);
152     job.waitForCompletion(false);
153     return job.isSuccessful();
154   }
155 
156   /**
157    * Test simple replication case with column mapping
158    * @throws Exception
159    */
160   @Test
161   public void testSimpleCase() throws Exception {
162     String EXPORT_TABLE = "exportSimpleCase";
163     Table t = UTIL.createTable(TableName.valueOf(EXPORT_TABLE), FAMILYA, 3);
164     Put p = new Put(ROW1);
165     p.add(FAMILYA, QUAL, now, QUAL);
166     p.add(FAMILYA, QUAL, now+1, QUAL);
167     p.add(FAMILYA, QUAL, now+2, QUAL);
168     t.put(p);
169     p = new Put(ROW2);
170     p.add(FAMILYA, QUAL, now, QUAL);
171     p.add(FAMILYA, QUAL, now+1, QUAL);
172     p.add(FAMILYA, QUAL, now+2, QUAL);
173     t.put(p);
174 
175     String[] args = new String[] {
176         EXPORT_TABLE,
177         FQ_OUTPUT_DIR,
178         "1000", // max number of key versions per key to export
179     };
180     assertTrue(runExport(args));
181 
182     String IMPORT_TABLE = "importTableSimpleCase";
183     t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), FAMILYB, 3);
184     args = new String[] {
185         "-D" + Import.CF_RENAME_PROP + "="+FAMILYA_STRING+":"+FAMILYB_STRING,
186         IMPORT_TABLE,
187         FQ_OUTPUT_DIR
188     };
189     assertTrue(runImport(args));
190 
191     Get g = new Get(ROW1);
192     g.setMaxVersions();
193     Result r = t.get(g);
194     assertEquals(3, r.size());
195     g = new Get(ROW2);
196     g.setMaxVersions();
197     r = t.get(g);
198     assertEquals(3, r.size());
199   }
200 
201   /**
202    * Test export hbase:meta table
203    *
204    * @throws Exception
205    */
206   @Test
207   public void testMetaExport() throws Exception {
208     String EXPORT_TABLE = TableName.META_TABLE_NAME.getNameAsString();
209     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1", "0", "0" };
210     assertTrue(runExport(args));
211   }
212 
213   /**
214    * Test import data from 0.94 exported file
215    * @throws Exception
216    */
217   @Test
218   public void testImport94Table() throws Exception {
219     URL url = TestImportExport.class.getResource(
220         "exportedTableIn94Format");
221     Path importPath = new Path(url.getPath());
222     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
223     fs.copyFromLocalFile(importPath, new Path(FQ_OUTPUT_DIR + Path.SEPARATOR
224         + "exportedTableIn94Format"));
225     String IMPORT_TABLE = "importTableExportedFrom94";
226     Table t = UTIL.createTable(TableName.valueOf(IMPORT_TABLE), Bytes.toBytes("f1"), 3);
227     String[] args = new String[] {
228         "-Dhbase.import.version=0.94" ,
229         IMPORT_TABLE, FQ_OUTPUT_DIR
230     };
231     assertTrue(runImport(args));
232 
233     /* exportedTableIn94Format contains 5 rows
234      ROW         COLUMN+CELL
235      r1          column=f1:c1, timestamp=1383766761171, value=val1
236      r2          column=f1:c1, timestamp=1383766771642, value=val2
237      r3          column=f1:c1, timestamp=1383766777615, value=val3
238      r4          column=f1:c1, timestamp=1383766785146, value=val4
239      r5          column=f1:c1, timestamp=1383766791506, value=val5
240      */
241     assertEquals(5, UTIL.countRows(t));
242     t.close();
243   }
244 
245   /**
246    * Test export scanner batching
247    */
248    @Test
249    public void testExportScannerBatching() throws Exception {
250     String BATCH_TABLE = "exportWithBatch";
251     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(BATCH_TABLE));
252     desc.addFamily(new HColumnDescriptor(FAMILYA)
253         .setMaxVersions(1)
254     );
255     UTIL.getHBaseAdmin().createTable(desc);
256     Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());
257 
258     Put p = new Put(ROW1);
259     p.add(FAMILYA, QUAL, now, QUAL);
260     p.add(FAMILYA, QUAL, now+1, QUAL);
261     p.add(FAMILYA, QUAL, now+2, QUAL);
262     p.add(FAMILYA, QUAL, now+3, QUAL);
263     p.add(FAMILYA, QUAL, now+4, QUAL);
264     t.put(p);
265 
266     String[] args = new String[] {
267         "-D" + Export.EXPORT_BATCHING + "=" + EXPORT_BATCH_SIZE,  // added scanner batching arg.
268         BATCH_TABLE,
269         FQ_OUTPUT_DIR
270     };
271     assertTrue(runExport(args));
272 
273     FileSystem fs = FileSystem.get(UTIL.getConfiguration());
274     fs.delete(new Path(FQ_OUTPUT_DIR), true);
275     t.close();
276   }
277 
278   @Test
279   public void testWithDeletes() throws Exception {
280     String EXPORT_TABLE = "exportWithDeletes";
281     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
282     desc.addFamily(new HColumnDescriptor(FAMILYA)
283         .setMaxVersions(5)
284         .setKeepDeletedCells(true)
285     );
286     UTIL.getHBaseAdmin().createTable(desc);
287     Table t = new HTable(UTIL.getConfiguration(), desc.getTableName());
288 
289     Put p = new Put(ROW1);
290     p.add(FAMILYA, QUAL, now, QUAL);
291     p.add(FAMILYA, QUAL, now+1, QUAL);
292     p.add(FAMILYA, QUAL, now+2, QUAL);
293     p.add(FAMILYA, QUAL, now+3, QUAL);
294     p.add(FAMILYA, QUAL, now+4, QUAL);
295     t.put(p);
296 
297     Delete d = new Delete(ROW1, now+3);
298     t.delete(d);
299     d = new Delete(ROW1);
300     d.deleteColumns(FAMILYA, QUAL, now+2);
301     t.delete(d);
302 
303     String[] args = new String[] {
304         "-D" + Export.RAW_SCAN + "=true",
305         EXPORT_TABLE,
306         FQ_OUTPUT_DIR,
307         "1000", // max number of key versions per key to export
308     };
309     assertTrue(runExport(args));
310 
311     String IMPORT_TABLE = "importWithDeletes";
312     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
313     desc.addFamily(new HColumnDescriptor(FAMILYA)
314         .setMaxVersions(5)
315         .setKeepDeletedCells(true)
316     );
317     UTIL.getHBaseAdmin().createTable(desc);
318     t.close();
319     t = new HTable(UTIL.getConfiguration(), desc.getTableName());
320     args = new String[] {
321         IMPORT_TABLE,
322         FQ_OUTPUT_DIR
323     };
324     assertTrue(runImport(args));
325 
326     Scan s = new Scan();
327     s.setMaxVersions();
328     s.setRaw(true);
329     ResultScanner scanner = t.getScanner(s);
330     Result r = scanner.next();
331     Cell[] res = r.rawCells();
332     assertTrue(CellUtil.isDeleteFamily(res[0]));
333     assertEquals(now+4, res[1].getTimestamp());
334     assertEquals(now+3, res[2].getTimestamp());
335     assertTrue(CellUtil.isDelete(res[3]));
336     assertEquals(now+2, res[4].getTimestamp());
337     assertEquals(now+1, res[5].getTimestamp());
338     assertEquals(now, res[6].getTimestamp());
339     t.close();
340   }
341   
342   
343   @Test
344   public void testWithMultipleDeleteFamilyMarkersOfSameRowSameFamily() throws Exception {
345     String EXPORT_TABLE = "exportWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
346     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
347     desc.addFamily(new HColumnDescriptor(FAMILYA)
348         .setMaxVersions(5)
349         .setKeepDeletedCells(true)
350     );
351     UTIL.getHBaseAdmin().createTable(desc);
352     HTable exportT = new HTable(UTIL.getConfiguration(), EXPORT_TABLE);
353 
354     //Add first version of QUAL
355     Put p = new Put(ROW1);
356     p.add(FAMILYA, QUAL, now, QUAL);
357     exportT.put(p);
358 
359     //Add Delete family marker
360     Delete d = new Delete(ROW1, now+3);
361     exportT.delete(d);
362 
363     //Add second version of QUAL
364     p = new Put(ROW1);
365     p.add(FAMILYA, QUAL, now+5, "s".getBytes());
366     exportT.put(p);
367 
368     //Add second Delete family marker
369     d = new Delete(ROW1, now+7);
370     exportT.delete(d);
371     
372     
373     String[] args = new String[] {
374         "-D" + Export.RAW_SCAN + "=true",
375         EXPORT_TABLE,
376         FQ_OUTPUT_DIR,
377         "1000", // max number of key versions per key to export
378     };
379     assertTrue(runExport(args));
380 
381     String IMPORT_TABLE = "importWithMultipleDeleteFamilyMarkersOfSameRowSameFamily";
382     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
383     desc.addFamily(new HColumnDescriptor(FAMILYA)
384         .setMaxVersions(5)
385         .setKeepDeletedCells(true)
386     );
387     UTIL.getHBaseAdmin().createTable(desc);
388     
389     HTable importT = new HTable(UTIL.getConfiguration(), IMPORT_TABLE);
390     args = new String[] {
391         IMPORT_TABLE,
392         FQ_OUTPUT_DIR
393     };
394     assertTrue(runImport(args));
395 
396     Scan s = new Scan();
397     s.setMaxVersions();
398     s.setRaw(true);
399     
400     ResultScanner importedTScanner = importT.getScanner(s);
401     Result importedTResult = importedTScanner.next();
402     
403     ResultScanner exportedTScanner = exportT.getScanner(s);
404     Result  exportedTResult =  exportedTScanner.next();
405     try
406     {
407       Result.compareResults(exportedTResult, importedTResult);
408     }
409     catch (Exception e) {
410       fail("Original and imported tables data comparision failed with error:"+e.getMessage());
411     }
412     finally
413     {
414       exportT.close();
415       importT.close();
416     }
417   }
418 
419   /**
420    * Create a simple table, run an Export Job on it, Import with filtering on,  verify counts,
421    * attempt with invalid values.
422    */
423   @Test
424   public void testWithFilter() throws Exception {
425     // Create simple table to export
426     String EXPORT_TABLE = "exportSimpleCase_ImportWithFilter";
427     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(EXPORT_TABLE));
428     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
429     UTIL.getHBaseAdmin().createTable(desc);
430     Table exportTable = new HTable(UTIL.getConfiguration(), desc.getTableName());
431 
432     Put p1 = new Put(ROW1);
433     p1.add(FAMILYA, QUAL, now, QUAL);
434     p1.add(FAMILYA, QUAL, now + 1, QUAL);
435     p1.add(FAMILYA, QUAL, now + 2, QUAL);
436     p1.add(FAMILYA, QUAL, now + 3, QUAL);
437     p1.add(FAMILYA, QUAL, now + 4, QUAL);
438 
439     // Having another row would actually test the filter.
440     Put p2 = new Put(ROW2);
441     p2.add(FAMILYA, QUAL, now, QUAL);
442 
443     exportTable.put(Arrays.asList(p1, p2));
444 
445     // Export the simple table
446     String[] args = new String[] { EXPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
447     assertTrue(runExport(args));
448 
449     // Import to a new table
450     String IMPORT_TABLE = "importWithFilter";
451     desc = new HTableDescriptor(TableName.valueOf(IMPORT_TABLE));
452     desc.addFamily(new HColumnDescriptor(FAMILYA).setMaxVersions(5));
453     UTIL.getHBaseAdmin().createTable(desc);
454 
455     Table importTable = new HTable(UTIL.getConfiguration(), desc.getTableName());
456     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + PrefixFilter.class.getName(),
457         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1), IMPORT_TABLE, FQ_OUTPUT_DIR,
458         "1000" };
459     assertTrue(runImport(args));
460 
461     // get the count of the source table for that time range
462     PrefixFilter filter = new PrefixFilter(ROW1);
463     int count = getCount(exportTable, filter);
464 
465     Assert.assertEquals("Unexpected row count between export and import tables", count,
466       getCount(importTable, null));
467 
468     // and then test that a broken command doesn't bork everything - easier here because we don't
469     // need to re-run the export job
470 
471     args = new String[] { "-D" + Import.FILTER_CLASS_CONF_KEY + "=" + Filter.class.getName(),
472         "-D" + Import.FILTER_ARGS_CONF_KEY + "=" + Bytes.toString(ROW1) + "", EXPORT_TABLE,
473         FQ_OUTPUT_DIR, "1000" };
474     assertFalse(runImport(args));
475 
476     // cleanup
477     exportTable.close();
478     importTable.close();
479   }
480 
481   /**
482    * Count the number of keyvalues in the specified table for the given timerange
483    * @param start
484    * @param end
485    * @param table
486    * @return
487    * @throws IOException
488    */
489   private int getCount(Table table, Filter filter) throws IOException {
490     Scan scan = new Scan();
491     scan.setFilter(filter);
492     ResultScanner results = table.getScanner(scan);
493     int count = 0;
494     for (Result res : results) {
495       count += res.size();
496     }
497     results.close();
498     return count;
499   }
500   
501   /**
502    * test main method. Import should print help and call System.exit
503    */
504   @Test
505   public void testImportMain() throws Exception {
506     PrintStream oldPrintStream = System.err;
507     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
508     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
509     System.setSecurityManager(newSecurityManager);
510     ByteArrayOutputStream data = new ByteArrayOutputStream();
511     String[] args = {};
512     System.setErr(new PrintStream(data));
513     try {
514       System.setErr(new PrintStream(data));
515       Import.main(args);
516       fail("should be SecurityException");
517     } catch (SecurityException e) {
518       assertEquals(-1, newSecurityManager.getExitCode());
519       assertTrue(data.toString().contains("Wrong number of arguments:"));
520       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
521       assertTrue(data.toString().contains("-Dimport.filter.class=<name of filter class>"));
522       assertTrue(data.toString().contains("-Dimport.bulk.output=/path/for/output"));
523       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
524     } finally {
525       System.setErr(oldPrintStream);
526       System.setSecurityManager(SECURITY_MANAGER);
527     }
528   }
529 
530   /**
531    * test main method. Export should print help and call System.exit
532    */
533   @Test
534   public void testExportMain() throws Exception {
535     PrintStream oldPrintStream = System.err;
536     SecurityManager SECURITY_MANAGER = System.getSecurityManager();
537     LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
538     System.setSecurityManager(newSecurityManager);
539     ByteArrayOutputStream data = new ByteArrayOutputStream();
540     String[] args = {};
541     System.setErr(new PrintStream(data));
542     try {
543       System.setErr(new PrintStream(data));
544       Export.main(args);
545       fail("should be SecurityException");
546     } catch (SecurityException e) {
547       assertEquals(-1, newSecurityManager.getExitCode());
548       assertTrue(data.toString().contains("Wrong number of arguments:"));
549       assertTrue(data.toString().contains(
550               "Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
551               "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]"));
552       assertTrue(data.toString().contains("-D hbase.mapreduce.scan.column.family=<familyName>"));
553       assertTrue(data.toString().contains("-D hbase.mapreduce.include.deleted.rows=true"));
554       assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
555       assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
556       assertTrue(data.toString().contains("-Dmapreduce.reduce.speculative=false"));
557       assertTrue(data.toString().contains("-Dhbase.export.scanner.batch=10"));
558     } finally {
559       System.setErr(oldPrintStream);
560       System.setSecurityManager(SECURITY_MANAGER);
561     }
562   }
563 
564   /**
565    * Test map method of Importer
566    */
567   @SuppressWarnings({ "unchecked", "rawtypes" })
568   @Test
569   public void testKeyValueImporter() throws Exception {
570     KeyValueImporter importer = new KeyValueImporter();
571     Configuration configuration = new Configuration();
572     Context ctx = mock(Context.class);
573     when(ctx.getConfiguration()).thenReturn(configuration);
574 
575     doAnswer(new Answer<Void>() {
576 
577       @Override
578       public Void answer(InvocationOnMock invocation) throws Throwable {
579         ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
580         KeyValue key = (KeyValue) invocation.getArguments()[1];
581         assertEquals("Key", Bytes.toString(writer.get()));
582         assertEquals("row", Bytes.toString(key.getRow()));
583         return null;
584       }
585     }).when(ctx).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
586 
587     importer.setup(ctx);
588     Result value = mock(Result.class);
589     KeyValue[] keys = {
590         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
591             Bytes.toBytes("value")),
592         new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), Bytes.toBytes("qualifier"),
593             Bytes.toBytes("value1")) };
594     when(value.rawCells()).thenReturn(keys);
595     importer.map(new ImmutableBytesWritable(Bytes.toBytes("Key")), value, ctx);
596 
597   }
598 
599   /**
600    * Test addFilterAndArguments method of Import This method set couple
601    * parameters into Configuration
602    */
603   @Test
604   public void testAddFilterAndArguments() throws IOException {
605     Configuration configuration = new Configuration();
606 
607     List<String> args = new ArrayList<String>();
608     args.add("param1");
609     args.add("param2");
610 
611     Import.addFilterAndArguments(configuration, FilterBase.class, args);
612     assertEquals("org.apache.hadoop.hbase.filter.FilterBase", 
613         configuration.get(Import.FILTER_CLASS_CONF_KEY));
614     assertEquals("param1,param2", configuration.get(Import.FILTER_ARGS_CONF_KEY));
615   }
616 
617   @Test
618   public void testDurability() throws IOException, InterruptedException, ClassNotFoundException {
619     // Create an export table.
620     String exportTableName = "exporttestDurability";
621     Table exportTable = UTIL.createTable(TableName.valueOf(exportTableName), FAMILYA, 3);
622 
623     // Insert some data
624     Put put = new Put(ROW1);
625     put.add(FAMILYA, QUAL, now, QUAL);
626     put.add(FAMILYA, QUAL, now + 1, QUAL);
627     put.add(FAMILYA, QUAL, now + 2, QUAL);
628     exportTable.put(put);
629 
630     put = new Put(ROW2);
631     put.add(FAMILYA, QUAL, now, QUAL);
632     put.add(FAMILYA, QUAL, now + 1, QUAL);
633     put.add(FAMILYA, QUAL, now + 2, QUAL);
634     exportTable.put(put);
635 
636     // Run the export
637     String[] args = new String[] { exportTableName, FQ_OUTPUT_DIR, "1000"};
638     assertTrue(runExport(args));
639 
640     // Create the table for import
641     String importTableName = "importTestDurability1";
642     Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
643 
644     // Register the wal listener for the import table
645     TableWALActionListener walListener = new TableWALActionListener(importTableName);
646     WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null);
647     wal.registerWALActionsListener(walListener);
648 
649     // Run the import with SKIP_WAL
650     args =
651         new String[] { "-D" + Import.WAL_DURABILITY + "=" + Durability.SKIP_WAL.name(),
652             importTableName, FQ_OUTPUT_DIR };
653     assertTrue(runImport(args));
654     //Assert that the wal is not visisted
655     assertTrue(!walListener.isWALVisited());
656     //Ensure that the count is 2 (only one version of key value is obtained)
657     assertTrue(getCount(importTable, null) == 2);
658 
659     // Run the import with the default durability option
660     importTableName = "importTestDurability2";
661     importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
662     wal.unregisterWALActionsListener(walListener);
663     walListener = new TableWALActionListener(importTableName);
664     wal.registerWALActionsListener(walListener);
665     args = new String[] { importTableName, FQ_OUTPUT_DIR };
666     assertTrue(runImport(args));
667     //Assert that the wal is visisted
668     assertTrue(walListener.isWALVisited());
669     //Ensure that the count is 2 (only one version of key value is obtained)
670     assertTrue(getCount(importTable, null) == 2);
671   }
672 
673   /**
674    * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, WALKey, WALEdit)} to
675    * identify that an entry is written to the Write Ahead Log for the given table.
676    */
677   private static class TableWALActionListener extends WALActionsListener.Base {
678 
679     private String tableName;
680     private boolean isVisited = false;
681 
682     public TableWALActionListener(String tableName) {
683       this.tableName = tableName;
684     }
685 
686     @Override
687     public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
688       if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
689         isVisited = true;
690       }
691     }
692 
693     public boolean isWALVisited() {
694       return isVisited;
695     }
696   }  
697 }