1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableSet;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ConcurrentMap;
30 import java.util.regex.Matcher;
31
32 import org.apache.commons.collections.map.AbstractReferenceMap;
33 import org.apache.commons.collections.map.ReferenceMap;
34 import org.apache.commons.lang.ClassUtils;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.hbase.classification.InterfaceAudience;
38 import org.apache.hadoop.hbase.classification.InterfaceStability;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Cell;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.CoprocessorEnvironment;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.client.Append;
51 import org.apache.hadoop.hbase.client.Delete;
52 import org.apache.hadoop.hbase.client.Durability;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.Increment;
55 import org.apache.hadoop.hbase.client.Mutation;
56 import org.apache.hadoop.hbase.client.Put;
57 import org.apache.hadoop.hbase.client.Result;
58 import org.apache.hadoop.hbase.client.Scan;
59 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
60 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
61 import org.apache.hadoop.hbase.coprocessor.EndpointObserver;
62 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
63 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
64 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
65 import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
66 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
67 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
68 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
69 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
70 import org.apache.hadoop.hbase.io.Reference;
71 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
72 import org.apache.hadoop.hbase.regionserver.Region.Operation;
73 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
74 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
75 import org.apache.hadoop.hbase.wal.WALKey;
76 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
77 import org.apache.hadoop.hbase.util.Bytes;
78 import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
79 import org.apache.hadoop.hbase.util.Pair;
80
81 import com.google.common.collect.ImmutableList;
82 import com.google.common.collect.Lists;
83 import com.google.protobuf.Message;
84 import com.google.protobuf.Service;
85
86
87
88
89
90 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
91 @InterfaceStability.Evolving
92 public class RegionCoprocessorHost
93 extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
94
95 private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
96
97 private static ReferenceMap sharedDataMap =
98 new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
99
100
101
102
103
104 static class RegionEnvironment extends CoprocessorHost.Environment
105 implements RegionCoprocessorEnvironment {
106
107 private Region region;
108 private RegionServerServices rsServices;
109 ConcurrentMap<String, Object> sharedData;
110 private final boolean useLegacyPre;
111 private final boolean useLegacyPost;
112
113
114
115
116
117
118 public RegionEnvironment(final Coprocessor impl, final int priority,
119 final int seq, final Configuration conf, final Region region,
120 final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
121 super(impl, priority, seq, conf);
122 this.region = region;
123 this.rsServices = services;
124 this.sharedData = sharedData;
125
126
127
128
129 useLegacyPre = useLegacyMethod(impl.getClass(), "preWALRestore", ObserverContext.class,
130 HRegionInfo.class, WALKey.class, WALEdit.class);
131 useLegacyPost = useLegacyMethod(impl.getClass(), "postWALRestore", ObserverContext.class,
132 HRegionInfo.class, WALKey.class, WALEdit.class);
133 }
134
135
136 @Override
137 public Region getRegion() {
138 return region;
139 }
140
141
142 @Override
143 public RegionServerServices getRegionServerServices() {
144 return rsServices;
145 }
146
147 public void shutdown() {
148 super.shutdown();
149 }
150
151 @Override
152 public ConcurrentMap<String, Object> getSharedData() {
153 return sharedData;
154 }
155
156 @Override
157 public HRegionInfo getRegionInfo() {
158 return region.getRegionInfo();
159 }
160
161 }
162
163 static class TableCoprocessorAttribute {
164 private Path path;
165 private String className;
166 private int priority;
167 private Configuration conf;
168
169 public TableCoprocessorAttribute(Path path, String className, int priority,
170 Configuration conf) {
171 this.path = path;
172 this.className = className;
173 this.priority = priority;
174 this.conf = conf;
175 }
176
177 public Path getPath() {
178 return path;
179 }
180
181 public String getClassName() {
182 return className;
183 }
184
185 public int getPriority() {
186 return priority;
187 }
188
189 public Configuration getConf() {
190 return conf;
191 }
192 }
193
194
195 RegionServerServices rsServices;
196
197 Region region;
198
199
200
201
202
203
204
205 public RegionCoprocessorHost(final Region region,
206 final RegionServerServices rsServices, final Configuration conf) {
207 super(rsServices);
208 this.conf = conf;
209 this.rsServices = rsServices;
210 this.region = region;
211 this.pathPrefix = Integer.toString(this.region.getRegionInfo().hashCode());
212
213
214 loadSystemCoprocessors(conf, REGION_COPROCESSOR_CONF_KEY);
215
216
217 if (!region.getRegionInfo().getTable().isSystemTable()) {
218 loadSystemCoprocessors(conf, USER_REGION_COPROCESSOR_CONF_KEY);
219 }
220
221
222 loadTableCoprocessors(conf);
223 }
224
225 static List<TableCoprocessorAttribute> getTableCoprocessorAttrsFromSchema(Configuration conf,
226 HTableDescriptor htd) {
227 List<TableCoprocessorAttribute> result = Lists.newArrayList();
228 for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e: htd.getValues().entrySet()) {
229 String key = Bytes.toString(e.getKey().get()).trim();
230 if (HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(key).matches()) {
231 String spec = Bytes.toString(e.getValue().get()).trim();
232
233 try {
234 Matcher matcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(spec);
235 if (matcher.matches()) {
236
237
238 Path path = matcher.group(1).trim().isEmpty() ?
239 null : new Path(matcher.group(1).trim());
240 String className = matcher.group(2).trim();
241 if (className.isEmpty()) {
242 LOG.error("Malformed table coprocessor specification: key=" +
243 key + ", spec: " + spec);
244 continue;
245 }
246 int priority = matcher.group(3).trim().isEmpty() ?
247 Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
248 String cfgSpec = null;
249 try {
250 cfgSpec = matcher.group(4);
251 } catch (IndexOutOfBoundsException ex) {
252
253 }
254 Configuration ourConf;
255 if (cfgSpec != null) {
256 cfgSpec = cfgSpec.substring(cfgSpec.indexOf('|') + 1);
257
258 ourConf = new Configuration(false);
259 HBaseConfiguration.merge(ourConf, conf);
260 Matcher m = HConstants.CP_HTD_ATTR_VALUE_PARAM_PATTERN.matcher(cfgSpec);
261 while (m.find()) {
262 ourConf.set(m.group(1), m.group(2));
263 }
264 } else {
265 ourConf = conf;
266 }
267 result.add(new TableCoprocessorAttribute(path, className, priority, ourConf));
268 } else {
269 LOG.error("Malformed table coprocessor specification: key=" + key +
270 ", spec: " + spec);
271 }
272 } catch (Exception ioe) {
273 LOG.error("Malformed table coprocessor specification: key=" + key +
274 ", spec: " + spec);
275 }
276 }
277 }
278 return result;
279 }
280
281
282
283
284
285
286
287
288 public static void testTableCoprocessorAttrs(final Configuration conf,
289 final HTableDescriptor htd) throws IOException {
290 String pathPrefix = UUID.randomUUID().toString();
291 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf, htd)) {
292 if (attr.getPriority() < 0) {
293 throw new IOException("Priority for coprocessor " + attr.getClassName() +
294 " cannot be less than 0");
295 }
296 ClassLoader old = Thread.currentThread().getContextClassLoader();
297 try {
298 ClassLoader cl;
299 if (attr.getPath() != null) {
300 cl = CoprocessorClassLoader.getClassLoader(attr.getPath(),
301 CoprocessorHost.class.getClassLoader(), pathPrefix, conf);
302 } else {
303 cl = CoprocessorHost.class.getClassLoader();
304 }
305 Thread.currentThread().setContextClassLoader(cl);
306 cl.loadClass(attr.getClassName());
307 } catch (ClassNotFoundException e) {
308 throw new IOException("Class " + attr.getClassName() + " cannot be loaded", e);
309 } finally {
310 Thread.currentThread().setContextClassLoader(old);
311 }
312 }
313 }
314
315 void loadTableCoprocessors(final Configuration conf) {
316 boolean coprocessorsEnabled = conf.getBoolean(COPROCESSORS_ENABLED_CONF_KEY,
317 DEFAULT_COPROCESSORS_ENABLED);
318 boolean tableCoprocessorsEnabled = conf.getBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY,
319 DEFAULT_USER_COPROCESSORS_ENABLED);
320 if (!(coprocessorsEnabled && tableCoprocessorsEnabled)) {
321 return;
322 }
323
324
325
326 List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
327 for (TableCoprocessorAttribute attr: getTableCoprocessorAttrsFromSchema(conf,
328 region.getTableDesc())) {
329
330 try {
331 RegionEnvironment env = load(attr.getPath(), attr.getClassName(), attr.getPriority(),
332 attr.getConf());
333 configured.add(env);
334 LOG.info("Loaded coprocessor " + attr.getClassName() + " from HTD of " +
335 region.getTableDesc().getTableName().getNameAsString() + " successfully.");
336 } catch (Throwable t) {
337
338 if (conf.getBoolean(ABORT_ON_ERROR_KEY, DEFAULT_ABORT_ON_ERROR)) {
339 abortServer(attr.getClassName(), t);
340 } else {
341 LOG.error("Failed to load coprocessor " + attr.getClassName(), t);
342 }
343 }
344 }
345
346 coprocessors.addAll(configured);
347 }
348
349 @Override
350 public RegionEnvironment createEnvironment(Class<?> implClass,
351 Coprocessor instance, int priority, int seq, Configuration conf) {
352
353
354
355
356
357 for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
358 Class<?> c = (Class<?>) itf;
359 if (CoprocessorService.class.isAssignableFrom(c)) {
360 region.registerService( ((CoprocessorService)instance).getService() );
361 }
362 }
363 ConcurrentMap<String, Object> classData;
364
365 synchronized (sharedDataMap) {
366
367
368 classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
369 if (classData == null) {
370 classData = new ConcurrentHashMap<String, Object>();
371 sharedDataMap.put(implClass.getName(), classData);
372 }
373 }
374 return new RegionEnvironment(instance, priority, seq, conf, region,
375 rsServices, classData);
376 }
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391 private void handleCoprocessorThrowableNoRethrow(
392 final CoprocessorEnvironment env, final Throwable e) {
393 try {
394 handleCoprocessorThrowable(env,e);
395 } catch (IOException ioe) {
396
397 LOG.warn(
398 "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
399 e + ". Ignoring.",e);
400 }
401 }
402
403
404
405
406
407
408 public void preOpen() throws IOException {
409 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
410 @Override
411 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
412 throws IOException {
413 oserver.preOpen(ctx);
414 }
415 });
416 }
417
418
419
420
421 public void postOpen() {
422 try {
423 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
424 @Override
425 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
426 throws IOException {
427 oserver.postOpen(ctx);
428 }
429 });
430 } catch (IOException e) {
431 LOG.warn(e);
432 }
433 }
434
435
436
437
438 public void postLogReplay() {
439 try {
440 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
441 @Override
442 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
443 throws IOException {
444 oserver.postLogReplay(ctx);
445 }
446 });
447 } catch (IOException e) {
448 LOG.warn(e);
449 }
450 }
451
452
453
454
455
456 public void preClose(final boolean abortRequested) throws IOException {
457 execOperation(false, new RegionOperation() {
458 @Override
459 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
460 throws IOException {
461 oserver.preClose(ctx, abortRequested);
462 }
463 });
464 }
465
466
467
468
469
470 public void postClose(final boolean abortRequested) {
471 try {
472 execOperation(false, new RegionOperation() {
473 @Override
474 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
475 throws IOException {
476 oserver.postClose(ctx, abortRequested);
477 }
478 public void postEnvCall(RegionEnvironment env) {
479 shutdown(env);
480 }
481 });
482 } catch (IOException e) {
483 LOG.warn(e);
484 }
485 }
486
487
488
489
490
491 public InternalScanner preCompactScannerOpen(final Store store,
492 final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
493 final CompactionRequest request) throws IOException {
494 return execOperationWithResult(null,
495 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
496 @Override
497 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
498 throws IOException {
499 setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType,
500 earliestPutTs, getResult(), request));
501 }
502 });
503 }
504
505
506
507
508
509
510
511
512
513
514 public boolean preCompactSelection(final Store store, final List<StoreFile> candidates,
515 final CompactionRequest request) throws IOException {
516 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
517 @Override
518 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
519 throws IOException {
520 oserver.preCompactSelection(ctx, store, candidates, request);
521 }
522 });
523 }
524
525
526
527
528
529
530
531
532 public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected,
533 final CompactionRequest request) {
534 try {
535 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
536 @Override
537 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
538 throws IOException {
539 oserver.postCompactSelection(ctx, store, selected, request);
540 }
541 });
542 } catch (IOException e) {
543 LOG.warn(e);
544 }
545 }
546
547
548
549
550
551
552
553
554
555 public InternalScanner preCompact(final Store store, final InternalScanner scanner,
556 final ScanType scanType, final CompactionRequest request) throws IOException {
557 return execOperationWithResult(false, scanner,
558 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
559 @Override
560 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
561 throws IOException {
562 setResult(oserver.preCompact(ctx, store, getResult(), scanType, request));
563 }
564 });
565 }
566
567
568
569
570
571
572
573
574 public void postCompact(final Store store, final StoreFile resultFile,
575 final CompactionRequest request) throws IOException {
576 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
577 @Override
578 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
579 throws IOException {
580 oserver.postCompact(ctx, store, resultFile, request);
581 }
582 });
583 }
584
585
586
587
588
589 public InternalScanner preFlush(final Store store, final InternalScanner scanner)
590 throws IOException {
591 return execOperationWithResult(false, scanner,
592 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
593 @Override
594 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
595 throws IOException {
596 setResult(oserver.preFlush(ctx, store, getResult()));
597 }
598 });
599 }
600
601
602
603
604
605 public void preFlush() throws IOException {
606 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
607 @Override
608 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
609 throws IOException {
610 oserver.preFlush(ctx);
611 }
612 });
613 }
614
615
616
617
618
619
620 public InternalScanner preFlushScannerOpen(final Store store,
621 final KeyValueScanner memstoreScanner) throws IOException {
622 return execOperationWithResult(null,
623 coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() {
624 @Override
625 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
626 throws IOException {
627 setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult()));
628 }
629 });
630 }
631
632
633
634
635
636 public void postFlush() throws IOException {
637 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
638 @Override
639 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
640 throws IOException {
641 oserver.postFlush(ctx);
642 }
643 });
644 }
645
646
647
648
649
650 public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
651 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
652 @Override
653 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
654 throws IOException {
655 oserver.postFlush(ctx, store, storeFile);
656 }
657 });
658 }
659
660
661
662
663
664
665 public void preSplit() throws IOException {
666 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
667 @Override
668 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
669 throws IOException {
670 oserver.preSplit(ctx);
671 }
672 });
673 }
674
675
676
677
678
679 public void preSplit(final byte[] splitRow) throws IOException {
680 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
681 @Override
682 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
683 throws IOException {
684 oserver.preSplit(ctx, splitRow);
685 }
686 });
687 }
688
689
690
691
692
693
694
695 public void postSplit(final Region l, final Region r) throws IOException {
696 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
697 @Override
698 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
699 throws IOException {
700 oserver.postSplit(ctx, l, r);
701 }
702 });
703 }
704
705 public boolean preSplitBeforePONR(final byte[] splitKey,
706 final List<Mutation> metaEntries) throws IOException {
707 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
708 @Override
709 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
710 throws IOException {
711 oserver.preSplitBeforePONR(ctx, splitKey, metaEntries);
712 }
713 });
714 }
715
716 public void preSplitAfterPONR() throws IOException {
717 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
718 @Override
719 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
720 throws IOException {
721 oserver.preSplitAfterPONR(ctx);
722 }
723 });
724 }
725
726
727
728
729
730 public void preRollBackSplit() throws IOException {
731 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
732 @Override
733 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
734 throws IOException {
735 oserver.preRollBackSplit(ctx);
736 }
737 });
738 }
739
740
741
742
743
744 public void postRollBackSplit() throws IOException {
745 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
746 @Override
747 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
748 throws IOException {
749 oserver.postRollBackSplit(ctx);
750 }
751 });
752 }
753
754
755
756
757
758 public void postCompleteSplit() throws IOException {
759 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
760 @Override
761 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
762 throws IOException {
763 oserver.postCompleteSplit(ctx);
764 }
765 });
766 }
767
768
769
770
771
772
773
774
775
776
777 public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
778 final Result result) throws IOException {
779 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
780 @Override
781 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
782 throws IOException {
783 oserver.preGetClosestRowBefore(ctx, row, family, result);
784 }
785 });
786 }
787
788
789
790
791
792
793
794 public void postGetClosestRowBefore(final byte[] row, final byte[] family,
795 final Result result) throws IOException {
796 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
797 @Override
798 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
799 throws IOException {
800 oserver.postGetClosestRowBefore(ctx, row, family, result);
801 }
802 });
803 }
804
805
806
807
808
809
810 public boolean preGet(final Get get, final List<Cell> results)
811 throws IOException {
812 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
813 @Override
814 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
815 throws IOException {
816 oserver.preGetOp(ctx, get, results);
817 }
818 });
819 }
820
821
822
823
824
825
826 public void postGet(final Get get, final List<Cell> results)
827 throws IOException {
828 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
829 @Override
830 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
831 throws IOException {
832 oserver.postGetOp(ctx, get, results);
833 }
834 });
835 }
836
837
838
839
840
841
842
843 public Boolean preExists(final Get get) throws IOException {
844 return execOperationWithResult(true, false,
845 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
846 @Override
847 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
848 throws IOException {
849 setResult(oserver.preExists(ctx, get, getResult()));
850 }
851 });
852 }
853
854
855
856
857
858
859
860 public boolean postExists(final Get get, boolean exists)
861 throws IOException {
862 return execOperationWithResult(exists,
863 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
864 @Override
865 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
866 throws IOException {
867 setResult(oserver.postExists(ctx, get, getResult()));
868 }
869 });
870 }
871
872
873
874
875
876
877
878
879 public boolean prePut(final Put put, final WALEdit edit, final Durability durability)
880 throws IOException {
881 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
882 @Override
883 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
884 throws IOException {
885 oserver.prePut(ctx, put, edit, durability);
886 }
887 });
888 }
889
890
891
892
893
894
895
896
897
898
899
900 public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation,
901 final Cell kv, final byte[] byteNow, final Get get) throws IOException {
902 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
903 @Override
904 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
905 throws IOException {
906 oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get);
907 }
908 });
909 }
910
911
912
913
914
915
916
917 public void postPut(final Put put, final WALEdit edit, final Durability durability)
918 throws IOException {
919 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
920 @Override
921 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
922 throws IOException {
923 oserver.postPut(ctx, put, edit, durability);
924 }
925 });
926 }
927
928
929
930
931
932
933
934
935 public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability)
936 throws IOException {
937 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
938 @Override
939 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
940 throws IOException {
941 oserver.preDelete(ctx, delete, edit, durability);
942 }
943 });
944 }
945
946
947
948
949
950
951
952 public void postDelete(final Delete delete, final WALEdit edit, final Durability durability)
953 throws IOException {
954 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
955 @Override
956 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
957 throws IOException {
958 oserver.postDelete(ctx, delete, edit, durability);
959 }
960 });
961 }
962
963
964
965
966
967
968 public boolean preBatchMutate(
969 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
970 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
971 @Override
972 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
973 throws IOException {
974 oserver.preBatchMutate(ctx, miniBatchOp);
975 }
976 });
977 }
978
979
980
981
982
983 public void postBatchMutate(
984 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
985 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
986 @Override
987 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
988 throws IOException {
989 oserver.postBatchMutate(ctx, miniBatchOp);
990 }
991 });
992 }
993
994 public void postBatchMutateIndispensably(
995 final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success)
996 throws IOException {
997 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
998 @Override
999 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1000 throws IOException {
1001 oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success);
1002 }
1003 });
1004 }
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017 public Boolean preCheckAndPut(final byte [] row, final byte [] family,
1018 final byte [] qualifier, final CompareOp compareOp,
1019 final ByteArrayComparable comparator, final Put put)
1020 throws IOException {
1021 return execOperationWithResult(true, false,
1022 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1023 @Override
1024 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1025 throws IOException {
1026 setResult(oserver.preCheckAndPut(ctx, row, family, qualifier,
1027 compareOp, comparator, put, getResult()));
1028 }
1029 });
1030 }
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043 public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family,
1044 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1045 final Put put) throws IOException {
1046 return execOperationWithResult(true, false,
1047 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1048 @Override
1049 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1050 throws IOException {
1051 setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier,
1052 compareOp, comparator, put, getResult()));
1053 }
1054 });
1055 }
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066 public boolean postCheckAndPut(final byte [] row, final byte [] family,
1067 final byte [] qualifier, final CompareOp compareOp,
1068 final ByteArrayComparable comparator, final Put put,
1069 boolean result) throws IOException {
1070 return execOperationWithResult(result,
1071 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1072 @Override
1073 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1074 throws IOException {
1075 setResult(oserver.postCheckAndPut(ctx, row, family, qualifier,
1076 compareOp, comparator, put, getResult()));
1077 }
1078 });
1079 }
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092 public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
1093 final byte [] qualifier, final CompareOp compareOp,
1094 final ByteArrayComparable comparator, final Delete delete)
1095 throws IOException {
1096 return execOperationWithResult(true, false,
1097 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1098 @Override
1099 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1100 throws IOException {
1101 setResult(oserver.preCheckAndDelete(ctx, row, family,
1102 qualifier, compareOp, comparator, delete, getResult()));
1103 }
1104 });
1105 }
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118 public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
1119 final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator,
1120 final Delete delete) throws IOException {
1121 return execOperationWithResult(true, false,
1122 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1123 @Override
1124 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1125 throws IOException {
1126 setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row,
1127 family, qualifier, compareOp, comparator, delete, getResult()));
1128 }
1129 });
1130 }
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141 public boolean postCheckAndDelete(final byte [] row, final byte [] family,
1142 final byte [] qualifier, final CompareOp compareOp,
1143 final ByteArrayComparable comparator, final Delete delete,
1144 boolean result) throws IOException {
1145 return execOperationWithResult(result,
1146 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1147 @Override
1148 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1149 throws IOException {
1150 setResult(oserver.postCheckAndDelete(ctx, row, family,
1151 qualifier, compareOp, comparator, delete, getResult()));
1152 }
1153 });
1154 }
1155
1156
1157
1158
1159
1160
1161
1162 public Result preAppend(final Append append) throws IOException {
1163 return execOperationWithResult(true, null,
1164 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1165 @Override
1166 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1167 throws IOException {
1168 setResult(oserver.preAppend(ctx, append));
1169 }
1170 });
1171 }
1172
1173
1174
1175
1176
1177
1178
1179 public Result preAppendAfterRowLock(final Append append) throws IOException {
1180 return execOperationWithResult(true, null,
1181 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1182 @Override
1183 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1184 throws IOException {
1185 setResult(oserver.preAppendAfterRowLock(ctx, append));
1186 }
1187 });
1188 }
1189
1190
1191
1192
1193
1194
1195
1196 public Result preIncrement(final Increment increment) throws IOException {
1197 return execOperationWithResult(true, null,
1198 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1199 @Override
1200 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1201 throws IOException {
1202 setResult(oserver.preIncrement(ctx, increment));
1203 }
1204 });
1205 }
1206
1207
1208
1209
1210
1211
1212
1213 public Result preIncrementAfterRowLock(final Increment increment) throws IOException {
1214 return execOperationWithResult(true, null,
1215 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1216 @Override
1217 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1218 throws IOException {
1219 setResult(oserver.preIncrementAfterRowLock(ctx, increment));
1220 }
1221 });
1222 }
1223
1224
1225
1226
1227
1228
1229 public void postAppend(final Append append, final Result result) throws IOException {
1230 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1231 @Override
1232 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1233 throws IOException {
1234 oserver.postAppend(ctx, append, result);
1235 }
1236 });
1237 }
1238
1239
1240
1241
1242
1243
1244 public Result postIncrement(final Increment increment, Result result) throws IOException {
1245 return execOperationWithResult(result,
1246 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() {
1247 @Override
1248 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1249 throws IOException {
1250 setResult(oserver.postIncrement(ctx, increment, getResult()));
1251 }
1252 });
1253 }
1254
1255
1256
1257
1258
1259
1260
1261 public RegionScanner preScannerOpen(final Scan scan) throws IOException {
1262 return execOperationWithResult(true, null,
1263 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1264 @Override
1265 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1266 throws IOException {
1267 setResult(oserver.preScannerOpen(ctx, scan, getResult()));
1268 }
1269 });
1270 }
1271
1272
1273
1274
1275
1276
1277 public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan,
1278 final NavigableSet<byte[]> targetCols) throws IOException {
1279 return execOperationWithResult(null,
1280 coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() {
1281 @Override
1282 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1283 throws IOException {
1284 setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult()));
1285 }
1286 });
1287 }
1288
1289
1290
1291
1292
1293
1294
1295 public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException {
1296 return execOperationWithResult(s,
1297 coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() {
1298 @Override
1299 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1300 throws IOException {
1301 setResult(oserver.postScannerOpen(ctx, scan, getResult()));
1302 }
1303 });
1304 }
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314 public Boolean preScannerNext(final InternalScanner s,
1315 final List<Result> results, final int limit) throws IOException {
1316 return execOperationWithResult(true, false,
1317 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1318 @Override
1319 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1320 throws IOException {
1321 setResult(oserver.preScannerNext(ctx, s, results, limit, getResult()));
1322 }
1323 });
1324 }
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334 public boolean postScannerNext(final InternalScanner s,
1335 final List<Result> results, final int limit, boolean hasMore)
1336 throws IOException {
1337 return execOperationWithResult(hasMore,
1338 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1339 @Override
1340 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1341 throws IOException {
1342 setResult(oserver.postScannerNext(ctx, s, results, limit, getResult()));
1343 }
1344 });
1345 }
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow,
1358 final int offset, final short length) throws IOException {
1359 return execOperationWithResult(true,
1360 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1361 @Override
1362 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1363 throws IOException {
1364 setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult()));
1365 }
1366 });
1367 }
1368
1369
1370
1371
1372
1373
1374 public boolean preScannerClose(final InternalScanner s) throws IOException {
1375 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1376 @Override
1377 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1378 throws IOException {
1379 oserver.preScannerClose(ctx, s);
1380 }
1381 });
1382 }
1383
1384
1385
1386
1387 public void postScannerClose(final InternalScanner s) throws IOException {
1388 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1389 @Override
1390 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1391 throws IOException {
1392 oserver.postScannerClose(ctx, s);
1393 }
1394 });
1395 }
1396
1397
1398
1399
1400
1401
1402
1403
1404 public boolean preWALRestore(final HRegionInfo info, final WALKey logKey,
1405 final WALEdit logEdit) throws IOException {
1406 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1407 @Override
1408 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1409 throws IOException {
1410
1411
1412 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1413 if (env.useLegacyPre) {
1414 if (logKey instanceof HLogKey) {
1415 oserver.preWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1416 } else {
1417 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1418 }
1419 } else {
1420 oserver.preWALRestore(ctx, info, logKey, logEdit);
1421 }
1422 }
1423 });
1424 }
1425
1426
1427
1428
1429
1430 @Deprecated
1431 public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey,
1432 final WALEdit logEdit) throws IOException {
1433 return preWALRestore(info, (WALKey)logKey, logEdit);
1434 }
1435
1436
1437
1438
1439
1440
1441
1442 public void postWALRestore(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
1443 throws IOException {
1444 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1445 @Override
1446 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1447 throws IOException {
1448
1449
1450 final RegionEnvironment env = (RegionEnvironment)ctx.getEnvironment();
1451 if (env.useLegacyPost) {
1452 if (logKey instanceof HLogKey) {
1453 oserver.postWALRestore(ctx, info, (HLogKey)logKey, logEdit);
1454 } else {
1455 legacyWarning(oserver.getClass(), "There are wal keys present that are not HLogKey.");
1456 }
1457 } else {
1458 oserver.postWALRestore(ctx, info, logKey, logEdit);
1459 }
1460 }
1461 });
1462 }
1463
1464
1465
1466
1467 @Deprecated
1468 public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit)
1469 throws IOException {
1470 postWALRestore(info, (WALKey)logKey, logEdit);
1471 }
1472
1473
1474
1475
1476
1477
1478 public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException {
1479 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1480 @Override
1481 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1482 throws IOException {
1483 oserver.preBulkLoadHFile(ctx, familyPaths);
1484 }
1485 });
1486 }
1487
1488 public boolean preCommitStoreFile(final byte[] family, final List<Pair<Path, Path>> pairs)
1489 throws IOException {
1490 return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1491 @Override
1492 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1493 throws IOException {
1494 oserver.preCommitStoreFile(ctx, family, pairs);
1495 }
1496 });
1497 }
1498 public void postCommitStoreFile(final byte[] family, final Path srcPath, final Path dstPath)
1499 throws IOException {
1500 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1501 @Override
1502 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1503 throws IOException {
1504 oserver.postCommitStoreFile(ctx, family, srcPath, dstPath);
1505 }
1506 });
1507 }
1508
1509
1510
1511
1512
1513
1514
1515
1516 public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths,
1517 final Map<byte[], List<Path>> map, boolean hasLoaded) throws IOException {
1518 return execOperationWithResult(hasLoaded,
1519 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() {
1520 @Override
1521 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1522 throws IOException {
1523 setResult(oserver.postBulkLoadHFile(ctx, familyPaths, map, getResult()));
1524 }
1525 });
1526 }
1527
1528 public void postStartRegionOperation(final Operation op) throws IOException {
1529 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1530 @Override
1531 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1532 throws IOException {
1533 oserver.postStartRegionOperation(ctx, op);
1534 }
1535 });
1536 }
1537
1538 public void postCloseRegionOperation(final Operation op) throws IOException {
1539 execOperation(coprocessors.isEmpty() ? null : new RegionOperation() {
1540 @Override
1541 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1542 throws IOException {
1543 oserver.postCloseRegionOperation(ctx, op);
1544 }
1545 });
1546 }
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559 public StoreFile.Reader preStoreFileReaderOpen(final FileSystem fs, final Path p,
1560 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1561 final Reference r) throws IOException {
1562 return execOperationWithResult(null,
1563 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1564 @Override
1565 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1566 throws IOException {
1567 setResult(oserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1568 }
1569 });
1570 }
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583 public StoreFile.Reader postStoreFileReaderOpen(final FileSystem fs, final Path p,
1584 final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
1585 final Reference r, final StoreFile.Reader reader) throws IOException {
1586 return execOperationWithResult(reader,
1587 coprocessors.isEmpty() ? null : new RegionOperationWithResult<StoreFile.Reader>() {
1588 @Override
1589 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1590 throws IOException {
1591 setResult(oserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, getResult()));
1592 }
1593 });
1594 }
1595
1596 public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
1597 final Cell oldCell, Cell newCell) throws IOException {
1598 return execOperationWithResult(newCell,
1599 coprocessors.isEmpty() ? null : new RegionOperationWithResult<Cell>() {
1600 @Override
1601 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1602 throws IOException {
1603 setResult(oserver.postMutationBeforeWAL(ctx, opType, mutation, oldCell, getResult()));
1604 }
1605 });
1606 }
1607
1608 public Message preEndpointInvocation(final Service service, final String methodName,
1609 Message request) throws IOException {
1610 return execOperationWithResult(request,
1611 coprocessors.isEmpty() ? null : new EndpointOperationWithResult<Message>() {
1612 @Override
1613 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1614 throws IOException {
1615 setResult(oserver.preEndpointInvocation(ctx, service, methodName, getResult()));
1616 }
1617 });
1618 }
1619
1620 public void postEndpointInvocation(final Service service, final String methodName,
1621 final Message request, final Message.Builder responseBuilder) throws IOException {
1622 execOperation(coprocessors.isEmpty() ? null : new EndpointOperation() {
1623 @Override
1624 public void call(EndpointObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1625 throws IOException {
1626 oserver.postEndpointInvocation(ctx, service, methodName, request, responseBuilder);
1627 }
1628 });
1629 }
1630
1631 public DeleteTracker postInstantiateDeleteTracker(DeleteTracker tracker) throws IOException {
1632 return execOperationWithResult(tracker,
1633 coprocessors.isEmpty() ? null : new RegionOperationWithResult<DeleteTracker>() {
1634 @Override
1635 public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx)
1636 throws IOException {
1637 setResult(oserver.postInstantiateDeleteTracker(ctx, getResult()));
1638 }
1639 });
1640 }
1641
1642 private static abstract class CoprocessorOperation
1643 extends ObserverContext<RegionCoprocessorEnvironment> {
1644 public abstract void call(Coprocessor observer,
1645 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1646 public abstract boolean hasCall(Coprocessor observer);
1647 public void postEnvCall(RegionEnvironment env) { }
1648 }
1649
1650 private static abstract class RegionOperation extends CoprocessorOperation {
1651 public abstract void call(RegionObserver observer,
1652 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1653
1654 public boolean hasCall(Coprocessor observer) {
1655 return observer instanceof RegionObserver;
1656 }
1657
1658 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1659 throws IOException {
1660 call((RegionObserver)observer, ctx);
1661 }
1662 }
1663
1664 private static abstract class RegionOperationWithResult<T> extends RegionOperation {
1665 private T result = null;
1666 public void setResult(final T result) { this.result = result; }
1667 public T getResult() { return this.result; }
1668 }
1669
1670 private static abstract class EndpointOperation extends CoprocessorOperation {
1671 public abstract void call(EndpointObserver observer,
1672 ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
1673
1674 public boolean hasCall(Coprocessor observer) {
1675 return observer instanceof EndpointObserver;
1676 }
1677
1678 public void call(Coprocessor observer, ObserverContext<RegionCoprocessorEnvironment> ctx)
1679 throws IOException {
1680 call((EndpointObserver)observer, ctx);
1681 }
1682 }
1683
1684 private static abstract class EndpointOperationWithResult<T> extends EndpointOperation {
1685 private T result = null;
1686 public void setResult(final T result) { this.result = result; }
1687 public T getResult() { return this.result; }
1688 }
1689
1690 private boolean execOperation(final CoprocessorOperation ctx)
1691 throws IOException {
1692 return execOperation(true, ctx);
1693 }
1694
1695 private <T> T execOperationWithResult(final T defaultValue,
1696 final RegionOperationWithResult<T> ctx) throws IOException {
1697 if (ctx == null) return defaultValue;
1698 ctx.setResult(defaultValue);
1699 execOperation(true, ctx);
1700 return ctx.getResult();
1701 }
1702
1703 private <T> T execOperationWithResult(final boolean ifBypass, final T defaultValue,
1704 final RegionOperationWithResult<T> ctx) throws IOException {
1705 boolean bypass = false;
1706 T result = defaultValue;
1707 if (ctx != null) {
1708 ctx.setResult(defaultValue);
1709 bypass = execOperation(true, ctx);
1710 result = ctx.getResult();
1711 }
1712 return bypass == ifBypass ? result : null;
1713 }
1714
1715 private <T> T execOperationWithResult(final T defaultValue,
1716 final EndpointOperationWithResult<T> ctx) throws IOException {
1717 if (ctx == null) return defaultValue;
1718 ctx.setResult(defaultValue);
1719 execOperation(true, ctx);
1720 return ctx.getResult();
1721 }
1722
1723 private boolean execOperation(final boolean earlyExit, final CoprocessorOperation ctx)
1724 throws IOException {
1725 boolean bypass = false;
1726 for (RegionEnvironment env: coprocessors) {
1727 Coprocessor observer = env.getInstance();
1728 if (ctx.hasCall(observer)) {
1729 ctx.prepare(env);
1730 Thread currentThread = Thread.currentThread();
1731 ClassLoader cl = currentThread.getContextClassLoader();
1732 try {
1733 currentThread.setContextClassLoader(env.getClassLoader());
1734 ctx.call(observer, ctx);
1735 } catch (Throwable e) {
1736 handleCoprocessorThrowable(env, e);
1737 } finally {
1738 currentThread.setContextClassLoader(cl);
1739 }
1740 bypass |= ctx.shouldBypass();
1741 if (earlyExit && ctx.shouldComplete()) {
1742 break;
1743 }
1744 }
1745
1746 ctx.postEnvCall(env);
1747 }
1748 return bypass;
1749 }
1750 }