View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.io.File;
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.PriorityQueue;
27  import java.util.Set;
28  import java.util.TreeSet;
29  import java.util.UUID;
30  
31  import org.apache.commons.lang.StringUtils;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.CommonConfigurationKeys;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Abortable;
42  import org.apache.hadoop.hbase.HColumnDescriptor;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.ServerName;
46  import org.apache.hadoop.hbase.TableName;
47  import org.apache.hadoop.hbase.client.Scan;
48  import org.apache.hadoop.hbase.io.HFileLink;
49  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
51  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
52  import org.apache.hadoop.hbase.master.TableLockManager;
53  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
54  import org.apache.hadoop.hbase.mob.MobConstants;
55  import org.apache.hadoop.hbase.mob.MobUtils;
56  import org.apache.hadoop.hbase.regionserver.BloomType;
57  import org.apache.hadoop.hbase.regionserver.StoreFile;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60  import org.apache.hadoop.hbase.util.FSUtils;
61  import org.apache.hadoop.hbase.util.Strings;
62  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
63  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
64  import org.apache.hadoop.io.IOUtils;
65  import org.apache.hadoop.io.SequenceFile;
66  import org.apache.hadoop.io.SequenceFile.CompressionType;
67  import org.apache.hadoop.io.Text;
68  import org.apache.hadoop.io.Writable;
69  import org.apache.hadoop.io.serializer.JavaSerialization;
70  import org.apache.hadoop.io.serializer.WritableSerialization;
71  import org.apache.hadoop.mapreduce.Job;
72  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
73  import org.apache.hadoop.net.DNS;
74  import org.apache.hadoop.security.Credentials;
75  import org.apache.hadoop.security.UserGroupInformation;
76  import org.apache.zookeeper.KeeperException;
77  
78  /**
79   * The sweep job.
80   * Run map reduce to merge the smaller mob files into bigger ones and cleans the unused ones.
81   */
82  @InterfaceAudience.Private
83  public class SweepJob {
84  
85    private final FileSystem fs;
86    private final Configuration conf;
87    private static final Log LOG = LogFactory.getLog(SweepJob.class);
88    static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
89    static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
90    static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
91    static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
92    static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
93    static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
94    static final String WORKING_ALLNAMES_DIR = "all";
95    static final String WORKING_VISITED_DIR = "visited";
96    public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
97    //the MOB_SWEEP_JOB_DELAY is ONE_DAY by default. Its value is only changed when testing.
98    public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
99    protected static final long ONE_DAY = 24 * 60 * 60 * 1000;
100   private long compactionStartTime = EnvironmentEdgeManager.currentTime();
101   public final static String CREDENTIALS_LOCATION = "credentials_location";
102   private CacheConfig cacheConfig;
103   static final int SCAN_CACHING = 10000;
104   private TableLockManager tableLockManager;
105 
106   public SweepJob(Configuration conf, FileSystem fs) {
107     this.conf = conf;
108     this.fs = fs;
109     // disable the block cache.
110     Configuration copyOfConf = new Configuration(conf);
111     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
112     cacheConfig = new CacheConfig(copyOfConf);
113   }
114 
115   static ServerName getCurrentServerName(Configuration conf) throws IOException {
116     String hostname = conf.get(
117         "hbase.regionserver.ipc.address",
118         Strings.domainNamePointerToHostName(DNS.getDefaultHost(
119             conf.get("hbase.regionserver.dns.interface", "default"),
120             conf.get("hbase.regionserver.dns.nameserver", "default"))));
121     int port = conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT);
122     // Creation of a HSA will force a resolve.
123     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
124     if (initialIsa.getAddress() == null) {
125       throw new IllegalArgumentException("Failed resolve of " + initialIsa);
126     }
127     return ServerName.valueOf(initialIsa.getHostName(), initialIsa.getPort(),
128         EnvironmentEdgeManager.currentTime());
129   }
130 
131   /**
132    * Runs MapReduce to do the sweeping on the mob files.
133    * There's a MobReferenceOnlyFilter so that the mappers only get the cells that have mob
134    * references from 'normal' regions' rows.
135    * The running of the sweep tool on the same column family are mutually exclusive.
136    * The HBase major compaction and running of the sweep tool on the same column family
137    * are mutually exclusive.
138    * The synchronization is done by the Zookeeper.
139    * So in the beginning of the running, we need to make sure only this sweep tool is the only one
140    * that is currently running in this column family, and in this column family there're no major
141    * compaction in progress.
142    * @param tn The current table name.
143    * @param family The descriptor of the current column family.
144    * @return 0 upon success, 3 if bailing out because another compaction is currently happening,
145    *   or 4 the mr job was unsuccessful
146    *
147    * @throws IOException
148    * @throws ClassNotFoundException
149    * @throws InterruptedException
150    * @throws KeeperException
151    */
152   public int sweep(TableName tn, HColumnDescriptor family) throws IOException,
153       ClassNotFoundException, InterruptedException, KeeperException {
154     Configuration conf = new Configuration(this.conf);
155     // check whether the current user is the same one with the owner of hbase root
156     String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
157     FileStatus[] hbaseRootFileStat = fs.listStatus(new Path(conf.get(HConstants.HBASE_DIR)));
158     if (hbaseRootFileStat.length > 0) {
159       String owner = hbaseRootFileStat[0].getOwner();
160       if (!owner.equals(currentUserName)) {
161         String errorMsg = "The current user[" + currentUserName
162             + "] doesn't have hbase root credentials."
163             + " Please make sure the user is the root of the target HBase";
164         LOG.error(errorMsg);
165         throw new IOException(errorMsg);
166       }
167     } else {
168       LOG.error("The target HBase doesn't exist");
169       throw new IOException("The target HBase doesn't exist");
170     }
171     String familyName = family.getNameAsString();
172     String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
173     ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, new DummyMobAbortable());
174     try {
175       ServerName serverName = getCurrentServerName(conf);
176       tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
177       TableName lockName = MobUtils.getTableLockName(tn);
178       TableLock lock = tableLockManager.writeLock(lockName, "Run sweep tool");
179       String tableName = tn.getNameAsString();
180       // Try to obtain the lock. Use this lock to synchronize all the query
181       try {
182         lock.acquire();
183       } catch (Exception e) {
184         LOG.warn("Can not lock the table " + tableName
185             + ". The major compaction in HBase may be in-progress or another sweep job is running."
186             + " Please re-run the job.");
187         return 3;
188       }
189       Job job = null;
190       try {
191         Scan scan = new Scan();
192         scan.addFamily(family.getName());
193         // Do not retrieve the mob data when scanning
194         scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
195         scan.setAttribute(MobConstants.MOB_SCAN_REF_ONLY, Bytes.toBytes(Boolean.TRUE));
196         scan.setCaching(SCAN_CACHING);
197         scan.setCacheBlocks(false);
198         scan.setMaxVersions(family.getMaxVersions());
199         conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
200             JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
201         conf.set(SWEEP_JOB_ID, id);
202         conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
203         String tableLockNode = ZKUtil.joinZNode(zkw.tableLockZNode, lockName.getNameAsString());
204         conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
205         job = prepareJob(tn, familyName, scan, conf);
206         job.getConfiguration().set(TableInputFormat.SCAN_COLUMN_FAMILY, familyName);
207         // Record the compaction start time.
208         // In the sweep tool, only the mob file whose modification time is older than
209         // (startTime - delay) could be handled by this tool.
210         // The delay is one day. It could be configured as well, but this is only used
211         // in the test.
212         job.getConfiguration().setLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_START_DATE,
213             compactionStartTime);
214 
215         job.setPartitionerClass(MobFilePathHashPartitioner.class);
216         submit(job, tn, familyName);
217         if (job.waitForCompletion(true)) {
218           // Archive the unused mob files.
219           removeUnusedFiles(job, tn, family);
220         } else {
221           System.err.println("Job was not successful");
222           return 4;
223         }
224       } finally {
225         try {
226           cleanup(job, tn, familyName);
227         } finally {
228           try {
229             lock.release();
230           } catch (IOException e) {
231             LOG.error("Failed to release the table lock " + tableName, e);
232           }
233         }
234       }
235     } finally {
236       zkw.close();
237     }
238     return 0;
239   }
240 
241   /**
242    * Prepares a map reduce job.
243    * @param tn The current table name.
244    * @param familyName The current family name.
245    * @param scan The current scan.
246    * @param conf The current configuration.
247    * @return A map reduce job.
248    * @throws IOException
249    */
250   private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf)
251       throws IOException {
252     Job job = Job.getInstance(conf);
253     job.setJarByClass(SweepMapper.class);
254     TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan,
255         SweepMapper.class, Text.class, Writable.class, job);
256 
257     job.setInputFormatClass(TableInputFormat.class);
258     job.setMapOutputKeyClass(Text.class);
259     job.setMapOutputValueClass(KeyValue.class);
260     job.setReducerClass(SweepReducer.class);
261     job.setOutputFormatClass(NullOutputFormat.class);
262     String jobName = getCustomJobName(this.getClass().getSimpleName(), tn, familyName);
263     job.setJobName(jobName);
264     if (StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
265       String fileLoc = conf.get(CREDENTIALS_LOCATION);
266       Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
267       job.getCredentials().addAll(cred);
268     }
269     return job;
270   }
271 
272   /**
273    * Gets a customized job name.
274    * It's className-mapperClassName-reducerClassName-tableName-familyName.
275    * @param className The current class name.
276    * @param tableName The current table name.
277    * @param familyName The current family name.
278    * @return The customized job name.
279    */
280   private static String getCustomJobName(String className, TableName tableName, String familyName) {
281     StringBuilder name = new StringBuilder();
282     name.append(className);
283     name.append('-').append(SweepMapper.class.getSimpleName());
284     name.append('-').append(SweepReducer.class.getSimpleName());
285     name.append('-').append(tableName.getNamespaceAsString());
286     name.append('-').append(tableName.getQualifierAsString());
287     name.append('-').append(familyName);
288     return name.toString();
289   }
290 
291   /**
292    * Submits a job.
293    * @param job The current job.
294    * @param tn The current table name.
295    * @param familyName The current family name.
296    * @throws IOException
297    */
298   private void submit(Job job, TableName tn, String familyName) throws IOException {
299     // delete the temp directory of the mob files in case the failure in the previous
300     // execution.
301     Path tempDir =
302         new Path(MobUtils.getMobHome(job.getConfiguration()), MobConstants.TEMP_DIR_NAME);
303     Path mobCompactionTempDir =
304         new Path(tempDir, MobConstants.MOB_SWEEP_TOOL_COMPACTION_TEMP_DIR_NAME);
305     Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
306     job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
307     // delete the working directory in case it'not deleted by the last running.
308     fs.delete(workingPath, true);
309     // create the working directory.
310     fs.mkdirs(workingPath);
311     // create a sequence file which contains the names of all the existing files.
312     Path workingPathOfFiles = new Path(workingPath, "files");
313     Path workingPathOfNames = new Path(workingPath, "names");
314     job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
315     Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
316     job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
317     Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
318     job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
319     // create a directory where the files contain names of visited mob files are saved.
320     fs.mkdirs(vistiedFileNamesPath);
321     Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
322     // Find all the files whose creation time are older than one day.
323     // Write those file names to a file.
324     // In each reducer there's a writer, it write the visited file names to a file which is saved
325     // in WORKING_VISITED_DIR.
326     // After the job is finished, compare those files, then find out the unused mob files and
327     // archive them.
328     FileStatus[] files = fs.listStatus(mobStorePath);
329     Set<String> fileNames = new TreeSet<String>();
330     long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, ONE_DAY);
331     for (FileStatus fileStatus : files) {
332       if (fileStatus.isFile() && !HFileLink.isHFileLink(fileStatus.getPath())) {
333         if (compactionStartTime - fileStatus.getModificationTime() > mobCompactionDelay) {
334           // only record the potentially unused files older than one day.
335           fileNames.add(fileStatus.getPath().getName());
336         }
337       }
338     }
339     FSDataOutputStream fout = null;
340     SequenceFile.Writer writer = null;
341     try {
342       // create a file includes all the existing mob files whose creation time is older than
343       // (now - oneDay)
344       fout = fs.create(allFileNamesPath, true);
345       // write the names to a sequence file
346       writer = SequenceFile.createWriter(job.getConfiguration(), fout, String.class, String.class,
347           CompressionType.NONE, null);
348       for (String fileName : fileNames) {
349         writer.append(fileName, MobConstants.EMPTY_STRING);
350       }
351       writer.hflush();
352     } finally {
353       if (writer != null) {
354         IOUtils.closeStream(writer);
355       }
356       if (fout != null) {
357         IOUtils.closeStream(fout);
358       }
359     }
360   }
361 
362   /**
363    * Gets the unused mob files.
364    * Compare the file which contains all the existing mob files and the visited files,
365    * find out the unused mob file and archive them.
366    * @param conf The current configuration.
367    * @return The unused mob files.
368    * @throws IOException
369    */
370   List<String> getUnusedFiles(Configuration conf) throws IOException {
371     // find out the unused files and archive them
372     Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
373     SequenceFile.Reader allNamesReader = null;
374     MergeSortReader visitedNamesReader = null;
375     List<String> toBeArchived = new ArrayList<String>();
376     try {
377       allNamesReader = new SequenceFile.Reader(fs, allFileNamesPath, conf);
378       visitedNamesReader = new MergeSortReader(fs, conf,
379           new Path(conf.get(WORKING_VISITED_DIR_KEY)));
380       String nextAll = (String) allNamesReader.next((String) null);
381       String nextVisited = visitedNamesReader.next();
382       do {
383         if (nextAll != null) {
384           if (nextVisited != null) {
385             int compare = nextAll.compareTo(nextVisited);
386             if (compare < 0) {
387               toBeArchived.add(nextAll);
388               nextAll = (String) allNamesReader.next((String) null);
389             } else if (compare > 0) {
390               nextVisited = visitedNamesReader.next();
391             } else {
392               nextAll = (String) allNamesReader.next((String) null);
393               nextVisited = visitedNamesReader.next();
394             }
395           } else {
396             toBeArchived.add(nextAll);
397             nextAll = (String) allNamesReader.next((String) null);
398           }
399         } else {
400           break;
401         }
402       } while (nextAll != null || nextVisited != null);
403     } finally {
404       if (allNamesReader != null) {
405         IOUtils.closeStream(allNamesReader);
406       }
407       if (visitedNamesReader != null) {
408         visitedNamesReader.close();
409       }
410     }
411     return toBeArchived;
412   }
413 
414   /**
415    * Archives unused mob files.
416    * @param job The current job.
417    * @param tn The current table name.
418    * @param hcd The descriptor of the current column family.
419    * @throws IOException
420    */
421   private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
422     // find out the unused files and archive them
423     List<StoreFile> storeFiles = new ArrayList<StoreFile>();
424     List<String> toBeArchived = getUnusedFiles(job.getConfiguration());
425     // archive them
426     Path mobStorePath = MobUtils
427         .getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
428     for (String archiveFileName : toBeArchived) {
429       Path path = new Path(mobStorePath, archiveFileName);
430       storeFiles.add(new StoreFile(fs, path, job.getConfiguration(), cacheConfig, BloomType.NONE));
431     }
432     if (!storeFiles.isEmpty()) {
433       try {
434         MobUtils.removeMobFiles(job.getConfiguration(), fs, tn,
435             FSUtils.getTableDir(MobUtils.getMobHome(conf), tn), hcd.getName(), storeFiles);
436         LOG.info(storeFiles.size() + " unused MOB files are removed");
437       } catch (Exception e) {
438         LOG.error("Failed to archive the store files " + storeFiles, e);
439       }
440     }
441   }
442 
443   /**
444    * Deletes the working directory.
445    * @param job The current job.
446    * @param familyName The family to cleanup
447    */
448   private void cleanup(Job job, TableName tn, String familyName) {
449     if (job != null) {
450       // delete the working directory
451       Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
452       try {
453         fs.delete(workingPath, true);
454       } catch (IOException e) {
455         LOG.warn("Failed to delete the working directory after sweeping store " + familyName
456             + " in the table " + tn.getNameAsString(), e);
457       }
458     }
459   }
460 
461   /**
462    * A result with index.
463    */
464   private static class IndexedResult implements Comparable<IndexedResult> {
465     private int index;
466     private String value;
467 
468     public IndexedResult(int index, String value) {
469       this.index = index;
470       this.value = value;
471     }
472 
473     public int getIndex() {
474       return this.index;
475     }
476 
477     public String getValue() {
478       return this.value;
479     }
480 
481     @Override
482     public int compareTo(IndexedResult o) {
483       if (this.value == null && o.getValue() == null) {
484         return 0;
485       } else if (o.value == null) {
486         return 1;
487       } else if (this.value == null) {
488         return -1;
489       } else {
490         return this.value.compareTo(o.value);
491       }
492     }
493 
494     @Override
495     public boolean equals(Object obj) {
496       if (this == obj) {
497         return true;
498       }
499       if (obj == null) {
500         return false;
501       }
502       if (!(obj instanceof IndexedResult)) {
503         return false;
504       }
505       return compareTo((IndexedResult) obj) == 0;
506     }
507 
508     @Override
509     public int hashCode() {
510       return value.hashCode();
511     }
512   }
513 
514   /**
515    * Merge sort reader.
516    * It merges and sort the readers in different sequence files as one where
517    * the results are read in order.
518    */
519   private static class MergeSortReader {
520 
521     private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
522     private PriorityQueue<IndexedResult> results = new PriorityQueue<IndexedResult>();
523 
524     public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
525       if (fs.exists(path)) {
526         FileStatus[] files = fs.listStatus(path);
527         int index = 0;
528         for (FileStatus file : files) {
529           if (file.isFile()) {
530             SequenceFile.Reader reader = new SequenceFile.Reader(fs, file.getPath(), conf);
531             String key = (String) reader.next((String) null);
532             if (key != null) {
533               results.add(new IndexedResult(index, key));
534               readers.add(reader);
535               index++;
536             }
537           }
538         }
539       }
540     }
541 
542     public String next() throws IOException {
543       IndexedResult result = results.poll();
544       if (result != null) {
545         SequenceFile.Reader reader = readers.get(result.getIndex());
546         String key = (String) reader.next((String) null);
547         if (key != null) {
548           results.add(new IndexedResult(result.getIndex(), key));
549         }
550         return result.getValue();
551       }
552       return null;
553     }
554 
555     public void close() {
556       for (SequenceFile.Reader reader : readers) {
557         if (reader != null) {
558           IOUtils.closeStream(reader);
559         }
560       }
561     }
562   }
563 
564   /**
565    * The counter used in sweep job.
566    */
567   public enum SweepCounter {
568 
569     /**
570      * How many files are read.
571      */
572     INPUT_FILE_COUNT,
573 
574     /**
575      * How many files need to be merged or cleaned.
576      */
577     FILE_TO_BE_MERGE_OR_CLEAN,
578 
579     /**
580      * How many files are left after merging.
581      */
582     FILE_AFTER_MERGE_OR_CLEAN,
583 
584     /**
585      * How many records are updated.
586      */
587     RECORDS_UPDATED,
588   }
589 
590   public static class DummyMobAbortable implements Abortable {
591 
592     private boolean abort = false;
593 
594     public void abort(String why, Throwable e) {
595       abort = true;
596     }
597 
598     public boolean isAborted() {
599       return abort;
600     }
601 
602   }
603 }