View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.mob.mapreduce;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.mockito.Mockito.mock;
22  import static org.mockito.Mockito.when;
23  
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.TreeSet;
28  
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.CommonConfigurationKeys;
31  import org.apache.hadoop.fs.FileStatus;
32  import org.apache.hadoop.fs.FileSystem;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseTestingUtility;
35  import org.apache.hadoop.hbase.HColumnDescriptor;
36  import org.apache.hadoop.hbase.HTableDescriptor;
37  import org.apache.hadoop.hbase.KeyValue;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.TableName;
40  import org.apache.hadoop.hbase.client.*;
41  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
42  import org.apache.hadoop.hbase.master.TableLockManager;
43  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
44  import org.apache.hadoop.hbase.mob.MobConstants;
45  import org.apache.hadoop.hbase.mob.MobUtils;
46  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.DummyMobAbortable;
47  import org.apache.hadoop.hbase.mob.mapreduce.SweepJob.SweepCounter;
48  import org.apache.hadoop.hbase.testclassification.MediumTests;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
51  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
52  import org.apache.hadoop.io.SequenceFile;
53  import org.apache.hadoop.io.Text;
54  import org.apache.hadoop.io.Writable;
55  import org.apache.hadoop.io.serializer.JavaSerialization;
56  import org.apache.hadoop.mapreduce.Counter;
57  import org.apache.hadoop.mapreduce.Reducer;
58  import org.apache.hadoop.mapreduce.counters.GenericCounter;
59  import org.junit.After;
60  import org.junit.AfterClass;
61  import org.junit.Before;
62  import org.junit.BeforeClass;
63  import org.junit.Test;
64  import org.junit.experimental.categories.Category;
65  import org.mockito.Matchers;
66  
67  @Category(MediumTests.class)
68  public class TestMobSweepReducer {
69  
70    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
71    private final static String tableName = "testSweepReducer";
72    private final static String row = "row";
73    private final static String family = "family";
74    private final static String qf = "qf";
75    private static BufferedMutator table;
76    private static Admin admin;
77  
78    @BeforeClass
79    public static void setUpBeforeClass() throws Exception {
80      TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
81      TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
82  
83      TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
84  
85      TEST_UTIL.startMiniCluster(1);
86    }
87  
88    @AfterClass
89    public static void tearDownAfterClass() throws Exception {
90      TEST_UTIL.shutdownMiniCluster();
91    }
92  
93    @SuppressWarnings("deprecation")
94    @Before
95    public void setUp() throws Exception {
96      HTableDescriptor desc = new HTableDescriptor(tableName);
97      HColumnDescriptor hcd = new HColumnDescriptor(family);
98      hcd.setMobEnabled(true);
99      hcd.setMobThreshold(3L);
100     hcd.setMaxVersions(4);
101     desc.addFamily(hcd);
102 
103     admin = TEST_UTIL.getHBaseAdmin();
104     admin.createTable(desc);
105     table = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())
106             .getBufferedMutator(TableName.valueOf(tableName));
107   }
108 
109   @After
110   public void tearDown() throws Exception {
111     admin.disableTable(TableName.valueOf(tableName));
112     admin.deleteTable(TableName.valueOf(tableName));
113     admin.close();
114   }
115 
116   private List<String> getKeyFromSequenceFile(FileSystem fs, Path path,
117                                               Configuration conf) throws Exception {
118     List<String> list = new ArrayList<String>();
119     SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
120 
121     String next = (String) reader.next((String) null);
122     while (next != null) {
123       list.add(next);
124       next = (String) reader.next((String) null);
125     }
126     reader.close();
127     return list;
128   }
129 
130   @Test
131   public void testRun() throws Exception {
132 
133     TableName tn = TableName.valueOf(tableName);
134     byte[] mobValueBytes = new byte[100];
135 
136     //get the path where mob files lie in
137     Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tn, family);
138 
139     Put put = new Put(Bytes.toBytes(row));
140     put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
141     Put put2 = new Put(Bytes.toBytes(row + "ignore"));
142     put2.addColumn(Bytes.toBytes(family), Bytes.toBytes(qf), 1, mobValueBytes);
143     table.mutate(put);
144     table.mutate(put2);
145     table.flush();
146     admin.flush(tn);
147 
148     FileStatus[] fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
149     //check the generation of a mob file
150     assertEquals(1, fileStatuses.length);
151 
152     String mobFile1 = fileStatuses[0].getPath().getName();
153 
154     Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
155     configuration.setFloat(MobConstants.MOB_SWEEP_TOOL_COMPACTION_RATIO, 0.6f);
156     configuration.setStrings(TableInputFormat.INPUT_TABLE, tableName);
157     configuration.setStrings(TableInputFormat.SCAN_COLUMN_FAMILY, family);
158     configuration.setStrings(SweepJob.WORKING_VISITED_DIR_KEY, "jobWorkingNamesDir");
159     configuration.setStrings(SweepJob.WORKING_FILES_DIR_KEY, "compactionFileDir");
160     configuration.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
161             JavaSerialization.class.getName());
162     configuration.set(SweepJob.WORKING_VISITED_DIR_KEY, "compactionVisitedDir");
163     configuration.setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
164         System.currentTimeMillis() + 24 * 3600 * 1000);
165 
166     ZooKeeperWatcher zkw = new ZooKeeperWatcher(configuration, "1", new DummyMobAbortable());
167     TableName lockName = MobUtils.getTableLockName(tn);
168     String znode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
169     configuration.set(SweepJob.SWEEP_JOB_ID, "1");
170     configuration.set(SweepJob.SWEEP_JOB_TABLE_NODE, znode);
171     ServerName serverName = SweepJob.getCurrentServerName(configuration);
172     configuration.set(SweepJob.SWEEP_JOB_SERVERNAME, serverName.toString());
173 
174     TableLockManager tableLockManager = TableLockManager.createTableLockManager(configuration, zkw,
175         serverName);
176     TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
177     lock.acquire();
178     try {
179       // use the same counter when mocking
180       Counter counter = new GenericCounter();
181       Reducer<Text, KeyValue, Writable, Writable>.Context ctx = mock(Reducer.Context.class);
182       when(ctx.getConfiguration()).thenReturn(configuration);
183       when(ctx.getCounter(Matchers.any(SweepCounter.class))).thenReturn(counter);
184       when(ctx.nextKey()).thenReturn(true).thenReturn(false);
185       when(ctx.getCurrentKey()).thenReturn(new Text(mobFile1));
186 
187       byte[] refBytes = Bytes.toBytes(mobFile1);
188       long valueLength = refBytes.length;
189       byte[] newValue = Bytes.add(Bytes.toBytes(valueLength), refBytes);
190       KeyValue kv2 = new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qf), 1,
191         KeyValue.Type.Put, newValue);
192       List<KeyValue> list = new ArrayList<KeyValue>();
193       list.add(kv2);
194 
195       when(ctx.getValues()).thenReturn(list);
196 
197       SweepReducer reducer = new SweepReducer();
198       reducer.run(ctx);
199     } finally {
200       lock.release();
201     }
202     FileStatus[] filsStatuses2 = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
203     String mobFile2 = filsStatuses2[0].getPath().getName();
204     //new mob file is generated, old one has been archived
205     assertEquals(1, filsStatuses2.length);
206     assertEquals(false, mobFile2.equalsIgnoreCase(mobFile1));
207 
208     //test sequence file
209     String workingPath = configuration.get(SweepJob.WORKING_VISITED_DIR_KEY);
210     FileStatus[] statuses = TEST_UTIL.getTestFileSystem().listStatus(new Path(workingPath));
211     Set<String> files = new TreeSet<String>();
212     for (FileStatus st : statuses) {
213       files.addAll(getKeyFromSequenceFile(TEST_UTIL.getTestFileSystem(),
214               st.getPath(), configuration));
215     }
216     assertEquals(1, files.size());
217     assertEquals(true, files.contains(mobFile1));
218   }
219 }