View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.UUID;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ConcurrentMap;
30  import java.util.regex.Matcher;
31  
32  import org.apache.commons.collections.map.AbstractReferenceMap;
33  import org.apache.commons.collections.map.ReferenceMap;
34  import org.apache.commons.lang.ClassUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.classification.InterfaceStability;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Cell;
43  import org.apache.hadoop.hbase.Coprocessor;
44  import org.apache.hadoop.hbase.CoprocessorEnvironment;
45  import org.apache.hadoop.hbase.HBaseConfiguration;
46  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.client.Append;
51  import org.apache.hadoop.hbase.client.Delete;
52  import org.apache.hadoop.hbase.client.Durability;
53  import org.apache.hadoop.hbase.client.Get;
54  import org.apache.hadoop.hbase.client.Increment;
55  import org.apache.hadoop.hbase.client.Mutation;
56  import org.apache.hadoop.hbase.client.Put;
57  import org.apache.hadoop.hbase.client.Result;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
60  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
61  import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
62  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
63  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
64  import org.apache.hadoop.hbase.coprocessor.RegionObserver;
65  import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
66  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
67  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
68  import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
69  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
70  import org.apache.hadoop.hbase.io.Reference;
71  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
72  import org.apache.hadoop.hbase.regionserver.Region.Operation;
73  import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
74  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
75  import org.apache.hadoop.hbase.wal.WALKey;
76  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
77  import org.apache.hadoop.hbase.util.Bytes;
78  import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
79  import org.apache.hadoop.hbase.util.Pair;
80  
81  import com.google.common.collect.ImmutableList;
82  import com.google.common.collect.Lists;
83  import com.google.protobuf.Message;
84  import com.google.protobuf.Service;
85  
86  /**
87   * Implements the coprocessor environment and runtime support for coprocessors
88   * loaded within a {@link Region}.
89   */
90  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
91  @InterfaceStability.Evolving
92  public class RegionCoprocessorHost
93      extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
94  
95    private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
96    // The shared data map
97    private static ReferenceMap sharedDataMap =
98        new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
99  
100   /**
101    * 
102    * Encapsulation of the environment of each coprocessor
103    */
104   static class RegionEnvironment extends CoprocessorHost.Environment
105       implements RegionCoprocessorEnvironment {
106 
107     private Region region;
108     private RegionServerServices rsServices;
109     ConcurrentMap<String, Object> sharedData;
110     private final boolean useLegacyPre;
111     private final boolean useLegacyPost;
112 
113     /**
114      * Constructor
115      * @param impl the coprocessor instance
116      * @param priority chaining priority
117      */
118     public RegionEnvironment(final Coprocessor impl, final int priority,
119         final int seq, final Configuration conf, final Region region,
120         final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
121       super(impl, priority, seq, conf);
122       this.region = region;
123       this.rsServices = services;
124       this.sharedData = sharedData;
125       // Pick which version of the WAL related events we'll call.
126       // This way we avoid calling the new version on older RegionObservers so
127       // we can maintain binary compatibility.
128       // See notes in javadoc for RegionObserver
129       useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
130           HRegionInfo.class, WALKey.class, WALEdit.class);
131       useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
132           HRegionInfo.class, WALKey.class, WALEdit.class);
133     }
134 
135     /** @return the region */
136     @Override
137     public Region getRegion() {
138       return region;
139     }
140 
141     /** @return reference to the region server services */
142     @Override
143     public RegionServerServices getRegionServerServices() {
144       return rsServices;
145     }
146 
147     public void shutdown() {
148       super.shutdown();
149     }
150 
151     @Override
152     public ConcurrentMap<String, Object> getSharedData() {
153       return sharedData;
154     }
155 
156     @Override
157     public HRegionInfo getRegionInfo() {
158       return region.getRegionInfo();
159     }
160 
161   }
162 
163   static class TableCoprocessorAttribute {
164     private Path path;
165     private String className;
166     private int priority;
167     private Configuration conf;
168 
169     public TableCoprocessorAttribute(Path path, String className, int priority,
170         Configuration conf) {
171       this.path = path;
172       this.className = className;
173       this.priority = priority;
174       this.conf = conf;
175     }
176 
177     public Path getPath() {
178       return path;
179     }
180 
181     public String getClassName() {
182       return className;
183     }
184 
185     public int getPriority() {
186       return priority;
187     }
188 
189     public Configuration getConf() {
190       return conf;
191     }
192   }
193 
194   /** The region server services */
195   RegionServerServices rsServices;
196   /** The region */
197   Region region;
198 
199   /**
200    * Constructor
201    * @param region the region
202    * @param rsServices interface to available region server functionality
203    * @param conf the configuration
204    */
205   public RegionCoprocessorHost(final Region region,
206       final RegionServerServices rsServices, final Configuration conf) {
207     super(rsServices);
208     this.conf = conf;
209     this.rsServices = rsServices;
210     this.region = region;
211     this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
212 
213     // load system default cp's from configuration.
214     loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
215 
216     // load system default cp's for user tables from configuration.
217     if (!region.getRegionInfo().getTable().isSystemTable()) {
218       loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
219     }
220 
221     // load Coprocessor From HDFS
222     loadTableCoprocessors(conf);
223   }
224 
225   static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
226       HTableDescriptor htd) {
227     List<TableCoprocessorAttribute> result = Lists.newArrayList();
228     for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
229       String key = Bytes.toString(e.getKey().get()).trim();
230       if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
231         String spec = Bytes.toString(e.getValue().get()).trim();
232         // found one
233         try {
234           Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
235           if (matcher.matches()) {
236             // jar file path can be empty if the cp class can be loaded
237             // from class loader.
238             Path path = matcher.group(1).trim().isEmpty() ?
239                 null : new Path(matcher.group(1).trim());
240             String className = matcher.group(2).trim();
241             if (className.isEmpty()) {
242               LOG.error("Malformed table coprocessor specification: key=" +
243                 key + ", spec: " + spec);
244               continue;
245             }
246             int priority = matcher.group(3).trim().isEmpty() ?
247                 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
248             String cfgSpec = null;
249             try {
250               cfgSpec = matcher.group(4);
251             } catch (IndexOutOfBoundsException ex) {
252               // ignore
253             }
254             Configuration ourConf;
255             if (cfgSpec != null) {
256               cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
257               // do an explicit deep copy of the passed configuration
258               ourConf = new Configuration(false);
259               HBaseConfiguration.merge(ourConf, conf);
260               Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
261               while (m.find()) {
262                 ourConf.set(m.group(1), m.group(2));
263               }
264             } else {
265               ourConf = conf;
266             }
267             result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
268           } else {
269             LOG.error("Malformed table coprocessor specification: key=" + key +
270               ", spec: " + spec);
271           }
272         } catch (Exception ioe) {
273           LOG.error("Malformed table coprocessor specification: key=" + key +
274             ", spec: " + spec);
275         }
276       }
277     }
278     return result;
279   }
280 
281   /**
282    * Sanity check the table coprocessor attributes of the supplied schema. Will
283    * throw an exception if there is a problem.
284    * @param conf
285    * @param htd
286    * @throws IOException
287    */
288   public static void testTableCoprocessorAttrs(final Configuration conf,
289       final HTableDescriptor htd) throws IOException {
290     String pathPrefix = UUID.randomUUID().toString();
291     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
292       if (attr.getPriority() < 0) {
293         throw new IOException("Priority for coprocessor " + attr.getClassName() +
294           " cannot be less than 0");
295       }
296       ClassLoader old = Thread.currentThread().getContextClassLoader();
297       try {
298         ClassLoader cl;
299         if (attr.getPath() != null) {
300           cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
301             CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
302         } else {
303           cl = CoprocessorHost.class.getClassLoader();
304         }
305         Thread.currentThread().setContextClassLoader(cl);
306         cl.loadClass(attr.getClassName());
307       } catch (ClassNotFoundException e) {
308         throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
309       } finally {
310         Thread.currentThread().setContextClassLoader(old);
311       }
312     }
313   }
314 
315   void loadTableCoprocessors(final Configuration conf) {
316     boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
317       DEFAULT_COPROCESSORS_ENABLED);
318     boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
319       DEFAULT_USER_COPROCESSORS_ENABLED);
320     if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
321       return;
322     }
323 
324     // scan the table attributes for coprocessor load specifications
325     // initialize the coprocessors
326     List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
327     for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, 
328         region.getTableDesc())) {
329       // Load encompasses classloading and coprocessor initialization
330       try {
331         RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
332           attr.getConf());
333         configured.add(env);
334         LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
335             region.getTableDesc().getTableName().getNameAsString() + " successfully.");
336       } catch (Throwable t) {
337         // Coprocessor failed to load, do we abort on error?
338         if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
339           abortServer(attr.getClassName(), t);
340         } else {
341           LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
342         }
343       }
344     }
345     // add together to coprocessor set for COW efficiency
346     coprocessors.addAll(configured);
347   }
348 
349   @Override
350   public RegionEnvironment createEnvironment(Class<?> implClass,
351       Coprocessor instance, int priority, int seq, Configuration conf) {
352     // Check if it's an Endpoint.
353     // Due to current dynamic protocol design, Endpoint
354     // uses a different way to be registered and executed.
355     // It uses a visitor pattern to invoke registered Endpoint
356     // method.
357     for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
358       Class<?> c = (Class<?>) itf;
359       if (CoprocessorService.class.isAssignableFrom(c)) {
360         region.registerService( ((CoprocessorService)instance).getService() );
361       }
362     }
363     ConcurrentMap<String, Object> classData;
364     // make sure only one thread can add maps
365     synchronized (sharedDataMap) {
366       // as long as at least one RegionEnvironment holds on to its classData it will
367       // remain in this map
368       classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
369       if (classData == null) {
370         classData = new ConcurrentHashMap<String, Object>();
371         sharedDataMap.put(implClass.getName(), classData);
372       }
373     }
374     return new RegionEnvironment(instance, priority, seq, conf, region,
375         rsServices, classData);
376   }
377 
378   /**
379    * HBASE-4014 : This is used by coprocessor hooks which are not declared to throw exceptions.
380    *
381    * For example, {@link
382    * org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#preOpen()} and
383    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
384    *
385    * See also
386    * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
387    *    CoprocessorEnvironment, Throwable)}
388    * @param env The coprocessor that threw the exception.
389    * @param e The exception that was thrown.
390    */
391   private void handleCoprocessorThrowableNoRethrow(
392       final CoprocessorEnvironment env, final Throwable e) {
393     try {
394       handleCoprocessorThrowable(env,e);
395     } catch (IOException ioe) {
396       // We cannot throw exceptions from the caller hook, so ignore.
397       LOG.warn(
398         "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
399         e + ". Ignoring.",e);
400     }
401   }
402 
403   /**
404    * Invoked before a region open.
405    *
406    * @throws IOException Signals that an I/O exception has occurred.
407    */
408   public void preOpen() throws IOException {
409     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
410       @Override
411       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
412           throws IOException {
413         oserver.preOpen(ctx);
414       }
415     });
416   }
417 
418   /**
419    * Invoked after a region open
420    */
421   public void postOpen() {
422     try {
423       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
424         @Override
425         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
426             throws IOException {
427           oserver.postOpen(ctx);
428         }
429       });
430     } catch (IOException e) {
431       LOG.warn(e);
432     }
433   }
434 
435   /**
436    * Invoked after log replay on region
437    */
438   public void postLogReplay() {
439     try {
440       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
441         @Override
442         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
443             throws IOException {
444           oserver.postLogReplay(ctx);
445         }
446       });
447     } catch (IOException e) {
448       LOG.warn(e);
449     }
450   }
451 
452   /**
453    * Invoked before a region is closed
454    * @param abortRequested true if the server is aborting
455    */
456   public void preClose(final boolean abortRequested) throws IOException {
457     execOperation(false, new RegionOperation() {
458       @Override
459       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
460           throws IOException {
461         oserver.preClose(ctx, abortRequested);
462       }
463     });
464   }
465 
466   /**
467    * Invoked after a region is closed
468    * @param abortRequested true if the server is aborting
469    */
470   public void postClose(final boolean abortRequested) {
471     try {
472       execOperation(false, new RegionOperation() {
473         @Override
474         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
475             throws IOException {
476           oserver.postClose(ctx, abortRequested);
477         }
478         public void postEnvCall(RegionEnvironment env) {
479           shutdown(env);
480         }
481       });
482     } catch (IOException e) {
483       LOG.warn(e);
484     }
485   }
486 
487   /**
488    * See
489    * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
490    */
491   public InternalScanner preCompactScannerOpen(final Store store,
492       final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
493       final CompactionRequest request) throws IOException {
494     return execOperationWithResult(null,
495         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
496       @Override
497       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
498           throws IOException {
499         setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
500           earliestPutTs, getResult(), request));
501       }
502     });
503   }
504 
505   /**
506    * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
507    * available candidates.
508    * @param store The store where compaction is being requested
509    * @param candidates The currently available store files
510    * @param request custom compaction request
511    * @return If {@code true}, skip the normal selection process and use the current list
512    * @throws IOException
513    */
514   public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
515       final CompactionRequest request) throws IOException {
516     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
517       @Override
518       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
519           throws IOException {
520         oserver.preCompactSelection(ctx, store, candidates, request);
521       }
522     });
523   }
524 
525   /**
526    * Called after the {@link StoreFile}s to be compacted have been selected from the available
527    * candidates.
528    * @param store The store where compaction is being requested
529    * @param selected The store files selected to compact
530    * @param request custom compaction
531    */
532   public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
533       final CompactionRequest request) {
534     try {
535       execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
536         @Override
537         public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
538             throws IOException {
539           oserver.postCompactSelection(ctx, store, selected, request);
540         }
541       });
542     } catch (IOException e) {
543       LOG.warn(e);
544     }
545   }
546 
547   /**
548    * Called prior to rewriting the store files selected for compaction
549    * @param store the store being compacted
550    * @param scanner the scanner used to read store data during compaction
551    * @param scanType type of Scan
552    * @param request the compaction that will be executed
553    * @throws IOException
554    */
555   public InternalScanner preCompact(final Store store, final InternalScanner scanner,
556       final ScanType scanType, final CompactionRequest request) throws IOException {
557     return execOperationWithResult(false, scanner,
558         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
559       @Override
560       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
561           throws IOException {
562         setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
563       }
564     });
565   }
566 
567   /**
568    * Called after the store compaction has completed.
569    * @param store the store being compacted
570    * @param resultFile the new store file written during compaction
571    * @param request the compaction that is being executed
572    * @throws IOException
573    */
574   public void postCompact(final Store store, final StoreFile resultFile,
575       final CompactionRequest request) throws IOException {
576     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
577       @Override
578       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
579           throws IOException {
580         oserver.postCompact(ctx, store, resultFile, request);
581       }
582     });
583   }
584 
585   /**
586    * Invoked before a memstore flush
587    * @throws IOException
588    */
589   public InternalScanner preFlush(final Store store, final InternalScanner scanner)
590       throws IOException {
591     return execOperationWithResult(false, scanner,
592         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
593       @Override
594       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
595           throws IOException {
596         setResult(oserver.preFlush(ctx, store, getResult()));
597       }
598     });
599   }
600 
601   /**
602    * Invoked before a memstore flush
603    * @throws IOException
604    */
605   public void preFlush() throws IOException {
606     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
607       @Override
608       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
609           throws IOException {
610         oserver.preFlush(ctx);
611       }
612     });
613   }
614 
615   /**
616    * See
617    * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
618    *    Store, KeyValueScanner, InternalScanner)}
619    */
620   public InternalScanner preFlushScannerOpen(final Store store,
621       final KeyValueScanner memstoreScanner) throws IOException {
622     return execOperationWithResult(null,
623         coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
624       @Override
625       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
626           throws IOException {
627         setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
628       }
629     });
630   }
631 
632   /**
633    * Invoked after a memstore flush
634    * @throws IOException
635    */
636   public void postFlush() throws IOException {
637     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
638       @Override
639       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
640           throws IOException {
641         oserver.postFlush(ctx);
642       }
643     });
644   }
645 
646   /**
647    * Invoked after a memstore flush
648    * @throws IOException
649    */
650   public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
651     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
652       @Override
653       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
654           throws IOException {
655         oserver.postFlush(ctx, store, storeFile);
656       }
657     });
658   }
659 
660   /**
661    * Invoked just before a split
662    * @throws IOException
663    */
664   // TODO: Deprecate this
665   public void preSplit() throws IOException {
666     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
667       @Override
668       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
669           throws IOException {
670         oserver.preSplit(ctx);
671       }
672     });
673   }
674 
675   /**
676    * Invoked just before a split
677    * @throws IOException
678    */
679   public void preSplit(final byte[] splitRow) throws IOException {
680     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
681       @Override
682       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
683           throws IOException {
684         oserver.preSplit(ctx, splitRow);
685       }
686     });
687   }
688 
689   /**
690    * Invoked just after a split
691    * @param l the new left-hand daughter region
692    * @param r the new right-hand daughter region
693    * @throws IOException
694    */
695   public void postSplit(final Region l, final Region r) throws IOException {
696     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
697       @Override
698       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
699           throws IOException {
700         oserver.postSplit(ctx, l, r);
701       }
702     });
703   }
704 
705   public boolean preSplitBeforePONR(final byte[] splitKey,
706       final List<Mutation> metaEntries) throws IOException {
707     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
708       @Override
709       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
710           throws IOException {
711         oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
712       }
713     });
714   }
715 
716   public void preSplitAfterPONR() throws IOException {
717     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
718       @Override
719       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
720           throws IOException {
721         oserver.preSplitAfterPONR(ctx);
722       }
723     });
724   }
725 
726   /**
727    * Invoked just before the rollback of a failed split is started
728    * @throws IOException
729    */
730   public void preRollBackSplit() throws IOException {
731     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
732       @Override
733       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
734           throws IOException {
735         oserver.preRollBackSplit(ctx);
736       }
737     });
738   }
739 
740   /**
741    * Invoked just after the rollback of a failed split is done
742    * @throws IOException
743    */
744   public void postRollBackSplit() throws IOException {
745     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
746       @Override
747       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
748           throws IOException {
749         oserver.postRollBackSplit(ctx);
750       }
751     });
752   }
753 
754   /**
755    * Invoked after a split is completed irrespective of a failure or success.
756    * @throws IOException
757    */
758   public void postCompleteSplit() throws IOException {
759     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
760       @Override
761       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
762           throws IOException {
763         oserver.postCompleteSplit(ctx);
764       }
765     });
766   }
767 
768   // RegionObserver support
769 
770   /**
771    * @param row the row key
772    * @param family the family
773    * @param result the result set from the region
774    * @return true if default processing should be bypassed
775    * @exception IOException Exception
776    */
777   public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
778       final Result result) throws IOException {
779     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
780       @Override
781       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
782           throws IOException {
783         oserver.preGetClosestRowBefore(ctx, row, family, result);
784       }
785     });
786   }
787 
788   /**
789    * @param row the row key
790    * @param family the family
791    * @param result the result set from the region
792    * @exception IOException Exception
793    */
794   public void postGetClosestRowBefore(final byte[] row, final byte[] family,
795       final Result result) throws IOException {
796     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
797       @Override
798       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
799           throws IOException {
800         oserver.postGetClosestRowBefore(ctx, row, family, result);
801       }
802     });
803   }
804 
805   /**
806    * @param get the Get request
807    * @return true if default processing should be bypassed
808    * @exception IOException Exception
809    */
810   public boolean preGet(final Get get, final List<Cell> results)
811       throws IOException {
812     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
813       @Override
814       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
815           throws IOException {
816         oserver.preGetOp(ctx, get, results);
817       }
818     });
819   }
820 
821   /**
822    * @param get the Get request
823    * @param results the result sett
824    * @exception IOException Exception
825    */
826   public void postGet(final Get get, final List<Cell> results)
827       throws IOException {
828     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
829       @Override
830       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
831           throws IOException {
832         oserver.postGetOp(ctx, get, results);
833       }
834     });
835   }
836 
837   /**
838    * @param get the Get request
839    * @return true or false to return to client if bypassing normal operation,
840    * or null otherwise
841    * @exception IOException Exception
842    */
843   public Boolean preExists(final Get get) throws IOException {
844     return execOperationWithResult(true, false,
845         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
846       @Override
847       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
848           throws IOException {
849         setResult(oserver.preExists(ctx, get, getResult()));
850       }
851     });
852   }
853 
854   /**
855    * @param get the Get request
856    * @param exists the result returned by the region server
857    * @return the result to return to the client
858    * @exception IOException Exception
859    */
860   public boolean postExists(final Get get, boolean exists)
861       throws IOException {
862     return execOperationWithResult(exists,
863         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
864       @Override
865       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
866           throws IOException {
867         setResult(oserver.postExists(ctx, get, getResult()));
868       }
869     });
870   }
871 
872   /**
873    * @param put The Put object
874    * @param edit The WALEdit object.
875    * @param durability The durability used
876    * @return true if default processing should be bypassed
877    * @exception IOException Exception
878    */
879   public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
880       throws IOException {
881     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
882       @Override
883       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
884           throws IOException {
885         oserver.prePut(ctx, put, edit, durability);
886       }
887     });
888   }
889 
890   /**
891    * @param mutation - the current mutation
892    * @param kv - the current cell
893    * @param byteNow - current timestamp in bytes
894    * @param get - the get that could be used
895    * Note that the get only does not specify the family and qualifier that should be used
896    * @return true if default processing should be bypassed
897    * @exception IOException
898    *              Exception
899    */
900   public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
901       final Cell kv, final byte[] byteNow, final Get get) throws IOException {
902     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
903       @Override
904       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
905           throws IOException {
906         oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
907       }
908     });
909   }
910 
911   /**
912    * @param put The Put object
913    * @param edit The WALEdit object.
914    * @param durability The durability used
915    * @exception IOException Exception
916    */
917   public void postPut(final Put put, final WALEdit edit, final Durability durability)
918       throws IOException {
919     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
920       @Override
921       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
922           throws IOException {
923         oserver.postPut(ctx, put, edit, durability);
924       }
925     });
926   }
927 
928   /**
929    * @param delete The Delete object
930    * @param edit The WALEdit object.
931    * @param durability The durability used
932    * @return true if default processing should be bypassed
933    * @exception IOException Exception
934    */
935   public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
936       throws IOException {
937     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
938       @Override
939       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
940           throws IOException {
941         oserver.preDelete(ctx, delete, edit, durability);
942       }
943     });
944   }
945 
946   /**
947    * @param delete The Delete object
948    * @param edit The WALEdit object.
949    * @param durability The durability used
950    * @exception IOException Exception
951    */
952   public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
953       throws IOException {
954     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
955       @Override
956       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
957           throws IOException {
958         oserver.postDelete(ctx, delete, edit, durability);
959       }
960     });
961   }
962 
963   /**
964    * @param miniBatchOp
965    * @return true if default processing should be bypassed
966    * @throws IOException
967    */
968   public boolean preBatchMutate(
969       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
970     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
971       @Override
972       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
973           throws IOException {
974         oserver.preBatchMutate(ctx, miniBatchOp);
975       }
976     });
977   }
978 
979   /**
980    * @param miniBatchOp
981    * @throws IOException
982    */
983   public void postBatchMutate(
984       final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
985     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
986       @Override
987       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
988           throws IOException {
989         oserver.postBatchMutate(ctx, miniBatchOp);
990       }
991     });
992   }
993 
994   public void postBatchMutateIndispensably(
995       final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
996       throws IOException {
997     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
998       @Override
999       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1000           throws IOException {
1001         oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1002       }
1003     });
1004   }
1005 
1006   /**
1007    * @param row row to check
1008    * @param family column family
1009    * @param qualifier column qualifier
1010    * @param compareOp the comparison operation
1011    * @param comparator the comparator
1012    * @param put data to put if check succeeds
1013    * @return true or false to return to client if default processing should
1014    * be bypassed, or null otherwise
1015    * @throws IOException e
1016    */
1017   public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1018       final byte [] qualifier, final CompareOp compareOp,
1019       final ByteArrayComparable comparator, final Put put)
1020       throws IOException {
1021     return execOperationWithResult(true, false,
1022         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1023       @Override
1024       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1025           throws IOException {
1026         setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1027           compareOp, comparator, put, getResult()));
1028       }
1029     });
1030   }
1031 
1032   /**
1033    * @param row row to check
1034    * @param family column family
1035    * @param qualifier column qualifier
1036    * @param compareOp the comparison operation
1037    * @param comparator the comparator
1038    * @param put data to put if check succeeds
1039    * @return true or false to return to client if default processing should
1040    * be bypassed, or null otherwise
1041    * @throws IOException e
1042    */
1043   public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1044       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1045       final Put put) throws IOException {
1046     return execOperationWithResult(true, false,
1047         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1048       @Override
1049       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1050           throws IOException {
1051         setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1052           compareOp, comparator, put, getResult()));
1053       }
1054     });
1055   }
1056 
1057   /**
1058    * @param row row to check
1059    * @param family column family
1060    * @param qualifier column qualifier
1061    * @param compareOp the comparison operation
1062    * @param comparator the comparator
1063    * @param put data to put if check succeeds
1064    * @throws IOException e
1065    */
1066   public boolean postCheckAndPut(final byte [] row, final byte [] family,
1067       final byte [] qualifier, final CompareOp compareOp,
1068       final ByteArrayComparable comparator, final Put put,
1069       boolean result) throws IOException {
1070     return execOperationWithResult(result,
1071         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1072       @Override
1073       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1074           throws IOException {
1075         setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1076           compareOp, comparator, put, getResult()));
1077       }
1078     });
1079   }
1080 
1081   /**
1082    * @param row row to check
1083    * @param family column family
1084    * @param qualifier column qualifier
1085    * @param compareOp the comparison operation
1086    * @param comparator the comparator
1087    * @param delete delete to commit if check succeeds
1088    * @return true or false to return to client if default processing should
1089    * be bypassed, or null otherwise
1090    * @throws IOException e
1091    */
1092   public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1093       final byte [] qualifier, final CompareOp compareOp,
1094       final ByteArrayComparable comparator, final Delete delete)
1095       throws IOException {
1096     return execOperationWithResult(true, false,
1097         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1098       @Override
1099       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1100           throws IOException {
1101         setResult(oserver.preCheckAndDelete(ctx, row, family,
1102             qualifier, compareOp, comparator, delete, getResult()));
1103       }
1104     });
1105   }
1106 
1107   /**
1108    * @param row row to check
1109    * @param family column family
1110    * @param qualifier column qualifier
1111    * @param compareOp the comparison operation
1112    * @param comparator the comparator
1113    * @param delete delete to commit if check succeeds
1114    * @return true or false to return to client if default processing should
1115    * be bypassed, or null otherwise
1116    * @throws IOException e
1117    */
1118   public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1119       final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1120       final Delete delete) throws IOException {
1121     return execOperationWithResult(true, false,
1122         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1123       @Override
1124       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1125           throws IOException {
1126         setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1127               family, qualifier, compareOp, comparator, delete, getResult()));
1128       }
1129     });
1130   }
1131 
1132   /**
1133    * @param row row to check
1134    * @param family column family
1135    * @param qualifier column qualifier
1136    * @param compareOp the comparison operation
1137    * @param comparator the comparator
1138    * @param delete delete to commit if check succeeds
1139    * @throws IOException e
1140    */
1141   public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1142       final byte [] qualifier, final CompareOp compareOp,
1143       final ByteArrayComparable comparator, final Delete delete,
1144       boolean result) throws IOException {
1145     return execOperationWithResult(result,
1146         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1147       @Override
1148       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1149           throws IOException {
1150         setResult(oserver.postCheckAndDelete(ctx, row, family,
1151             qualifier, compareOp, comparator, delete, getResult()));
1152       }
1153     });
1154   }
1155 
1156   /**
1157    * @param append append object
1158    * @return result to return to client if default operation should be
1159    * bypassed, null otherwise
1160    * @throws IOException if an error occurred on the coprocessor
1161    */
1162   public Result preAppend(final Append append) throws IOException {
1163     return execOperationWithResult(true, null,
1164         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1165       @Override
1166       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1167           throws IOException {
1168         setResult(oserver.preAppend(ctx, append));
1169       }
1170     });
1171   }
1172 
1173   /**
1174    * @param append append object
1175    * @return result to return to client if default operation should be
1176    * bypassed, null otherwise
1177    * @throws IOException if an error occurred on the coprocessor
1178    */
1179   public Result preAppendAfterRowLock(final Append append) throws IOException {
1180     return execOperationWithResult(true, null,
1181         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1182       @Override
1183       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1184           throws IOException {
1185         setResult(oserver.preAppendAfterRowLock(ctx, append));
1186       }
1187     });
1188   }
1189 
1190   /**
1191    * @param increment increment object
1192    * @return result to return to client if default operation should be
1193    * bypassed, null otherwise
1194    * @throws IOException if an error occurred on the coprocessor
1195    */
1196   public Result preIncrement(final Increment increment) throws IOException {
1197     return execOperationWithResult(true, null,
1198         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1199       @Override
1200       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1201           throws IOException {
1202         setResult(oserver.preIncrement(ctx, increment));
1203       }
1204     });
1205   }
1206 
1207   /**
1208    * @param increment increment object
1209    * @return result to return to client if default operation should be
1210    * bypassed, null otherwise
1211    * @throws IOException if an error occurred on the coprocessor
1212    */
1213   public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1214     return execOperationWithResult(true, null,
1215         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1216       @Override
1217       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1218           throws IOException {
1219         setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1220       }
1221     });
1222   }
1223 
1224   /**
1225    * @param append Append object
1226    * @param result the result returned by the append
1227    * @throws IOException if an error occurred on the coprocessor
1228    */
1229   public void postAppend(final Append append, final Result result) throws IOException {
1230     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1231       @Override
1232       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1233           throws IOException {
1234         oserver.postAppend(ctx, append, result);
1235       }
1236     });
1237   }
1238 
1239   /**
1240    * @param increment increment object
1241    * @param result the result returned by postIncrement
1242    * @throws IOException if an error occurred on the coprocessor
1243    */
1244   public Result postIncrement(final Increment increment, Result result) throws IOException {
1245     return execOperationWithResult(result,
1246         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1247       @Override
1248       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1249           throws IOException {
1250         setResult(oserver.postIncrement(ctx, increment, getResult()));
1251       }
1252     });
1253   }
1254 
1255   /**
1256    * @param scan the Scan specification
1257    * @return scanner id to return to client if default operation should be
1258    * bypassed, false otherwise
1259    * @exception IOException Exception
1260    */
1261   public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1262     return execOperationWithResult(true, null,
1263         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1264       @Override
1265       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1266           throws IOException {
1267         setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1268       }
1269     });
1270   }
1271 
1272   /**
1273    * See
1274    * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
1275    *    Store, Scan, NavigableSet, KeyValueScanner)}
1276    */
1277   public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1278       final NavigableSet<byte[]> targetCols) throws IOException {
1279     return execOperationWithResult(null,
1280         coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1281       @Override
1282       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1283           throws IOException {
1284         setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1285       }
1286     });
1287   }
1288 
1289   /**
1290    * @param scan the Scan specification
1291    * @param s the scanner
1292    * @return the scanner instance to use
1293    * @exception IOException Exception
1294    */
1295   public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1296     return execOperationWithResult(s,
1297         coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1298       @Override
1299       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1300           throws IOException {
1301         setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1302       }
1303     });
1304   }
1305 
1306   /**
1307    * @param s the scanner
1308    * @param results the result set returned by the region server
1309    * @param limit the maximum number of results to return
1310    * @return 'has next' indication to client if bypassing default behavior, or
1311    * null otherwise
1312    * @exception IOException Exception
1313    */
1314   public Boolean preScannerNext(final InternalScanner s,
1315       final List<Result> results, final int limit) throws IOException {
1316     return execOperationWithResult(true, false,
1317         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1318       @Override
1319       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1320           throws IOException {
1321         setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1322       }
1323     });
1324   }
1325 
1326   /**
1327    * @param s the scanner
1328    * @param results the result set returned by the region server
1329    * @param limit the maximum number of results to return
1330    * @param hasMore
1331    * @return 'has more' indication to give to client
1332    * @exception IOException Exception
1333    */
1334   public boolean postScannerNext(final InternalScanner s,
1335       final List<Result> results, final int limit, boolean hasMore)
1336       throws IOException {
1337     return execOperationWithResult(hasMore,
1338         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1339       @Override
1340       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1341           throws IOException {
1342         setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1343       }
1344     });
1345   }
1346 
1347   /**
1348    * This will be called by the scan flow when the current scanned row is being filtered out by the
1349    * filter.
1350    * @param s the scanner
1351    * @param currentRow The current rowkey which got filtered out
1352    * @param offset offset to rowkey
1353    * @param length length of rowkey
1354    * @return whether more rows are available for the scanner or not
1355    * @throws IOException
1356    */
1357   public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1358       final int offset, final short length) throws IOException {
1359     return execOperationWithResult(true,
1360         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1361       @Override
1362       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1363           throws IOException {
1364         setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1365       }
1366     });
1367   }
1368 
1369   /**
1370    * @param s the scanner
1371    * @return true if default behavior should be bypassed, false otherwise
1372    * @exception IOException Exception
1373    */
1374   public boolean preScannerClose(final InternalScanner s) throws IOException {
1375     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1376       @Override
1377       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1378           throws IOException {
1379         oserver.preScannerClose(ctx, s);
1380       }
1381     });
1382   }
1383 
1384   /**
1385    * @exception IOException Exception
1386    */
1387   public void postScannerClose(final InternalScanner s) throws IOException {
1388     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1389       @Override
1390       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1391           throws IOException {
1392         oserver.postScannerClose(ctx, s);
1393       }
1394     });
1395   }
1396 
1397   /**
1398    * @param info
1399    * @param logKey
1400    * @param logEdit
1401    * @return true if default behavior should be bypassed, false otherwise
1402    * @throws IOException
1403    */
1404   public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1405       final WALEdit logEdit) throws IOException {
1406     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1407       @Override
1408       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1409           throws IOException {
1410         // Once we don't need to support the legacy call, replace RegionOperation with a version
1411         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1412         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1413         if (env.useLegacyPre) {
1414           if (logKey instanceof HLogKey) {
1415             oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1416           } else {
1417             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1418           }
1419         } else {
1420           oserver.preWALRestore(ctx, info, logKey, logEdit);
1421         }
1422       }
1423     });
1424   }
1425 
1426   /**
1427    * @return true if default behavior should be bypassed, false otherwise
1428    * @deprecated use {@link #preWALRestore(HRegionInfo, WALKey, WALEdit)}
1429    */
1430   @Deprecated
1431   public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1432       final WALEdit logEdit) throws IOException {
1433     return preWALRestore(info, (WALKey)logKey, logEdit);
1434   }
1435 
1436   /**
1437    * @param info
1438    * @param logKey
1439    * @param logEdit
1440    * @throws IOException
1441    */
1442   public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1443       throws IOException {
1444     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1445       @Override
1446       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1447           throws IOException {
1448         // Once we don't need to support the legacy call, replace RegionOperation with a version
1449         // that's ObserverContext<RegionEnvironment> and avoid this cast.
1450         final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1451         if (env.useLegacyPost) {
1452           if (logKey instanceof HLogKey) {
1453             oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1454           } else {
1455             legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1456           }
1457         } else {
1458           oserver.postWALRestore(ctx, info, logKey, logEdit);
1459         }
1460       }
1461     });
1462   }
1463 
1464   /**
1465    * @deprecated use {@link #postWALRestore(HRegionInfo, WALKey, WALEdit)}
1466    */
1467   @Deprecated
1468   public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1469       throws IOException {
1470     postWALRestore(info, (WALKey)logKey, logEdit);
1471   }
1472 
1473   /**
1474    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1475    * @return true if the default operation should be bypassed
1476    * @throws IOException
1477    */
1478   public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1479     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1480       @Override
1481       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1482           throws IOException {
1483         oserver.preBulkLoadHFile(ctx, familyPaths);
1484       }
1485     });
1486   }
1487 
1488   public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1489       throws IOException {
1490     return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1491       @Override
1492       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1493           throws IOException {
1494         oserver.preCommitStoreFile(ctx, family, pairs);
1495       }
1496     });
1497   }
1498   public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath)
1499       throws IOException {
1500     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1501       @Override
1502       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1503           throws IOException {
1504         oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
1505       }
1506     });
1507   }
1508 
1509   /**
1510    * @param familyPaths pairs of { CF, file path } submitted for bulk load
1511    * @param map Map of CF to List of file paths for the final loaded files
1512    * @param hasLoaded whether load was successful or not
1513    * @return the possibly modified value of hasLoaded
1514    * @throws IOException
1515    */
1516   public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1517       final Map<byte[], List<Path>> map, boolean hasLoaded) throws IOException {
1518     return execOperationWithResult(hasLoaded,
1519         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1520       @Override
1521       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1522           throws IOException {
1523         setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult()));
1524       }
1525     });
1526   }
1527 
1528   public void postStartRegionOperation(final Operation op) throws IOException {
1529     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1530       @Override
1531       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1532           throws IOException {
1533         oserver.postStartRegionOperation(ctx, op);
1534       }
1535     });
1536   }
1537 
1538   public void postCloseRegionOperation(final Operation op) throws IOException {
1539     execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1540       @Override
1541       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1542           throws IOException {
1543         oserver.postCloseRegionOperation(ctx, op);
1544       }
1545     });
1546   }
1547 
1548   /**
1549    * @param fs fileystem to read from
1550    * @param p path to the file
1551    * @param in {@link FSDataInputStreamWrapper}
1552    * @param size Full size of the file
1553    * @param cacheConf
1554    * @param r original reference file. This will be not null only when reading a split file.
1555    * @return a Reader instance to use instead of the base reader if overriding
1556    * default behavior, null otherwise
1557    * @throws IOException
1558    */
1559   public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1560       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1561       final Reference r) throws IOException {
1562     return execOperationWithResult(null,
1563         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1564       @Override
1565       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1566           throws IOException {
1567         setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1568       }
1569     });
1570   }
1571 
1572   /**
1573    * @param fs fileystem to read from
1574    * @param p path to the file
1575    * @param in {@link FSDataInputStreamWrapper}
1576    * @param size Full size of the file
1577    * @param cacheConf
1578    * @param r original reference file. This will be not null only when reading a split file.
1579    * @param reader the base reader instance
1580    * @return The reader to use
1581    * @throws IOException
1582    */
1583   public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1584       final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1585       final Reference r, final StoreFile.Reader reader) throws IOException {
1586     return execOperationWithResult(reader,
1587         coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1588       @Override
1589       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1590           throws IOException {
1591         setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1592       }
1593     });
1594   }
1595 
1596   public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1597       final Cell oldCell, Cell newCell) throws IOException {
1598     return execOperationWithResult(newCell,
1599         coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1600       @Override
1601       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1602           throws IOException {
1603         setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1604       }
1605     });
1606   }
1607 
1608   public Message preEndpointInvocation(final Service service, final String methodName,
1609       Message request) throws IOException {
1610     return execOperationWithResult(request,
1611         coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1612       @Override
1613       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1614           throws IOException {
1615         setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1616       }
1617     });
1618   }
1619 
1620   public void postEndpointInvocation(final Service service, final String methodName,
1621       final Message request, final Message.Builder responseBuilder) throws IOException {
1622     execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1623       @Override
1624       public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1625           throws IOException {
1626         oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1627       }
1628     });
1629   }
1630 
1631   public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1632     return execOperationWithResult(tracker,
1633         coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1634       @Override
1635       public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1636           throws IOException {
1637         setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1638       }
1639     });
1640   }
1641 
1642   private static abstract class CoprocessorOperation
1643       extends ObserverContext<RegionCoprocessorEnvironment> {
1644     public abstract void call(Coprocessor observer,
1645         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1646     public abstract boolean hasCall(Coprocessor observer);
1647     public void postEnvCall(RegionEnvironment env) { }
1648   }
1649 
1650   private static abstract class RegionOperation extends CoprocessorOperation {
1651     public abstract void call(RegionObserver observer,
1652         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1653 
1654     public boolean hasCall(Coprocessor observer) {
1655       return observer instanceof RegionObserver;
1656     }
1657 
1658     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1659         throws IOException {
1660       call((RegionObserver)observer, ctx);
1661     }
1662   }
1663 
1664   private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1665     private T result = null;
1666     public void setResult(final T result) { this.result = result; }
1667     public T getResult() { return this.result; }
1668   }
1669 
1670   private static abstract class EndpointOperation extends CoprocessorOperation {
1671     public abstract void call(EndpointObserver observer,
1672         ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1673 
1674     public boolean hasCall(Coprocessor observer) {
1675       return observer instanceof EndpointObserver;
1676     }
1677 
1678     public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1679         throws IOException {
1680       call((EndpointObserver)observer, ctx);
1681     }
1682   }
1683 
1684   private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1685     private T result = null;
1686     public void setResult(final T result) { this.result = result; }
1687     public T getResult() { return this.result; }
1688   }
1689 
1690   private boolean execOperation(final CoprocessorOperation ctx)
1691       throws IOException {
1692     return execOperation(true, ctx);
1693   }
1694 
1695   private <T> T execOperationWithResult(final T defaultValue,
1696       final RegionOperationWithResult<T> ctx) throws IOException {
1697     if (ctx == null) return defaultValue;
1698     ctx.setResult(defaultValue);
1699     execOperation(true, ctx);
1700     return ctx.getResult();
1701   }
1702 
1703   private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1704       final RegionOperationWithResult<T> ctx) throws IOException {
1705     boolean bypass = false;
1706     T result = defaultValue;
1707     if (ctx != null) {
1708       ctx.setResult(defaultValue);
1709       bypass = execOperation(true, ctx);
1710       result = ctx.getResult();
1711     }
1712     return bypass == ifBypass ? result : null;
1713   }
1714 
1715   private <T> T execOperationWithResult(final T defaultValue,
1716       final EndpointOperationWithResult<T> ctx) throws IOException {
1717     if (ctx == null) return defaultValue;
1718     ctx.setResult(defaultValue);
1719     execOperation(true, ctx);
1720     return ctx.getResult();
1721   }
1722 
1723   private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1724       throws IOException {
1725     boolean bypass = false;
1726     for (RegionEnvironment env: coprocessors) {
1727       Coprocessor observer = env.getInstance();
1728       if (ctx.hasCall(observer)) {
1729         ctx.prepare(env);
1730         Thread currentThread = Thread.currentThread();
1731         ClassLoader cl = currentThread.getContextClassLoader();
1732         try {
1733           currentThread.setContextClassLoader(env.getClassLoader());
1734           ctx.call(observer, ctx);
1735         } catch (Throwable e) {
1736           handleCoprocessorThrowable(env, e);
1737         } finally {
1738           currentThread.setContextClassLoader(cl);
1739         }
1740         bypass |= ctx.shouldBypass();
1741         if (earlyExit && ctx.shouldComplete()) {
1742           break;
1743         }
1744       }
1745 
1746       ctx.postEnvCall(env);
1747     }
1748     return bypass;
1749   }
1750 }