1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.UnsupportedEncodingException;
26 import java.lang.reflect.Constructor;
27 import java.text.ParseException;
28 import java.util.AbstractList;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Comparator;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.NavigableMap;
41 import java.util.NavigableSet;
42 import java.util.RandomAccess;
43 import java.util.Set;
44 import java.util.TreeMap;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CompletionService;
47 import java.util.concurrent.ConcurrentHashMap;
48 import java.util.concurrent.ConcurrentMap;
49 import java.util.concurrent.ConcurrentSkipListMap;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.ExecutorCompletionService;
53 import java.util.concurrent.ExecutorService;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.Future;
56 import java.util.concurrent.FutureTask;
57 import java.util.concurrent.ThreadFactory;
58 import java.util.concurrent.ThreadPoolExecutor;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicInteger;
63 import java.util.concurrent.atomic.AtomicLong;
64 import java.util.concurrent.locks.Lock;
65 import java.util.concurrent.locks.ReentrantReadWriteLock;
66
67 import org.apache.commons.lang.RandomStringUtils;
68 import org.apache.commons.logging.Log;
69 import org.apache.commons.logging.LogFactory;
70 import org.apache.hadoop.conf.Configuration;
71 import org.apache.hadoop.fs.FileStatus;
72 import org.apache.hadoop.fs.FileSystem;
73 import org.apache.hadoop.fs.Path;
74 import org.apache.hadoop.hbase.Cell;
75 import org.apache.hadoop.hbase.CellScanner;
76 import org.apache.hadoop.hbase.CellUtil;
77 import org.apache.hadoop.hbase.CompoundConfiguration;
78 import org.apache.hadoop.hbase.DoNotRetryIOException;
79 import org.apache.hadoop.hbase.DroppedSnapshotException;
80 import org.apache.hadoop.hbase.HBaseConfiguration;
81 import org.apache.hadoop.hbase.HBaseIOException;
82 import org.apache.hadoop.hbase.HColumnDescriptor;
83 import org.apache.hadoop.hbase.HConstants;
84 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
85 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
86 import org.apache.hadoop.hbase.HRegionInfo;
87 import org.apache.hadoop.hbase.HTableDescriptor;
88 import org.apache.hadoop.hbase.KeyValue;
89 import org.apache.hadoop.hbase.KeyValueUtil;
90 import org.apache.hadoop.hbase.NamespaceDescriptor;
91 import org.apache.hadoop.hbase.NotServingRegionException;
92 import org.apache.hadoop.hbase.RegionTooBusyException;
93 import org.apache.hadoop.hbase.TableName;
94 import org.apache.hadoop.hbase.Tag;
95 import org.apache.hadoop.hbase.TagType;
96 import org.apache.hadoop.hbase.UnknownScannerException;
97 import org.apache.hadoop.hbase.backup.HFileArchiver;
98 import org.apache.hadoop.hbase.classification.InterfaceAudience;
99 import org.apache.hadoop.hbase.client.Append;
100 import org.apache.hadoop.hbase.client.Delete;
101 import org.apache.hadoop.hbase.client.Durability;
102 import org.apache.hadoop.hbase.client.Get;
103 import org.apache.hadoop.hbase.client.Increment;
104 import org.apache.hadoop.hbase.client.IsolationLevel;
105 import org.apache.hadoop.hbase.client.Mutation;
106 import org.apache.hadoop.hbase.client.Put;
107 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
108 import org.apache.hadoop.hbase.client.Result;
109 import org.apache.hadoop.hbase.client.RowMutations;
110 import org.apache.hadoop.hbase.client.Scan;
111 import org.apache.hadoop.hbase.conf.ConfigurationManager;
112 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
113 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
114 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
115 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
116 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
117 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
118 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
119 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
120 import org.apache.hadoop.hbase.filter.FilterWrapper;
121 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
122 import org.apache.hadoop.hbase.io.HeapSize;
123 import org.apache.hadoop.hbase.io.TimeRange;
124 import org.apache.hadoop.hbase.io.hfile.BlockCache;
125 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
126 import org.apache.hadoop.hbase.io.hfile.HFile;
127 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
128 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
129 import org.apache.hadoop.hbase.ipc.RpcCallContext;
130 import org.apache.hadoop.hbase.ipc.RpcServer;
131 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
132 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
133 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
134 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
135 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
137 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
138 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
139 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
140 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
146 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
147 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
148 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
149 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
150 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
152 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
153 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
154 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
155 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
156 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
157 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
158 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
159 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
160 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
161 import org.apache.hadoop.hbase.security.User;
162 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
163 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
164 import org.apache.hadoop.hbase.util.ByteStringer;
165 import org.apache.hadoop.hbase.util.Bytes;
166 import org.apache.hadoop.hbase.util.CancelableProgressable;
167 import org.apache.hadoop.hbase.util.ClassSize;
168 import org.apache.hadoop.hbase.util.CompressionTest;
169 import org.apache.hadoop.hbase.util.Counter;
170 import org.apache.hadoop.hbase.util.EncryptionTest;
171 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
172 import org.apache.hadoop.hbase.util.FSTableDescriptors;
173 import org.apache.hadoop.hbase.util.FSUtils;
174 import org.apache.hadoop.hbase.util.HashedBytes;
175 import org.apache.hadoop.hbase.util.Pair;
176 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
177 import org.apache.hadoop.hbase.util.Threads;
178 import org.apache.hadoop.hbase.wal.WAL;
179 import org.apache.hadoop.hbase.wal.WALFactory;
180 import org.apache.hadoop.hbase.wal.WALKey;
181 import org.apache.hadoop.hbase.wal.WALSplitter;
182 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
183 import org.apache.hadoop.io.MultipleIOException;
184 import org.apache.hadoop.util.StringUtils;
185 import org.apache.htrace.Trace;
186 import org.apache.htrace.TraceScope;
187
188 import com.google.common.annotations.VisibleForTesting;
189 import com.google.common.base.Optional;
190 import com.google.common.base.Preconditions;
191 import com.google.common.collect.Lists;
192 import com.google.common.collect.Maps;
193 import com.google.common.io.Closeables;
194 import com.google.protobuf.ByteString;
195 import com.google.protobuf.Descriptors;
196 import com.google.protobuf.Message;
197 import com.google.protobuf.RpcCallback;
198 import com.google.protobuf.RpcController;
199 import com.google.protobuf.Service;
200 import com.google.protobuf.TextFormat;
201
202 @InterfaceAudience.Private
203 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
204 public static final Log LOG = LogFactory.getLog(HRegion.class);
205
206 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
207 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
208
209
210
211
212
213
214
215 private final int maxWaitForSeqId;
216 private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
217 private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
218
219
220
221
222
223 private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
224
225 final AtomicBoolean closed = new AtomicBoolean(false);
226
227
228
229
230
231 final AtomicBoolean closing = new AtomicBoolean(false);
232
233
234
235
236
237 private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
238
239
240
241
242
243
244 private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
245
246
247
248
249
250
251
252
253
254
255
256
257 private final AtomicLong sequenceId = new AtomicLong(-1L);
258
259
260
261
262
263
264 protected volatile long lastReplayedOpenRegionSeqId = -1L;
265 protected volatile long lastReplayedCompactionSeqId = -1L;
266
267
268
269
270
271
272
273
274
275
276 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
277 new ConcurrentHashMap<HashedBytes, RowLockContext>();
278
279 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
280 Bytes.BYTES_RAWCOMPARATOR);
281
282
283 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
284
285 public final AtomicLong memstoreSize = new AtomicLong(0);
286
287
288 final Counter numMutationsWithoutWAL = new Counter();
289 final Counter dataInMemoryWithoutWAL = new Counter();
290
291
292 final Counter checkAndMutateChecksPassed = new Counter();
293 final Counter checkAndMutateChecksFailed = new Counter();
294
295
296 final Counter readRequestsCount = new Counter();
297 final Counter writeRequestsCount = new Counter();
298
299
300 private final Counter blockedRequestsCount = new Counter();
301
302
303 final AtomicLong compactionsFinished = new AtomicLong(0L);
304 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
305 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
306
307 private final WAL wal;
308 private final HRegionFileSystem fs;
309 protected final Configuration conf;
310 private final Configuration baseConf;
311 private final KeyValue.KVComparator comparator;
312 private final int rowLockWaitDuration;
313 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
314
315
316
317
318
319
320
321 final long busyWaitDuration;
322 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
323
324
325
326
327 final int maxBusyWaitMultiplier;
328
329
330
331 final long maxBusyWaitDuration;
332
333
334 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
335 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
336
337 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
338
339
340
341
342 private long openSeqNum = HConstants.NO_SEQNUM;
343
344
345
346
347
348 private boolean isLoadingCfsOnDemandDefault = false;
349
350 private final AtomicInteger majorInProgress = new AtomicInteger(0);
351 private final AtomicInteger minorInProgress = new AtomicInteger(0);
352
353
354
355
356
357
358
359 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
360
361
362 private PrepareFlushResult prepareFlushResult = null;
363
364
365
366
367 private boolean disallowWritesInRecovering = false;
368
369
370 private volatile boolean isRecovering = false;
371
372 private volatile Optional<ConfigurationManager> configurationManager;
373
374
375
376
377
378
379 public long getSmallestReadPoint() {
380 long minimumReadPoint;
381
382
383
384 synchronized(scannerReadPoints) {
385 minimumReadPoint = mvcc.memstoreReadPoint();
386
387 for (Long readPoint: this.scannerReadPoints.values()) {
388 if (readPoint < minimumReadPoint) {
389 minimumReadPoint = readPoint;
390 }
391 }
392 }
393 return minimumReadPoint;
394 }
395
396
397
398
399
400 static class WriteState {
401
402 volatile boolean flushing = false;
403
404 volatile boolean flushRequested = false;
405
406 volatile int compacting = 0;
407
408 volatile boolean writesEnabled = true;
409
410 volatile boolean readOnly = false;
411
412
413 volatile boolean readsEnabled = true;
414
415
416
417
418
419
420 synchronized void setReadOnly(final boolean onOff) {
421 this.writesEnabled = !onOff;
422 this.readOnly = onOff;
423 }
424
425 boolean isReadOnly() {
426 return this.readOnly;
427 }
428
429 boolean isFlushRequested() {
430 return this.flushRequested;
431 }
432
433 void setReadsEnabled(boolean readsEnabled) {
434 this.readsEnabled = readsEnabled;
435 }
436
437 static final long HEAP_SIZE = ClassSize.align(
438 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
439 }
440
441
442
443
444
445
446
447 public static class FlushResultImpl implements FlushResult {
448 final Result result;
449 final String failureReason;
450 final long flushSequenceId;
451 final boolean wroteFlushWalMarker;
452
453
454
455
456
457
458
459
460 FlushResultImpl(Result result, long flushSequenceId) {
461 this(result, flushSequenceId, null, false);
462 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
463 .FLUSHED_COMPACTION_NEEDED;
464 }
465
466
467
468
469
470
471 FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
472 this(result, -1, failureReason, wroteFlushMarker);
473 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
474 }
475
476
477
478
479
480
481
482 FlushResultImpl(Result result, long flushSequenceId, String failureReason,
483 boolean wroteFlushMarker) {
484 this.result = result;
485 this.flushSequenceId = flushSequenceId;
486 this.failureReason = failureReason;
487 this.wroteFlushWalMarker = wroteFlushMarker;
488 }
489
490
491
492
493
494
495 @Override
496 public boolean isFlushSucceeded() {
497 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
498 .FLUSHED_COMPACTION_NEEDED;
499 }
500
501
502
503
504
505 @Override
506 public boolean isCompactionNeeded() {
507 return result == Result.FLUSHED_COMPACTION_NEEDED;
508 }
509
510 @Override
511 public String toString() {
512 return new StringBuilder()
513 .append("flush result:").append(result).append(", ")
514 .append("failureReason:").append(failureReason).append(",")
515 .append("flush seq id").append(flushSequenceId).toString();
516 }
517
518 @Override
519 public Result getResult() {
520 return result;
521 }
522 }
523
524
525 @VisibleForTesting
526 static class PrepareFlushResult {
527 final FlushResult result;
528 final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
529 final TreeMap<byte[], List<Path>> committedFiles;
530 final TreeMap<byte[], Long> storeFlushableSize;
531 final long startTime;
532 final long flushOpSeqId;
533 final long flushedSeqId;
534 final long totalFlushableSize;
535
536
537 PrepareFlushResult(FlushResult result, long flushSeqId) {
538 this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
539 }
540
541
542 PrepareFlushResult(
543 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
544 TreeMap<byte[], List<Path>> committedFiles,
545 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
546 long flushedSeqId, long totalFlushableSize) {
547 this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
548 flushSeqId, flushedSeqId, totalFlushableSize);
549 }
550
551 private PrepareFlushResult(
552 FlushResult result,
553 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
554 TreeMap<byte[], List<Path>> committedFiles,
555 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
556 long flushedSeqId, long totalFlushableSize) {
557 this.result = result;
558 this.storeFlushCtxs = storeFlushCtxs;
559 this.committedFiles = committedFiles;
560 this.storeFlushableSize = storeFlushableSize;
561 this.startTime = startTime;
562 this.flushOpSeqId = flushSeqId;
563 this.flushedSeqId = flushedSeqId;
564 this.totalFlushableSize = totalFlushableSize;
565 }
566
567 public FlushResult getResult() {
568 return this.result;
569 }
570 }
571
572
573
574
575 static class ObservedExceptionsInBatch {
576 private boolean wrongRegion = false;
577 private boolean failedSanityCheck = false;
578 private boolean wrongFamily = false;
579
580
581
582
583 boolean hasSeenWrongRegion() {
584 return wrongRegion;
585 }
586
587
588
589
590 void sawWrongRegion() {
591 wrongRegion = true;
592 }
593
594
595
596
597 boolean hasSeenFailedSanityCheck() {
598 return failedSanityCheck;
599 }
600
601
602
603
604 void sawFailedSanityCheck() {
605 failedSanityCheck = true;
606 }
607
608
609
610
611 boolean hasSeenNoSuchFamily() {
612 return wrongFamily;
613 }
614
615
616
617
618 void sawNoSuchFamily() {
619 wrongFamily = true;
620 }
621 }
622
623 final WriteState writestate = new WriteState();
624
625 long memstoreFlushSize;
626 final long timestampSlop;
627 final long rowProcessorTimeout;
628
629
630 private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
631 new ConcurrentHashMap<Store, Long>();
632
633 final RegionServerServices rsServices;
634 private RegionServerAccounting rsAccounting;
635 private long flushCheckInterval;
636
637 private long flushPerChanges;
638 private long blockingMemStoreSize;
639 final long threadWakeFrequency;
640
641 final ReentrantReadWriteLock lock =
642 new ReentrantReadWriteLock();
643
644
645 private final ReentrantReadWriteLock updatesLock =
646 new ReentrantReadWriteLock();
647 private boolean splitRequest;
648 private byte[] explicitSplitPoint = null;
649
650 private final MultiVersionConsistencyControl mvcc =
651 new MultiVersionConsistencyControl();
652
653
654 private RegionCoprocessorHost coprocessorHost;
655
656 private HTableDescriptor htableDescriptor = null;
657 private RegionSplitPolicy splitPolicy;
658 private FlushPolicy flushPolicy;
659
660 private final MetricsRegion metricsRegion;
661 private final MetricsRegionWrapperImpl metricsRegionWrapper;
662 private final Durability durability;
663 private final boolean regionStatsEnabled;
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685 @Deprecated
686 public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
687 final Configuration confParam, final HRegionInfo regionInfo,
688 final HTableDescriptor htd, final RegionServerServices rsServices) {
689 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
690 wal, confParam, htd, rsServices);
691 }
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709 public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
710 final HTableDescriptor htd, final RegionServerServices rsServices) {
711 if (htd == null) {
712 throw new IllegalArgumentException("Need table descriptor");
713 }
714
715 if (confParam instanceof CompoundConfiguration) {
716 throw new IllegalArgumentException("Need original base configuration");
717 }
718
719 this.comparator = fs.getRegionInfo().getComparator();
720 this.wal = wal;
721 this.fs = fs;
722
723
724 this.baseConf = confParam;
725 this.conf = new CompoundConfiguration()
726 .add(confParam)
727 .addStringMap(htd.getConfiguration())
728 .addWritableMap(htd.getValues());
729 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
730 DEFAULT_CACHE_FLUSH_INTERVAL);
731 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
732 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
733 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
734 + MAX_FLUSH_PER_CHANGES);
735 }
736 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
737 DEFAULT_ROWLOCK_WAIT_DURATION);
738
739 this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
740 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
741 this.htableDescriptor = htd;
742 this.rsServices = rsServices;
743 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
744 setHTableSpecificConf();
745 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
746
747 this.busyWaitDuration = conf.getLong(
748 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
749 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
750 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
751 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
752 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
753 + maxBusyWaitMultiplier + "). Their product should be positive");
754 }
755 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
756 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
757
758
759
760
761
762
763
764 this.timestampSlop = conf.getLong(
765 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
766 HConstants.LATEST_TIMESTAMP);
767
768
769
770
771
772 this.rowProcessorTimeout = conf.getLong(
773 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
774 this.durability = htd.getDurability() == Durability.USE_DEFAULT
775 ? DEFAULT_DURABILITY
776 : htd.getDurability();
777 if (rsServices != null) {
778 this.rsAccounting = this.rsServices.getRegionServerAccounting();
779
780
781 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
782 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
783 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
784
785 Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
786 String encodedName = getRegionInfo().getEncodedName();
787 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
788 this.isRecovering = true;
789 recoveringRegions.put(encodedName, this);
790 }
791 } else {
792 this.metricsRegionWrapper = null;
793 this.metricsRegion = null;
794 }
795 if (LOG.isDebugEnabled()) {
796
797 LOG.debug("Instantiated " + this);
798 }
799
800
801 this.disallowWritesInRecovering =
802 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
803 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
804 configurationManager = Optional.absent();
805
806
807 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
808 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
809 false :
810 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
811 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
812 }
813
814 void setHTableSpecificConf() {
815 if (this.htableDescriptor == null) return;
816 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
817
818 if (flushSize <= 0) {
819 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
820 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
821 }
822 this.memstoreFlushSize = flushSize;
823 this.blockingMemStoreSize = this.memstoreFlushSize *
824 conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
825 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
826 }
827
828
829
830
831
832
833
834
835
836 @Deprecated
837 public long initialize() throws IOException {
838 return initialize(null);
839 }
840
841
842
843
844
845
846
847
848 private long initialize(final CancelableProgressable reporter) throws IOException {
849 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
850 long nextSeqId = -1;
851 try {
852 nextSeqId = initializeRegionInternals(reporter, status);
853 return nextSeqId;
854 } finally {
855
856
857 if (nextSeqId == -1) {
858 status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
859 " initialization.");
860 }
861 }
862 }
863
864 private long initializeRegionInternals(final CancelableProgressable reporter,
865 final MonitoredTask status) throws IOException {
866 if (coprocessorHost != null) {
867 status.setStatus("Running coprocessor pre-open hook");
868 coprocessorHost.preOpen();
869 }
870
871
872
873 if (this.getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
874 status.setStatus("Writing region info on filesystem");
875 fs.checkRegionInfoOnFilesystem();
876 } else {
877 if (LOG.isDebugEnabled()) {
878 LOG.debug("Skipping creation of .regioninfo file for " + this.getRegionInfo());
879 }
880 }
881
882
883 status.setStatus("Initializing all the Stores");
884 long maxSeqId = initializeRegionStores(reporter, status, false);
885 this.lastReplayedOpenRegionSeqId = maxSeqId;
886
887 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
888 this.writestate.flushRequested = false;
889 this.writestate.compacting = 0;
890
891 if (this.writestate.writesEnabled) {
892
893 status.setStatus("Cleaning up temporary data from old regions");
894 fs.cleanupTempDir();
895 }
896
897 if (this.writestate.writesEnabled) {
898 status.setStatus("Cleaning up detritus from prior splits");
899
900
901
902 fs.cleanupAnySplitDetritus();
903 fs.cleanupMergesDir();
904 }
905
906
907 this.splitPolicy = RegionSplitPolicy.create(this, conf);
908
909
910 this.flushPolicy = FlushPolicyFactory.create(this, conf);
911
912 long lastFlushTime = EnvironmentEdgeManager.currentTime();
913 for (Store store: stores.values()) {
914 this.lastStoreFlushTimeMap.put(store, lastFlushTime);
915 }
916
917
918
919 long nextSeqid = maxSeqId;
920
921
922
923
924 if (this.writestate.writesEnabled) {
925 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
926 .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1));
927 } else {
928 nextSeqid++;
929 }
930
931 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
932 "; next sequenceid=" + nextSeqid);
933
934
935 this.closing.set(false);
936 this.closed.set(false);
937
938 if (coprocessorHost != null) {
939 status.setStatus("Running coprocessor post-open hooks");
940 coprocessorHost.postOpen();
941 }
942
943 status.markComplete("Region opened successfully");
944 return nextSeqid;
945 }
946
947 private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status,
948 boolean warmupOnly)
949 throws IOException {
950
951
952
953 long maxSeqId = -1;
954
955 long maxMemstoreTS = -1;
956
957 if (!htableDescriptor.getFamilies().isEmpty()) {
958
959 ThreadPoolExecutor storeOpenerThreadPool =
960 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
961 CompletionService<HStore> completionService =
962 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
963
964
965 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
966 status.setStatus("Instantiating store for column family " + family);
967 completionService.submit(new Callable<HStore>() {
968 @Override
969 public HStore call() throws IOException {
970 return instantiateHStore(family);
971 }
972 });
973 }
974 boolean allStoresOpened = false;
975 try {
976 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
977 Future<HStore> future = completionService.take();
978 HStore store = future.get();
979 this.stores.put(store.getFamily().getName(), store);
980
981 long storeMaxSequenceId = store.getMaxSequenceId();
982 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
983 storeMaxSequenceId);
984 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
985 maxSeqId = storeMaxSequenceId;
986 }
987 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
988 if (maxStoreMemstoreTS > maxMemstoreTS) {
989 maxMemstoreTS = maxStoreMemstoreTS;
990 }
991 }
992 allStoresOpened = true;
993 } catch (InterruptedException e) {
994 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
995 } catch (ExecutionException e) {
996 throw new IOException(e.getCause());
997 } finally {
998 storeOpenerThreadPool.shutdownNow();
999 if (!allStoresOpened) {
1000
1001 LOG.error("Could not initialize all stores for the region=" + this);
1002 for (Store store : this.stores.values()) {
1003 try {
1004 store.close();
1005 } catch (IOException e) {
1006 LOG.warn(e.getMessage());
1007 }
1008 }
1009 }
1010 }
1011 }
1012 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this) && !warmupOnly) {
1013
1014 maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
1015 this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
1016 }
1017 maxSeqId = Math.max(maxSeqId, maxMemstoreTS + 1);
1018 mvcc.initialize(maxSeqId);
1019 return maxSeqId;
1020 }
1021
1022 private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
1023 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
1024
1025
1026 status.setStatus("Warming up all the Stores");
1027 initializeRegionStores(reporter, status, true);
1028 }
1029
1030
1031
1032
1033 private NavigableMap<byte[], List<Path>> getStoreFiles() {
1034 NavigableMap<byte[], List<Path>> allStoreFiles =
1035 new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
1036 for (Store store: getStores()) {
1037 Collection<StoreFile> storeFiles = store.getStorefiles();
1038 if (storeFiles == null) continue;
1039 List<Path> storeFileNames = new ArrayList<Path>();
1040 for (StoreFile storeFile: storeFiles) {
1041 storeFileNames.add(storeFile.getPath());
1042 }
1043 allStoreFiles.put(store.getFamily().getName(), storeFileNames);
1044 }
1045 return allStoreFiles;
1046 }
1047
1048 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
1049 Map<byte[], List<Path>> storeFiles = getStoreFiles();
1050 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1051 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
1052 getRegionServerServices().getServerName(), storeFiles);
1053 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc,
1054 getSequenceId());
1055 }
1056
1057 private void writeRegionCloseMarker(WAL wal) throws IOException {
1058 Map<byte[], List<Path>> storeFiles = getStoreFiles();
1059 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
1060 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), getSequenceId().get(),
1061 getRegionServerServices().getServerName(), storeFiles);
1062 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc,
1063 getSequenceId());
1064
1065
1066
1067
1068 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
1069 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
1070 getSequenceId().get(), 0);
1071 }
1072 }
1073
1074
1075
1076
1077 public boolean hasReferences() {
1078 for (Store store : this.stores.values()) {
1079 if (store.hasReferences()) return true;
1080 }
1081 return false;
1082 }
1083
1084 @Override
1085 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1086 HDFSBlocksDistribution hdfsBlocksDistribution =
1087 new HDFSBlocksDistribution();
1088 synchronized (this.stores) {
1089 for (Store store : this.stores.values()) {
1090 Collection<StoreFile> storeFiles = store.getStorefiles();
1091 if (storeFiles == null) continue;
1092 for (StoreFile sf : storeFiles) {
1093 HDFSBlocksDistribution storeFileBlocksDistribution =
1094 sf.getHDFSBlockDistribution();
1095 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1096 }
1097 }
1098 }
1099 return hdfsBlocksDistribution;
1100 }
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1111 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1112 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1113 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1114 }
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1126 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
1127 throws IOException {
1128 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1129 FileSystem fs = tablePath.getFileSystem(conf);
1130
1131 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1132 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1133 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1134 if (storeFiles == null) continue;
1135 for (StoreFileInfo storeFileInfo : storeFiles) {
1136 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1137 }
1138 }
1139 return hdfsBlocksDistribution;
1140 }
1141
1142
1143
1144
1145
1146
1147 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1148 if (this.rsAccounting != null) {
1149 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1150 }
1151 long size = this.memstoreSize.addAndGet(memStoreSize);
1152
1153
1154
1155 if (HRegionInfo.DEFAULT_REPLICA_ID == this.getRegionInfo().getReplicaId() && size < 0) {
1156 LOG.error("Asked to modify this region's (" + this.toString()
1157 + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
1158 + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
1159 }
1160 return size;
1161 }
1162
1163 @Override
1164 public HRegionInfo getRegionInfo() {
1165 return this.fs.getRegionInfo();
1166 }
1167
1168
1169
1170
1171
1172 RegionServerServices getRegionServerServices() {
1173 return this.rsServices;
1174 }
1175
1176 @Override
1177 public long getReadRequestsCount() {
1178 return readRequestsCount.get();
1179 }
1180
1181 @Override
1182 public void updateReadRequestsCount(long i) {
1183 readRequestsCount.add(i);
1184 }
1185
1186 @Override
1187 public long getWriteRequestsCount() {
1188 return writeRequestsCount.get();
1189 }
1190
1191 @Override
1192 public void updateWriteRequestsCount(long i) {
1193 writeRequestsCount.add(i);
1194 }
1195
1196 @Override
1197 public long getMemstoreSize() {
1198 return memstoreSize.get();
1199 }
1200
1201 @Override
1202 public long getNumMutationsWithoutWAL() {
1203 return numMutationsWithoutWAL.get();
1204 }
1205
1206 @Override
1207 public long getDataInMemoryWithoutWAL() {
1208 return dataInMemoryWithoutWAL.get();
1209 }
1210
1211 @Override
1212 public long getBlockedRequestsCount() {
1213 return blockedRequestsCount.get();
1214 }
1215
1216 @Override
1217 public long getCheckAndMutateChecksPassed() {
1218 return checkAndMutateChecksPassed.get();
1219 }
1220
1221 @Override
1222 public long getCheckAndMutateChecksFailed() {
1223 return checkAndMutateChecksFailed.get();
1224 }
1225
1226 @Override
1227 public MetricsRegion getMetrics() {
1228 return metricsRegion;
1229 }
1230
1231 @Override
1232 public boolean isClosed() {
1233 return this.closed.get();
1234 }
1235
1236 @Override
1237 public boolean isClosing() {
1238 return this.closing.get();
1239 }
1240
1241 @Override
1242 public boolean isReadOnly() {
1243 return this.writestate.isReadOnly();
1244 }
1245
1246
1247
1248
1249 public void setRecovering(boolean newState) {
1250 boolean wasRecovering = this.isRecovering;
1251
1252
1253 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1254 && wasRecovering && !newState) {
1255
1256
1257 boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1258
1259
1260 MonitoredTask status = TaskMonitor.get().createStatus(
1261 "Flushing region " + this + " because recovery is finished");
1262 try {
1263 if (forceFlush) {
1264 internalFlushcache(status);
1265 }
1266
1267 status.setStatus("Writing region open event marker to WAL because recovery is finished");
1268 try {
1269 long seqId = openSeqNum;
1270
1271 if (wal != null) {
1272 seqId = getNextSequenceId(wal);
1273 }
1274 writeRegionOpenMarker(wal, seqId);
1275 } catch (IOException e) {
1276
1277
1278 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1279 + "event to WAL, continueing", e);
1280 }
1281 } catch (IOException ioe) {
1282
1283
1284 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1285 + "event to WAL, continueing", ioe);
1286 } finally {
1287 status.cleanup();
1288 }
1289 }
1290
1291 this.isRecovering = newState;
1292 if (wasRecovering && !isRecovering) {
1293
1294 coprocessorHost.postLogReplay();
1295 }
1296 }
1297
1298 @Override
1299 public boolean isRecovering() {
1300 return this.isRecovering;
1301 }
1302
1303 @Override
1304 public boolean isAvailable() {
1305 return !isClosed() && !isClosing();
1306 }
1307
1308
1309 public boolean isSplittable() {
1310 return isAvailable() && !hasReferences();
1311 }
1312
1313
1314
1315
1316 public boolean isMergeable() {
1317 if (!isAvailable()) {
1318 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1319 + " is not mergeable because it is closing or closed");
1320 return false;
1321 }
1322 if (hasReferences()) {
1323 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1324 + " is not mergeable because it has references");
1325 return false;
1326 }
1327
1328 return true;
1329 }
1330
1331 public boolean areWritesEnabled() {
1332 synchronized(this.writestate) {
1333 return this.writestate.writesEnabled;
1334 }
1335 }
1336
1337 public MultiVersionConsistencyControl getMVCC() {
1338 return mvcc;
1339 }
1340
1341 @Override
1342 public long getMaxFlushedSeqId() {
1343 return maxFlushedSeqId;
1344 }
1345
1346 @Override
1347 public long getReadpoint(IsolationLevel isolationLevel) {
1348 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1349
1350 return Long.MAX_VALUE;
1351 }
1352 return mvcc.memstoreReadPoint();
1353 }
1354
1355 @Override
1356 public boolean isLoadingCfsOnDemandDefault() {
1357 return this.isLoadingCfsOnDemandDefault;
1358 }
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376 public Map<byte[], List<StoreFile>> close() throws IOException {
1377 return close(false);
1378 }
1379
1380 private final Object closeLock = new Object();
1381
1382
1383 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1384 "hbase.regionserver.optionalcacheflushinterval";
1385
1386 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1387 public static final int META_CACHE_FLUSH_INTERVAL = 300000;
1388
1389
1390 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1391 "hbase.regionserver.flush.per.changes";
1392 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1393
1394
1395
1396
1397 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1417
1418
1419 MonitoredTask status = TaskMonitor.get().createStatus(
1420 "Closing region " + this +
1421 (abort ? " due to abort" : ""));
1422
1423 status.setStatus("Waiting for close lock");
1424 try {
1425 synchronized (closeLock) {
1426 return doClose(abort, status);
1427 }
1428 } finally {
1429 status.cleanup();
1430 }
1431 }
1432
1433
1434
1435
1436 @VisibleForTesting
1437 public void setClosing(boolean closing) {
1438 this.closing.set(closing);
1439 }
1440
1441 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1442 throws IOException {
1443 if (isClosed()) {
1444 LOG.warn("Region " + this + " already closed");
1445 return null;
1446 }
1447
1448 if (coprocessorHost != null) {
1449 status.setStatus("Running coprocessor pre-close hooks");
1450 this.coprocessorHost.preClose(abort);
1451 }
1452
1453 status.setStatus("Disabling compacts and flushes for region");
1454 boolean canFlush = true;
1455 synchronized (writestate) {
1456
1457
1458 canFlush = !writestate.readOnly;
1459 writestate.writesEnabled = false;
1460 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1461 waitForFlushesAndCompactions();
1462 }
1463
1464
1465
1466 if (!abort && worthPreFlushing() && canFlush) {
1467 status.setStatus("Pre-flushing region before close");
1468 LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1469 try {
1470 internalFlushcache(status);
1471 } catch (IOException ioe) {
1472
1473 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1474 }
1475 }
1476
1477
1478 lock.writeLock().lock();
1479 this.closing.set(true);
1480 status.setStatus("Disabling writes for close");
1481 try {
1482 if (this.isClosed()) {
1483 status.abort("Already got closed by another process");
1484
1485 return null;
1486 }
1487 LOG.debug("Updates disabled for region " + this);
1488
1489 if (!abort && canFlush) {
1490 int flushCount = 0;
1491 while (this.memstoreSize.get() > 0) {
1492 try {
1493 if (flushCount++ > 0) {
1494 int actualFlushes = flushCount - 1;
1495 if (actualFlushes > 5) {
1496
1497
1498 throw new DroppedSnapshotException("Failed clearing memory after " +
1499 actualFlushes + " attempts on region: " +
1500 Bytes.toStringBinary(getRegionInfo().getRegionName()));
1501 }
1502 LOG.info("Running extra flush, " + actualFlushes +
1503 " (carrying snapshot?) " + this);
1504 }
1505 internalFlushcache(status);
1506 } catch (IOException ioe) {
1507 status.setStatus("Failed flush " + this + ", putting online again");
1508 synchronized (writestate) {
1509 writestate.writesEnabled = true;
1510 }
1511
1512 throw ioe;
1513 }
1514 }
1515 }
1516
1517 Map<byte[], List<StoreFile>> result =
1518 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1519 if (!stores.isEmpty()) {
1520
1521 ThreadPoolExecutor storeCloserThreadPool =
1522 getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1523 getRegionInfo().getRegionNameAsString());
1524 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1525 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1526
1527
1528 for (final Store store : stores.values()) {
1529 long flushableSize = store.getFlushableSize();
1530 if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1531 getRegionServerServices().abort("Assertion failed while closing store "
1532 + getRegionInfo().getRegionNameAsString() + " " + store
1533 + ". flushableSize expected=0, actual= " + flushableSize
1534 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1535 + "operation failed and left the memstore in a partially updated state.", null);
1536 }
1537 completionService
1538 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1539 @Override
1540 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1541 return new Pair<byte[], Collection<StoreFile>>(
1542 store.getFamily().getName(), store.close());
1543 }
1544 });
1545 }
1546 try {
1547 for (int i = 0; i < stores.size(); i++) {
1548 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1549 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1550 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1551 if (familyFiles == null) {
1552 familyFiles = new ArrayList<StoreFile>();
1553 result.put(storeFiles.getFirst(), familyFiles);
1554 }
1555 familyFiles.addAll(storeFiles.getSecond());
1556 }
1557 } catch (InterruptedException e) {
1558 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1559 } catch (ExecutionException e) {
1560 throw new IOException(e.getCause());
1561 } finally {
1562 storeCloserThreadPool.shutdownNow();
1563 }
1564 }
1565
1566 status.setStatus("Writing region close event to WAL");
1567 if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1568 writeRegionCloseMarker(wal);
1569 }
1570
1571 this.closed.set(true);
1572 if (!canFlush) {
1573 addAndGetGlobalMemstoreSize(-memstoreSize.get());
1574 } else if (memstoreSize.get() != 0) {
1575 LOG.error("Memstore size is " + memstoreSize.get());
1576 }
1577 if (coprocessorHost != null) {
1578 status.setStatus("Running coprocessor post-close hooks");
1579 this.coprocessorHost.postClose(abort);
1580 }
1581 if (this.metricsRegion != null) {
1582 this.metricsRegion.close();
1583 }
1584 if (this.metricsRegionWrapper != null) {
1585 Closeables.closeQuietly(this.metricsRegionWrapper);
1586 }
1587 status.markComplete("Closed");
1588 LOG.info("Closed " + this);
1589 return result;
1590 } finally {
1591 lock.writeLock().unlock();
1592 }
1593 }
1594
1595 @Override
1596 public void waitForFlushesAndCompactions() {
1597 synchronized (writestate) {
1598 if (this.writestate.readOnly) {
1599
1600
1601 return;
1602 }
1603 boolean interrupted = false;
1604 try {
1605 while (writestate.compacting > 0 || writestate.flushing) {
1606 LOG.debug("waiting for " + writestate.compacting + " compactions"
1607 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1608 try {
1609 writestate.wait();
1610 } catch (InterruptedException iex) {
1611
1612 LOG.warn("Interrupted while waiting");
1613 interrupted = true;
1614 }
1615 }
1616 } finally {
1617 if (interrupted) {
1618 Thread.currentThread().interrupt();
1619 }
1620 }
1621 }
1622 }
1623
1624 public void waitForFlushes() {
1625 synchronized (writestate) {
1626 if (this.writestate.readOnly) {
1627
1628
1629 return;
1630 }
1631 if (!writestate.flushing) return;
1632 long start = System.currentTimeMillis();
1633 boolean interrupted = false;
1634 try {
1635 while (writestate.flushing) {
1636 LOG.debug("waiting for cache flush to complete for region " + this);
1637 try {
1638 writestate.wait();
1639 } catch (InterruptedException iex) {
1640
1641 LOG.warn("Interrupted while waiting");
1642 interrupted = true;
1643 }
1644 }
1645 } finally {
1646 if (interrupted) {
1647 Thread.currentThread().interrupt();
1648 }
1649 }
1650 long duration = System.currentTimeMillis() - start;
1651 LOG.debug("Waited " + duration + " ms for flush to complete");
1652 }
1653 }
1654 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1655 final String threadNamePrefix) {
1656 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1657 int maxThreads = Math.min(numStores,
1658 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1659 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1660 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1661 }
1662
1663 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1664 final String threadNamePrefix) {
1665 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1666 int maxThreads = Math.max(1,
1667 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1668 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1669 / numStores);
1670 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1671 }
1672
1673 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1674 final String threadNamePrefix) {
1675 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1676 new ThreadFactory() {
1677 private int count = 1;
1678
1679 @Override
1680 public Thread newThread(Runnable r) {
1681 return new Thread(r, threadNamePrefix + "-" + count++);
1682 }
1683 });
1684 }
1685
1686
1687
1688
1689 private boolean worthPreFlushing() {
1690 return this.memstoreSize.get() >
1691 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1692 }
1693
1694
1695
1696
1697
1698 @Override
1699 public HTableDescriptor getTableDesc() {
1700 return this.htableDescriptor;
1701 }
1702
1703
1704 public WAL getWAL() {
1705 return this.wal;
1706 }
1707
1708
1709
1710
1711
1712
1713
1714
1715 Configuration getBaseConf() {
1716 return this.baseConf;
1717 }
1718
1719
1720 public FileSystem getFilesystem() {
1721 return fs.getFileSystem();
1722 }
1723
1724
1725 public HRegionFileSystem getRegionFileSystem() {
1726 return this.fs;
1727 }
1728
1729 @Override
1730 public long getEarliestFlushTimeForAllStores() {
1731 return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1732 .values());
1733 }
1734
1735 @Override
1736 public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1737 long result = Long.MAX_VALUE;
1738 for (Store store : getStores()) {
1739 Collection<StoreFile> storeFiles = store.getStorefiles();
1740 if (storeFiles == null) continue;
1741 for (StoreFile file : storeFiles) {
1742 StoreFile.Reader sfReader = file.getReader();
1743 if (sfReader == null) continue;
1744 HFile.Reader reader = sfReader.getHFileReader();
1745 if (reader == null) continue;
1746 if (majorCompactioOnly) {
1747 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1748 if (val == null) continue;
1749 if (val == null || !Bytes.toBoolean(val)) {
1750 continue;
1751 }
1752 }
1753 result = Math.min(result, reader.getFileContext().getFileCreateTime());
1754 }
1755 }
1756 return result == Long.MAX_VALUE ? 0 : result;
1757 }
1758
1759 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1760 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1761 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1762 regionLoadBldr.clearStoreCompleteSequenceId();
1763 for (byte[] familyName : this.stores.keySet()) {
1764 long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1765
1766
1767 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
1768 .newBuilder()
1769 .setFamilyName(ByteString.copyFrom(familyName))
1770 .setSequenceId(
1771 oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
1772 }
1773 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1774 }
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784 public long getLargestHStoreSize() {
1785 long size = 0;
1786 for (Store h : stores.values()) {
1787 long storeSize = h.getSize();
1788 if (storeSize > size) {
1789 size = storeSize;
1790 }
1791 }
1792 return size;
1793 }
1794
1795
1796
1797
1798 public KeyValue.KVComparator getComparator() {
1799 return this.comparator;
1800 }
1801
1802
1803
1804
1805
1806 protected void doRegionCompactionPrep() throws IOException {
1807 }
1808
1809 @Override
1810 public void triggerMajorCompaction() throws IOException {
1811 for (Store s : getStores()) {
1812 s.triggerMajorCompaction();
1813 }
1814 }
1815
1816 @Override
1817 public void compact(final boolean majorCompaction) throws IOException {
1818 if (majorCompaction) {
1819 triggerMajorCompaction();
1820 }
1821 for (Store s : getStores()) {
1822 CompactionContext compaction = s.requestCompaction();
1823 if (compaction != null) {
1824 CompactionThroughputController controller = null;
1825 if (rsServices != null) {
1826 controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1827 }
1828 if (controller == null) {
1829 controller = NoLimitCompactionThroughputController.INSTANCE;
1830 }
1831 compact(compaction, s, controller, null);
1832 }
1833 }
1834 }
1835
1836
1837
1838
1839
1840
1841
1842 public void compactStores() throws IOException {
1843 for (Store s : getStores()) {
1844 CompactionContext compaction = s.requestCompaction();
1845 if (compaction != null) {
1846 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1847 }
1848 }
1849 }
1850
1851
1852
1853
1854
1855
1856
1857 @VisibleForTesting
1858 void compactStore(byte[] family, CompactionThroughputController throughputController)
1859 throws IOException {
1860 Store s = getStore(family);
1861 CompactionContext compaction = s.requestCompaction();
1862 if (compaction != null) {
1863 compact(compaction, s, throughputController, null);
1864 }
1865 }
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882 public boolean compact(CompactionContext compaction, Store store,
1883 CompactionThroughputController throughputController) throws IOException {
1884 return compact(compaction, store, throughputController, null);
1885 }
1886
1887 public boolean compact(CompactionContext compaction, Store store,
1888 CompactionThroughputController throughputController, User user) throws IOException {
1889 assert compaction != null && compaction.hasSelection();
1890 assert !compaction.getRequest().getFiles().isEmpty();
1891 if (this.closing.get() || this.closed.get()) {
1892 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1893 store.cancelRequestedCompaction(compaction);
1894 return false;
1895 }
1896 MonitoredTask status = null;
1897 boolean requestNeedsCancellation = true;
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972 try {
1973 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1974 if (stores.get(cf) != store) {
1975 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1976 + " has been re-instantiated, cancel this compaction request. "
1977 + " It may be caused by the roll back of split transaction");
1978 return false;
1979 }
1980
1981 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1982 if (this.closed.get()) {
1983 String msg = "Skipping compaction on " + this + " because closed";
1984 LOG.debug(msg);
1985 status.abort(msg);
1986 return false;
1987 }
1988 boolean wasStateSet = false;
1989 try {
1990 synchronized (writestate) {
1991 if (writestate.writesEnabled) {
1992 wasStateSet = true;
1993 ++writestate.compacting;
1994 } else {
1995 String msg = "NOT compacting region " + this + ". Writes disabled.";
1996 LOG.info(msg);
1997 status.abort(msg);
1998 return false;
1999 }
2000 }
2001 LOG.info("Starting compaction on " + store + " in region " + this
2002 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
2003 doRegionCompactionPrep();
2004 try {
2005 status.setStatus("Compacting store " + store);
2006
2007
2008 requestNeedsCancellation = false;
2009 store.compact(compaction, throughputController, user);
2010 } catch (InterruptedIOException iioe) {
2011 String msg = "compaction interrupted";
2012 LOG.info(msg, iioe);
2013 status.abort(msg);
2014 return false;
2015 }
2016 } finally {
2017 if (wasStateSet) {
2018 synchronized (writestate) {
2019 --writestate.compacting;
2020 if (writestate.compacting <= 0) {
2021 writestate.notifyAll();
2022 }
2023 }
2024 }
2025 }
2026 status.markComplete("Compaction complete");
2027 return true;
2028 } finally {
2029 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
2030 if (status != null) status.cleanup();
2031 }
2032 }
2033
2034 @Override
2035 public FlushResult flush(boolean force) throws IOException {
2036 return flushcache(force, false);
2037 }
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
2062 throws IOException {
2063
2064 if (this.closing.get()) {
2065 String msg = "Skipping flush on " + this + " because closing";
2066 LOG.debug(msg);
2067 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
2068 }
2069 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
2070 status.setStatus("Acquiring readlock on region");
2071
2072 lock.readLock().lock();
2073 try {
2074 if (this.closed.get()) {
2075 String msg = "Skipping flush on " + this + " because closed";
2076 LOG.debug(msg);
2077 status.abort(msg);
2078 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
2079 }
2080 if (coprocessorHost != null) {
2081 status.setStatus("Running coprocessor pre-flush hooks");
2082 coprocessorHost.preFlush();
2083 }
2084
2085
2086 if (numMutationsWithoutWAL.get() > 0) {
2087 numMutationsWithoutWAL.set(0);
2088 dataInMemoryWithoutWAL.set(0);
2089 }
2090 synchronized (writestate) {
2091 if (!writestate.flushing && writestate.writesEnabled) {
2092 this.writestate.flushing = true;
2093 } else {
2094 if (LOG.isDebugEnabled()) {
2095 LOG.debug("NOT flushing memstore for region " + this
2096 + ", flushing=" + writestate.flushing + ", writesEnabled="
2097 + writestate.writesEnabled);
2098 }
2099 String msg = "Not flushing since "
2100 + (writestate.flushing ? "already flushing"
2101 : "writes not enabled");
2102 status.abort(msg);
2103 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
2104 }
2105 }
2106
2107 try {
2108 Collection<Store> specificStoresToFlush =
2109 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
2110 FlushResult fs = internalFlushcache(specificStoresToFlush,
2111 status, writeFlushRequestWalMarker);
2112
2113 if (coprocessorHost != null) {
2114 status.setStatus("Running post-flush coprocessor hooks");
2115 coprocessorHost.postFlush();
2116 }
2117
2118 status.markComplete("Flush successful");
2119 return fs;
2120 } finally {
2121 synchronized (writestate) {
2122 writestate.flushing = false;
2123 this.writestate.flushRequested = false;
2124 writestate.notifyAll();
2125 }
2126 }
2127 } finally {
2128 lock.readLock().unlock();
2129 status.cleanup();
2130 }
2131 }
2132
2133
2134
2135
2136
2137
2138
2139
2140 boolean shouldFlushStore(Store store) {
2141 long maxFlushedSeqId =
2142 this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
2143 .getFamily().getName()) - 1;
2144 if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
2145 if (LOG.isDebugEnabled()) {
2146 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
2147 + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
2148 + ") is far away from current(" + sequenceId.get() + "), max allowed is "
2149 + flushPerChanges);
2150 }
2151 return true;
2152 }
2153 if (flushCheckInterval <= 0) {
2154 return false;
2155 }
2156 long now = EnvironmentEdgeManager.currentTime();
2157 if (store.timeOfOldestEdit() < now - flushCheckInterval) {
2158 if (LOG.isDebugEnabled()) {
2159 LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
2160 + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
2161 + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
2162 }
2163 return true;
2164 }
2165 return false;
2166 }
2167
2168
2169
2170
2171 boolean shouldFlush() {
2172
2173 if (this.maxFlushedSeqId > 0
2174 && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
2175 return true;
2176 }
2177 long modifiedFlushCheckInterval = flushCheckInterval;
2178 if (getRegionInfo().isMetaRegion() &&
2179 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2180 modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
2181 }
2182 if (modifiedFlushCheckInterval <= 0) {
2183 return false;
2184 }
2185 long now = EnvironmentEdgeManager.currentTime();
2186
2187 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2188 return false;
2189 }
2190
2191
2192 for (Store s : getStores()) {
2193 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2194
2195 return true;
2196 }
2197 }
2198 return false;
2199 }
2200
2201
2202
2203
2204
2205
2206 private FlushResult internalFlushcache(MonitoredTask status)
2207 throws IOException {
2208 return internalFlushcache(stores.values(), status, false);
2209 }
2210
2211
2212
2213
2214
2215
2216 private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2217 MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2218 return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2219 status, writeFlushWalMarker);
2220 }
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2251 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2252 throws IOException {
2253 PrepareFlushResult result
2254 = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2255 if (result.result == null) {
2256 return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2257 } else {
2258 return result.result;
2259 }
2260 }
2261
2262 protected PrepareFlushResult internalPrepareFlushCache(
2263 final WAL wal, final long myseqid, final Collection<Store> storesToFlush,
2264 MonitoredTask status, boolean writeFlushWalMarker)
2265 throws IOException {
2266
2267 if (this.rsServices != null && this.rsServices.isAborted()) {
2268
2269 throw new IOException("Aborting flush because server is aborted...");
2270 }
2271 final long startTime = EnvironmentEdgeManager.currentTime();
2272
2273 if (this.memstoreSize.get() <= 0) {
2274
2275
2276 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2277 this.updatesLock.writeLock().lock();
2278 try {
2279 if (this.memstoreSize.get() <= 0) {
2280
2281
2282
2283
2284
2285
2286 if (wal != null) {
2287 writeEntry = mvcc.beginMemstoreInsert();
2288 long flushOpSeqId = getNextSequenceId(wal);
2289 FlushResult flushResult = new FlushResultImpl(
2290 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush",
2291 writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2292 writeEntry.setWriteNumber(flushOpSeqId);
2293 mvcc.waitForPreviousTransactionsComplete(writeEntry);
2294 writeEntry = null;
2295 return new PrepareFlushResult(flushResult, myseqid);
2296 } else {
2297 return new PrepareFlushResult(
2298 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2299 "Nothing to flush", false),
2300 myseqid);
2301 }
2302 }
2303 } finally {
2304 this.updatesLock.writeLock().unlock();
2305 if (writeEntry != null) {
2306 mvcc.advanceMemstore(writeEntry);
2307 }
2308 }
2309 }
2310
2311 if (LOG.isInfoEnabled()) {
2312 LOG.info("Started memstore flush for " + this + ", current region memstore size "
2313 + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
2314 + stores.size() + " column families' memstores are being flushed."
2315 + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
2316
2317 if (this.stores.size() > storesToFlush.size()) {
2318 for (Store store: storesToFlush) {
2319 LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
2320 + " which was occupying "
2321 + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
2322 }
2323 }
2324 }
2325
2326
2327
2328
2329 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
2330
2331
2332 status.setStatus("Obtaining lock to block concurrent updates");
2333
2334 this.updatesLock.writeLock().lock();
2335 status.setStatus("Preparing to flush by snapshotting stores in " +
2336 getRegionInfo().getEncodedName());
2337 long totalFlushableSizeOfFlushableStores = 0;
2338
2339 Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2340 for (Store store: storesToFlush) {
2341 flushedFamilyNames.add(store.getFamily().getName());
2342 }
2343
2344 TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2345 = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2346 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2347 Bytes.BYTES_COMPARATOR);
2348 TreeMap<byte[], Long> storeFlushableSize
2349 = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2350
2351
2352 long flushOpSeqId = HConstants.NO_SEQNUM;
2353
2354
2355 long flushedSeqId = HConstants.NO_SEQNUM;
2356 byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2357
2358 long trxId = 0;
2359 try {
2360 try {
2361 mvcc.waitForPreviousTransactionsComplete();
2362 writeEntry = mvcc.beginMemstoreInsert();
2363 if (wal != null) {
2364 Long earliestUnflushedSequenceIdForTheRegion =
2365 wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2366 if (earliestUnflushedSequenceIdForTheRegion == null) {
2367
2368 String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2369 status.setStatus(msg);
2370 return new PrepareFlushResult(
2371 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2372 myseqid);
2373 }
2374 flushOpSeqId = getNextSequenceId(wal);
2375
2376 flushedSeqId =
2377 earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2378 flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2379 } else {
2380
2381 flushedSeqId = flushOpSeqId = myseqid;
2382 }
2383
2384 for (Store s : storesToFlush) {
2385 totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2386 storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2387 committedFiles.put(s.getFamily().getName(), null);
2388 storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2389 }
2390
2391
2392 if (wal != null && !writestate.readOnly) {
2393 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2394 getRegionInfo(), flushOpSeqId, committedFiles);
2395
2396 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2397 desc, sequenceId, false);
2398 }
2399
2400
2401 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2402 flush.prepare();
2403 }
2404 } catch (IOException ex) {
2405 if (wal != null) {
2406 if (trxId > 0) {
2407 try {
2408 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2409 getRegionInfo(), flushOpSeqId, committedFiles);
2410 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2411 desc, sequenceId, false);
2412 } catch (Throwable t) {
2413 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2414 StringUtils.stringifyException(t));
2415
2416 }
2417 }
2418
2419 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2420 throw ex;
2421 }
2422 } finally {
2423 this.updatesLock.writeLock().unlock();
2424 }
2425 String s = "Finished memstore snapshotting " + this +
2426 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2427 status.setStatus(s);
2428 if (LOG.isTraceEnabled()) LOG.trace(s);
2429
2430
2431 if (wal != null) {
2432 try {
2433 wal.sync();
2434 } catch (IOException ioe) {
2435 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2436 throw ioe;
2437 }
2438 }
2439
2440
2441
2442
2443
2444
2445 writeEntry.setWriteNumber(flushOpSeqId);
2446 mvcc.waitForPreviousTransactionsComplete(writeEntry);
2447
2448 writeEntry = null;
2449 } finally {
2450 if (writeEntry != null) {
2451
2452 mvcc.advanceMemstore(writeEntry);
2453 }
2454 }
2455 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
2456 flushedSeqId, totalFlushableSizeOfFlushableStores);
2457 }
2458
2459
2460
2461
2462
2463
2464
2465 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2466 if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2467 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2468 getRegionInfo(), -1, new TreeMap<byte[], List<Path>>());
2469 try {
2470 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2471 desc, sequenceId, true);
2472 return true;
2473 } catch (IOException e) {
2474 LOG.warn(getRegionInfo().getEncodedName() + " : "
2475 + "Received exception while trying to write the flush request to wal", e);
2476 }
2477 }
2478 return false;
2479 }
2480
2481 protected FlushResult internalFlushCacheAndCommit(
2482 final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2483 final Collection<Store> storesToFlush)
2484 throws IOException {
2485
2486
2487 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2488 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2489 long startTime = prepareResult.startTime;
2490 long flushOpSeqId = prepareResult.flushOpSeqId;
2491 long flushedSeqId = prepareResult.flushedSeqId;
2492 long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2493
2494 String s = "Flushing stores of " + this;
2495 status.setStatus(s);
2496 if (LOG.isTraceEnabled()) LOG.trace(s);
2497
2498
2499
2500
2501
2502 boolean compactionRequested = false;
2503 long flushedOutputFileSize = 0;
2504 try {
2505
2506
2507
2508
2509
2510 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2511 flush.flushCache(status);
2512 }
2513
2514
2515
2516 Iterator<Store> it = storesToFlush.iterator();
2517
2518 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2519 boolean needsCompaction = flush.commit(status);
2520 if (needsCompaction) {
2521 compactionRequested = true;
2522 }
2523 byte[] storeName = it.next().getFamily().getName();
2524 List<Path> storeCommittedFiles = flush.getCommittedFiles();
2525 committedFiles.put(storeName, storeCommittedFiles);
2526
2527 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2528 totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2529 }
2530 flushedOutputFileSize += flush.getOutputFileSize();
2531 }
2532 storeFlushCtxs.clear();
2533
2534
2535 this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2536
2537 if (wal != null) {
2538
2539 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2540 getRegionInfo(), flushOpSeqId, committedFiles);
2541 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2542 desc, sequenceId, true);
2543 }
2544 } catch (Throwable t) {
2545
2546
2547
2548
2549
2550
2551 if (wal != null) {
2552 try {
2553 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2554 getRegionInfo(), flushOpSeqId, committedFiles);
2555 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2556 desc, sequenceId, false);
2557 } catch (Throwable ex) {
2558 LOG.warn(getRegionInfo().getEncodedName() + " : "
2559 + "Received unexpected exception trying to write ABORT_FLUSH marker to WAL:"
2560 + StringUtils.stringifyException(ex));
2561
2562 }
2563 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2564 }
2565 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2566 Bytes.toStringBinary(getRegionInfo().getRegionName()));
2567 dse.initCause(t);
2568 status.abort("Flush failed: " + StringUtils.stringifyException(t));
2569
2570
2571
2572
2573
2574 this.closing.set(true);
2575
2576 if (rsServices != null) {
2577
2578 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2579 }
2580
2581 throw dse;
2582 }
2583
2584
2585 if (wal != null) {
2586 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2587 }
2588
2589
2590 for (Store store: storesToFlush) {
2591 this.lastStoreFlushTimeMap.put(store, startTime);
2592 }
2593
2594
2595 this.maxFlushedSeqId = flushedSeqId;
2596
2597
2598 this.lastFlushOpSeqId = flushOpSeqId;
2599
2600
2601
2602 synchronized (this) {
2603 notifyAll();
2604 }
2605
2606 long time = EnvironmentEdgeManager.currentTime() - startTime;
2607 long memstoresize = this.memstoreSize.get();
2608 String msg = "Finished memstore flush of ~"
2609 + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2610 + totalFlushableSizeOfFlushableStores + ", currentsize="
2611 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2612 + " for region " + this + " in " + time + "ms, sequenceid="
2613 + flushOpSeqId + ", compaction requested=" + compactionRequested
2614 + ((wal == null) ? "; wal=null" : "");
2615 LOG.info(msg);
2616 status.setStatus(msg);
2617
2618 if (rsServices != null && rsServices.getMetrics() != null) {
2619 rsServices.getMetrics().updateFlush(
2620 getTableDesc().getTableName().getNameAsString(),
2621 time - startTime,
2622 totalFlushableSizeOfFlushableStores, flushedOutputFileSize);
2623 }
2624
2625 return new FlushResultImpl(compactionRequested ?
2626 FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2627 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
2628 }
2629
2630
2631
2632
2633
2634
2635 @VisibleForTesting
2636 protected long getNextSequenceId(final WAL wal) throws IOException {
2637
2638
2639
2640
2641
2642 WALKey key = this.appendEmptyEdit(wal, null);
2643 return key.getSequenceId(maxWaitForSeqId);
2644 }
2645
2646
2647
2648
2649
2650 @Override
2651 public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException {
2652 if (coprocessorHost != null) {
2653 Result result = new Result();
2654 if (coprocessorHost.preGetClosestRowBefore(row, family, result)) {
2655 return result;
2656 }
2657 }
2658
2659
2660 checkRow(row, "getClosestRowBefore");
2661 startRegionOperation(Operation.GET);
2662 this.readRequestsCount.increment();
2663 try {
2664 Store store = getStore(family);
2665
2666 Cell key = store.getRowKeyAtOrBefore(row);
2667 Result result = null;
2668 if (key != null) {
2669 Get get = new Get(CellUtil.cloneRow(key));
2670 get.addFamily(family);
2671 result = get(get);
2672 }
2673 if (coprocessorHost != null) {
2674 coprocessorHost.postGetClosestRowBefore(row, family, result);
2675 }
2676 return result;
2677 } finally {
2678 closeRegionOperation(Operation.GET);
2679 }
2680 }
2681
2682 @Override
2683 public RegionScanner getScanner(Scan scan) throws IOException {
2684 return getScanner(scan, null);
2685 }
2686
2687 protected RegionScanner getScanner(Scan scan,
2688 List<KeyValueScanner> additionalScanners) throws IOException {
2689 startRegionOperation(Operation.SCAN);
2690 try {
2691
2692 if (!scan.hasFamilies()) {
2693
2694 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
2695 scan.addFamily(family);
2696 }
2697 } else {
2698 for (byte [] family : scan.getFamilyMap().keySet()) {
2699 checkFamily(family);
2700 }
2701 }
2702 return instantiateRegionScanner(scan, additionalScanners);
2703 } finally {
2704 closeRegionOperation(Operation.SCAN);
2705 }
2706 }
2707
2708 protected RegionScanner instantiateRegionScanner(Scan scan,
2709 List<KeyValueScanner> additionalScanners) throws IOException {
2710 if (scan.isReversed()) {
2711 if (scan.getFilter() != null) {
2712 scan.getFilter().setReversed(true);
2713 }
2714 return new ReversedRegionScannerImpl(scan, additionalScanners, this);
2715 }
2716 return new RegionScannerImpl(scan, additionalScanners, this);
2717 }
2718
2719 @Override
2720 public void prepareDelete(Delete delete) throws IOException {
2721
2722 if(delete.getFamilyCellMap().isEmpty()){
2723 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2724
2725 delete.addFamily(family, delete.getTimeStamp());
2726 }
2727 } else {
2728 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2729 if(family == null) {
2730 throw new NoSuchColumnFamilyException("Empty family is invalid");
2731 }
2732 checkFamily(family);
2733 }
2734 }
2735 }
2736
2737 @Override
2738 public void delete(Delete delete) throws IOException {
2739 checkReadOnly();
2740 checkResources();
2741 startRegionOperation(Operation.DELETE);
2742 try {
2743 delete.getRow();
2744
2745 doBatchMutate(delete);
2746 } finally {
2747 closeRegionOperation(Operation.DELETE);
2748 }
2749 }
2750
2751
2752
2753
2754 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2755
2756
2757
2758
2759
2760
2761 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2762 Durability durability) throws IOException {
2763 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2764 delete.setFamilyCellMap(familyMap);
2765 delete.setDurability(durability);
2766 doBatchMutate(delete);
2767 }
2768
2769 @Override
2770 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2771 byte[] byteNow) throws IOException {
2772 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2773
2774 byte[] family = e.getKey();
2775 List<Cell> cells = e.getValue();
2776 assert cells instanceof RandomAccess;
2777
2778 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2779 int listSize = cells.size();
2780 for (int i=0; i < listSize; i++) {
2781 Cell cell = cells.get(i);
2782
2783
2784 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2785 byte[] qual = CellUtil.cloneQualifier(cell);
2786 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2787
2788 Integer count = kvCount.get(qual);
2789 if (count == null) {
2790 kvCount.put(qual, 1);
2791 } else {
2792 kvCount.put(qual, count + 1);
2793 }
2794 count = kvCount.get(qual);
2795
2796 Get get = new Get(CellUtil.cloneRow(cell));
2797 get.setMaxVersions(count);
2798 get.addColumn(family, qual);
2799 if (coprocessorHost != null) {
2800 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2801 byteNow, get)) {
2802 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2803 }
2804 } else {
2805 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2806 }
2807 } else {
2808 CellUtil.updateLatestStamp(cell, byteNow, 0);
2809 }
2810 }
2811 }
2812 }
2813
2814 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2815 throws IOException {
2816 List<Cell> result = get(get, false);
2817
2818 if (result.size() < count) {
2819
2820 CellUtil.updateLatestStamp(cell, byteNow, 0);
2821 return;
2822 }
2823 if (result.size() > count) {
2824 throw new RuntimeException("Unexpected size: " + result.size());
2825 }
2826 Cell getCell = result.get(count - 1);
2827 CellUtil.setTimestamp(cell, getCell.getTimestamp());
2828 }
2829
2830 @Override
2831 public void put(Put put) throws IOException {
2832 checkReadOnly();
2833
2834
2835
2836
2837
2838 checkResources();
2839 startRegionOperation(Operation.PUT);
2840 try {
2841
2842 doBatchMutate(put);
2843 } finally {
2844 closeRegionOperation(Operation.PUT);
2845 }
2846 }
2847
2848
2849
2850
2851
2852
2853 private abstract static class BatchOperationInProgress<T> {
2854 T[] operations;
2855 int nextIndexToProcess = 0;
2856 OperationStatus[] retCodeDetails;
2857 WALEdit[] walEditsFromCoprocessors;
2858
2859 public BatchOperationInProgress(T[] operations) {
2860 this.operations = operations;
2861 this.retCodeDetails = new OperationStatus[operations.length];
2862 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2863 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2864 }
2865
2866 public abstract Mutation getMutation(int index);
2867 public abstract long getNonceGroup(int index);
2868 public abstract long getNonce(int index);
2869
2870 public abstract Mutation[] getMutationsForCoprocs();
2871 public abstract boolean isInReplay();
2872 public abstract long getReplaySequenceId();
2873
2874 public boolean isDone() {
2875 return nextIndexToProcess == operations.length;
2876 }
2877 }
2878
2879 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2880 private long nonceGroup;
2881 private long nonce;
2882 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2883 super(operations);
2884 this.nonceGroup = nonceGroup;
2885 this.nonce = nonce;
2886 }
2887
2888 @Override
2889 public Mutation getMutation(int index) {
2890 return this.operations[index];
2891 }
2892
2893 @Override
2894 public long getNonceGroup(int index) {
2895 return nonceGroup;
2896 }
2897
2898 @Override
2899 public long getNonce(int index) {
2900 return nonce;
2901 }
2902
2903 @Override
2904 public Mutation[] getMutationsForCoprocs() {
2905 return this.operations;
2906 }
2907
2908 @Override
2909 public boolean isInReplay() {
2910 return false;
2911 }
2912
2913 @Override
2914 public long getReplaySequenceId() {
2915 return 0;
2916 }
2917 }
2918
2919 private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2920 private long replaySeqId = 0;
2921 public ReplayBatch(MutationReplay[] operations, long seqId) {
2922 super(operations);
2923 this.replaySeqId = seqId;
2924 }
2925
2926 @Override
2927 public Mutation getMutation(int index) {
2928 return this.operations[index].mutation;
2929 }
2930
2931 @Override
2932 public long getNonceGroup(int index) {
2933 return this.operations[index].nonceGroup;
2934 }
2935
2936 @Override
2937 public long getNonce(int index) {
2938 return this.operations[index].nonce;
2939 }
2940
2941 @Override
2942 public Mutation[] getMutationsForCoprocs() {
2943 assert false;
2944 throw new RuntimeException("Should not be called for replay batch");
2945 }
2946
2947 @Override
2948 public boolean isInReplay() {
2949 return true;
2950 }
2951
2952 @Override
2953 public long getReplaySequenceId() {
2954 return this.replaySeqId;
2955 }
2956 }
2957
2958 @Override
2959 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2960 throws IOException {
2961
2962
2963
2964
2965 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2966 }
2967
2968 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2969 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2970 }
2971
2972 @Override
2973 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2974 throws IOException {
2975 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2976 && replaySeqId < lastReplayedOpenRegionSeqId) {
2977
2978
2979 if (LOG.isTraceEnabled()) {
2980 LOG.trace(getRegionInfo().getEncodedName() + " : "
2981 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2982 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2983 for (MutationReplay mut : mutations) {
2984 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2985 }
2986 }
2987
2988 OperationStatus[] statuses = new OperationStatus[mutations.length];
2989 for (int i = 0; i < statuses.length; i++) {
2990 statuses[i] = OperationStatus.SUCCESS;
2991 }
2992 return statuses;
2993 }
2994 return batchMutate(new ReplayBatch(mutations, replaySeqId));
2995 }
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
3006 boolean initialized = false;
3007 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
3008 startRegionOperation(op);
3009 try {
3010 while (!batchOp.isDone()) {
3011 if (!batchOp.isInReplay()) {
3012 checkReadOnly();
3013 }
3014 checkResources();
3015
3016 if (!initialized) {
3017 this.writeRequestsCount.add(batchOp.operations.length);
3018 if (!batchOp.isInReplay()) {
3019 doPreMutationHook(batchOp);
3020 }
3021 initialized = true;
3022 }
3023 doMiniBatchMutation(batchOp);
3024 long newSize = this.getMemstoreSize();
3025 if (isFlushSize(newSize)) {
3026 requestFlush();
3027 }
3028 }
3029 } finally {
3030 closeRegionOperation(op);
3031 }
3032 return batchOp.retCodeDetails;
3033 }
3034
3035
3036 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
3037 throws IOException {
3038
3039 WALEdit walEdit = new WALEdit();
3040 if (coprocessorHost != null) {
3041 for (int i = 0 ; i < batchOp.operations.length; i++) {
3042 Mutation m = batchOp.getMutation(i);
3043 if (m instanceof Put) {
3044 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
3045
3046
3047 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3048 }
3049 } else if (m instanceof Delete) {
3050 Delete curDel = (Delete) m;
3051 if (curDel.getFamilyCellMap().isEmpty()) {
3052
3053 prepareDelete(curDel);
3054 }
3055 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
3056
3057
3058 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3059 }
3060 } else {
3061
3062
3063
3064 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
3065 "Put/Delete mutations only supported in batchMutate() now");
3066 }
3067 if (!walEdit.isEmpty()) {
3068 batchOp.walEditsFromCoprocessors[i] = walEdit;
3069 walEdit = new WALEdit();
3070 }
3071 }
3072 }
3073 }
3074
3075 @SuppressWarnings("unchecked")
3076 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
3077 boolean isInReplay = batchOp.isInReplay();
3078
3079 boolean putsCfSetConsistent = true;
3080
3081 Set<byte[]> putsCfSet = null;
3082
3083 boolean deletesCfSetConsistent = true;
3084
3085 Set<byte[]> deletesCfSet = null;
3086
3087 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
3088 WALEdit walEdit = new WALEdit(isInReplay);
3089 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
3090 long txid = 0;
3091 boolean doRollBackMemstore = false;
3092 boolean locked = false;
3093
3094
3095 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
3096
3097 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
3098 List<Cell> memstoreCells = new ArrayList<Cell>();
3099
3100 int firstIndex = batchOp.nextIndexToProcess;
3101 int lastIndexExclusive = firstIndex;
3102 boolean success = false;
3103 int noOfPuts = 0, noOfDeletes = 0;
3104 WALKey walKey = null;
3105 long mvccNum = 0;
3106 long addedSize = 0;
3107 final ObservedExceptionsInBatch observedExceptions = new ObservedExceptionsInBatch();
3108 try {
3109
3110
3111
3112
3113 int numReadyToWrite = 0;
3114 long now = EnvironmentEdgeManager.currentTime();
3115 while (lastIndexExclusive < batchOp.operations.length) {
3116 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
3117 boolean isPutMutation = mutation instanceof Put;
3118
3119 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
3120
3121 familyMaps[lastIndexExclusive] = familyMap;
3122
3123
3124 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
3125 != OperationStatusCode.NOT_RUN) {
3126 lastIndexExclusive++;
3127 continue;
3128 }
3129
3130 try {
3131 checkAndPrepareMutation(mutation, batchOp.isInReplay(), familyMap, now);
3132 } catch (NoSuchColumnFamilyException nscf) {
3133 final String msg = "No such column family in batch mutation. ";
3134 if (observedExceptions.hasSeenNoSuchFamily()) {
3135 LOG.warn(msg + nscf.getMessage());
3136 } else {
3137 LOG.warn(msg, nscf);
3138 observedExceptions.sawNoSuchFamily();
3139 }
3140 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3141 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
3142 lastIndexExclusive++;
3143 continue;
3144 } catch (FailedSanityCheckException fsce) {
3145 final String msg = "Batch Mutation did not pass sanity check. ";
3146 if (observedExceptions.hasSeenFailedSanityCheck()) {
3147 LOG.warn(msg + fsce.getMessage());
3148 } else {
3149 LOG.warn(msg, fsce);
3150 observedExceptions.sawFailedSanityCheck();
3151 }
3152 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3153 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
3154 lastIndexExclusive++;
3155 continue;
3156 } catch (WrongRegionException we) {
3157 final String msg = "Batch mutation had a row that does not belong to this region. ";
3158 if (observedExceptions.hasSeenWrongRegion()) {
3159 LOG.warn(msg + we.getMessage());
3160 } else {
3161 LOG.warn(msg, we);
3162 observedExceptions.sawWrongRegion();
3163 }
3164 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
3165 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
3166 lastIndexExclusive++;
3167 continue;
3168 }
3169
3170
3171
3172 boolean shouldBlock = numReadyToWrite == 0;
3173 RowLock rowLock = null;
3174 try {
3175 rowLock = getRowLockInternal(mutation.getRow(), shouldBlock);
3176 } catch (IOException ioe) {
3177 LOG.warn("Failed getting lock in batch put, row="
3178 + Bytes.toStringBinary(mutation.getRow()), ioe);
3179 }
3180 if (rowLock == null) {
3181
3182 break;
3183 } else {
3184 acquiredRowLocks.add(rowLock);
3185 }
3186
3187 lastIndexExclusive++;
3188 numReadyToWrite++;
3189
3190 if (isPutMutation) {
3191
3192
3193
3194 if (putsCfSet == null) {
3195 putsCfSet = mutation.getFamilyCellMap().keySet();
3196 } else {
3197 putsCfSetConsistent = putsCfSetConsistent
3198 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
3199 }
3200 } else {
3201 if (deletesCfSet == null) {
3202 deletesCfSet = mutation.getFamilyCellMap().keySet();
3203 } else {
3204 deletesCfSetConsistent = deletesCfSetConsistent
3205 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
3206 }
3207 }
3208 }
3209
3210
3211
3212 now = EnvironmentEdgeManager.currentTime();
3213 byte[] byteNow = Bytes.toBytes(now);
3214
3215
3216 if (numReadyToWrite <= 0) return 0L;
3217
3218
3219
3220
3221
3222
3223 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3224
3225 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3226 != OperationStatusCode.NOT_RUN) continue;
3227
3228 Mutation mutation = batchOp.getMutation(i);
3229 if (mutation instanceof Put) {
3230 updateCellTimestamps(familyMaps[i].values(), byteNow);
3231 noOfPuts++;
3232 } else {
3233 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3234 noOfDeletes++;
3235 }
3236 rewriteCellTags(familyMaps[i], mutation);
3237 }
3238
3239 lock(this.updatesLock.readLock(), numReadyToWrite);
3240 locked = true;
3241 if(isInReplay) {
3242 mvccNum = batchOp.getReplaySequenceId();
3243 } else {
3244 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
3245 }
3246
3247
3248
3249
3250 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
3251
3252
3253 if (!isInReplay && coprocessorHost != null) {
3254 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3255 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3256 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3257 if (coprocessorHost.preBatchMutate(miniBatchOp)) {
3258 return 0L;
3259 } else {
3260 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3261 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
3262
3263 continue;
3264 }
3265
3266 Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
3267 if (cpMutations == null) {
3268 continue;
3269 }
3270
3271 for (int j = 0; j < cpMutations.length; j++) {
3272 Mutation cpMutation = miniBatchOp.getOperationsFromCoprocessors(i)[j];
3273 Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
3274 checkAndPrepareMutation(cpMutation, isInReplay, cpFamilyMap, now);
3275
3276
3277 acquiredRowLocks.add(getRowLock(cpMutation.getRow(), true));
3278
3279 if (cpMutation.getDurability() == Durability.SKIP_WAL) {
3280 recordMutationWithoutWal(cpFamilyMap);
3281 }
3282
3283
3284
3285 mergeFamilyMaps(familyMaps[i], cpFamilyMap);
3286 }
3287 }
3288 }
3289 }
3290
3291
3292
3293
3294
3295
3296
3297
3298
3299
3300 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3301 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3302 != OperationStatusCode.NOT_RUN) {
3303 continue;
3304 }
3305 doRollBackMemstore = true;
3306 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay);
3307 }
3308
3309
3310
3311
3312 Durability durability = Durability.USE_DEFAULT;
3313 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3314
3315 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3316 != OperationStatusCode.NOT_RUN) {
3317 continue;
3318 }
3319 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3320
3321 Mutation m = batchOp.getMutation(i);
3322 Durability tmpDur = getEffectiveDurability(m.getDurability());
3323 if (tmpDur.ordinal() > durability.ordinal()) {
3324 durability = tmpDur;
3325 }
3326 if (tmpDur == Durability.SKIP_WAL) {
3327 recordMutationWithoutWal(m.getFamilyCellMap());
3328 continue;
3329 }
3330
3331 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3332
3333
3334
3335 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3336 if (walEdit.size() > 0) {
3337 assert isInReplay;
3338 if (!isInReplay) {
3339 throw new IOException("Multiple nonces per batch and not in replay");
3340 }
3341
3342
3343 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3344 this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3345 currentNonceGroup, currentNonce);
3346 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
3347 walEdit, getSequenceId(), true, null);
3348 walEdit = new WALEdit(isInReplay);
3349 walKey = null;
3350 }
3351 currentNonceGroup = nonceGroup;
3352 currentNonce = nonce;
3353 }
3354
3355
3356 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3357 if (fromCP != null) {
3358 for (Cell cell : fromCP.getCells()) {
3359 walEdit.add(cell);
3360 }
3361 }
3362 addFamilyMapToWALEdit(familyMaps[i], walEdit);
3363 }
3364
3365
3366
3367
3368 Mutation mutation = batchOp.getMutation(firstIndex);
3369 if (isInReplay) {
3370
3371 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3372 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3373 mutation.getClusterIds(), currentNonceGroup, currentNonce);
3374 long replaySeqId = batchOp.getReplaySequenceId();
3375 walKey.setOrigLogSeqNum(replaySeqId);
3376
3377
3378 while (true) {
3379 long seqId = getSequenceId().get();
3380 if (seqId >= replaySeqId) break;
3381 if (getSequenceId().compareAndSet(seqId, replaySeqId)) break;
3382 }
3383 }
3384 if (walEdit.size() > 0) {
3385 if (!isInReplay) {
3386
3387 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3388 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3389 mutation.getClusterIds(), currentNonceGroup, currentNonce);
3390 }
3391
3392 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit,
3393 getSequenceId(), true, memstoreCells);
3394 }
3395 if (walKey == null){
3396
3397 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
3398 }
3399
3400
3401
3402
3403 if (locked) {
3404 this.updatesLock.readLock().unlock();
3405 locked = false;
3406 }
3407 releaseRowLocks(acquiredRowLocks);
3408
3409
3410
3411
3412 if (txid != 0) {
3413 syncOrDefer(txid, durability);
3414 }
3415
3416 doRollBackMemstore = false;
3417
3418 this.addAndGetGlobalMemstoreSize(addedSize);
3419
3420
3421 if (!isInReplay && coprocessorHost != null) {
3422 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3423 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3424 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3425 coprocessorHost.postBatchMutate(miniBatchOp);
3426 }
3427
3428
3429
3430
3431
3432 if (writeEntry != null) {
3433 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3434 writeEntry = null;
3435 }
3436
3437
3438
3439
3440
3441 if (!isInReplay && coprocessorHost != null) {
3442 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3443
3444 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3445 != OperationStatusCode.SUCCESS) {
3446 continue;
3447 }
3448 Mutation m = batchOp.getMutation(i);
3449 if (m instanceof Put) {
3450 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3451 } else {
3452 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3453 }
3454 }
3455 }
3456
3457 success = true;
3458 return addedSize;
3459 } finally {
3460
3461 if (doRollBackMemstore) {
3462 rollbackMemstore(memstoreCells);
3463 if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
3464 } else {
3465 if (writeEntry != null) {
3466 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
3467 }
3468 }
3469
3470 if (locked) {
3471 this.updatesLock.readLock().unlock();
3472 }
3473 releaseRowLocks(acquiredRowLocks);
3474
3475
3476
3477
3478
3479
3480
3481 if (noOfPuts > 0) {
3482
3483 if (this.metricsRegion != null) {
3484 this.metricsRegion.updatePut();
3485 }
3486 }
3487 if (noOfDeletes > 0) {
3488
3489 if (this.metricsRegion != null) {
3490 this.metricsRegion.updateDelete();
3491 }
3492 }
3493 if (!success) {
3494 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3495 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3496 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3497 }
3498 }
3499 }
3500 if (coprocessorHost != null && !batchOp.isInReplay()) {
3501
3502
3503 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3504 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3505 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3506 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3507 }
3508
3509 batchOp.nextIndexToProcess = lastIndexExclusive;
3510 }
3511 }
3512
3513 private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
3514 Map<byte[], List<Cell>> toBeMerged) {
3515 for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
3516 List<Cell> cells = familyMap.get(entry.getKey());
3517 if (cells == null) {
3518 familyMap.put(entry.getKey(), entry.getValue());
3519 } else {
3520 cells.addAll(entry.getValue());
3521 }
3522 }
3523 }
3524
3525
3526
3527
3528
3529 protected Durability getEffectiveDurability(Durability d) {
3530 return d == Durability.USE_DEFAULT ? this.durability : d;
3531 }
3532
3533
3534
3535
3536
3537
3538 @Override
3539 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3540 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3541 boolean writeToWAL)
3542 throws IOException{
3543 checkReadOnly();
3544
3545
3546 checkResources();
3547 boolean isPut = w instanceof Put;
3548 if (!isPut && !(w instanceof Delete))
3549 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3550 "be Put or Delete");
3551 if (!Bytes.equals(row, w.getRow())) {
3552 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3553 "getRow must match the passed row");
3554 }
3555
3556 startRegionOperation();
3557 try {
3558 Get get = new Get(row);
3559 checkFamily(family);
3560 get.addColumn(family, qualifier);
3561
3562
3563 RowLock rowLock = getRowLock(get.getRow());
3564
3565 mvcc.waitForPreviousTransactionsComplete();
3566 try {
3567 if (this.getCoprocessorHost() != null) {
3568 Boolean processed = null;
3569 if (w instanceof Put) {
3570 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3571 qualifier, compareOp, comparator, (Put) w);
3572 } else if (w instanceof Delete) {
3573 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3574 qualifier, compareOp, comparator, (Delete) w);
3575 }
3576 if (processed != null) {
3577 return processed;
3578 }
3579 }
3580 List<Cell> result = get(get, false);
3581
3582 boolean valueIsNull = comparator.getValue() == null ||
3583 comparator.getValue().length == 0;
3584 boolean matches = false;
3585 long cellTs = 0;
3586 if (result.size() == 0 && valueIsNull) {
3587 matches = true;
3588 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3589 valueIsNull) {
3590 matches = true;
3591 cellTs = result.get(0).getTimestamp();
3592 } else if (result.size() == 1 && !valueIsNull) {
3593 Cell kv = result.get(0);
3594 cellTs = kv.getTimestamp();
3595 int compareResult = comparator.compareTo(kv.getValueArray(),
3596 kv.getValueOffset(), kv.getValueLength());
3597 switch (compareOp) {
3598 case LESS:
3599 matches = compareResult < 0;
3600 break;
3601 case LESS_OR_EQUAL:
3602 matches = compareResult <= 0;
3603 break;
3604 case EQUAL:
3605 matches = compareResult == 0;
3606 break;
3607 case NOT_EQUAL:
3608 matches = compareResult != 0;
3609 break;
3610 case GREATER_OR_EQUAL:
3611 matches = compareResult >= 0;
3612 break;
3613 case GREATER:
3614 matches = compareResult > 0;
3615 break;
3616 default:
3617 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3618 }
3619 }
3620
3621 if (matches) {
3622
3623
3624
3625
3626 long now = EnvironmentEdgeManager.currentTime();
3627 long ts = Math.max(now, cellTs);
3628 byte[] byteTs = Bytes.toBytes(ts);
3629
3630 if (w instanceof Put) {
3631 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3632 }
3633
3634
3635
3636
3637
3638 doBatchMutate(w);
3639 this.checkAndMutateChecksPassed.increment();
3640 return true;
3641 }
3642 this.checkAndMutateChecksFailed.increment();
3643 return false;
3644 } finally {
3645 rowLock.release();
3646 }
3647 } finally {
3648 closeRegionOperation();
3649 }
3650 }
3651
3652
3653
3654
3655
3656
3657 @Override
3658 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3659 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3660 boolean writeToWAL) throws IOException {
3661 checkReadOnly();
3662
3663
3664 checkResources();
3665
3666 startRegionOperation();
3667 try {
3668 Get get = new Get(row);
3669 checkFamily(family);
3670 get.addColumn(family, qualifier);
3671
3672
3673 RowLock rowLock = getRowLock(get.getRow());
3674
3675 mvcc.waitForPreviousTransactionsComplete();
3676 try {
3677 List<Cell> result = get(get, false);
3678
3679 boolean valueIsNull = comparator.getValue() == null ||
3680 comparator.getValue().length == 0;
3681 boolean matches = false;
3682 long cellTs = 0;
3683 if (result.size() == 0 && valueIsNull) {
3684 matches = true;
3685 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3686 valueIsNull) {
3687 matches = true;
3688 cellTs = result.get(0).getTimestamp();
3689 } else if (result.size() == 1 && !valueIsNull) {
3690 Cell kv = result.get(0);
3691 cellTs = kv.getTimestamp();
3692 int compareResult = comparator.compareTo(kv.getValueArray(),
3693 kv.getValueOffset(), kv.getValueLength());
3694 switch (compareOp) {
3695 case LESS:
3696 matches = compareResult < 0;
3697 break;
3698 case LESS_OR_EQUAL:
3699 matches = compareResult <= 0;
3700 break;
3701 case EQUAL:
3702 matches = compareResult == 0;
3703 break;
3704 case NOT_EQUAL:
3705 matches = compareResult != 0;
3706 break;
3707 case GREATER_OR_EQUAL:
3708 matches = compareResult >= 0;
3709 break;
3710 case GREATER:
3711 matches = compareResult > 0;
3712 break;
3713 default:
3714 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3715 }
3716 }
3717
3718 if (matches) {
3719
3720
3721
3722
3723 long now = EnvironmentEdgeManager.currentTime();
3724 long ts = Math.max(now, cellTs);
3725 byte[] byteTs = Bytes.toBytes(ts);
3726
3727 for (Mutation w : rm.getMutations()) {
3728 if (w instanceof Put) {
3729 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3730 }
3731
3732
3733 }
3734
3735
3736
3737 mutateRow(rm);
3738 this.checkAndMutateChecksPassed.increment();
3739 return true;
3740 }
3741 this.checkAndMutateChecksFailed.increment();
3742 return false;
3743 } finally {
3744 rowLock.release();
3745 }
3746 } finally {
3747 closeRegionOperation();
3748 }
3749 }
3750
3751 private void doBatchMutate(Mutation mutation) throws IOException {
3752
3753 OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation });
3754 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3755 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3756 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3757 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3758 }
3759 }
3760
3761
3762
3763
3764
3765
3766
3767
3768
3769
3770
3771
3772
3773
3774 public void addRegionToSnapshot(SnapshotDescription desc,
3775 ForeignExceptionSnare exnSnare) throws IOException {
3776 Path rootDir = FSUtils.getRootDir(conf);
3777 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3778
3779 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3780 snapshotDir, desc, exnSnare);
3781 manifest.addRegion(this);
3782 }
3783
3784 @Override
3785 public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3786 throws IOException {
3787 for (List<Cell> cells: cellItr) {
3788 if (cells == null) continue;
3789 assert cells instanceof RandomAccess;
3790 int listSize = cells.size();
3791 for (int i = 0; i < listSize; i++) {
3792 CellUtil.updateLatestStamp(cells.get(i), now, 0);
3793 }
3794 }
3795 }
3796
3797
3798
3799
3800 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3801
3802
3803
3804 if (m.getTTL() == Long.MAX_VALUE) {
3805 return;
3806 }
3807
3808
3809
3810 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3811 List<Cell> cells = e.getValue();
3812 assert cells instanceof RandomAccess;
3813 int listSize = cells.size();
3814 for (int i = 0; i < listSize; i++) {
3815 Cell cell = cells.get(i);
3816 List<Tag> newTags = Tag.carryForwardTags(null, cell);
3817 newTags = carryForwardTTLTag(newTags, m);
3818
3819
3820 cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
3821 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
3822 cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
3823 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
3824 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
3825 newTags));
3826 }
3827 }
3828 }
3829
3830
3831
3832
3833
3834
3835
3836 private void checkResources() throws RegionTooBusyException {
3837
3838 if (this.getRegionInfo().isMetaRegion()) return;
3839
3840 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3841 blockedRequestsCount.increment();
3842 requestFlush();
3843 throw new RegionTooBusyException("Above memstore limit, " +
3844 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3845 this.getRegionInfo().getRegionNameAsString()) +
3846 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3847 this.getRegionServerServices().getServerName()) +
3848 ", memstoreSize=" + memstoreSize.get() +
3849 ", blockingMemStoreSize=" + blockingMemStoreSize);
3850 }
3851 }
3852
3853
3854
3855
3856 protected void checkReadOnly() throws IOException {
3857 if (isReadOnly()) {
3858 throw new DoNotRetryIOException("region is read only");
3859 }
3860 }
3861
3862 protected void checkReadsEnabled() throws IOException {
3863 if (!this.writestate.readsEnabled) {
3864 throw new IOException(getRegionInfo().getEncodedName()
3865 + ": The region's reads are disabled. Cannot serve the request");
3866 }
3867 }
3868
3869 public void setReadsEnabled(boolean readsEnabled) {
3870 if (readsEnabled && !this.writestate.readsEnabled) {
3871 LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3872 }
3873 this.writestate.setReadsEnabled(readsEnabled);
3874 }
3875
3876
3877
3878
3879
3880
3881
3882 private void put(final byte [] row, byte [] family, List<Cell> edits)
3883 throws IOException {
3884 NavigableMap<byte[], List<Cell>> familyMap;
3885 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3886
3887 familyMap.put(family, edits);
3888 Put p = new Put(row);
3889 p.setFamilyCellMap(familyMap);
3890 doBatchMutate(p);
3891 }
3892
3893
3894
3895
3896
3897
3898
3899
3900
3901
3902
3903
3904
3905
3906
3907
3908 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3909 long mvccNum, List<Cell> memstoreCells, boolean isInReplay) throws IOException {
3910 long size = 0;
3911
3912 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3913 byte[] family = e.getKey();
3914 List<Cell> cells = e.getValue();
3915 assert cells instanceof RandomAccess;
3916 Store store = getStore(family);
3917 int listSize = cells.size();
3918 for (int i=0; i < listSize; i++) {
3919 Cell cell = cells.get(i);
3920 CellUtil.setSequenceId(cell, mvccNum);
3921 Pair<Long, Cell> ret = store.add(cell);
3922 size += ret.getFirst();
3923 memstoreCells.add(ret.getSecond());
3924 if(isInReplay) {
3925
3926 CellUtil.setSequenceId(ret.getSecond(), mvccNum);
3927 }
3928 }
3929 }
3930
3931 return size;
3932 }
3933
3934
3935
3936
3937
3938
3939 private void rollbackMemstore(List<Cell> memstoreCells) {
3940 int kvsRolledback = 0;
3941
3942 for (Cell cell : memstoreCells) {
3943 byte[] family = CellUtil.cloneFamily(cell);
3944 Store store = getStore(family);
3945 store.rollback(cell);
3946 kvsRolledback++;
3947 }
3948 LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3949 }
3950
3951 @Override
3952 public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3953 for (byte[] family : families) {
3954 checkFamily(family);
3955 }
3956 }
3957
3958 private void checkAndPrepareMutation(Mutation mutation, boolean replay,
3959 final Map<byte[], List<Cell>> familyMap, final long now)
3960 throws IOException {
3961 if (mutation instanceof Put) {
3962
3963 if (replay) {
3964 removeNonExistentColumnFamilyForReplay(familyMap);
3965 } else {
3966 checkFamilies(familyMap.keySet());
3967 }
3968 checkTimestamps(mutation.getFamilyCellMap(), now);
3969 } else {
3970 prepareDelete((Delete)mutation);
3971 }
3972 checkRow(mutation.getRow(), "doMiniBatchMutation");
3973 }
3974
3975
3976
3977
3978
3979 private void removeNonExistentColumnFamilyForReplay(
3980 final Map<byte[], List<Cell>> familyMap) {
3981 List<byte[]> nonExistentList = null;
3982 for (byte[] family : familyMap.keySet()) {
3983 if (!this.htableDescriptor.hasFamily(family)) {
3984 if (nonExistentList == null) {
3985 nonExistentList = new ArrayList<byte[]>();
3986 }
3987 nonExistentList.add(family);
3988 }
3989 }
3990 if (nonExistentList != null) {
3991 for (byte[] family : nonExistentList) {
3992
3993 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3994 familyMap.remove(family);
3995 }
3996 }
3997 }
3998
3999 @Override
4000 public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
4001 throws FailedSanityCheckException {
4002 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
4003 return;
4004 }
4005 long maxTs = now + timestampSlop;
4006 for (List<Cell> kvs : familyMap.values()) {
4007 assert kvs instanceof RandomAccess;
4008 int listSize = kvs.size();
4009 for (int i=0; i < listSize; i++) {
4010 Cell cell = kvs.get(i);
4011
4012 long ts = cell.getTimestamp();
4013 if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
4014 throw new FailedSanityCheckException("Timestamp for KV out of range "
4015 + cell + " (too.new=" + timestampSlop + ")");
4016 }
4017 }
4018 }
4019 }
4020
4021
4022
4023
4024
4025
4026
4027 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
4028 WALEdit walEdit) {
4029 for (List<Cell> edits : familyMap.values()) {
4030 assert edits instanceof RandomAccess;
4031 int listSize = edits.size();
4032 for (int i=0; i < listSize; i++) {
4033 Cell cell = edits.get(i);
4034 walEdit.add(cell);
4035 }
4036 }
4037 }
4038
4039 private void requestFlush() {
4040 if (this.rsServices == null) {
4041 return;
4042 }
4043 synchronized (writestate) {
4044 if (this.writestate.isFlushRequested()) {
4045 return;
4046 }
4047 writestate.flushRequested = true;
4048 }
4049
4050 this.rsServices.getFlushRequester().requestFlush(this, false);
4051 if (LOG.isDebugEnabled()) {
4052 LOG.debug("Flush requested on " + this);
4053 }
4054 }
4055
4056
4057
4058
4059
4060 private boolean isFlushSize(final long size) {
4061 return size > this.memstoreFlushSize;
4062 }
4063
4064
4065
4066
4067
4068
4069
4070
4071
4072
4073
4074
4075
4076
4077
4078
4079
4080
4081
4082
4083
4084
4085
4086
4087
4088
4089
4090
4091
4092
4093
4094
4095
4096
4097
4098 protected long replayRecoveredEditsIfAny(final Path regiondir,
4099 Map<byte[], Long> maxSeqIdInStores,
4100 final CancelableProgressable reporter, final MonitoredTask status)
4101 throws IOException {
4102 long minSeqIdForTheRegion = -1;
4103 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
4104 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
4105 minSeqIdForTheRegion = maxSeqIdInStore;
4106 }
4107 }
4108 long seqid = minSeqIdForTheRegion;
4109
4110 FileSystem fs = this.fs.getFileSystem();
4111 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
4112 if (LOG.isDebugEnabled()) {
4113 LOG.debug("Found " + (files == null ? 0 : files.size())
4114 + " recovered edits file(s) under " + regiondir);
4115 }
4116
4117 if (files == null || files.isEmpty()) return seqid;
4118
4119 for (Path edits: files) {
4120 if (edits == null || !fs.exists(edits)) {
4121 LOG.warn("Null or non-existent edits file: " + edits);
4122 continue;
4123 }
4124 if (isZeroLengthThenDelete(fs, edits)) continue;
4125
4126 long maxSeqId;
4127 String fileName = edits.getName();
4128 maxSeqId = Math.abs(Long.parseLong(fileName));
4129 if (maxSeqId <= minSeqIdForTheRegion) {
4130 if (LOG.isDebugEnabled()) {
4131 String msg = "Maximum sequenceid for this wal is " + maxSeqId
4132 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
4133 + ", skipped the whole file, path=" + edits;
4134 LOG.debug(msg);
4135 }
4136 continue;
4137 }
4138
4139 try {
4140
4141
4142 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
4143 } catch (IOException e) {
4144 boolean skipErrors = conf.getBoolean(
4145 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
4146 conf.getBoolean(
4147 "hbase.skip.errors",
4148 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
4149 if (conf.get("hbase.skip.errors") != null) {
4150 LOG.warn(
4151 "The property 'hbase.skip.errors' has been deprecated. Please use " +
4152 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
4153 }
4154 if (skipErrors) {
4155 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4156 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
4157 + "=true so continuing. Renamed " + edits +
4158 " as " + p, e);
4159 } else {
4160 throw e;
4161 }
4162 }
4163 }
4164
4165
4166 if (this.rsAccounting != null) {
4167 this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
4168 }
4169 if (seqid > minSeqIdForTheRegion) {
4170
4171 internalFlushcache(null, seqid, stores.values(), status, false);
4172 }
4173
4174 if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
4175
4176
4177
4178 String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
4179 Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
4180 for (Path file: files) {
4181 fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
4182 null, null));
4183 }
4184 getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
4185 } else {
4186 for (Path file: files) {
4187 if (!fs.delete(file, false)) {
4188 LOG.error("Failed delete of " + file);
4189 } else {
4190 LOG.debug("Deleted recovered.edits file=" + file);
4191 }
4192 }
4193 }
4194 return seqid;
4195 }
4196
4197
4198
4199
4200
4201
4202
4203
4204
4205
4206 private long replayRecoveredEdits(final Path edits,
4207 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
4208 throws IOException {
4209 String msg = "Replaying edits from " + edits;
4210 LOG.info(msg);
4211 MonitoredTask status = TaskMonitor.get().createStatus(msg);
4212 FileSystem fs = this.fs.getFileSystem();
4213
4214 status.setStatus("Opening recovered edits");
4215 WAL.Reader reader = null;
4216 try {
4217 reader = WALFactory.createReader(fs, edits, conf);
4218 long currentEditSeqId = -1;
4219 long currentReplaySeqId = -1;
4220 long firstSeqIdInLog = -1;
4221 long skippedEdits = 0;
4222 long editsCount = 0;
4223 long intervalEdits = 0;
4224 WAL.Entry entry;
4225 Store store = null;
4226 boolean reported_once = false;
4227 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
4228
4229 try {
4230
4231 int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
4232
4233 int period = this.conf.getInt("hbase.hstore.report.period", 300000);
4234 long lastReport = EnvironmentEdgeManager.currentTime();
4235
4236 while ((entry = reader.next()) != null) {
4237 WALKey key = entry.getKey();
4238 WALEdit val = entry.getEdit();
4239
4240 if (ng != null) {
4241 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
4242 }
4243
4244 if (reporter != null) {
4245 intervalEdits += val.size();
4246 if (intervalEdits >= interval) {
4247
4248 intervalEdits = 0;
4249 long cur = EnvironmentEdgeManager.currentTime();
4250 if (lastReport + period <= cur) {
4251 status.setStatus("Replaying edits..." +
4252 " skipped=" + skippedEdits +
4253 " edits=" + editsCount);
4254
4255 if(!reporter.progress()) {
4256 msg = "Progressable reporter failed, stopping replay";
4257 LOG.warn(msg);
4258 status.abort(msg);
4259 throw new IOException(msg);
4260 }
4261 reported_once = true;
4262 lastReport = cur;
4263 }
4264 }
4265 }
4266
4267 if (firstSeqIdInLog == -1) {
4268 firstSeqIdInLog = key.getLogSeqNum();
4269 }
4270 if (currentEditSeqId > key.getLogSeqNum()) {
4271
4272
4273 LOG.error(getRegionInfo().getEncodedName() + " : "
4274 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4275 + "; edit=" + val);
4276 } else {
4277 currentEditSeqId = key.getLogSeqNum();
4278 }
4279 currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4280 key.getOrigLogSeqNum() : currentEditSeqId;
4281
4282
4283
4284 if (coprocessorHost != null) {
4285 status.setStatus("Running pre-WAL-restore hook in coprocessors");
4286 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4287
4288 continue;
4289 }
4290 }
4291 boolean checkRowWithinBoundary = false;
4292
4293 if (!Bytes.equals(key.getEncodedRegionName(),
4294 this.getRegionInfo().getEncodedNameAsBytes())) {
4295 checkRowWithinBoundary = true;
4296 }
4297
4298 boolean flush = false;
4299 for (Cell cell: val.getCells()) {
4300
4301
4302 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4303
4304 if (!checkRowWithinBoundary) {
4305
4306 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4307 if (compaction != null) {
4308
4309 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4310 }
4311 }
4312 skippedEdits++;
4313 continue;
4314 }
4315
4316 if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4317 store = getStore(cell);
4318 }
4319 if (store == null) {
4320
4321
4322 LOG.warn("No family for " + cell);
4323 skippedEdits++;
4324 continue;
4325 }
4326 if (checkRowWithinBoundary && !rowIsInRange(this.getRegionInfo(),
4327 cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())) {
4328 LOG.warn("Row of " + cell + " is not within region boundary");
4329 skippedEdits++;
4330 continue;
4331 }
4332
4333 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4334 .getName())) {
4335 skippedEdits++;
4336 continue;
4337 }
4338 CellUtil.setSequenceId(cell, currentReplaySeqId);
4339
4340
4341
4342
4343 flush |= restoreEdit(store, cell);
4344 editsCount++;
4345 }
4346 if (flush) {
4347 internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4348 }
4349
4350 if (coprocessorHost != null) {
4351 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4352 }
4353 }
4354 } catch (EOFException eof) {
4355 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4356 msg = "Encountered EOF. Most likely due to Master failure during " +
4357 "wal splitting, so we have this data in another edit. " +
4358 "Continuing, but renaming " + edits + " as " + p;
4359 LOG.warn(msg, eof);
4360 status.abort(msg);
4361 } catch (IOException ioe) {
4362
4363
4364 if (ioe.getCause() instanceof ParseException) {
4365 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4366 msg = "File corruption encountered! " +
4367 "Continuing, but renaming " + edits + " as " + p;
4368 LOG.warn(msg, ioe);
4369 status.setStatus(msg);
4370 } else {
4371 status.abort(StringUtils.stringifyException(ioe));
4372
4373
4374 throw ioe;
4375 }
4376 }
4377 if (reporter != null && !reported_once) {
4378 reporter.progress();
4379 }
4380 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4381 ", firstSequenceIdInLog=" + firstSeqIdInLog +
4382 ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4383 status.markComplete(msg);
4384 LOG.debug(msg);
4385 return currentEditSeqId;
4386 } finally {
4387 status.cleanup();
4388 if (reader != null) {
4389 reader.close();
4390 }
4391 }
4392 }
4393
4394
4395
4396
4397
4398
4399 void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4400 boolean removeFiles, long replaySeqId)
4401 throws IOException {
4402 try {
4403 checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4404 "Compaction marker from WAL ", compaction);
4405 } catch (WrongRegionException wre) {
4406 if (RegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4407
4408 return;
4409 }
4410 throw wre;
4411 }
4412
4413 synchronized (writestate) {
4414 if (replaySeqId < lastReplayedOpenRegionSeqId) {
4415 LOG.warn(getRegionInfo().getEncodedName() + " : "
4416 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4417 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4418 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4419 return;
4420 }
4421 if (replaySeqId < lastReplayedCompactionSeqId) {
4422 LOG.warn(getRegionInfo().getEncodedName() + " : "
4423 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4424 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4425 + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4426 return;
4427 } else {
4428 lastReplayedCompactionSeqId = replaySeqId;
4429 }
4430
4431 if (LOG.isDebugEnabled()) {
4432 LOG.debug(getRegionInfo().getEncodedName() + " : "
4433 + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4434 + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4435 + lastReplayedOpenRegionSeqId);
4436 }
4437
4438 startRegionOperation(Operation.REPLAY_EVENT);
4439 try {
4440 Store store = this.getStore(compaction.getFamilyName().toByteArray());
4441 if (store == null) {
4442 LOG.warn(getRegionInfo().getEncodedName() + " : "
4443 + "Found Compaction WAL edit for deleted family:"
4444 + Bytes.toString(compaction.getFamilyName().toByteArray()));
4445 return;
4446 }
4447 store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4448 logRegionFiles();
4449 } catch (FileNotFoundException ex) {
4450 LOG.warn(getRegionInfo().getEncodedName() + " : "
4451 + "At least one of the store files in compaction: "
4452 + TextFormat.shortDebugString(compaction)
4453 + " doesn't exist any more. Skip loading the file(s)", ex);
4454 } finally {
4455 closeRegionOperation(Operation.REPLAY_EVENT);
4456 }
4457 }
4458 }
4459
4460 void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4461 checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4462 "Flush marker from WAL ", flush);
4463
4464 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4465 return;
4466 }
4467
4468 if (LOG.isDebugEnabled()) {
4469 LOG.debug(getRegionInfo().getEncodedName() + " : "
4470 + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4471 }
4472
4473 startRegionOperation(Operation.REPLAY_EVENT);
4474 try {
4475 FlushAction action = flush.getAction();
4476 switch (action) {
4477 case START_FLUSH:
4478 replayWALFlushStartMarker(flush);
4479 break;
4480 case COMMIT_FLUSH:
4481 replayWALFlushCommitMarker(flush);
4482 break;
4483 case ABORT_FLUSH:
4484 replayWALFlushAbortMarker(flush);
4485 break;
4486 case CANNOT_FLUSH:
4487 replayWALFlushCannotFlushMarker(flush, replaySeqId);
4488 break;
4489 default:
4490 LOG.warn(getRegionInfo().getEncodedName() + " : " +
4491 "Received a flush event with unknown action, ignoring. " +
4492 TextFormat.shortDebugString(flush));
4493 break;
4494 }
4495
4496 logRegionFiles();
4497 } finally {
4498 closeRegionOperation(Operation.REPLAY_EVENT);
4499 }
4500 }
4501
4502
4503
4504
4505
4506 @VisibleForTesting
4507 PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4508 long flushSeqId = flush.getFlushSequenceNumber();
4509
4510 HashSet<Store> storesToFlush = new HashSet<Store>();
4511 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4512 byte[] family = storeFlush.getFamilyName().toByteArray();
4513 Store store = getStore(family);
4514 if (store == null) {
4515 LOG.warn(getRegionInfo().getEncodedName() + " : "
4516 + "Received a flush start marker from primary, but the family is not found. Ignoring"
4517 + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4518 continue;
4519 }
4520 storesToFlush.add(store);
4521 }
4522
4523 MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4524
4525
4526
4527 synchronized (writestate) {
4528 try {
4529 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4530 LOG.warn(getRegionInfo().getEncodedName() + " : "
4531 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4532 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4533 + " of " + lastReplayedOpenRegionSeqId);
4534 return null;
4535 }
4536 if (numMutationsWithoutWAL.get() > 0) {
4537 numMutationsWithoutWAL.set(0);
4538 dataInMemoryWithoutWAL.set(0);
4539 }
4540
4541 if (!writestate.flushing) {
4542
4543
4544
4545
4546 PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4547 flushSeqId, storesToFlush, status, false);
4548 if (prepareResult.result == null) {
4549
4550 this.writestate.flushing = true;
4551 this.prepareFlushResult = prepareResult;
4552 status.markComplete("Flush prepare successful");
4553 if (LOG.isDebugEnabled()) {
4554 LOG.debug(getRegionInfo().getEncodedName() + " : "
4555 + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4556 }
4557 } else {
4558
4559
4560 if (prepareResult.getResult().getResult() ==
4561 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4562 this.writestate.flushing = true;
4563 this.prepareFlushResult = prepareResult;
4564 if (LOG.isDebugEnabled()) {
4565 LOG.debug(getRegionInfo().getEncodedName() + " : "
4566 + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4567 }
4568 }
4569 status.abort("Flush prepare failed with " + prepareResult.result);
4570
4571 }
4572 return prepareResult;
4573 } else {
4574
4575 if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4576
4577 LOG.warn(getRegionInfo().getEncodedName() + " : "
4578 + "Received a flush prepare marker with the same seqId: " +
4579 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4580 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4581
4582 } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4583
4584
4585 LOG.warn(getRegionInfo().getEncodedName() + " : "
4586 + "Received a flush prepare marker with a smaller seqId: " +
4587 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4588 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4589
4590 } else {
4591
4592 LOG.warn(getRegionInfo().getEncodedName() + " : "
4593 + "Received a flush prepare marker with a larger seqId: " +
4594 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4595 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4596
4597
4598
4599
4600
4601
4602
4603
4604
4605
4606
4607 }
4608 }
4609 } finally {
4610 status.cleanup();
4611 writestate.notifyAll();
4612 }
4613 }
4614 return null;
4615 }
4616
4617 @VisibleForTesting
4618 void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4619 MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4620
4621
4622
4623
4624
4625 synchronized (writestate) {
4626 try {
4627 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4628 LOG.warn(getRegionInfo().getEncodedName() + " : "
4629 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4630 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4631 + " of " + lastReplayedOpenRegionSeqId);
4632 return;
4633 }
4634
4635 if (writestate.flushing) {
4636 PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4637 if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4638 if (LOG.isDebugEnabled()) {
4639 LOG.debug(getRegionInfo().getEncodedName() + " : "
4640 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4641 + " and a previous prepared snapshot was found");
4642 }
4643
4644
4645 replayFlushInStores(flush, prepareFlushResult, true);
4646
4647
4648 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4649
4650 this.prepareFlushResult = null;
4651 writestate.flushing = false;
4652 } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4653
4654
4655
4656
4657 LOG.warn(getRegionInfo().getEncodedName() + " : "
4658 + "Received a flush commit marker with smaller seqId: "
4659 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4660 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4661 +" prepared memstore snapshot");
4662 replayFlushInStores(flush, prepareFlushResult, false);
4663
4664
4665
4666 } else {
4667
4668
4669
4670
4671
4672 LOG.warn(getRegionInfo().getEncodedName() + " : "
4673 + "Received a flush commit marker with larger seqId: "
4674 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4675 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4676 +" memstore snapshot");
4677
4678 replayFlushInStores(flush, prepareFlushResult, true);
4679
4680
4681 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4682
4683
4684
4685 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4686
4687 this.prepareFlushResult = null;
4688 writestate.flushing = false;
4689 }
4690
4691
4692
4693
4694
4695 this.setReadsEnabled(true);
4696 } else {
4697 LOG.warn(getRegionInfo().getEncodedName() + " : "
4698 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4699 + ", but no previous prepared snapshot was found");
4700
4701
4702 replayFlushInStores(flush, null, false);
4703
4704
4705
4706 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4707 }
4708
4709 status.markComplete("Flush commit successful");
4710
4711
4712 this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4713
4714
4715
4716
4717
4718
4719
4720 getMVCC().advanceMemstoreReadPointIfNeeded(flush.getFlushSequenceNumber());
4721
4722 } catch (FileNotFoundException ex) {
4723 LOG.warn(getRegionInfo().getEncodedName() + " : "
4724 + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4725 + " doesn't exist any more. Skip loading the file(s)", ex);
4726 }
4727 finally {
4728 status.cleanup();
4729 writestate.notifyAll();
4730 }
4731 }
4732
4733
4734
4735 synchronized (this) {
4736 notifyAll();
4737 }
4738 }
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748 private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4749 boolean dropMemstoreSnapshot)
4750 throws IOException {
4751 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4752 byte[] family = storeFlush.getFamilyName().toByteArray();
4753 Store store = getStore(family);
4754 if (store == null) {
4755 LOG.warn(getRegionInfo().getEncodedName() + " : "
4756 + "Received a flush commit marker from primary, but the family is not found."
4757 + "Ignoring StoreFlushDescriptor:" + storeFlush);
4758 continue;
4759 }
4760 List<String> flushFiles = storeFlush.getFlushOutputList();
4761 StoreFlushContext ctx = null;
4762 long startTime = EnvironmentEdgeManager.currentTime();
4763 if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4764 ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4765 } else {
4766 ctx = prepareFlushResult.storeFlushCtxs.get(family);
4767 startTime = prepareFlushResult.startTime;
4768 }
4769
4770 if (ctx == null) {
4771 LOG.warn(getRegionInfo().getEncodedName() + " : "
4772 + "Unexpected: flush commit marker received from store "
4773 + Bytes.toString(family) + " but no associated flush context. Ignoring");
4774 continue;
4775 }
4776
4777 ctx.replayFlush(flushFiles, dropMemstoreSnapshot);
4778
4779
4780 this.lastStoreFlushTimeMap.put(store, startTime);
4781 }
4782 }
4783
4784
4785
4786
4787
4788
4789
4790 private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4791 long totalFreedSize = 0;
4792 this.updatesLock.writeLock().lock();
4793 try {
4794 mvcc.waitForPreviousTransactionsComplete();
4795 long currentSeqId = getSequenceId().get();
4796 if (seqId >= currentSeqId) {
4797
4798 LOG.info(getRegionInfo().getEncodedName() + " : "
4799 + "Dropping memstore contents as well since replayed flush seqId: "
4800 + seqId + " is greater than current seqId:" + currentSeqId);
4801
4802
4803 if (store == null ) {
4804 for (Store s : stores.values()) {
4805 totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4806 }
4807 } else {
4808 totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4809 }
4810 } else {
4811 LOG.info(getRegionInfo().getEncodedName() + " : "
4812 + "Not dropping memstore contents since replayed flush seqId: "
4813 + seqId + " is smaller than current seqId:" + currentSeqId);
4814 }
4815 } finally {
4816 this.updatesLock.writeLock().unlock();
4817 }
4818 return totalFreedSize;
4819 }
4820
4821 private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4822 long snapshotSize = s.getFlushableSize();
4823 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4824 StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4825 ctx.prepare();
4826 ctx.abort();
4827 return snapshotSize;
4828 }
4829
4830 private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4831
4832
4833
4834 }
4835
4836 private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4837 synchronized (writestate) {
4838 if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4839 LOG.warn(getRegionInfo().getEncodedName() + " : "
4840 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4841 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4842 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4843 return;
4844 }
4845
4846
4847
4848
4849
4850
4851 this.setReadsEnabled(true);
4852 }
4853 }
4854
4855 @VisibleForTesting
4856 PrepareFlushResult getPrepareFlushResult() {
4857 return prepareFlushResult;
4858 }
4859
4860 void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4861 checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4862 "RegionEvent marker from WAL ", regionEvent);
4863
4864 startRegionOperation(Operation.REPLAY_EVENT);
4865 try {
4866 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4867 return;
4868 }
4869
4870 if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4871
4872 return;
4873 }
4874 if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4875 LOG.warn(getRegionInfo().getEncodedName() + " : "
4876 + "Unknown region event received, ignoring :"
4877 + TextFormat.shortDebugString(regionEvent));
4878 return;
4879 }
4880
4881 if (LOG.isDebugEnabled()) {
4882 LOG.debug(getRegionInfo().getEncodedName() + " : "
4883 + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4884 }
4885
4886
4887 synchronized (writestate) {
4888
4889
4890
4891
4892
4893
4894 if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4895 this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4896 } else {
4897 LOG.warn(getRegionInfo().getEncodedName() + " : "
4898 + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4899 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4900 + " of " + lastReplayedOpenRegionSeqId);
4901 return;
4902 }
4903
4904
4905
4906 for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4907
4908 byte[] family = storeDescriptor.getFamilyName().toByteArray();
4909 Store store = getStore(family);
4910 if (store == null) {
4911 LOG.warn(getRegionInfo().getEncodedName() + " : "
4912 + "Received a region open marker from primary, but the family is not found. "
4913 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4914 continue;
4915 }
4916
4917 long storeSeqId = store.getMaxSequenceId();
4918 List<String> storeFiles = storeDescriptor.getStoreFileList();
4919 try {
4920 store.refreshStoreFiles(storeFiles);
4921 } catch (FileNotFoundException ex) {
4922 LOG.warn(getRegionInfo().getEncodedName() + " : "
4923 + "At least one of the store files: " + storeFiles
4924 + " doesn't exist any more. Skip loading the file(s)", ex);
4925 continue;
4926 }
4927 if (store.getMaxSequenceId() != storeSeqId) {
4928
4929 lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4930 }
4931
4932 if (writestate.flushing) {
4933
4934 if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4935 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4936 null : this.prepareFlushResult.storeFlushCtxs.get(family);
4937 if (ctx != null) {
4938 long snapshotSize = store.getFlushableSize();
4939 ctx.abort();
4940 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4941 this.prepareFlushResult.storeFlushCtxs.remove(family);
4942 }
4943 }
4944 }
4945
4946
4947 dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4948 if (storeSeqId > this.maxFlushedSeqId) {
4949 this.maxFlushedSeqId = storeSeqId;
4950 }
4951 }
4952
4953
4954
4955 dropPrepareFlushIfPossible();
4956
4957
4958
4959
4960 getMVCC().advanceMemstoreReadPointIfNeeded(this.maxFlushedSeqId);
4961
4962
4963
4964 this.setReadsEnabled(true);
4965
4966
4967
4968 synchronized (this) {
4969 notifyAll();
4970 }
4971 }
4972 logRegionFiles();
4973 } finally {
4974 closeRegionOperation(Operation.REPLAY_EVENT);
4975 }
4976 }
4977
4978 void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4979 checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4980 "BulkLoad marker from WAL ", bulkLoadEvent);
4981
4982 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4983 return;
4984 }
4985
4986 if (LOG.isDebugEnabled()) {
4987 LOG.debug(getRegionInfo().getEncodedName() + " : "
4988 + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4989 }
4990
4991 boolean multipleFamilies = false;
4992 byte[] family = null;
4993 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4994 byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4995 if (family == null) {
4996 family = fam;
4997 } else if (!Bytes.equals(family, fam)) {
4998 multipleFamilies = true;
4999 break;
5000 }
5001 }
5002
5003 startBulkRegionOperation(multipleFamilies);
5004 try {
5005
5006 synchronized (writestate) {
5007
5008
5009
5010
5011
5012
5013 if (bulkLoadEvent.getBulkloadSeqNum() >= 0
5014 && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
5015 LOG.warn(getRegionInfo().getEncodedName() + " : "
5016 + "Skipping replaying bulkload event :"
5017 + TextFormat.shortDebugString(bulkLoadEvent)
5018 + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
5019 + " =" + lastReplayedOpenRegionSeqId);
5020
5021 return;
5022 }
5023
5024 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
5025
5026 family = storeDescriptor.getFamilyName().toByteArray();
5027 Store store = getStore(family);
5028 if (store == null) {
5029 LOG.warn(getRegionInfo().getEncodedName() + " : "
5030 + "Received a bulk load marker from primary, but the family is not found. "
5031 + "Ignoring. StoreDescriptor:" + storeDescriptor);
5032 continue;
5033 }
5034
5035 List<String> storeFiles = storeDescriptor.getStoreFileList();
5036 for (String storeFile : storeFiles) {
5037 StoreFileInfo storeFileInfo = null;
5038 try {
5039 storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
5040 store.bulkLoadHFile(storeFileInfo);
5041 } catch(FileNotFoundException ex) {
5042 LOG.warn(getRegionInfo().getEncodedName() + " : "
5043 + ((storeFileInfo != null) ? storeFileInfo.toString() :
5044 (new Path(Bytes.toString(family), storeFile)).toString())
5045 + " doesn't exist any more. Skip loading the file");
5046 }
5047 }
5048 }
5049 }
5050 if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
5051 getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum());
5052 }
5053 } finally {
5054 closeBulkRegionOperation();
5055 }
5056 }
5057
5058
5059
5060
5061 private void dropPrepareFlushIfPossible() {
5062 if (writestate.flushing) {
5063 boolean canDrop = true;
5064 if (prepareFlushResult.storeFlushCtxs != null) {
5065 for (Entry<byte[], StoreFlushContext> entry
5066 : prepareFlushResult.storeFlushCtxs.entrySet()) {
5067 Store store = getStore(entry.getKey());
5068 if (store == null) {
5069 continue;
5070 }
5071 if (store.getSnapshotSize() > 0) {
5072 canDrop = false;
5073 break;
5074 }
5075 }
5076 }
5077
5078
5079
5080 if (canDrop) {
5081 writestate.flushing = false;
5082 this.prepareFlushResult = null;
5083 }
5084 }
5085 }
5086
5087 @Override
5088 public boolean refreshStoreFiles() throws IOException {
5089 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
5090 return false;
5091 }
5092
5093 if (LOG.isDebugEnabled()) {
5094 LOG.debug(getRegionInfo().getEncodedName() + " : "
5095 + "Refreshing store files to see whether we can free up memstore");
5096 }
5097
5098 long totalFreedSize = 0;
5099
5100 long smallestSeqIdInStores = Long.MAX_VALUE;
5101
5102 startRegionOperation();
5103 try {
5104 synchronized (writestate) {
5105 for (Store store : getStores()) {
5106
5107
5108 long maxSeqIdBefore = store.getMaxSequenceId();
5109
5110
5111 store.refreshStoreFiles();
5112
5113 long storeSeqId = store.getMaxSequenceId();
5114 if (storeSeqId < smallestSeqIdInStores) {
5115 smallestSeqIdInStores = storeSeqId;
5116 }
5117
5118
5119 if (storeSeqId > maxSeqIdBefore) {
5120
5121 if (writestate.flushing) {
5122
5123 if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
5124 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
5125 null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
5126 if (ctx != null) {
5127 long snapshotSize = store.getFlushableSize();
5128 ctx.abort();
5129 this.addAndGetGlobalMemstoreSize(-snapshotSize);
5130 this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
5131 totalFreedSize += snapshotSize;
5132 }
5133 }
5134 }
5135
5136
5137 totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
5138 }
5139 }
5140
5141
5142
5143 dropPrepareFlushIfPossible();
5144
5145
5146
5147
5148 for (Store s : getStores()) {
5149 getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS());
5150 }
5151
5152
5153
5154
5155
5156 if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
5157 this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
5158 }
5159 }
5160
5161
5162 synchronized (this) {
5163 notifyAll();
5164 }
5165 return totalFreedSize > 0;
5166 } finally {
5167 closeRegionOperation();
5168 }
5169 }
5170
5171 private void logRegionFiles() {
5172 if (LOG.isTraceEnabled()) {
5173 LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
5174 for (Store s : stores.values()) {
5175 Collection<StoreFile> storeFiles = s.getStorefiles();
5176 if (storeFiles == null) continue;
5177 for (StoreFile sf : storeFiles) {
5178 LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
5179 }
5180 }
5181 }
5182 }
5183
5184
5185
5186
5187 private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
5188 throws WrongRegionException {
5189 if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
5190 return;
5191 }
5192
5193 if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
5194 Bytes.equals(encodedRegionName,
5195 this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
5196 return;
5197 }
5198
5199 throw new WrongRegionException(exceptionMsg + payload
5200 + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
5201 + " does not match this region: " + this.getRegionInfo());
5202 }
5203
5204
5205
5206
5207
5208
5209
5210 protected boolean restoreEdit(final Store s, final Cell cell) {
5211 long kvSize = s.add(cell).getFirst();
5212 if (this.rsAccounting != null) {
5213 rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
5214 }
5215 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
5216 }
5217
5218
5219
5220
5221
5222
5223
5224 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
5225 throws IOException {
5226 FileStatus stat = fs.getFileStatus(p);
5227 if (stat.getLen() > 0) return false;
5228 LOG.warn("File " + p + " is zero-length, deleting.");
5229 fs.delete(p, false);
5230 return true;
5231 }
5232
5233 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
5234 if (family.isMobEnabled()) {
5235 if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
5236 throw new IOException("A minimum HFile version of "
5237 + HFile.MIN_FORMAT_VERSION_WITH_TAGS
5238 + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
5239 + " accordingly.");
5240 }
5241 return new HMobStore(this, family, this.conf);
5242 }
5243 return new HStore(this, family, this.conf);
5244 }
5245
5246 @Override
5247 public Store getStore(final byte[] column) {
5248 return this.stores.get(column);
5249 }
5250
5251
5252
5253
5254
5255 private Store getStore(Cell cell) {
5256 for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
5257 if (Bytes.equals(
5258 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
5259 famStore.getKey(), 0, famStore.getKey().length)) {
5260 return famStore.getValue();
5261 }
5262 }
5263
5264 return null;
5265 }
5266
5267 @Override
5268 public List<Store> getStores() {
5269 List<Store> list = new ArrayList<Store>(stores.size());
5270 list.addAll(stores.values());
5271 return list;
5272 }
5273
5274 @Override
5275 public List<String> getStoreFileList(final byte [][] columns)
5276 throws IllegalArgumentException {
5277 List<String> storeFileNames = new ArrayList<String>();
5278 synchronized(closeLock) {
5279 for(byte[] column : columns) {
5280 Store store = this.stores.get(column);
5281 if (store == null) {
5282 throw new IllegalArgumentException("No column family : " +
5283 new String(column) + " available");
5284 }
5285 Collection<StoreFile> storeFiles = store.getStorefiles();
5286 if (storeFiles == null) continue;
5287 for (StoreFile storeFile: storeFiles) {
5288 storeFileNames.add(storeFile.getPath().toString());
5289 }
5290
5291 logRegionFiles();
5292 }
5293 }
5294 return storeFileNames;
5295 }
5296
5297
5298
5299
5300
5301
5302 void checkRow(final byte [] row, String op) throws IOException {
5303 if (!rowIsInRange(getRegionInfo(), row)) {
5304 throw new WrongRegionException("Requested row out of range for " +
5305 op + " on HRegion " + this + ", startKey='" +
5306 Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5307 Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5308 Bytes.toStringBinary(row) + "'");
5309 }
5310 }
5311
5312 @Override
5313 public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
5314 startRegionOperation();
5315 try {
5316 return getRowLockInternal(row, waitForLock);
5317 } finally {
5318 closeRegionOperation();
5319 }
5320 }
5321
5322
5323
5324
5325
5326 protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
5327 HashedBytes rowKey = new HashedBytes(row);
5328 RowLockContext rowLockContext = new RowLockContext(rowKey);
5329
5330
5331 while (true) {
5332 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5333 if (existingContext == null) {
5334
5335 break;
5336 } else if (existingContext.ownedByCurrentThread()) {
5337
5338 rowLockContext = existingContext;
5339 break;
5340 } else {
5341 if (!waitForLock) {
5342 return null;
5343 }
5344 TraceScope traceScope = null;
5345 try {
5346 if (Trace.isTracing()) {
5347 traceScope = Trace.startSpan("HRegion.getRowLockInternal");
5348 }
5349
5350 if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
5351 if(traceScope != null) {
5352 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5353 }
5354 throw new IOException("Timed out waiting for lock for row: " + rowKey);
5355 }
5356 rowLockContext.setThreadName(Thread.currentThread().getName());
5357 if (traceScope != null) traceScope.close();
5358 traceScope = null;
5359 } catch (InterruptedException ie) {
5360 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5361 InterruptedIOException iie = new InterruptedIOException();
5362 iie.initCause(ie);
5363 throw iie;
5364 } finally {
5365 if (traceScope != null) traceScope.close();
5366 }
5367 }
5368 }
5369
5370
5371 return rowLockContext.newLock();
5372 }
5373
5374
5375
5376
5377
5378
5379
5380 public RowLock getRowLock(byte[] row) throws IOException {
5381 return getRowLock(row, true);
5382 }
5383
5384 @Override
5385 public void releaseRowLocks(List<RowLock> rowLocks) {
5386 if (rowLocks != null) {
5387 for (RowLock rowLock : rowLocks) {
5388 rowLock.release();
5389 }
5390 rowLocks.clear();
5391 }
5392 }
5393
5394
5395
5396
5397
5398
5399
5400 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5401 boolean multipleFamilies = false;
5402 byte[] family = null;
5403 for (Pair<byte[], String> pair : familyPaths) {
5404 byte[] fam = pair.getFirst();
5405 if (family == null) {
5406 family = fam;
5407 } else if (!Bytes.equals(family, fam)) {
5408 multipleFamilies = true;
5409 break;
5410 }
5411 }
5412 return multipleFamilies;
5413 }
5414
5415 @Override
5416 public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5417 BulkLoadListener bulkLoadListener) throws IOException {
5418 return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
5419 }
5420
5421 @Override
5422 public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
5423 boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
5424 long seqId = -1;
5425 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5426 Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
5427 Preconditions.checkNotNull(familyPaths);
5428
5429 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5430 boolean isSuccessful = false;
5431 try {
5432 this.writeRequestsCount.increment();
5433
5434
5435
5436
5437 List<IOException> ioes = new ArrayList<IOException>();
5438 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5439 for (Pair<byte[], String> p : familyPaths) {
5440 byte[] familyName = p.getFirst();
5441 String path = p.getSecond();
5442
5443 Store store = getStore(familyName);
5444 if (store == null) {
5445 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5446 "No such column family " + Bytes.toStringBinary(familyName));
5447 ioes.add(ioe);
5448 } else {
5449 try {
5450 store.assertBulkLoadHFileOk(new Path(path));
5451 } catch (WrongRegionException wre) {
5452
5453 failures.add(p);
5454 } catch (IOException ioe) {
5455
5456 ioes.add(ioe);
5457 }
5458 }
5459 }
5460
5461
5462 if (ioes.size() != 0) {
5463 IOException e = MultipleIOException.createIOException(ioes);
5464 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5465 throw e;
5466 }
5467
5468
5469 if (failures.size() != 0) {
5470 StringBuilder list = new StringBuilder();
5471 for (Pair<byte[], String> p : failures) {
5472 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5473 .append(p.getSecond());
5474 }
5475
5476 LOG.warn("There was a recoverable bulk load failure likely due to a" +
5477 " split. These (family, HFile) pairs were not loaded: " + list);
5478 return null;
5479 }
5480
5481
5482
5483
5484
5485
5486 if (assignSeqId) {
5487 FlushResult fs = flushcache(true, false);
5488 if (fs.isFlushSucceeded()) {
5489 seqId = ((FlushResultImpl)fs).flushSequenceId;
5490 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5491 seqId = ((FlushResultImpl)fs).flushSequenceId;
5492 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH) {
5493
5494
5495 waitForFlushes();
5496 } else {
5497 throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5498 "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5499 }
5500 }
5501
5502 Map<byte[], List<Pair<Path, Path>>> familyWithFinalPath =
5503 new TreeMap<>(Bytes.BYTES_COMPARATOR);
5504 for (Pair<byte[], String> p : familyPaths) {
5505 byte[] familyName = p.getFirst();
5506 String path = p.getSecond();
5507 Store store = getStore(familyName);
5508 if (!familyWithFinalPath.containsKey(familyName)) {
5509 familyWithFinalPath.put(familyName, new ArrayList<Pair<Path, Path>>());
5510 }
5511 List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
5512 try {
5513 String finalPath = path;
5514 if (bulkLoadListener != null) {
5515 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
5516 }
5517 Pair<Path, Path> pair = ((HStore)store).preBulkLoadHFile(finalPath, seqId);
5518 lst.add(pair);
5519 } catch (IOException ioe) {
5520
5521
5522
5523 LOG.error("There was a partial failure due to IO when attempting to" +
5524 " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5525 if (bulkLoadListener != null) {
5526 try {
5527 bulkLoadListener.failedBulkLoad(familyName, path);
5528 } catch (Exception ex) {
5529 LOG.error("Error while calling failedBulkLoad for family " +
5530 Bytes.toString(familyName) + " with path " + path, ex);
5531 }
5532 }
5533 throw ioe;
5534 }
5535 }
5536
5537 if (this.getCoprocessorHost() != null) {
5538 for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
5539 this.getCoprocessorHost().preCommitStoreFile(entry.getKey(), entry.getValue());
5540 }
5541 }
5542 for (Map.Entry<byte[], List<Pair<Path, Path>>> entry : familyWithFinalPath.entrySet()) {
5543 byte[] familyName = entry.getKey();
5544 for (Pair<Path, Path> p : entry.getValue()) {
5545 String path = p.getFirst().toString();
5546 Path commitedStoreFile = p.getSecond();
5547 Store store = getStore(familyName);
5548 try {
5549 store.bulkLoadHFile(familyName, path, commitedStoreFile);
5550
5551 try {
5552 FileSystem fs = commitedStoreFile.getFileSystem(baseConf);
5553 storeFilesSizes.put(commitedStoreFile.getName(), fs.getFileStatus(commitedStoreFile)
5554 .getLen());
5555 } catch (IOException e) {
5556 LOG.warn("Failed to find the size of hfile " + commitedStoreFile);
5557 storeFilesSizes.put(commitedStoreFile.getName(), 0L);
5558 }
5559
5560 if(storeFiles.containsKey(familyName)) {
5561 storeFiles.get(familyName).add(commitedStoreFile);
5562 } else {
5563 List<Path> storeFileNames = new ArrayList<Path>();
5564 storeFileNames.add(commitedStoreFile);
5565 storeFiles.put(familyName, storeFileNames);
5566 }
5567 if (bulkLoadListener != null) {
5568 bulkLoadListener.doneBulkLoad(familyName, path);
5569 }
5570 } catch (IOException ioe) {
5571
5572
5573
5574
5575 LOG.error("There was a partial failure due to IO when attempting to" +
5576 " load " + p.getFirst() + " : " + p.getSecond(), ioe);
5577 if (bulkLoadListener != null) {
5578 try {
5579 bulkLoadListener.failedBulkLoad(familyName, path);
5580 } catch (Exception ex) {
5581 LOG.error("Error while calling failedBulkLoad for family " +
5582 Bytes.toString(familyName) + " with path " + path, ex);
5583 }
5584 }
5585 throw ioe;
5586 }
5587 }
5588
5589 }
5590
5591 isSuccessful = true;
5592 } finally {
5593 if (wal != null && !storeFiles.isEmpty()) {
5594
5595 try {
5596 WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5597 this.getRegionInfo().getTable(),
5598 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles,
5599 storeFilesSizes, seqId);
5600 WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5601 loadDescriptor, sequenceId);
5602 } catch (IOException ioe) {
5603 if (this.rsServices != null) {
5604
5605
5606 isSuccessful = false;
5607 this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5608 }
5609 }
5610 }
5611
5612 closeBulkRegionOperation();
5613 }
5614 return isSuccessful ? storeFiles : null;
5615 }
5616
5617 @Override
5618 public boolean equals(Object o) {
5619 return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5620 ((HRegion) o).getRegionInfo().getRegionName());
5621 }
5622
5623 @Override
5624 public int hashCode() {
5625 return Bytes.hashCode(getRegionInfo().getRegionName());
5626 }
5627
5628 @Override
5629 public String toString() {
5630 return getRegionInfo().getRegionNameAsString();
5631 }
5632
5633
5634
5635
5636 class RegionScannerImpl implements RegionScanner {
5637
5638 KeyValueHeap storeHeap = null;
5639
5640
5641 KeyValueHeap joinedHeap = null;
5642
5643
5644
5645 protected Cell joinedContinuationRow = null;
5646 protected final byte[] stopRow;
5647 private final FilterWrapper filter;
5648 private ScannerContext defaultScannerContext;
5649 protected int isScan;
5650 private boolean filterClosed = false;
5651 private long readPt;
5652 private long maxResultSize;
5653 protected HRegion region;
5654
5655 @Override
5656 public HRegionInfo getRegionInfo() {
5657 return region.getRegionInfo();
5658 }
5659
5660 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
5661 throws IOException {
5662
5663 this.region = region;
5664 this.maxResultSize = scan.getMaxResultSize();
5665 if (scan.hasFilter()) {
5666 this.filter = new FilterWrapper(scan.getFilter());
5667 } else {
5668 this.filter = null;
5669 }
5670
5671
5672
5673
5674
5675
5676 defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
5677
5678 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5679 this.stopRow = null;
5680 } else {
5681 this.stopRow = scan.getStopRow();
5682 }
5683
5684
5685 this.isScan = scan.isGetScan() ? -1 : 0;
5686
5687
5688
5689 IsolationLevel isolationLevel = scan.getIsolationLevel();
5690 synchronized(scannerReadPoints) {
5691 this.readPt = getReadpoint(isolationLevel);
5692 scannerReadPoints.put(this, this.readPt);
5693 }
5694
5695
5696
5697 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
5698 List<KeyValueScanner> joinedScanners
5699 = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
5700
5701 List<KeyValueScanner> instantiatedScanners = new ArrayList<KeyValueScanner>();
5702 if (additionalScanners != null && !additionalScanners.isEmpty()) {
5703 scanners.addAll(additionalScanners);
5704 instantiatedScanners.addAll(additionalScanners);
5705 }
5706
5707 try {
5708 for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
5709 scan.getFamilyMap().entrySet()) {
5710 Store store = stores.get(entry.getKey());
5711 KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5712 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5713 || this.filter.isFamilyEssential(entry.getKey())) {
5714 scanners.add(scanner);
5715 } else {
5716 joinedScanners.add(scanner);
5717 }
5718 }
5719 initializeKVHeap(scanners, joinedScanners, region);
5720 } catch (Throwable t) {
5721 throw handleException(instantiatedScanners, t);
5722 }
5723 }
5724
5725 protected void initializeKVHeap(List<KeyValueScanner> scanners,
5726 List<KeyValueScanner> joinedScanners, HRegion region)
5727 throws IOException {
5728 this.storeHeap = new KeyValueHeap(scanners, region.comparator);
5729 if (!joinedScanners.isEmpty()) {
5730 this.joinedHeap = new KeyValueHeap(joinedScanners, region.comparator);
5731 }
5732 }
5733
5734 private IOException handleException(List<KeyValueScanner> instantiatedScanners,
5735 Throwable t) {
5736 scannerReadPoints.remove(this);
5737 if (storeHeap != null) {
5738 storeHeap.close();
5739 storeHeap = null;
5740 if (joinedHeap != null) {
5741 joinedHeap.close();
5742 joinedHeap = null;
5743 }
5744 } else {
5745 for (KeyValueScanner scanner : instantiatedScanners) {
5746 scanner.close();
5747 }
5748 }
5749 return t instanceof IOException ? (IOException) t : new IOException(t);
5750 }
5751
5752 @Override
5753 public long getMaxResultSize() {
5754 return maxResultSize;
5755 }
5756
5757 @Override
5758 public long getMvccReadPoint() {
5759 return this.readPt;
5760 }
5761
5762 @Override
5763 public int getBatch() {
5764 return this.defaultScannerContext.getBatchLimit();
5765 }
5766
5767
5768
5769
5770
5771
5772 protected void resetFilters() throws IOException {
5773 if (filter != null) {
5774 filter.reset();
5775 }
5776 }
5777
5778 @Override
5779 public boolean next(List<Cell> outResults)
5780 throws IOException {
5781
5782 return next(outResults, defaultScannerContext);
5783 }
5784
5785 @Override
5786 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException {
5787 if (this.filterClosed) {
5788 throw new UnknownScannerException("Scanner was closed (timed out?) " +
5789 "after we renewed it. Could be caused by a very slow scanner " +
5790 "or a lengthy garbage collection");
5791 }
5792 startRegionOperation(Operation.SCAN);
5793 readRequestsCount.increment();
5794 try {
5795 return nextRaw(outResults, scannerContext);
5796 } finally {
5797 closeRegionOperation(Operation.SCAN);
5798 }
5799 }
5800
5801 @Override
5802 public boolean nextRaw(List<Cell> outResults) throws IOException {
5803
5804 return nextRaw(outResults, defaultScannerContext);
5805 }
5806
5807 @Override
5808 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5809 throws IOException {
5810 if (storeHeap == null) {
5811
5812 throw new UnknownScannerException("Scanner was closed");
5813 }
5814 boolean moreValues;
5815 if (outResults.isEmpty()) {
5816
5817
5818 moreValues = nextInternal(outResults, scannerContext);
5819 } else {
5820 List<Cell> tmpList = new ArrayList<Cell>();
5821 moreValues = nextInternal(tmpList, scannerContext);
5822 outResults.addAll(tmpList);
5823 }
5824
5825
5826
5827
5828 if (!scannerContext.partialResultFormed()) resetFilters();
5829
5830 if (isFilterDoneInternal()) {
5831 moreValues = false;
5832 }
5833 return moreValues;
5834 }
5835
5836
5837
5838
5839 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5840 throws IOException {
5841 assert joinedContinuationRow != null;
5842 boolean moreValues =
5843 populateResult(results, this.joinedHeap, scannerContext,
5844 joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(),
5845 joinedContinuationRow.getRowLength());
5846
5847 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5848
5849 joinedContinuationRow = null;
5850 }
5851
5852
5853 Collections.sort(results, comparator);
5854 return moreValues;
5855 }
5856
5857
5858
5859
5860
5861
5862
5863
5864
5865
5866
5867 private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5868 ScannerContext scannerContext, byte[] currentRow, int offset, short length)
5869 throws IOException {
5870 Cell nextKv;
5871 boolean moreCellsInRow = false;
5872 boolean tmpKeepProgress = scannerContext.getKeepProgress();
5873
5874 LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5875 do {
5876
5877
5878
5879 scannerContext.setKeepProgress(true);
5880 heap.next(results, scannerContext);
5881 scannerContext.setKeepProgress(tmpKeepProgress);
5882
5883 nextKv = heap.peek();
5884 moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
5885
5886 if (scannerContext.checkBatchLimit(limitScope)) {
5887 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5888 } else if (scannerContext.checkSizeLimit(limitScope)) {
5889 ScannerContext.NextState state =
5890 moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
5891 return scannerContext.setScannerState(state).hasMoreValues();
5892 } else if (scannerContext.checkTimeLimit(limitScope)) {
5893 ScannerContext.NextState state =
5894 moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
5895 return scannerContext.setScannerState(state).hasMoreValues();
5896 }
5897 } while (moreCellsInRow);
5898
5899 return nextKv != null;
5900 }
5901
5902
5903
5904
5905
5906
5907
5908
5909
5910
5911
5912 private boolean moreCellsInRow(final Cell nextKv, byte[] currentRow, int offset,
5913 short length) {
5914 return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length);
5915 }
5916
5917
5918
5919
5920 @Override
5921 public synchronized boolean isFilterDone() throws IOException {
5922 return isFilterDoneInternal();
5923 }
5924
5925 private boolean isFilterDoneInternal() throws IOException {
5926 return this.filter != null && this.filter.filterAllRemaining();
5927 }
5928
5929 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5930 throws IOException {
5931 if (!results.isEmpty()) {
5932 throw new IllegalArgumentException("First parameter should be an empty list");
5933 }
5934 if (scannerContext == null) {
5935 throw new IllegalArgumentException("Scanner context cannot be null");
5936 }
5937 RpcCallContext rpcCall = RpcServer.getCurrentCall();
5938
5939
5940
5941
5942 int initialBatchProgress = scannerContext.getBatchProgress();
5943 long initialSizeProgress = scannerContext.getSizeProgress();
5944 long initialTimeProgress = scannerContext.getTimeProgress();
5945
5946
5947
5948
5949
5950
5951 while (true) {
5952
5953
5954 if (scannerContext.getKeepProgress()) {
5955
5956 scannerContext
5957 .setProgress(initialBatchProgress, initialSizeProgress, initialTimeProgress);
5958 } else {
5959 scannerContext.clearProgress();
5960 }
5961
5962 if (rpcCall != null) {
5963
5964
5965
5966
5967 long afterTime = rpcCall.disconnectSince();
5968 if (afterTime >= 0) {
5969 throw new CallerDisconnectedException(
5970 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5971 this + " after " + afterTime + " ms, since " +
5972 "caller disconnected");
5973 }
5974 }
5975
5976
5977 Cell current = this.storeHeap.peek();
5978
5979 byte[] currentRow = null;
5980 int offset = 0;
5981 short length = 0;
5982 if (current != null) {
5983 currentRow = current.getRowArray();
5984 offset = current.getRowOffset();
5985 length = current.getRowLength();
5986 }
5987
5988 boolean stopRow = isStopRow(currentRow, offset, length);
5989
5990
5991
5992
5993 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5994
5995
5996
5997
5998
5999 if (hasFilterRow) {
6000 if (LOG.isTraceEnabled()) {
6001 LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
6002 + " formed. Changing scope of limits that may create partials");
6003 }
6004 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
6005 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
6006 }
6007
6008
6009
6010 if (joinedContinuationRow == null) {
6011
6012 if (stopRow) {
6013 if (hasFilterRow) {
6014 filter.filterRowCells(results);
6015 }
6016 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6017 }
6018
6019
6020
6021 if (filterRowKey(currentRow, offset, length)) {
6022 boolean moreRows = nextRow(currentRow, offset, length);
6023 if (!moreRows) {
6024 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6025 }
6026 results.clear();
6027 continue;
6028 }
6029
6030
6031 populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);
6032
6033 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
6034 if (hasFilterRow) {
6035 throw new IncompatibleFilterException(
6036 "Filter whose hasFilterRow() returns true is incompatible with scans that must "
6037 + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
6038 }
6039 return true;
6040 }
6041
6042 Cell nextKv = this.storeHeap.peek();
6043 stopRow = nextKv == null ||
6044 isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());
6045
6046 final boolean isEmptyRow = results.isEmpty();
6047
6048
6049
6050 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
6051 if (hasFilterRow) {
6052 ret = filter.filterRowCellsWithRet(results);
6053
6054
6055
6056
6057 long timeProgress = scannerContext.getTimeProgress();
6058 if (scannerContext.getKeepProgress()) {
6059 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
6060 initialTimeProgress);
6061 } else {
6062 scannerContext.clearProgress();
6063 }
6064 scannerContext.setTimeProgress(timeProgress);
6065 scannerContext.incrementBatchProgress(results.size());
6066 for (Cell cell : results) {
6067 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
6068 }
6069 }
6070
6071 if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {
6072 results.clear();
6073 boolean moreRows = nextRow(currentRow, offset, length);
6074 if (!moreRows) {
6075 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6076 }
6077
6078
6079
6080 if (!stopRow) continue;
6081 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6082 }
6083
6084
6085
6086
6087
6088 if (this.joinedHeap != null) {
6089 boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);
6090 if (mayHaveData) {
6091 joinedContinuationRow = current;
6092 populateFromJoinedHeap(results, scannerContext);
6093
6094 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
6095 return true;
6096 }
6097 }
6098 }
6099 } else {
6100
6101 populateFromJoinedHeap(results, scannerContext);
6102 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
6103 return true;
6104 }
6105 }
6106
6107
6108 if (joinedContinuationRow != null) {
6109 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
6110 }
6111
6112
6113
6114
6115 if (results.isEmpty()) {
6116 boolean moreRows = nextRow(currentRow, offset, length);
6117 if (!moreRows) {
6118 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6119 }
6120 if (!stopRow) continue;
6121 }
6122
6123
6124 if (stopRow) {
6125 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
6126 } else {
6127 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
6128 }
6129 }
6130 }
6131
6132
6133
6134
6135
6136
6137
6138
6139 private boolean joinedHeapMayHaveData(byte[] currentRow, int offset, short length)
6140 throws IOException {
6141 Cell nextJoinedKv = joinedHeap.peek();
6142 boolean matchCurrentRow =
6143 nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRow, offset, length);
6144 boolean matchAfterSeek = false;
6145
6146
6147
6148 if (!matchCurrentRow) {
6149 Cell firstOnCurrentRow = KeyValueUtil.createFirstOnRow(currentRow, offset, length);
6150 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
6151 matchAfterSeek =
6152 seekSuccessful && joinedHeap.peek() != null
6153 && CellUtil.matchingRow(joinedHeap.peek(), currentRow, offset, length);
6154 }
6155
6156 return matchCurrentRow || matchAfterSeek;
6157 }
6158
6159
6160
6161
6162
6163
6164
6165
6166 private boolean filterRow() throws IOException {
6167
6168
6169 return filter != null && (!filter.hasFilterRow())
6170 && filter.filterRow();
6171 }
6172
6173 private boolean filterRowKey(byte[] row, int offset, short length) throws IOException {
6174 return filter != null
6175 && filter.filterRowKey(row, offset, length);
6176 }
6177
6178 protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {
6179 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
6180 Cell next;
6181 while ((next = this.storeHeap.peek()) != null &&
6182 CellUtil.matchingRow(next, currentRow, offset, length)) {
6183 this.storeHeap.next(MOCKED_LIST);
6184 }
6185 resetFilters();
6186
6187 return this.region.getCoprocessorHost() == null
6188 || this.region.getCoprocessorHost()
6189 .postScannerFilterRow(this, currentRow, offset, length);
6190 }
6191
6192 protected boolean isStopRow(byte[] currentRow, int offset, short length) {
6193 return currentRow == null ||
6194 (stopRow != null &&
6195 comparator.compareRows(stopRow, 0, stopRow.length,
6196 currentRow, offset, length) <= isScan);
6197 }
6198
6199 @Override
6200 public synchronized void close() {
6201 if (storeHeap != null) {
6202 storeHeap.close();
6203 storeHeap = null;
6204 }
6205 if (joinedHeap != null) {
6206 joinedHeap.close();
6207 joinedHeap = null;
6208 }
6209
6210 scannerReadPoints.remove(this);
6211 this.filterClosed = true;
6212 }
6213
6214 KeyValueHeap getStoreHeapForTesting() {
6215 return storeHeap;
6216 }
6217
6218 @Override
6219 public synchronized boolean reseek(byte[] row) throws IOException {
6220 if (row == null) {
6221 throw new IllegalArgumentException("Row cannot be null.");
6222 }
6223 boolean result = false;
6224 startRegionOperation();
6225 try {
6226 KeyValue kv = KeyValueUtil.createFirstOnRow(row);
6227
6228 result = this.storeHeap.requestSeek(kv, true, true);
6229 if (this.joinedHeap != null) {
6230 result = this.joinedHeap.requestSeek(kv, true, true) || result;
6231 }
6232 } finally {
6233 closeRegionOperation();
6234 }
6235 return result;
6236 }
6237 }
6238
6239
6240
6241
6242
6243
6244
6245
6246
6247
6248
6249
6250
6251
6252
6253
6254
6255
6256
6257
6258 static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
6259 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
6260 RegionServerServices rsServices) {
6261 try {
6262 @SuppressWarnings("unchecked")
6263 Class<? extends HRegion> regionClass =
6264 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
6265
6266 Constructor<? extends HRegion> c =
6267 regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
6268 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
6269 RegionServerServices.class);
6270
6271 return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
6272 } catch (Throwable e) {
6273
6274 throw new IllegalStateException("Could not instantiate a region instance.", e);
6275 }
6276 }
6277
6278
6279
6280
6281
6282
6283
6284
6285
6286
6287
6288
6289
6290
6291
6292
6293
6294 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6295 final Configuration conf, final HTableDescriptor hTableDescriptor)
6296 throws IOException {
6297 return createHRegion(info, rootDir, conf, hTableDescriptor, null);
6298 }
6299
6300
6301
6302
6303
6304
6305
6306
6307
6308
6309
6310 public static void closeHRegion(final HRegion r) throws IOException {
6311 if (r == null) return;
6312 r.close();
6313 if (r.getWAL() == null) return;
6314 r.getWAL().close();
6315 }
6316
6317
6318
6319
6320
6321
6322
6323
6324
6325
6326
6327
6328
6329
6330 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6331 final Configuration conf,
6332 final HTableDescriptor hTableDescriptor,
6333 final WAL wal,
6334 final boolean initialize)
6335 throws IOException {
6336 return createHRegion(info, rootDir, conf, hTableDescriptor,
6337 wal, initialize, false);
6338 }
6339
6340
6341
6342
6343
6344
6345
6346
6347
6348
6349
6350
6351
6352
6353
6354 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6355 final Configuration conf,
6356 final HTableDescriptor hTableDescriptor,
6357 final WAL wal,
6358 final boolean initialize, final boolean ignoreWAL)
6359 throws IOException {
6360 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6361 return createHRegion(info, rootDir, tableDir, conf, hTableDescriptor, wal, initialize,
6362 ignoreWAL);
6363 }
6364
6365
6366
6367
6368
6369
6370
6371
6372
6373
6374
6375
6376
6377
6378
6379
6380 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
6381 final Configuration conf,
6382 final HTableDescriptor hTableDescriptor,
6383 final WAL wal,
6384 final boolean initialize, final boolean ignoreWAL)
6385 throws IOException {
6386 LOG.info("creating HRegion " + info.getTable().getNameAsString()
6387 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6388 " Table name == " + info.getTable().getNameAsString());
6389 FileSystem fs = FileSystem.get(conf);
6390 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6391 WAL effectiveWAL = wal;
6392 if (wal == null && !ignoreWAL) {
6393
6394
6395
6396 Configuration confForWAL = new Configuration(conf);
6397 FSUtils.setRootDir(confForWAL, rootDir);
6398 effectiveWAL = (new WALFactory(confForWAL,
6399 Collections.<WALActionsListener>singletonList(new MetricsWAL()),
6400 "hregion-" + RandomStringUtils.randomNumeric(8))).
6401 getWAL(info.getEncodedNameAsBytes());
6402 }
6403 HRegion region = HRegion.newHRegion(tableDir,
6404 effectiveWAL, fs, conf, info, hTableDescriptor, null);
6405 if (initialize) {
6406
6407
6408 region.setSequenceId(region.initialize(null));
6409 }
6410 return region;
6411 }
6412
6413 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6414 final Configuration conf,
6415 final HTableDescriptor hTableDescriptor,
6416 final WAL wal)
6417 throws IOException {
6418 return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6419 }
6420
6421
6422
6423
6424
6425
6426
6427
6428
6429
6430
6431
6432
6433 public static HRegion openHRegion(final HRegionInfo info,
6434 final HTableDescriptor htd, final WAL wal,
6435 final Configuration conf)
6436 throws IOException {
6437 return openHRegion(info, htd, wal, conf, null, null);
6438 }
6439
6440
6441
6442
6443
6444
6445
6446
6447
6448
6449
6450
6451
6452
6453
6454
6455 public static HRegion openHRegion(final HRegionInfo info,
6456 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6457 final RegionServerServices rsServices,
6458 final CancelableProgressable reporter)
6459 throws IOException {
6460 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6461 }
6462
6463
6464
6465
6466
6467
6468
6469
6470
6471
6472
6473
6474
6475
6476 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6477 final HTableDescriptor htd, final WAL wal, final Configuration conf)
6478 throws IOException {
6479 return openHRegion(rootDir, info, htd, wal, conf, null, null);
6480 }
6481
6482
6483
6484
6485
6486
6487
6488
6489
6490
6491
6492
6493
6494
6495
6496
6497 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6498 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6499 final RegionServerServices rsServices,
6500 final CancelableProgressable reporter)
6501 throws IOException {
6502 FileSystem fs = null;
6503 if (rsServices != null) {
6504 fs = rsServices.getFileSystem();
6505 }
6506 if (fs == null) {
6507 fs = FileSystem.get(conf);
6508 }
6509 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6510 }
6511
6512
6513
6514
6515
6516
6517
6518
6519
6520
6521
6522
6523
6524
6525
6526 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6527 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6528 throws IOException {
6529 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6530 }
6531
6532
6533
6534
6535
6536
6537
6538
6539
6540
6541
6542
6543
6544
6545
6546
6547
6548 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6549 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6550 final RegionServerServices rsServices, final CancelableProgressable reporter)
6551 throws IOException {
6552 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6553 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6554 }
6555
6556
6557
6558
6559
6560
6561
6562
6563
6564
6565
6566
6567
6568
6569
6570
6571
6572 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6573 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6574 final WAL wal, final RegionServerServices rsServices,
6575 final CancelableProgressable reporter)
6576 throws IOException {
6577 if (info == null) throw new NullPointerException("Passed region info is null");
6578 if (LOG.isDebugEnabled()) {
6579 LOG.debug("Opening region: " + info);
6580 }
6581 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6582 return r.openHRegion(reporter);
6583 }
6584
6585
6586
6587
6588
6589
6590
6591
6592
6593 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6594 throws IOException {
6595 HRegionFileSystem regionFs = other.getRegionFileSystem();
6596 HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6597 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6598 return r.openHRegion(reporter);
6599 }
6600
6601 public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6602 throws IOException {
6603 return openHRegion((HRegion)other, reporter);
6604 }
6605
6606
6607
6608
6609
6610
6611
6612 protected HRegion openHRegion(final CancelableProgressable reporter)
6613 throws IOException {
6614
6615 checkCompressionCodecs();
6616
6617
6618 checkEncryption();
6619
6620 checkClassLoading();
6621 this.openSeqNum = initialize(reporter);
6622 this.setSequenceId(openSeqNum);
6623 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6624 && !isRecovering) {
6625
6626
6627
6628 writeRegionOpenMarker(wal, openSeqNum);
6629 }
6630 return this;
6631 }
6632
6633 public static void warmupHRegion(final HRegionInfo info,
6634 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6635 final RegionServerServices rsServices,
6636 final CancelableProgressable reporter)
6637 throws IOException {
6638
6639 if (info == null) throw new NullPointerException("Passed region info is null");
6640
6641 if (LOG.isDebugEnabled()) {
6642 LOG.debug("HRegion.Warming up region: " + info);
6643 }
6644
6645 Path rootDir = FSUtils.getRootDir(conf);
6646 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6647
6648 FileSystem fs = null;
6649 if (rsServices != null) {
6650 fs = rsServices.getFileSystem();
6651 }
6652 if (fs == null) {
6653 fs = FileSystem.get(conf);
6654 }
6655
6656 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6657 r.initializeWarmup(reporter);
6658 r.close();
6659 }
6660
6661
6662 private void checkCompressionCodecs() throws IOException {
6663 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6664 CompressionTest.testCompression(fam.getCompression());
6665 CompressionTest.testCompression(fam.getCompactionCompression());
6666 }
6667 }
6668
6669 private void checkEncryption() throws IOException {
6670 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6671 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6672 }
6673 }
6674
6675 private void checkClassLoading() throws IOException {
6676 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6677 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6678 }
6679
6680
6681
6682
6683
6684
6685 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6686
6687 fs.commitDaughterRegion(hri);
6688
6689
6690 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6691 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6692 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6693 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6694 return r;
6695 }
6696
6697
6698
6699
6700
6701
6702
6703 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6704 final HRegion region_b) throws IOException {
6705 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6706 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6707 this.getTableDesc(), this.rsServices);
6708 r.readRequestsCount.set(this.getReadRequestsCount()
6709 + region_b.getReadRequestsCount());
6710 r.writeRequestsCount.set(this.getWriteRequestsCount()
6711
6712 + region_b.getWriteRequestsCount());
6713 this.fs.commitMergedRegion(mergedRegionInfo);
6714 return r;
6715 }
6716
6717
6718
6719
6720
6721
6722
6723
6724
6725
6726
6727
6728 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6729 meta.checkResources();
6730
6731 byte[] row = r.getRegionInfo().getRegionName();
6732 final long now = EnvironmentEdgeManager.currentTime();
6733 final List<Cell> cells = new ArrayList<Cell>(2);
6734 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6735 HConstants.REGIONINFO_QUALIFIER, now,
6736 r.getRegionInfo().toByteArray()));
6737
6738 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6739 HConstants.META_VERSION_QUALIFIER, now,
6740 Bytes.toBytes(HConstants.META_VERSION)));
6741 meta.put(row, HConstants.CATALOG_FAMILY, cells);
6742 }
6743
6744
6745
6746
6747
6748
6749
6750
6751 @Deprecated
6752 public static Path getRegionDir(final Path tabledir, final String name) {
6753 return new Path(tabledir, name);
6754 }
6755
6756
6757
6758
6759
6760
6761
6762
6763 @Deprecated
6764 @VisibleForTesting
6765 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6766 return new Path(
6767 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6768 }
6769
6770
6771
6772
6773
6774
6775
6776
6777
6778 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6779 return ((info.getStartKey().length == 0) ||
6780 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6781 ((info.getEndKey().length == 0) ||
6782 (Bytes.compareTo(info.getEndKey(), row) > 0));
6783 }
6784
6785 public static boolean rowIsInRange(HRegionInfo info, final byte [] row, final int offset,
6786 final short length) {
6787 return ((info.getStartKey().length == 0) ||
6788 (Bytes.compareTo(info.getStartKey(), 0, info.getStartKey().length,
6789 row, offset, length) <= 0)) &&
6790 ((info.getEndKey().length == 0) ||
6791 (Bytes.compareTo(info.getEndKey(), 0, info.getEndKey().length, row, offset, length) > 0));
6792 }
6793
6794
6795
6796
6797
6798
6799
6800 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6801 throws IOException {
6802 HRegion a = srcA;
6803 HRegion b = srcB;
6804
6805
6806
6807 if (srcA.getRegionInfo().getStartKey() == null) {
6808 if (srcB.getRegionInfo().getStartKey() == null) {
6809 throw new IOException("Cannot merge two regions with null start key");
6810 }
6811
6812 } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6813 (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6814 srcB.getRegionInfo().getStartKey()) > 0)) {
6815 a = srcB;
6816 b = srcA;
6817 }
6818
6819 if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6820 b.getRegionInfo().getStartKey()) == 0)) {
6821 throw new IOException("Cannot merge non-adjacent regions");
6822 }
6823 return merge(a, b);
6824 }
6825
6826
6827
6828
6829
6830
6831
6832
6833
6834 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6835 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6836 throw new IOException("Regions do not belong to the same table");
6837 }
6838
6839 FileSystem fs = a.getRegionFileSystem().getFileSystem();
6840
6841 a.flush(true);
6842 b.flush(true);
6843
6844
6845 a.compact(true);
6846 if (LOG.isDebugEnabled()) {
6847 LOG.debug("Files for region: " + a);
6848 a.getRegionFileSystem().logFileSystemState(LOG);
6849 }
6850 b.compact(true);
6851 if (LOG.isDebugEnabled()) {
6852 LOG.debug("Files for region: " + b);
6853 b.getRegionFileSystem().logFileSystemState(LOG);
6854 }
6855
6856 RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6857 if (!rmt.prepare(null)) {
6858 throw new IOException("Unable to merge regions " + a + " and " + b);
6859 }
6860 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6861 LOG.info("starting merge of regions: " + a + " and " + b
6862 + " into new region " + mergedRegionInfo.getRegionNameAsString()
6863 + " with start key <"
6864 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6865 + "> and end key <"
6866 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6867 HRegion dstRegion;
6868 try {
6869 dstRegion = (HRegion)rmt.execute(null, null);
6870 } catch (IOException ioe) {
6871 rmt.rollback(null, null);
6872 throw new IOException("Failed merging region " + a + " and " + b
6873 + ", and successfully rolled back");
6874 }
6875 dstRegion.compact(true);
6876
6877 if (LOG.isDebugEnabled()) {
6878 LOG.debug("Files for new region");
6879 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6880 }
6881
6882 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6883 throw new IOException("Merged region " + dstRegion
6884 + " still has references after the compaction, is compaction canceled?");
6885 }
6886
6887
6888 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6889
6890 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6891
6892 LOG.info("merge completed. New region is " + dstRegion);
6893 return dstRegion;
6894 }
6895
6896 @Override
6897 public Result get(final Get get) throws IOException {
6898 checkRow(get.getRow(), "Get");
6899
6900 if (get.hasFamilies()) {
6901 for (byte [] family: get.familySet()) {
6902 checkFamily(family);
6903 }
6904 } else {
6905 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6906 get.addFamily(family);
6907 }
6908 }
6909 List<Cell> results = get(get, true);
6910 boolean stale = this.getRegionInfo().getReplicaId() != 0;
6911 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6912 }
6913
6914 @Override
6915 public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6916
6917 List<Cell> results = new ArrayList<Cell>();
6918
6919
6920 if (withCoprocessor && (coprocessorHost != null)) {
6921 if (coprocessorHost.preGet(get, results)) {
6922 return results;
6923 }
6924 }
6925 long before = EnvironmentEdgeManager.currentTime();
6926 Scan scan = new Scan(get);
6927
6928 RegionScanner scanner = null;
6929 try {
6930 scanner = getScanner(scan);
6931 scanner.next(results);
6932 } finally {
6933 if (scanner != null)
6934 scanner.close();
6935 }
6936
6937
6938 if (withCoprocessor && (coprocessorHost != null)) {
6939 coprocessorHost.postGet(get, results);
6940 }
6941
6942 metricsUpdateForGet(results, before);
6943
6944 return results;
6945 }
6946
6947 void metricsUpdateForGet(List<Cell> results, long before) {
6948 if (this.metricsRegion != null) {
6949 this.metricsRegion.updateGet(EnvironmentEdgeManager.currentTime() - before);
6950 }
6951
6952 }
6953
6954 @Override
6955 public void mutateRow(RowMutations rm) throws IOException {
6956
6957 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6958 }
6959
6960
6961
6962
6963
6964 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6965 Collection<byte[]> rowsToLock) throws IOException {
6966 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6967 }
6968
6969
6970
6971
6972
6973
6974
6975
6976
6977
6978
6979
6980
6981 @Override
6982 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6983 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6984 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6985 processRowsWithLocks(proc, -1, nonceGroup, nonce);
6986 }
6987
6988
6989
6990
6991 public ClientProtos.RegionLoadStats getRegionStats() {
6992 if (!regionStatsEnabled) {
6993 return null;
6994 }
6995 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
6996 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
6997 .memstoreFlushSize)));
6998 stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
6999 return stats.build();
7000 }
7001
7002 @Override
7003 public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
7004 processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
7005 HConstants.NO_NONCE);
7006 }
7007
7008 @Override
7009 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
7010 throws IOException {
7011 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
7012 }
7013
7014 @Override
7015 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
7016 long nonceGroup, long nonce) throws IOException {
7017
7018 for (byte[] row : processor.getRowsToLock()) {
7019 checkRow(row, "processRowsWithLocks");
7020 }
7021 if (!processor.readOnly()) {
7022 checkReadOnly();
7023 }
7024 checkResources();
7025
7026 startRegionOperation();
7027 WALEdit walEdit = new WALEdit();
7028
7029
7030 try {
7031 processor.preProcess(this, walEdit);
7032 } catch (IOException e) {
7033 closeRegionOperation();
7034 throw e;
7035 }
7036
7037 if (processor.readOnly()) {
7038 try {
7039 long now = EnvironmentEdgeManager.currentTime();
7040 doProcessRowWithTimeout(
7041 processor, now, this, null, null, timeout);
7042 processor.postProcess(this, walEdit, true);
7043 } finally {
7044 closeRegionOperation();
7045 }
7046 return;
7047 }
7048
7049 MultiVersionConsistencyControl.WriteEntry writeEntry = null;
7050 boolean locked;
7051 boolean walSyncSuccessful = false;
7052 List<RowLock> acquiredRowLocks;
7053 long addedSize = 0;
7054 List<Mutation> mutations = new ArrayList<Mutation>();
7055 List<Cell> memstoreCells = new ArrayList<Cell>();
7056 Collection<byte[]> rowsToLock = processor.getRowsToLock();
7057 long mvccNum = 0;
7058 WALKey walKey = null;
7059 try {
7060
7061 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
7062 for (byte[] row : rowsToLock) {
7063
7064 acquiredRowLocks.add(getRowLock(row));
7065 }
7066
7067 lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
7068 locked = true;
7069
7070 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
7071
7072 long now = EnvironmentEdgeManager.currentTime();
7073 try {
7074
7075
7076 doProcessRowWithTimeout(
7077 processor, now, this, mutations, walEdit, timeout);
7078
7079 if (!mutations.isEmpty()) {
7080
7081 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
7082
7083 processor.preBatchMutate(this, walEdit);
7084
7085 for (Mutation m : mutations) {
7086
7087 rewriteCellTags(m.getFamilyCellMap(), m);
7088
7089 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
7090 Cell cell = cellScanner.current();
7091 CellUtil.setSequenceId(cell, mvccNum);
7092 Store store = getStore(cell);
7093 if (store == null) {
7094 checkFamily(CellUtil.cloneFamily(cell));
7095
7096 }
7097 Pair<Long, Cell> ret = store.add(cell);
7098 addedSize += ret.getFirst();
7099 memstoreCells.add(ret.getSecond());
7100 }
7101 }
7102
7103 long txid = 0;
7104
7105 if (!walEdit.isEmpty()) {
7106
7107 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7108 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
7109 processor.getClusterIds(), nonceGroup, nonce);
7110 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7111 walKey, walEdit, getSequenceId(), true, memstoreCells);
7112 }
7113 if(walKey == null){
7114
7115
7116 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7117 }
7118
7119 if (locked) {
7120 this.updatesLock.readLock().unlock();
7121 locked = false;
7122 }
7123
7124
7125 releaseRowLocks(acquiredRowLocks);
7126
7127
7128 if (txid != 0) {
7129 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
7130 }
7131 walSyncSuccessful = true;
7132
7133 processor.postBatchMutate(this);
7134 }
7135 } finally {
7136
7137
7138
7139 if (!mutations.isEmpty() && !walSyncSuccessful) {
7140 String row = processor.getRowsToLock().isEmpty() ? "" :
7141 " for row(s):" + StringUtils.byteToHexString(processor.getRowsToLock().iterator().next())
7142 + "...";
7143 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
7144 " memstore keyvalues" + row);
7145 for (Mutation m : mutations) {
7146 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
7147 Cell cell = cellScanner.current();
7148 getStore(cell).rollback(cell);
7149 }
7150 }
7151 if (writeEntry != null) {
7152 mvcc.cancelMemstoreInsert(writeEntry);
7153 writeEntry = null;
7154 }
7155 }
7156
7157 if (writeEntry != null) {
7158 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7159 }
7160 if (locked) {
7161 this.updatesLock.readLock().unlock();
7162 }
7163
7164 releaseRowLocks(acquiredRowLocks);
7165 }
7166
7167
7168 processor.postProcess(this, walEdit, walSyncSuccessful);
7169
7170 } finally {
7171 closeRegionOperation();
7172 if (!mutations.isEmpty() &&
7173 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
7174 requestFlush();
7175 }
7176 }
7177 }
7178
7179 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
7180 final long now,
7181 final HRegion region,
7182 final List<Mutation> mutations,
7183 final WALEdit walEdit,
7184 final long timeout) throws IOException {
7185
7186 if (timeout < 0) {
7187 try {
7188 processor.process(now, region, mutations, walEdit);
7189 } catch (IOException e) {
7190 String row = processor.getRowsToLock().isEmpty() ? "" :
7191 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7192 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7193 " throws Exception" + row, e);
7194 throw e;
7195 }
7196 return;
7197 }
7198
7199
7200 FutureTask<Void> task =
7201 new FutureTask<Void>(new Callable<Void>() {
7202 @Override
7203 public Void call() throws IOException {
7204 try {
7205 processor.process(now, region, mutations, walEdit);
7206 return null;
7207 } catch (IOException e) {
7208 String row = processor.getRowsToLock().isEmpty() ? "" :
7209 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7210 LOG.warn("RowProcessor:" + processor.getClass().getName() +
7211 " throws Exception" + row, e);
7212 throw e;
7213 }
7214 }
7215 });
7216 rowProcessorExecutor.execute(task);
7217 try {
7218 task.get(timeout, TimeUnit.MILLISECONDS);
7219 } catch (TimeoutException te) {
7220 String row = processor.getRowsToLock().isEmpty() ? "" :
7221 " on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
7222 LOG.error("RowProcessor timeout:" + timeout + " ms" + row);
7223 throw new IOException(te);
7224 } catch (Exception e) {
7225 throw new IOException(e);
7226 }
7227 }
7228
7229 public Result append(Append append) throws IOException {
7230 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
7231 }
7232
7233
7234
7235
7236
7237 @Override
7238 public Result append(Append append, long nonceGroup, long nonce) throws IOException {
7239 byte[] row = append.getRow();
7240 checkRow(row, "append");
7241 boolean flush = false;
7242 Durability durability = getEffectiveDurability(append.getDurability());
7243 boolean writeToWAL = durability != Durability.SKIP_WAL;
7244 WALEdit walEdits = null;
7245 List<Cell> allKVs = new ArrayList<Cell>(append.size());
7246 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
7247 long size = 0;
7248 long txid = 0;
7249
7250 checkReadOnly();
7251 checkResources();
7252
7253 startRegionOperation(Operation.APPEND);
7254 this.writeRequestsCount.increment();
7255 long mvccNum = 0;
7256 WriteEntry writeEntry = null;
7257 WALKey walKey = null;
7258 RowLock rowLock = null;
7259 List<Cell> memstoreCells = new ArrayList<Cell>();
7260 boolean doRollBackMemstore = false;
7261 try {
7262 rowLock = getRowLock(row);
7263 try {
7264 lock(this.updatesLock.readLock());
7265 try {
7266
7267
7268 mvcc.waitForPreviousTransactionsComplete();
7269 if (this.coprocessorHost != null) {
7270 Result r = this.coprocessorHost.preAppendAfterRowLock(append);
7271 if(r!= null) {
7272 return r;
7273 }
7274 }
7275
7276 mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
7277 writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
7278 long now = EnvironmentEdgeManager.currentTime();
7279
7280 for (Map.Entry<byte[], List<Cell>> family : append.getFamilyCellMap().entrySet()) {
7281
7282 Store store = stores.get(family.getKey());
7283 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
7284
7285
7286
7287
7288
7289 Collections.sort(family.getValue(), store.getComparator());
7290
7291 Get get = new Get(row);
7292 for (Cell cell : family.getValue()) {
7293 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
7294 }
7295 List<Cell> results = get(get, false);
7296
7297
7298
7299
7300
7301
7302
7303 int idx = 0;
7304 for (Cell cell : family.getValue()) {
7305 Cell newCell;
7306 Cell oldCell = null;
7307 if (idx < results.size()
7308 && CellUtil.matchingQualifier(results.get(idx), cell)) {
7309 oldCell = results.get(idx);
7310 long ts = Math.max(now, oldCell.getTimestamp());
7311
7312
7313 List<Tag> tags = Tag.carryForwardTags(null, oldCell);
7314 tags = Tag.carryForwardTags(tags, cell);
7315 tags = carryForwardTTLTag(tags, append);
7316
7317
7318 byte[] tagBytes = Tag.fromList(tags);
7319
7320
7321 newCell = new KeyValue(row.length, cell.getFamilyLength(),
7322 cell.getQualifierLength(), ts, KeyValue.Type.Put,
7323 oldCell.getValueLength() + cell.getValueLength(),
7324 tagBytes == null? 0: tagBytes.length);
7325
7326 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
7327 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
7328 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
7329 newCell.getFamilyArray(), newCell.getFamilyOffset(),
7330 cell.getFamilyLength());
7331 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
7332 newCell.getQualifierArray(), newCell.getQualifierOffset(),
7333 cell.getQualifierLength());
7334
7335 System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
7336 newCell.getValueArray(), newCell.getValueOffset(),
7337 oldCell.getValueLength());
7338 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7339 newCell.getValueArray(),
7340 newCell.getValueOffset() + oldCell.getValueLength(),
7341 cell.getValueLength());
7342
7343 if (tagBytes != null) {
7344 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7345 tagBytes.length);
7346 }
7347 idx++;
7348 } else {
7349
7350 CellUtil.updateLatestStamp(cell, now);
7351
7352
7353
7354 if (append.getTTL() != Long.MAX_VALUE) {
7355
7356 newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
7357 cell.getRowLength(),
7358 cell.getFamilyArray(), cell.getFamilyOffset(),
7359 cell.getFamilyLength(),
7360 cell.getQualifierArray(), cell.getQualifierOffset(),
7361 cell.getQualifierLength(),
7362 cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
7363 cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
7364 carryForwardTTLTag(append));
7365 } else {
7366 newCell = cell;
7367 }
7368 }
7369
7370 CellUtil.setSequenceId(newCell, mvccNum);
7371
7372 if (coprocessorHost != null) {
7373 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7374 append, oldCell, newCell);
7375 }
7376 kvs.add(newCell);
7377
7378
7379 if (writeToWAL) {
7380 if (walEdits == null) {
7381 walEdits = new WALEdit();
7382 }
7383 walEdits.add(newCell);
7384 }
7385 }
7386
7387
7388 tempMemstore.put(store, kvs);
7389 }
7390
7391
7392 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7393 Store store = entry.getKey();
7394 if (store.getFamily().getMaxVersions() == 1) {
7395
7396 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7397 memstoreCells.addAll(entry.getValue());
7398 } else {
7399
7400 for (Cell cell: entry.getValue()) {
7401 Pair<Long, Cell> ret = store.add(cell);
7402 size += ret.getFirst();
7403 memstoreCells.add(ret.getSecond());
7404 doRollBackMemstore = true;
7405 }
7406 }
7407 allKVs.addAll(entry.getValue());
7408 }
7409
7410
7411 if (writeToWAL) {
7412
7413
7414
7415
7416 walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
7417 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7418 txid = this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits,
7419 this.sequenceId, true, memstoreCells);
7420 } else {
7421 recordMutationWithoutWal(append.getFamilyCellMap());
7422 }
7423 if (walKey == null) {
7424
7425 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7426 }
7427 size = this.addAndGetGlobalMemstoreSize(size);
7428 flush = isFlushSize(size);
7429 } finally {
7430 this.updatesLock.readLock().unlock();
7431 }
7432 } finally {
7433 rowLock.release();
7434 rowLock = null;
7435 }
7436
7437 if(txid != 0){
7438 syncOrDefer(txid, durability);
7439 }
7440 doRollBackMemstore = false;
7441 } finally {
7442 if (rowLock != null) {
7443 rowLock.release();
7444 }
7445
7446 if (doRollBackMemstore) {
7447 rollbackMemstore(memstoreCells);
7448 if (writeEntry != null) mvcc.cancelMemstoreInsert(writeEntry);
7449 } else if (writeEntry != null) {
7450 mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7451 }
7452
7453 closeRegionOperation(Operation.APPEND);
7454 }
7455
7456 if (this.metricsRegion != null) {
7457 this.metricsRegion.updateAppend();
7458 }
7459
7460 if (flush) {
7461
7462 requestFlush();
7463 }
7464
7465
7466 return append.isReturnResults() ? Result.create(allKVs) : null;
7467 }
7468
7469 public Result increment(Increment increment) throws IOException {
7470 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7471 }
7472
7473
7474
7475
7476
7477 @Override
7478 public Result increment(Increment increment, long nonceGroup, long nonce)
7479 throws IOException {
7480 checkReadOnly();
7481 checkResources();
7482 checkRow(increment.getRow(), "increment");
7483 checkFamilies(increment.getFamilyCellMap().keySet());
7484 startRegionOperation(Operation.INCREMENT);
7485 this.writeRequestsCount.increment();
7486 try {
7487 return doIncrement(increment, nonceGroup, nonce);
7488 } finally {
7489 if (this.metricsRegion != null) this.metricsRegion.updateIncrement();
7490 closeRegionOperation(Operation.INCREMENT);
7491 }
7492 }
7493
7494 private Result doIncrement(Increment increment, long nonceGroup, long nonce) throws IOException {
7495 RowLock rowLock = null;
7496 WriteEntry writeEntry = null;
7497 WALKey walKey = null;
7498 boolean doRollBackMemstore = false;
7499 long accumulatedResultSize = 0;
7500 List<Cell> allKVs = new ArrayList<Cell>(increment.size());
7501 List<Cell> memstoreCells = new ArrayList<Cell>();
7502 Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
7503 try {
7504 rowLock = getRowLock(increment.getRow());
7505 long txid = 0;
7506 try {
7507 lock(this.updatesLock.readLock());
7508 try {
7509
7510
7511 this.mvcc.waitForPreviousTransactionsComplete();
7512 if (this.coprocessorHost != null) {
7513 Result r = this.coprocessorHost.preIncrementAfterRowLock(increment);
7514 if (r != null) return r;
7515 }
7516
7517 long mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
7518 writeEntry = this.mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
7519
7520
7521 long now = EnvironmentEdgeManager.currentTime();
7522 final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
7523 WALEdit walEdits = null;
7524 for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
7525 byte [] columnFamilyName = entry.getKey();
7526 List<Cell> increments = entry.getValue();
7527 Store store = this.stores.get(columnFamilyName);
7528
7529
7530 List<Cell> results = applyIncrementsToColumnFamily(increment, columnFamilyName,
7531 sort(increments, store.getComparator()), now, mvccNum, allKVs, null);
7532 if (!results.isEmpty()) {
7533
7534 if (writeToWAL) {
7535
7536
7537 int resultsSize = results.size();
7538 for (int i = 0; i < resultsSize; i++) {
7539 if (walEdits == null) walEdits = new WALEdit();
7540 walEdits.add(results.get(i));
7541 }
7542 }
7543
7544 if (store.getFamily().getMaxVersions() == 1) {
7545
7546 accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
7547 memstoreCells.addAll(results);
7548
7549 } else {
7550
7551 for (Cell cell: results) {
7552 Pair<Long, Cell> ret = store.add(cell);
7553 accumulatedResultSize += ret.getFirst();
7554 memstoreCells.add(ret.getSecond());
7555 doRollBackMemstore = true;
7556 }
7557 }
7558 }
7559 }
7560
7561
7562 if (walEdits != null && !walEdits.isEmpty()) {
7563 if (writeToWAL) {
7564
7565
7566
7567 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7568 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, nonceGroup, nonce);
7569 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7570 walKey, walEdits, getSequenceId(), true, memstoreCells);
7571 } else {
7572 recordMutationWithoutWal(increment.getFamilyCellMap());
7573 }
7574 }
7575 if (walKey == null) {
7576
7577 walKey = this.appendEmptyEdit(this.wal, memstoreCells);
7578 }
7579 } finally {
7580 this.updatesLock.readLock().unlock();
7581 }
7582 } finally {
7583 rowLock.release();
7584 rowLock = null;
7585 }
7586
7587 if (txid != 0) syncOrDefer(txid, effectiveDurability);
7588 doRollBackMemstore = false;
7589 } finally {
7590 if (rowLock != null) rowLock.release();
7591
7592 if (doRollBackMemstore) rollbackMemstore(memstoreCells);
7593 if (writeEntry != null) mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
7594 }
7595
7596 if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
7597 return increment.isReturnResults() ? Result.create(allKVs) : null;
7598 }
7599
7600
7601
7602
7603 private static List<Cell> sort(List<Cell> cells, final Comparator<Cell> comparator) {
7604 Collections.sort(cells, comparator);
7605 return cells;
7606 }
7607
7608
7609
7610
7611
7612
7613
7614
7615
7616
7617
7618
7619 private List<Cell> applyIncrementsToColumnFamily(Increment increment, byte[] columnFamilyName,
7620 List<Cell> sortedIncrements, long now, long mvccNum, List<Cell> allKVs,
7621 final IsolationLevel isolation)
7622 throws IOException {
7623 List<Cell> results = new ArrayList<Cell>(sortedIncrements.size());
7624 byte [] row = increment.getRow();
7625
7626 List<Cell> currentValues =
7627 getIncrementCurrentValue(increment, columnFamilyName, sortedIncrements, isolation);
7628
7629
7630 int idx = 0;
7631 for (int i = 0; i < sortedIncrements.size(); i++) {
7632 Cell inc = sortedIncrements.get(i);
7633 long incrementAmount = getLongValue(inc);
7634
7635 boolean writeBack = (incrementAmount != 0);
7636
7637 List<Tag> tags = Tag.carryForwardTags(inc);
7638
7639 Cell currentValue = null;
7640 long ts = now;
7641 if (idx < currentValues.size() && CellUtil.matchingQualifier(currentValues.get(idx), inc)) {
7642 currentValue = currentValues.get(idx);
7643 ts = Math.max(now, currentValue.getTimestamp());
7644 incrementAmount += getLongValue(currentValue);
7645
7646 tags = Tag.carryForwardTags(tags, currentValue);
7647 if (i < (sortedIncrements.size() - 1) &&
7648 !CellUtil.matchingQualifier(inc, sortedIncrements.get(i + 1))) idx++;
7649 }
7650
7651
7652 byte [] qualifier = CellUtil.cloneQualifier(inc);
7653 byte [] incrementAmountInBytes = Bytes.toBytes(incrementAmount);
7654 tags = carryForwardTTLTag(tags, increment);
7655
7656 Cell newValue = new KeyValue(row, 0, row.length,
7657 columnFamilyName, 0, columnFamilyName.length,
7658 qualifier, 0, qualifier.length,
7659 ts, KeyValue.Type.Put,
7660 incrementAmountInBytes, 0, incrementAmountInBytes.length,
7661 tags);
7662
7663
7664
7665 if (mvccNum != MultiVersionConsistencyControl.NO_WRITE_NUMBER) {
7666 CellUtil.setSequenceId(newValue, mvccNum);
7667 }
7668
7669
7670 if (coprocessorHost != null) {
7671 newValue = coprocessorHost.postMutationBeforeWAL(
7672 RegionObserver.MutationType.INCREMENT, increment, currentValue, newValue);
7673 }
7674 allKVs.add(newValue);
7675 if (writeBack) {
7676 results.add(newValue);
7677 }
7678 }
7679 return results;
7680 }
7681
7682
7683
7684
7685
7686 private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
7687 int len = cell.getValueLength();
7688 if (len != Bytes.SIZEOF_LONG) {
7689
7690 throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
7691 }
7692 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len);
7693 }
7694
7695
7696
7697
7698
7699
7700
7701
7702
7703
7704 private List<Cell> getIncrementCurrentValue(final Increment increment, byte [] columnFamily,
7705 final List<Cell> increments, final IsolationLevel isolation)
7706 throws IOException {
7707 Get get = new Get(increment.getRow());
7708 if (isolation != null) get.setIsolationLevel(isolation);
7709 for (Cell cell: increments) {
7710 get.addColumn(columnFamily, CellUtil.cloneQualifier(cell));
7711 }
7712 TimeRange tr = increment.getTimeRange();
7713 get.setTimeRange(tr.getMin(), tr.getMax());
7714 return get(get, false);
7715 }
7716
7717 private static List<Tag> carryForwardTTLTag(final Mutation mutation) {
7718 return carryForwardTTLTag(null, mutation);
7719 }
7720
7721
7722
7723
7724 private static List<Tag> carryForwardTTLTag(final List<Tag> tagsOrNull,
7725 final Mutation mutation) {
7726 long ttl = mutation.getTTL();
7727 if (ttl == Long.MAX_VALUE) return tagsOrNull;
7728 List<Tag> tags = tagsOrNull;
7729
7730
7731
7732 if (tags == null) tags = new ArrayList<Tag>(1);
7733 tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
7734 return tags;
7735 }
7736
7737
7738
7739
7740
7741 private void checkFamily(final byte [] family)
7742 throws NoSuchColumnFamilyException {
7743 if (!this.htableDescriptor.hasFamily(family)) {
7744 throw new NoSuchColumnFamilyException("Column family " +
7745 Bytes.toString(family) + " does not exist in region " + this
7746 + " in table " + this.htableDescriptor);
7747 }
7748 }
7749
7750 public static final long FIXED_OVERHEAD = ClassSize.align(
7751 ClassSize.OBJECT +
7752 ClassSize.ARRAY +
7753 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7754 (14 * Bytes.SIZEOF_LONG) +
7755 5 * Bytes.SIZEOF_BOOLEAN);
7756
7757
7758
7759
7760
7761
7762
7763
7764
7765
7766
7767 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7768 ClassSize.OBJECT +
7769 (2 * ClassSize.ATOMIC_BOOLEAN) +
7770 (3 * ClassSize.ATOMIC_LONG) +
7771 (2 * ClassSize.CONCURRENT_HASHMAP) +
7772 WriteState.HEAP_SIZE +
7773 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
7774 (2 * ClassSize.REENTRANT_LOCK) +
7775 MultiVersionConsistencyControl.FIXED_SIZE
7776 + ClassSize.TREEMAP
7777 + 2 * ClassSize.ATOMIC_INTEGER
7778 ;
7779
7780 @Override
7781 public long heapSize() {
7782 long heapSize = DEEP_OVERHEAD;
7783 for (Store store : this.stores.values()) {
7784 heapSize += store.heapSize();
7785 }
7786
7787 return heapSize;
7788 }
7789
7790
7791
7792
7793
7794 private static void printUsageAndExit(final String message) {
7795 if (message != null && message.length() > 0) System.out.println(message);
7796 System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7797 System.out.println("Options:");
7798 System.out.println(" major_compact Pass this option to major compact " +
7799 "passed region.");
7800 System.out.println("Default outputs scan of passed region.");
7801 System.exit(1);
7802 }
7803
7804 @Override
7805 public boolean registerService(Service instance) {
7806
7807
7808
7809 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7810 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
7811 if (coprocessorServiceHandlers.containsKey(serviceName)) {
7812 LOG.error("Coprocessor service " + serviceName +
7813 " already registered, rejecting request from " + instance
7814 );
7815 return false;
7816 }
7817
7818 coprocessorServiceHandlers.put(serviceName, instance);
7819 if (LOG.isDebugEnabled()) {
7820 LOG.debug("Registered coprocessor service: region=" +
7821 Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7822 " service=" + serviceName);
7823 }
7824 return true;
7825 }
7826
7827 @Override
7828 public Message execService(RpcController controller, CoprocessorServiceCall call)
7829 throws IOException {
7830 String serviceName = call.getServiceName();
7831 String methodName = call.getMethodName();
7832 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7833 throw new UnknownProtocolException(null,
7834 "No registered coprocessor service found for name "+serviceName+
7835 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7836 }
7837
7838 Service service = coprocessorServiceHandlers.get(serviceName);
7839 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7840 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7841 if (methodDesc == null) {
7842 throw new UnknownProtocolException(service.getClass(),
7843 "Unknown method "+methodName+" called on service "+serviceName+
7844 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7845 }
7846
7847 Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7848 ProtobufUtil.mergeFrom(builder, call.getRequest());
7849 Message request = builder.build();
7850
7851 if (coprocessorHost != null) {
7852 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7853 }
7854
7855 final Message.Builder responseBuilder =
7856 service.getResponsePrototype(methodDesc).newBuilderForType();
7857 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
7858 @Override
7859 public void run(Message message) {
7860 if (message != null) {
7861 responseBuilder.mergeFrom(message);
7862 }
7863 }
7864 });
7865
7866 if (coprocessorHost != null) {
7867 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
7868 }
7869
7870 return responseBuilder.build();
7871 }
7872
7873
7874
7875
7876
7877
7878 private static void processTable(final FileSystem fs, final Path p,
7879 final WALFactory walFactory, final Configuration c,
7880 final boolean majorCompact)
7881 throws IOException {
7882 HRegion region;
7883 FSTableDescriptors fst = new FSTableDescriptors(c);
7884
7885 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
7886 final WAL wal = walFactory.getMetaWAL(
7887 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
7888 region = HRegion.newHRegion(p, wal, fs, c,
7889 HRegionInfo.FIRST_META_REGIONINFO, fst.get(TableName.META_TABLE_NAME), null);
7890 } else {
7891 throw new IOException("Not a known catalog table: " + p.toString());
7892 }
7893 try {
7894 region.initialize(null);
7895 if (majorCompact) {
7896 region.compact(true);
7897 } else {
7898
7899 Scan scan = new Scan();
7900
7901 RegionScanner scanner = region.getScanner(scan);
7902 try {
7903 List<Cell> kvs = new ArrayList<Cell>();
7904 boolean done;
7905 do {
7906 kvs.clear();
7907 done = scanner.next(kvs);
7908 if (kvs.size() > 0) LOG.info(kvs);
7909 } while (done);
7910 } finally {
7911 scanner.close();
7912 }
7913 }
7914 } finally {
7915 region.close();
7916 }
7917 }
7918
7919 boolean shouldForceSplit() {
7920 return this.splitRequest;
7921 }
7922
7923 byte[] getExplicitSplitPoint() {
7924 return this.explicitSplitPoint;
7925 }
7926
7927 void forceSplit(byte[] sp) {
7928
7929
7930 this.splitRequest = true;
7931 if (sp != null) {
7932 this.explicitSplitPoint = sp;
7933 }
7934 }
7935
7936 void clearSplit() {
7937 this.splitRequest = false;
7938 this.explicitSplitPoint = null;
7939 }
7940
7941
7942
7943
7944 protected void prepareToSplit() {
7945
7946 }
7947
7948
7949
7950
7951
7952
7953
7954 public byte[] checkSplit() {
7955
7956 if (this.getRegionInfo().isMetaTable() ||
7957 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
7958 if (shouldForceSplit()) {
7959 LOG.warn("Cannot split meta region in HBase 0.20 and above");
7960 }
7961 return null;
7962 }
7963
7964
7965 if (this.isRecovering()) {
7966 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
7967 return null;
7968 }
7969
7970 if (!splitPolicy.shouldSplit()) {
7971 return null;
7972 }
7973
7974 byte[] ret = splitPolicy.getSplitPoint();
7975
7976 if (ret != null) {
7977 try {
7978 checkRow(ret, "calculated split");
7979 } catch (IOException e) {
7980 LOG.error("Ignoring invalid split", e);
7981 return null;
7982 }
7983 }
7984 return ret;
7985 }
7986
7987
7988
7989
7990 public int getCompactPriority() {
7991 int count = Integer.MAX_VALUE;
7992 for (Store store : stores.values()) {
7993 count = Math.min(count, store.getCompactPriority());
7994 }
7995 return count;
7996 }
7997
7998
7999
8000 @Override
8001 public RegionCoprocessorHost getCoprocessorHost() {
8002 return coprocessorHost;
8003 }
8004
8005
8006 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
8007 this.coprocessorHost = coprocessorHost;
8008 }
8009
8010 @Override
8011 public void startRegionOperation() throws IOException {
8012 startRegionOperation(Operation.ANY);
8013 }
8014
8015 @Override
8016 public void startRegionOperation(Operation op) throws IOException {
8017 switch (op) {
8018 case GET:
8019 case SCAN:
8020 checkReadsEnabled();
8021 case INCREMENT:
8022 case APPEND:
8023 case SPLIT_REGION:
8024 case MERGE_REGION:
8025 case PUT:
8026 case DELETE:
8027 case BATCH_MUTATE:
8028 case COMPACT_REGION:
8029
8030 if (isRecovering() && (this.disallowWritesInRecovering ||
8031 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
8032 throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
8033 " is recovering; cannot take reads");
8034 }
8035 break;
8036 default:
8037 break;
8038 }
8039 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
8040 || op == Operation.COMPACT_REGION) {
8041
8042
8043 return;
8044 }
8045 if (this.closing.get()) {
8046 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
8047 }
8048 lock(lock.readLock());
8049 if (this.closed.get()) {
8050 lock.readLock().unlock();
8051 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
8052 }
8053 try {
8054 if (coprocessorHost != null) {
8055 coprocessorHost.postStartRegionOperation(op);
8056 }
8057 } catch (Exception e) {
8058 lock.readLock().unlock();
8059 throw new IOException(e);
8060 }
8061 }
8062
8063 @Override
8064 public void closeRegionOperation() throws IOException {
8065 closeRegionOperation(Operation.ANY);
8066 }
8067
8068
8069
8070
8071
8072
8073 public void closeRegionOperation(Operation operation) throws IOException {
8074 lock.readLock().unlock();
8075 if (coprocessorHost != null) {
8076 coprocessorHost.postCloseRegionOperation(operation);
8077 }
8078 }
8079
8080
8081
8082
8083
8084
8085
8086
8087
8088
8089 private void startBulkRegionOperation(boolean writeLockNeeded)
8090 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
8091 if (this.closing.get()) {
8092 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
8093 }
8094 if (writeLockNeeded) lock(lock.writeLock());
8095 else lock(lock.readLock());
8096 if (this.closed.get()) {
8097 if (writeLockNeeded) lock.writeLock().unlock();
8098 else lock.readLock().unlock();
8099 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
8100 }
8101 }
8102
8103
8104
8105
8106
8107 private void closeBulkRegionOperation(){
8108 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
8109 else lock.readLock().unlock();
8110 }
8111
8112
8113
8114
8115
8116 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
8117 numMutationsWithoutWAL.increment();
8118 if (numMutationsWithoutWAL.get() <= 1) {
8119 LOG.info("writing data to region " + this +
8120 " with WAL disabled. Data may be lost in the event of a crash.");
8121 }
8122
8123 long mutationSize = 0;
8124 for (List<Cell> cells: familyMap.values()) {
8125 assert cells instanceof RandomAccess;
8126 int listSize = cells.size();
8127 for (int i=0; i < listSize; i++) {
8128 Cell cell = cells.get(i);
8129
8130 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
8131 }
8132 }
8133
8134 dataInMemoryWithoutWAL.add(mutationSize);
8135 }
8136
8137 private void lock(final Lock lock)
8138 throws RegionTooBusyException, InterruptedIOException {
8139 lock(lock, 1);
8140 }
8141
8142
8143
8144
8145
8146
8147 private void lock(final Lock lock, final int multiplier)
8148 throws RegionTooBusyException, InterruptedIOException {
8149 try {
8150 final long waitTime = Math.min(maxBusyWaitDuration,
8151 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
8152 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
8153 throw new RegionTooBusyException(
8154 "failed to get a lock in " + waitTime + " ms. " +
8155 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
8156 this.getRegionInfo().getRegionNameAsString()) +
8157 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
8158 this.getRegionServerServices().getServerName()));
8159 }
8160 } catch (InterruptedException ie) {
8161 LOG.info("Interrupted while waiting for a lock");
8162 InterruptedIOException iie = new InterruptedIOException();
8163 iie.initCause(ie);
8164 throw iie;
8165 }
8166 }
8167
8168
8169
8170
8171
8172
8173
8174 private void syncOrDefer(long txid, Durability durability) throws IOException {
8175 if (this.getRegionInfo().isMetaRegion()) {
8176 this.wal.sync(txid);
8177 } else {
8178 switch(durability) {
8179 case USE_DEFAULT:
8180
8181 if (shouldSyncWAL()) {
8182 this.wal.sync(txid);
8183 }
8184 break;
8185 case SKIP_WAL:
8186
8187 break;
8188 case ASYNC_WAL:
8189
8190 break;
8191 case SYNC_WAL:
8192 case FSYNC_WAL:
8193
8194 this.wal.sync(txid);
8195 break;
8196 }
8197 }
8198 }
8199
8200
8201
8202
8203 private boolean shouldSyncWAL() {
8204 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
8205 }
8206
8207
8208
8209
8210 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
8211
8212 @Override
8213 public void add(int index, Cell element) {
8214
8215 }
8216
8217 @Override
8218 public boolean addAll(int index, Collection<? extends Cell> c) {
8219 return false;
8220 }
8221
8222 @Override
8223 public KeyValue get(int index) {
8224 throw new UnsupportedOperationException();
8225 }
8226
8227 @Override
8228 public int size() {
8229 return 0;
8230 }
8231 };
8232
8233
8234
8235
8236
8237
8238
8239
8240
8241
8242 public static void main(String[] args) throws IOException {
8243 if (args.length < 1) {
8244 printUsageAndExit(null);
8245 }
8246 boolean majorCompact = false;
8247 if (args.length > 1) {
8248 if (!args[1].toLowerCase().startsWith("major")) {
8249 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
8250 }
8251 majorCompact = true;
8252 }
8253 final Path tableDir = new Path(args[0]);
8254 final Configuration c = HBaseConfiguration.create();
8255 final FileSystem fs = FileSystem.get(c);
8256 final Path logdir = new Path(c.get("hbase.tmp.dir"));
8257 final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
8258
8259 final Configuration walConf = new Configuration(c);
8260 FSUtils.setRootDir(walConf, logdir);
8261 final WALFactory wals = new WALFactory(walConf, null, logname);
8262 try {
8263 processTable(fs, tableDir, wals, c, majorCompact);
8264 } finally {
8265 wals.close();
8266
8267 BlockCache bc = new CacheConfig(c).getBlockCache();
8268 if (bc != null) bc.shutdown();
8269 }
8270 }
8271
8272 @Override
8273 public long getOpenSeqNum() {
8274 return this.openSeqNum;
8275 }
8276
8277 @Override
8278 public Map<byte[], Long> getMaxStoreSeqId() {
8279 return this.maxSeqIdInStores;
8280 }
8281
8282 @Override
8283 public long getOldestSeqIdOfStore(byte[] familyName) {
8284 return wal.getEarliestMemstoreSeqNum(getRegionInfo()
8285 .getEncodedNameAsBytes(), familyName);
8286 }
8287
8288 @Override
8289 public CompactionState getCompactionState() {
8290 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
8291 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
8292 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
8293 }
8294
8295 public void reportCompactionRequestStart(boolean isMajor){
8296 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
8297 }
8298
8299 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
8300 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
8301
8302
8303 compactionsFinished.incrementAndGet();
8304 compactionNumFilesCompacted.addAndGet(numFiles);
8305 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
8306
8307 assert newValue >= 0;
8308 }
8309
8310
8311
8312
8313
8314 @VisibleForTesting
8315 public AtomicLong getSequenceId() {
8316 return this.sequenceId;
8317 }
8318
8319
8320
8321
8322
8323 private void setSequenceId(long value) {
8324 this.sequenceId.set(value);
8325 }
8326
8327 public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
8328 return lockedRows;
8329 }
8330
8331 @VisibleForTesting class RowLockContext {
8332 private final HashedBytes row;
8333 private final CountDownLatch latch = new CountDownLatch(1);
8334 private final Thread thread;
8335 private int lockCount = 0;
8336 private String threadName;
8337
8338 RowLockContext(HashedBytes row) {
8339 this.row = row;
8340 this.thread = Thread.currentThread();
8341 }
8342
8343 boolean ownedByCurrentThread() {
8344 return thread == Thread.currentThread();
8345 }
8346
8347 RowLock newLock() {
8348 lockCount++;
8349 RowLockImpl rl = new RowLockImpl();
8350 rl.setContext(this);
8351 return rl;
8352 }
8353
8354 void releaseLock() {
8355 if (!ownedByCurrentThread()) {
8356 throw new IllegalArgumentException("Lock held by thread: " + thread
8357 + " cannot be released by different thread: " + Thread.currentThread());
8358 }
8359 lockCount--;
8360 if (lockCount == 0) {
8361
8362 RowLockContext existingContext = lockedRows.remove(row);
8363 if (existingContext != this) {
8364 throw new RuntimeException(
8365 "Internal row lock state inconsistent, should not happen, row: " + row);
8366 }
8367 latch.countDown();
8368 }
8369 }
8370
8371 public void setThreadName(String threadName) {
8372 this.threadName = threadName;
8373 }
8374
8375
8376 @Override
8377 public String toString() {
8378 return "RowLockContext{" +
8379 "row=" + row +
8380 ", count=" + lockCount +
8381 ", threadName=" + threadName +
8382 '}';
8383 }
8384 }
8385
8386 public static class RowLockImpl implements RowLock {
8387 private RowLockContext context;
8388 private boolean released = false;
8389
8390 @VisibleForTesting
8391 public RowLockContext getContext() {
8392 return context;
8393 }
8394
8395 @VisibleForTesting
8396 public void setContext(RowLockContext context) {
8397 this.context = context;
8398 }
8399
8400 @Override
8401 public void release() {
8402 if (!released) {
8403 context.releaseLock();
8404 }
8405 released = true;
8406 }
8407 }
8408
8409
8410
8411
8412
8413
8414
8415
8416
8417
8418 private WALKey appendEmptyEdit(final WAL wal, List<Cell> cells) throws IOException {
8419
8420 @SuppressWarnings("deprecation")
8421 WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
8422 WALKey.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
8423
8424
8425 wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, getSequenceId(), false,
8426 cells);
8427 return key;
8428 }
8429
8430
8431
8432
8433 @Override
8434 public void onConfigurationChange(Configuration conf) {
8435
8436 }
8437
8438
8439
8440
8441 @Override
8442 public void registerChildren(ConfigurationManager manager) {
8443 configurationManager = Optional.of(manager);
8444 for (Store s : this.stores.values()) {
8445 configurationManager.get().registerObserver(s);
8446 }
8447 }
8448
8449
8450
8451
8452 @Override
8453 public void deregisterChildren(ConfigurationManager manager) {
8454 for (Store s : this.stores.values()) {
8455 configurationManager.get().deregisterObserver(s);
8456 }
8457 }
8458
8459
8460
8461
8462 public RegionSplitPolicy getSplitPolicy() {
8463 return this.splitPolicy;
8464 }
8465
8466 public long getMemstoreFlushSize() {
8467 return this.memstoreFlushSize;
8468 }
8469 }