1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mob.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.PriorityQueue;
27 import java.util.Set;
28 import java.util.TreeSet;
29 import java.util.UUID;
30
31 import org.apache.commons.lang.StringUtils;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.CommonConfigurationKeys;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.Abortable;
42 import org.apache.hadoop.hbase.HColumnDescriptor;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.KeyValue;
45 import org.apache.hadoop.hbase.ServerName;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Scan;
48 import org.apache.hadoop.hbase.io.HFileLink;
49 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
51 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
52 import org.apache.hadoop.hbase.master.TableLockManager;
53 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
54 import org.apache.hadoop.hbase.mob.MobConstants;
55 import org.apache.hadoop.hbase.mob.MobUtils;
56 import org.apache.hadoop.hbase.regionserver.BloomType;
57 import org.apache.hadoop.hbase.regionserver.StoreFile;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60 import org.apache.hadoop.hbase.util.FSUtils;
61 import org.apache.hadoop.hbase.util.Strings;
62 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
63 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
64 import org.apache.hadoop.io.IOUtils;
65 import org.apache.hadoop.io.SequenceFile;
66 import org.apache.hadoop.io.SequenceFile.CompressionType;
67 import org.apache.hadoop.io.Text;
68 import org.apache.hadoop.io.Writable;
69 import org.apache.hadoop.io.serializer.JavaSerialization;
70 import org.apache.hadoop.io.serializer.WritableSerialization;
71 import org.apache.hadoop.mapreduce.Job;
72 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73 import org.apache.hadoop.net.DNS;
74 import org.apache.hadoop.security.Credentials;
75 import org.apache.hadoop.security.UserGroupInformation;
76 import org.apache.zookeeper.KeeperException;
77
78
79
80
81
82 @InterfaceAudience.Private
83 public class SweepJob {
84
85 private final FileSystem fs;
86 private final Configuration conf;
87 private static final Log LOG = LogFactory.getLog(SweepJob.class);
88 static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
89 static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
90 static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
91 static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
92 static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
93 static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
94 static final String WORKING_ALLNAMES_DIR = "all";
95 static final String WORKING_VISITED_DIR = "visited";
96 public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
97
98 public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
99 protected static final long ONE_DAY = 24 * 60 * 60 * 1000;
100 private long compactionStartTime = EnvironmentEdgeManager.currentTime();
101 public final static String CREDENTIALS_LOCATION = "credentials_location";
102 private CacheConfig cacheConfig;
103 static final int SCAN_CACHING = 10000;
104 private TableLockManager tableLockManager;
105
106 public SweepJob(Configuration conf, FileSystem fs) {
107 this.conf = conf;
108 this.fs = fs;
109
110 Configuration copyOfConf = new Configuration(conf);
111 copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
112 cacheConfig = new CacheConfig(copyOfConf);
113 }
114
115 static ServerName getCurrentServerName(Configuration conf) throws IOException {
116 String hostname = conf.get(
117 "hbase.regionserver.ipc.address",
118 Strings.domainNamePointerToHostName(DNS.getDefaultHost(
119 conf.get("hbase.regionserver.dns.interface", "default"),
120 conf.get("hbase.regionserver.dns.nameserver", "default"))));
121 int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
122
123 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
124 if (initialIsa.getAddress() == null) {
125 throw new IllegalArgumentException("Failed resolve of " + initialIsa);
126 }
127 return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
128 EnvironmentEdgeManager.currentTime());
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public int sweep(TableName tn, HColumnDescriptor family) throws IOException,
153 ClassNotFoundException, InterruptedException, KeeperException {
154 Configuration conf = new Configuration(this.conf);
155
156 String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
157 FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
158 if (hbaseRootFileStat.length > 0) {
159 String owner = hbaseRootFileStat[0].getOwner();
160 if (!owner.equals(currentUserName)) {
161 String errorMsg = "The current user[" + currentUserName
162 + "] doesn't have hbase root credentials."
163 + " Please make sure the user is the root of the target HBase";
164 LOG.error(errorMsg);
165 throw new IOException(errorMsg);
166 }
167 } else {
168 LOG.error("The target HBase doesn't exist");
169 throw new IOException("The target HBase doesn't exist");
170 }
171 String familyName = family.getNameAsString();
172 String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
173 ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable());
174 try {
175 ServerName serverName = getCurrentServerName(conf);
176 tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
177 TableName lockName = MobUtils.getTableLockName(tn);
178 TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
179 String tableName = tn.getNameAsString();
180
181 try {
182 lock.acquire();
183 } catch (Exception e) {
184 LOG.warn("Can not lock the table " + tableName
185 + ". The major compaction in HBase may be in-progress or another sweep job is running."
186 + " Please re-run the job.");
187 return 3;
188 }
189 Job job = null;
190 try {
191 Scan scan = new Scan();
192 scan.addFamily(family.getName());
193
194 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
195 scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
196 scan.setCaching(SCAN_CACHING);
197 scan.setCacheBlocks(false);
198 scan.setMaxVersions(family.getMaxVersions());
199 conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
200 JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
201 conf.set(SWEEP_JOB_ID, id);
202 conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
203 String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
204 conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
205 job = prepareJob(tn, familyName, scan, conf);
206 job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
207
208
209
210
211
212 job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
213 compactionStartTime);
214
215 job.setPartitionerClass(MobFilePathHashPartitioner.class);
216 submit(job, tn, familyName);
217 if (job.waitForCompletion(true)) {
218
219 removeUnusedFiles(job, tn, family);
220 } else {
221 System.err.println("Job was not successful");
222 return 4;
223 }
224 } finally {
225 try {
226 cleanup(job, tn, familyName);
227 } finally {
228 try {
229 lock.release();
230 } catch (IOException e) {
231 LOG.error("Failed to release the table lock " + tableName, e);
232 }
233 }
234 }
235 } finally {
236 zkw.close();
237 }
238 return 0;
239 }
240
241
242
243
244
245
246
247
248
249
250 private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
251 throws IOException {
252 Job job = Job.getInstance(conf);
253 job.setJarByClass(SweepMapper.class);
254 TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
255 SweepMapper.class, Text.class, Writable.class, job);
256
257 job.setInputFormatClass(TableInputFormat.class);
258 job.setMapOutputKeyClass(Text.class);
259 job.setMapOutputValueClass(KeyValue.class);
260 job.setReducerClass(SweepReducer.class);
261 job.setOutputFormatClass(NullOutputFormat.class);
262 String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, familyName);
263 job.setJobName(jobName);
264 if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
265 String fileLoc = conf.get(CREDENTIALS_LOCATION);
266 Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
267 job.getCredentials().addAll(cred);
268 }
269 return job;
270 }
271
272
273
274
275
276
277
278
279
280 private static String getCustomJobName(String className, TableName tableName, String familyName) {
281 StringBuilder name = new StringBuilder();
282 name.append(className);
283 name.append('-').append(SweepMapper.class.getSimpleName());
284 name.append('-').append(SweepReducer.class.getSimpleName());
285 name.append('-').append(tableName.getNamespaceAsString());
286 name.append('-').append(tableName.getQualifierAsString());
287 name.append('-').append(familyName);
288 return name.toString();
289 }
290
291
292
293
294
295
296
297
298 private void submit(Job job, TableName tn, String familyName) throws IOException {
299
300
301 Path tempDir =
302 new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
303 Path mobCompactionTempDir =
304 new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
305 Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
306 job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
307
308 fs.delete(workingPath, true);
309
310 fs.mkdirs(workingPath);
311
312 Path workingPathOfFiles = new Path(workingPath, "files");
313 Path workingPathOfNames = new Path(workingPath, "names");
314 job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
315 Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
316 job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
317 Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
318 job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
319
320 fs.mkdirs(vistiedFileNamesPath);
321 Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
322
323
324
325
326
327
328 FileStatus[] files = fs.listStatus(mobStorePath);
329 Set<String> fileNames = new TreeSet<String>();
330 long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
331 for (FileStatus fileStatus : files) {
332 if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
333 if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
334
335 fileNames.add(fileStatus.getPath().getName());
336 }
337 }
338 }
339 FSDataOutputStream fout = null;
340 SequenceFile.Writer writer = null;
341 try {
342
343
344 fout = fs.create(allFileNamesPath, true);
345
346 writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class,
347 CompressionType.NONE, null);
348 for (String fileName : fileNames) {
349 writer.append(fileName, MobConstants.EMPTY_STRING);
350 }
351 writer.hflush();
352 } finally {
353 if (writer != null) {
354 IOUtils.closeStream(writer);
355 }
356 if (fout != null) {
357 IOUtils.closeStream(fout);
358 }
359 }
360 }
361
362
363
364
365
366
367
368
369
370 List<String> getUnusedFiles(Configuration conf) throws IOException {
371
372 Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
373 SequenceFile.Reader allNamesReader = null;
374 MergeSortReader visitedNamesReader = null;
375 List<String> toBeArchived = new ArrayList<String>();
376 try {
377 allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
378 visitedNamesReader = new MergeSortReader(fs, conf,
379 new Path(conf.get(WORKING_VISITED_DIR_KEY)));
380 String nextAll = (String) allNamesReader.next((String) null);
381 String nextVisited = visitedNamesReader.next();
382 do {
383 if (nextAll != null) {
384 if (nextVisited != null) {
385 int compare = nextAll.compareTo(nextVisited);
386 if (compare < 0) {
387 toBeArchived.add(nextAll);
388 nextAll = (String) allNamesReader.next((String) null);
389 } else if (compare > 0) {
390 nextVisited = visitedNamesReader.next();
391 } else {
392 nextAll = (String) allNamesReader.next((String) null);
393 nextVisited = visitedNamesReader.next();
394 }
395 } else {
396 toBeArchived.add(nextAll);
397 nextAll = (String) allNamesReader.next((String) null);
398 }
399 } else {
400 break;
401 }
402 } while (nextAll != null || nextVisited != null);
403 } finally {
404 if (allNamesReader != null) {
405 IOUtils.closeStream(allNamesReader);
406 }
407 if (visitedNamesReader != null) {
408 visitedNamesReader.close();
409 }
410 }
411 return toBeArchived;
412 }
413
414
415
416
417
418
419
420
421 private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
422
423 List<StoreFile> storeFiles = new ArrayList<StoreFile>();
424 List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
425
426 Path mobStorePath = MobUtils
427 .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
428 for (String archiveFileName : toBeArchived) {
429 Path path = new Path(mobStorePath, archiveFileName);
430 storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
431 }
432 if (!storeFiles.isEmpty()) {
433 try {
434 MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
435 FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
436 LOG.info(storeFiles.size() + " unused MOB files are removed");
437 } catch (Exception e) {
438 LOG.error("Failed to archive the store files " + storeFiles, e);
439 }
440 }
441 }
442
443
444
445
446
447
448 private void cleanup(Job job, TableName tn, String familyName) {
449 if (job != null) {
450
451 Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
452 try {
453 fs.delete(workingPath, true);
454 } catch (IOException e) {
455 LOG.warn("Failed to delete the working directory after sweeping store " + familyName
456 + " in the table " + tn.getNameAsString(), e);
457 }
458 }
459 }
460
461
462
463
464 private static class IndexedResult implements Comparable<IndexedResult> {
465 private int index;
466 private String value;
467
468 public IndexedResult(int index, String value) {
469 this.index = index;
470 this.value = value;
471 }
472
473 public int getIndex() {
474 return this.index;
475 }
476
477 public String getValue() {
478 return this.value;
479 }
480
481 @Override
482 public int compareTo(IndexedResult o) {
483 if (this.value == null && o.getValue() == null) {
484 return 0;
485 } else if (o.value == null) {
486 return 1;
487 } else if (this.value == null) {
488 return -1;
489 } else {
490 return this.value.compareTo(o.value);
491 }
492 }
493
494 @Override
495 public boolean equals(Object obj) {
496 if (this == obj) {
497 return true;
498 }
499 if (obj == null) {
500 return false;
501 }
502 if (!(obj instanceof IndexedResult)) {
503 return false;
504 }
505 return compareTo((IndexedResult) obj) == 0;
506 }
507
508 @Override
509 public int hashCode() {
510 return value.hashCode();
511 }
512 }
513
514
515
516
517
518
519 private static class MergeSortReader {
520
521 private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
522 private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
523
524 public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
525 if (fs.exists(path)) {
526 FileStatus[] files = fs.listStatus(path);
527 int index = 0;
528 for (FileStatus file : files) {
529 if (file.isFile()) {
530 SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
531 String key = (String) reader.next((String) null);
532 if (key != null) {
533 results.add(new IndexedResult(index, key));
534 readers.add(reader);
535 index++;
536 }
537 }
538 }
539 }
540 }
541
542 public String next() throws IOException {
543 IndexedResult result = results.poll();
544 if (result != null) {
545 SequenceFile.Reader reader = readers.get(result.getIndex());
546 String key = (String) reader.next((String) null);
547 if (key != null) {
548 results.add(new IndexedResult(result.getIndex(), key));
549 }
550 return result.getValue();
551 }
552 return null;
553 }
554
555 public void close() {
556 for (SequenceFile.Reader reader : readers) {
557 if (reader != null) {
558 IOUtils.closeStream(reader);
559 }
560 }
561 }
562 }
563
564
565
566
567 public enum SweepCounter {
568
569
570
571
572 INPUT_FILE_COUNT,
573
574
575
576
577 FILE_TO_BE_MERGE_OR_CLEAN,
578
579
580
581
582 FILE_AFTER_MERGE_OR_CLEAN,
583
584
585
586
587 RECORDS_UPDATED,
588 }
589
590 public static class DummyMobAbortable implements Abortable {
591
592 private boolean abort = false;
593
594 public void abort(String why, Throwable e) {
595 abort = true;
596 }
597
598 public boolean isAborted() {
599 return abort;
600 }
601
602 }
603 }