1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.Comparator;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.TreeMap;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.KeyValue;
39 import org.apache.hadoop.hbase.KeyValue.KVComparator;
40 import org.apache.hadoop.hbase.classification.InterfaceAudience;
41 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.hbase.util.ConcatenatedLists;
44 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
45
46 import com.google.common.collect.ImmutableCollection;
47 import com.google.common.collect.ImmutableList;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @InterfaceAudience.Private
66 public class StripeStoreFileManager
67 implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
68 static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
69
70
71
72
73 public static final byte[] STRIPE_START_KEY = Bytes.toBytes("STRIPE_START_KEY");
74 public static final byte[] STRIPE_END_KEY = Bytes.toBytes("STRIPE_END_KEY");
75
76 private final static Bytes.RowEndKeyComparator MAP_COMPARATOR = new Bytes.RowEndKeyComparator();
77
78
79
80
81 public final static byte[] OPEN_KEY = HConstants.EMPTY_BYTE_ARRAY;
82 final static byte[] INVALID_KEY = null;
83
84
85
86
87
88 private static class State {
89
90
91
92
93
94 public byte[][] stripeEndRows = new byte[0][];
95
96
97
98
99
100
101 public ArrayList<ImmutableList<StoreFile>> stripeFiles
102 = new ArrayList<ImmutableList<StoreFile>>();
103
104 public ImmutableList<StoreFile> level0Files = ImmutableList.<StoreFile>of();
105
106
107 public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
108 }
109 private State state = null;
110
111
112 private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
113 private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
114
115
116
117 private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
118
119 private final KVComparator kvComparator;
120 private StripeStoreConfig config;
121
122 private final int blockingFileCount;
123
124 public StripeStoreFileManager(
125 KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
126 this.kvComparator = kvComparator;
127 this.config = config;
128 this.blockingFileCount = conf.getInt(
129 HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
130 }
131
132 @Override
133 public void loadFiles(List<StoreFile> storeFiles) {
134 loadUnclassifiedStoreFiles(storeFiles);
135 }
136
137 @Override
138 public Collection<StoreFile> getStorefiles() {
139 return state.allFilesCached;
140 }
141
142 @Override
143 public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
144 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
145 cmc.mergeResults(null, sfs);
146 debugDumpState("Added new files");
147 }
148
149 @Override
150 public ImmutableCollection<StoreFile> clearFiles() {
151 ImmutableCollection<StoreFile> result = state.allFilesCached;
152 this.state = new State();
153 this.fileStarts.clear();
154 this.fileEnds.clear();
155 return result;
156 }
157
158 @Override
159 public int getStorefileCount() {
160 return state.allFilesCached.size();
161 }
162
163
164
165 @Override
166 public Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) {
167 KeyBeforeConcatenatedLists result = new KeyBeforeConcatenatedLists();
168
169 result.addSublist(state.level0Files);
170 if (!state.stripeFiles.isEmpty()) {
171 int lastStripeIndex = findStripeForRow(targetKey.getRow(), false);
172 for (int stripeIndex = lastStripeIndex; stripeIndex >= 0; --stripeIndex) {
173 result.addSublist(state.stripeFiles.get(stripeIndex));
174 }
175 }
176 return result.iterator();
177 }
178
179
180
181
182 @Override
183 public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore(
184 Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) {
185 KeyBeforeConcatenatedLists.Iterator original =
186 (KeyBeforeConcatenatedLists.Iterator)candidateFiles;
187 assert original != null;
188 ArrayList<List<StoreFile>> components = original.getComponents();
189 for (int firstIrrelevant = 0; firstIrrelevant < components.size(); ++firstIrrelevant) {
190 StoreFile sf = components.get(firstIrrelevant).get(0);
191 byte[] endKey = endOf(sf);
192
193
194
195 if (!isInvalid(endKey) && !isOpen(endKey)
196 && (nonOpenRowCompare(endKey, targetKey.getRow()) <= 0)) {
197 original.removeComponents(firstIrrelevant);
198 break;
199 }
200 }
201 return original;
202 }
203
204 @Override
205
206
207
208
209
210
211
212 public byte[] getSplitPoint() throws IOException {
213 if (this.getStorefileCount() == 0) return null;
214 if (state.stripeFiles.size() <= 1) {
215 return getSplitPointFromAllFiles();
216 }
217 int leftIndex = -1, rightIndex = state.stripeFiles.size();
218 long leftSize = 0, rightSize = 0;
219 long lastLeftSize = 0, lastRightSize = 0;
220 while (rightIndex - 1 != leftIndex) {
221 if (leftSize >= rightSize) {
222 --rightIndex;
223 lastRightSize = getStripeFilesSize(rightIndex);
224 rightSize += lastRightSize;
225 } else {
226 ++leftIndex;
227 lastLeftSize = getStripeFilesSize(leftIndex);
228 leftSize += lastLeftSize;
229 }
230 }
231 if (leftSize == 0 || rightSize == 0) {
232 String errMsg = String.format("Cannot split on a boundary - left index %d size %d, "
233 + "right index %d size %d", leftIndex, leftSize, rightIndex, rightSize);
234 debugDumpState(errMsg);
235 LOG.warn(errMsg);
236 return getSplitPointFromAllFiles();
237 }
238 double ratio = (double)rightSize / leftSize;
239 if (ratio < 1) {
240 ratio = 1 / ratio;
241 }
242 if (config.getMaxSplitImbalance() > ratio) return state.stripeEndRows[leftIndex];
243
244
245
246
247
248 boolean isRightLarger = rightSize >= leftSize;
249 double newRatio = isRightLarger
250 ? getMidStripeSplitRatio(leftSize, rightSize, lastRightSize)
251 : getMidStripeSplitRatio(rightSize, leftSize, lastLeftSize);
252 if (newRatio < 1) {
253 newRatio = 1 / newRatio;
254 }
255 if (newRatio >= ratio) return state.stripeEndRows[leftIndex];
256 LOG.debug("Splitting the stripe - ratio w/o split " + ratio + ", ratio with split "
257 + newRatio + " configured ratio " + config.getMaxSplitImbalance());
258
259 return StoreUtils.getLargestFile(state.stripeFiles.get(
260 isRightLarger ? rightIndex : leftIndex)).getFileSplitPoint(this.kvComparator);
261 }
262
263 private byte[] getSplitPointFromAllFiles() throws IOException {
264 ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
265 sfs.addSublist(state.level0Files);
266 sfs.addAllSublists(state.stripeFiles);
267 if (sfs.isEmpty()) return null;
268 return StoreUtils.getLargestFile(sfs).getFileSplitPoint(this.kvComparator);
269 }
270
271 private double getMidStripeSplitRatio(long smallerSize, long largerSize, long lastLargerSize) {
272 return (double)(largerSize - lastLargerSize / 2f) / (smallerSize + lastLargerSize / 2f);
273 }
274
275 @Override
276 public Collection<StoreFile> getFilesForScanOrGet(
277 boolean isGet, byte[] startRow, byte[] stopRow) {
278 if (state.stripeFiles.isEmpty()) {
279 return state.level0Files;
280 }
281
282 int firstStripe = findStripeForRow(startRow, true);
283 int lastStripe = findStripeForRow(stopRow, false);
284 assert firstStripe <= lastStripe;
285 if (firstStripe == lastStripe && state.level0Files.isEmpty()) {
286 return state.stripeFiles.get(firstStripe);
287 }
288 if (firstStripe == 0 && lastStripe == (state.stripeFiles.size() - 1)) {
289 return state.allFilesCached;
290 }
291
292 ConcatenatedLists<StoreFile> result = new ConcatenatedLists<StoreFile>();
293 result.addAllSublists(state.stripeFiles.subList(firstStripe, lastStripe + 1));
294 result.addSublist(state.level0Files);
295 return result;
296 }
297
298 @Override
299 public void addCompactionResults(
300 Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException {
301
302 LOG.debug("Attempting to merge compaction results: " + compactedFiles.size()
303 + " files replaced by " + results.size());
304
305
306 CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
307 cmc.mergeResults(compactedFiles, results);
308 debugDumpState("Merged compaction results");
309 }
310
311 @Override
312 public int getStoreCompactionPriority() {
313
314
315
316 int fc = getStorefileCount();
317 if (state.stripeFiles.isEmpty() || (this.blockingFileCount <= fc)) {
318 return this.blockingFileCount - fc;
319 }
320
321
322
323 int l0 = state.level0Files.size(), sc = state.stripeFiles.size();
324 int priority = (int)Math.ceil(((double)(this.blockingFileCount - fc + l0) / sc) - l0);
325 return (priority <= HStore.PRIORITY_USER) ? (HStore.PRIORITY_USER + 1) : priority;
326 }
327
328
329
330
331
332
333 private long getStripeFilesSize(int stripeIndex) {
334 long result = 0;
335 for (StoreFile sf : state.stripeFiles.get(stripeIndex)) {
336 result += sf.getReader().length();
337 }
338 return result;
339 }
340
341
342
343
344
345
346
347
348 private void loadUnclassifiedStoreFiles(List<StoreFile> storeFiles) {
349 LOG.debug("Attempting to load " + storeFiles.size() + " store files.");
350 TreeMap<byte[], ArrayList<StoreFile>> candidateStripes =
351 new TreeMap<byte[], ArrayList<StoreFile>>(MAP_COMPARATOR);
352 ArrayList<StoreFile> level0Files = new ArrayList<StoreFile>();
353
354
355 for (StoreFile sf : storeFiles) {
356 byte[] startRow = startOf(sf), endRow = endOf(sf);
357
358 if (isInvalid(startRow) || isInvalid(endRow)) {
359 insertFileIntoStripe(level0Files, sf);
360 ensureLevel0Metadata(sf);
361 } else if (!isOpen(startRow) && !isOpen(endRow) &&
362 nonOpenRowCompare(startRow, endRow) >= 0) {
363 LOG.error("Unexpected metadata - start row [" + Bytes.toString(startRow) + "], end row ["
364 + Bytes.toString(endRow) + "] in file [" + sf.getPath() + "], pushing to L0");
365 insertFileIntoStripe(level0Files, sf);
366 ensureLevel0Metadata(sf);
367 } else {
368 ArrayList<StoreFile> stripe = candidateStripes.get(endRow);
369 if (stripe == null) {
370 stripe = new ArrayList<StoreFile>();
371 candidateStripes.put(endRow, stripe);
372 }
373 insertFileIntoStripe(stripe, sf);
374 }
375 }
376
377
378
379 boolean hasOverlaps = false;
380 byte[] expectedStartRow = null;
381 Iterator<Map.Entry<byte[], ArrayList<StoreFile>>> entryIter =
382 candidateStripes.entrySet().iterator();
383 while (entryIter.hasNext()) {
384 Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
385 ArrayList<StoreFile> files = entry.getValue();
386
387 for (int i = 0; i < files.size(); ++i) {
388 StoreFile sf = files.get(i);
389 byte[] startRow = startOf(sf);
390 if (expectedStartRow == null) {
391 expectedStartRow = startRow;
392 } else if (!rowEquals(expectedStartRow, startRow)) {
393 hasOverlaps = true;
394 LOG.warn("Store file doesn't fit into the tentative stripes - expected to start at ["
395 + Bytes.toString(expectedStartRow) + "], but starts at [" + Bytes.toString(startRow)
396 + "], to L0 it goes");
397 StoreFile badSf = files.remove(i);
398 insertFileIntoStripe(level0Files, badSf);
399 ensureLevel0Metadata(badSf);
400 --i;
401 }
402 }
403
404 byte[] endRow = entry.getKey();
405 if (!files.isEmpty()) {
406 expectedStartRow = endRow;
407 } else {
408 entryIter.remove();
409 }
410 }
411
412
413
414
415
416 if (!candidateStripes.isEmpty()) {
417 StoreFile firstFile = candidateStripes.firstEntry().getValue().get(0);
418 boolean isOpen = isOpen(startOf(firstFile)) && isOpen(candidateStripes.lastKey());
419 if (!isOpen) {
420 LOG.warn("The range of the loaded files does not cover full key space: from ["
421 + Bytes.toString(startOf(firstFile)) + "], to ["
422 + Bytes.toString(candidateStripes.lastKey()) + "]");
423 if (!hasOverlaps) {
424 ensureEdgeStripeMetadata(candidateStripes.firstEntry().getValue(), true);
425 ensureEdgeStripeMetadata(candidateStripes.lastEntry().getValue(), false);
426 } else {
427 LOG.warn("Inconsistent files, everything goes to L0.");
428 for (ArrayList<StoreFile> files : candidateStripes.values()) {
429 for (StoreFile sf : files) {
430 insertFileIntoStripe(level0Files, sf);
431 ensureLevel0Metadata(sf);
432 }
433 }
434 candidateStripes.clear();
435 }
436 }
437 }
438
439
440 State state = new State();
441 state.level0Files = ImmutableList.copyOf(level0Files);
442 state.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(candidateStripes.size());
443 state.stripeEndRows = new byte[Math.max(0, candidateStripes.size() - 1)][];
444 ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(level0Files);
445 int i = candidateStripes.size() - 1;
446 for (Map.Entry<byte[], ArrayList<StoreFile>> entry : candidateStripes.entrySet()) {
447 state.stripeFiles.add(ImmutableList.copyOf(entry.getValue()));
448 newAllFiles.addAll(entry.getValue());
449 if (i > 0) {
450 state.stripeEndRows[state.stripeFiles.size() - 1] = entry.getKey();
451 }
452 --i;
453 }
454 state.allFilesCached = ImmutableList.copyOf(newAllFiles);
455 this.state = state;
456 debugDumpState("Files loaded");
457 }
458
459 private void ensureEdgeStripeMetadata(ArrayList<StoreFile> stripe, boolean isFirst) {
460 HashMap<StoreFile, byte[]> targetMap = isFirst ? fileStarts : fileEnds;
461 for (StoreFile sf : stripe) {
462 targetMap.put(sf, OPEN_KEY);
463 }
464 }
465
466 private void ensureLevel0Metadata(StoreFile sf) {
467 if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
468 if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
469 }
470
471 private void debugDumpState(String string) {
472 if (!LOG.isDebugEnabled()) return;
473 StringBuilder sb = new StringBuilder();
474 sb.append("\n" + string + "; current stripe state is as such:");
475 sb.append("\n level 0 with ")
476 .append(state.level0Files.size())
477 .append(
478 " files: "
479 + TraditionalBinaryPrefix.long2String(
480 StripeCompactionPolicy.getTotalFileSize(state.level0Files), "", 1) + ";");
481 for (int i = 0; i < state.stripeFiles.size(); ++i) {
482 String endRow = (i == state.stripeEndRows.length)
483 ? "(end)" : "[" + Bytes.toString(state.stripeEndRows[i]) + "]";
484 sb.append("\n stripe ending in ")
485 .append(endRow)
486 .append(" with ")
487 .append(state.stripeFiles.get(i).size())
488 .append(
489 " files: "
490 + TraditionalBinaryPrefix.long2String(
491 StripeCompactionPolicy.getTotalFileSize(state.stripeFiles.get(i)), "", 1) + ";");
492 }
493 sb.append("\n").append(state.stripeFiles.size()).append(" stripes total.");
494 sb.append("\n").append(getStorefileCount()).append(" files total.");
495 LOG.debug(sb.toString());
496 }
497
498
499
500
501 private static final boolean isOpen(byte[] key) {
502 return key != null && key.length == 0;
503 }
504
505
506
507
508 private static final boolean isInvalid(byte[] key) {
509 return key == INVALID_KEY;
510 }
511
512
513
514
515 private final boolean rowEquals(byte[] k1, byte[] k2) {
516 return kvComparator.matchingRows(k1, 0, k1.length, k2, 0, k2.length);
517 }
518
519
520
521
522 private final int nonOpenRowCompare(byte[] k1, byte[] k2) {
523 assert !isOpen(k1) && !isOpen(k2);
524 return kvComparator.compareRows(k1, 0, k1.length, k2, 0, k2.length);
525 }
526
527
528
529
530 private final int findStripeIndexByEndRow(byte[] endRow) {
531 assert !isInvalid(endRow);
532 if (isOpen(endRow)) return state.stripeEndRows.length;
533 return Arrays.binarySearch(state.stripeEndRows, endRow, Bytes.BYTES_COMPARATOR);
534 }
535
536
537
538
539 private final int findStripeForRow(byte[] row, boolean isStart) {
540 if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
541 if (!isStart && row == HConstants.EMPTY_END_ROW) return state.stripeFiles.size() - 1;
542
543
544
545
546
547 return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
548 }
549
550 @Override
551 public final byte[] getStartRow(int stripeIndex) {
552 return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
553 }
554
555 @Override
556 public final byte[] getEndRow(int stripeIndex) {
557 return (stripeIndex == state.stripeEndRows.length
558 ? OPEN_KEY : state.stripeEndRows[stripeIndex]);
559 }
560
561
562 private byte[] startOf(StoreFile sf) {
563 byte[] result = this.fileStarts.get(sf);
564 return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
565 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
566 }
567
568 private byte[] endOf(StoreFile sf) {
569 byte[] result = this.fileEnds.get(sf);
570 return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
571 : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
572 }
573
574
575
576
577
578
579 private static void insertFileIntoStripe(ArrayList<StoreFile> stripe, StoreFile sf) {
580
581
582 for (int insertBefore = 0; ; ++insertBefore) {
583 if (insertBefore == stripe.size()
584 || (StoreFile.Comparators.SEQ_ID.compare(sf, stripe.get(insertBefore)) >= 0)) {
585 stripe.add(insertBefore, sf);
586 break;
587 }
588 }
589 }
590
591
592
593
594
595
596
597
598
599
600 private static class KeyBeforeConcatenatedLists extends ConcatenatedLists<StoreFile> {
601 @Override
602 public java.util.Iterator<StoreFile> iterator() {
603 return new Iterator();
604 }
605
606 public class Iterator extends ConcatenatedLists<StoreFile>.Iterator {
607 public ArrayList<List<StoreFile>> getComponents() {
608 return components;
609 }
610
611 public void removeComponents(int startIndex) {
612 List<List<StoreFile>> subList = components.subList(startIndex, components.size());
613 for (List<StoreFile> entry : subList) {
614 size -= entry.size();
615 }
616 assert size >= 0;
617 subList.clear();
618 }
619
620 @Override
621 public void remove() {
622 if (!this.nextWasCalled) {
623 throw new IllegalStateException("No element to remove");
624 }
625 this.nextWasCalled = false;
626 List<StoreFile> src = components.get(currentComponent);
627 if (src instanceof ImmutableList<?>) {
628 src = new ArrayList<StoreFile>(src);
629 components.set(currentComponent, src);
630 }
631 src.remove(indexWithinComponent);
632 --size;
633 --indexWithinComponent;
634 if (src.isEmpty()) {
635 components.remove(currentComponent);
636 }
637 }
638 }
639 }
640
641
642
643
644
645
646 private class CompactionOrFlushMergeCopy {
647 private ArrayList<List<StoreFile>> stripeFiles = null;
648 private ArrayList<StoreFile> level0Files = null;
649 private ArrayList<byte[]> stripeEndRows = null;
650
651 private Collection<StoreFile> compactedFiles = null;
652 private Collection<StoreFile> results = null;
653
654 private List<StoreFile> l0Results = new ArrayList<StoreFile>();
655 private final boolean isFlush;
656
657 public CompactionOrFlushMergeCopy(boolean isFlush) {
658
659 this.stripeFiles = new ArrayList<List<StoreFile>>(
660 StripeStoreFileManager.this.state.stripeFiles);
661 this.isFlush = isFlush;
662 }
663
664 public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
665 throws IOException {
666 assert this.compactedFiles == null && this.results == null;
667 this.compactedFiles = compactedFiles;
668 this.results = results;
669
670 if (!isFlush) removeCompactedFiles();
671 TreeMap<byte[], StoreFile> newStripes = processResults();
672 if (newStripes != null) {
673 processNewCandidateStripes(newStripes);
674 }
675
676 State state = createNewState();
677 StripeStoreFileManager.this.state = state;
678 updateMetadataMaps();
679 }
680
681 private State createNewState() {
682 State oldState = StripeStoreFileManager.this.state;
683
684 assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
685 State newState = new State();
686 newState.level0Files = (this.level0Files == null) ? oldState.level0Files
687 : ImmutableList.copyOf(this.level0Files);
688 newState.stripeEndRows = (this.stripeEndRows == null) ? oldState.stripeEndRows
689 : this.stripeEndRows.toArray(new byte[this.stripeEndRows.size()][]);
690 newState.stripeFiles = new ArrayList<ImmutableList<StoreFile>>(this.stripeFiles.size());
691 for (List<StoreFile> newStripe : this.stripeFiles) {
692 newState.stripeFiles.add(newStripe instanceof ImmutableList<?>
693 ? (ImmutableList<StoreFile>)newStripe : ImmutableList.copyOf(newStripe));
694 }
695
696 List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
697 if (!isFlush) newAllFiles.removeAll(compactedFiles);
698 newAllFiles.addAll(results);
699 newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
700 return newState;
701 }
702
703 private void updateMetadataMaps() {
704 StripeStoreFileManager parent = StripeStoreFileManager.this;
705 if (!isFlush) {
706 for (StoreFile sf : this.compactedFiles) {
707 parent.fileStarts.remove(sf);
708 parent.fileEnds.remove(sf);
709 }
710 }
711 if (this.l0Results != null) {
712 for (StoreFile sf : this.l0Results) {
713 parent.ensureLevel0Metadata(sf);
714 }
715 }
716 }
717
718
719
720
721
722 private final ArrayList<StoreFile> getStripeCopy(int index) {
723 List<StoreFile> stripeCopy = this.stripeFiles.get(index);
724 ArrayList<StoreFile> result = null;
725 if (stripeCopy instanceof ImmutableList<?>) {
726 result = new ArrayList<StoreFile>(stripeCopy);
727 this.stripeFiles.set(index, result);
728 } else {
729 result = (ArrayList<StoreFile>)stripeCopy;
730 }
731 return result;
732 }
733
734
735
736
737 private final ArrayList<StoreFile> getLevel0Copy() {
738 if (this.level0Files == null) {
739 this.level0Files = new ArrayList<StoreFile>(StripeStoreFileManager.this.state.level0Files);
740 }
741 return this.level0Files;
742 }
743
744
745
746
747
748
749 private TreeMap<byte[], StoreFile> processResults() throws IOException {
750 TreeMap<byte[], StoreFile> newStripes = null;
751 for (StoreFile sf : this.results) {
752 byte[] startRow = startOf(sf), endRow = endOf(sf);
753 if (isInvalid(endRow) || isInvalid(startRow)) {
754 if (!isFlush) {
755 LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
756 }
757 insertFileIntoStripe(getLevel0Copy(), sf);
758 this.l0Results.add(sf);
759 continue;
760 }
761 if (!this.stripeFiles.isEmpty()) {
762 int stripeIndex = findStripeIndexByEndRow(endRow);
763 if ((stripeIndex >= 0) && rowEquals(getStartRow(stripeIndex), startRow)) {
764
765 insertFileIntoStripe(getStripeCopy(stripeIndex), sf);
766 continue;
767 }
768 }
769
770
771 if (newStripes == null) {
772 newStripes = new TreeMap<byte[], StoreFile>(MAP_COMPARATOR);
773 }
774 StoreFile oldSf = newStripes.put(endRow, sf);
775 if (oldSf != null) {
776 throw new IOException("Compactor has produced multiple files for the stripe ending in ["
777 + Bytes.toString(endRow) + "], found " + sf.getPath() + " and " + oldSf.getPath());
778 }
779 }
780 return newStripes;
781 }
782
783
784
785
786
787 private void removeCompactedFiles() throws IOException {
788 for (StoreFile oldFile : this.compactedFiles) {
789 byte[] oldEndRow = endOf(oldFile);
790 List<StoreFile> source = null;
791 if (isInvalid(oldEndRow)) {
792 source = getLevel0Copy();
793 } else {
794 int stripeIndex = findStripeIndexByEndRow(oldEndRow);
795 if (stripeIndex < 0) {
796 throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
797 + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
798 }
799 source = getStripeCopy(stripeIndex);
800 }
801 if (!source.remove(oldFile)) {
802 throw new IOException("An allegedly compacted file [" + oldFile + "] was not found");
803 }
804 }
805 }
806
807
808
809
810
811
812 private void processNewCandidateStripes(
813 TreeMap<byte[], StoreFile> newStripes) throws IOException {
814
815 boolean hasStripes = !this.stripeFiles.isEmpty();
816 this.stripeEndRows = new ArrayList<byte[]>(
817 Arrays.asList(StripeStoreFileManager.this.state.stripeEndRows));
818 int removeFrom = 0;
819 byte[] firstStartRow = startOf(newStripes.firstEntry().getValue());
820 byte[] lastEndRow = newStripes.lastKey();
821 if (!hasStripes && (!isOpen(firstStartRow) || !isOpen(lastEndRow))) {
822 throw new IOException("Newly created stripes do not cover the entire key space.");
823 }
824
825 boolean canAddNewStripes = true;
826 Collection<StoreFile> filesForL0 = null;
827 if (hasStripes) {
828
829
830 if (isOpen(firstStartRow)) {
831 removeFrom = 0;
832 } else {
833 removeFrom = findStripeIndexByEndRow(firstStartRow);
834 if (removeFrom < 0) throw new IOException("Compaction is trying to add a bad range.");
835 ++removeFrom;
836 }
837 int removeTo = findStripeIndexByEndRow(lastEndRow);
838 if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
839
840 ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
841 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
842 conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
843 }
844 if (!conflictingFiles.isEmpty()) {
845
846
847
848 if (isFlush) {
849 long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
850 LOG.warn("Stripes were created by a flush, but results of size " + newSize
851 + " cannot be added because the stripes have changed");
852 canAddNewStripes = false;
853 filesForL0 = newStripes.values();
854 } else {
855 long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
856 LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
857 + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
858 filesForL0 = conflictingFiles;
859 }
860 if (filesForL0 != null) {
861 for (StoreFile sf : filesForL0) {
862 insertFileIntoStripe(getLevel0Copy(), sf);
863 }
864 l0Results.addAll(filesForL0);
865 }
866 }
867
868 if (canAddNewStripes) {
869
870 int originalCount = this.stripeFiles.size();
871 for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
872 if (removeIndex != originalCount - 1) {
873 this.stripeEndRows.remove(removeIndex);
874 }
875 this.stripeFiles.remove(removeIndex);
876 }
877 }
878 }
879
880 if (!canAddNewStripes) return;
881
882
883 byte[] previousEndRow = null;
884 int insertAt = removeFrom;
885 for (Map.Entry<byte[], StoreFile> newStripe : newStripes.entrySet()) {
886 if (previousEndRow != null) {
887
888 assert !isOpen(previousEndRow);
889 byte[] startRow = startOf(newStripe.getValue());
890 if (!rowEquals(previousEndRow, startRow)) {
891 throw new IOException("The new stripes produced by "
892 + (isFlush ? "flush" : "compaction") + " are not contiguous");
893 }
894 }
895
896 ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
897 tmp.add(newStripe.getValue());
898 stripeFiles.add(insertAt, tmp);
899 previousEndRow = newStripe.getKey();
900 if (!isOpen(previousEndRow)) {
901 stripeEndRows.add(insertAt, previousEndRow);
902 }
903 ++insertAt;
904 }
905 }
906 }
907
908 @Override
909 public List<StoreFile> getLevel0Files() {
910 return this.state.level0Files;
911 }
912
913 @Override
914 public List<byte[]> getStripeBoundaries() {
915 if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
916 ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
917 result.add(OPEN_KEY);
918 Collections.addAll(result, this.state.stripeEndRows);
919 result.add(OPEN_KEY);
920 return result;
921 }
922
923 @Override
924 public ArrayList<ImmutableList<StoreFile>> getStripes() {
925 return this.state.stripeFiles;
926 }
927
928 @Override
929 public int getStripeCount() {
930 return this.state.stripeFiles.size();
931 }
932
933 @Override
934 public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) {
935
936
937 State state = this.state;
938 Collection<StoreFile> expiredStoreFiles = null;
939 for (ImmutableList<StoreFile> stripe : state.stripeFiles) {
940 expiredStoreFiles = findExpiredFiles(stripe, maxTs, filesCompacting, expiredStoreFiles);
941 }
942 return findExpiredFiles(state.level0Files, maxTs, filesCompacting, expiredStoreFiles);
943 }
944
945 private Collection<StoreFile> findExpiredFiles(ImmutableList<StoreFile> stripe, long maxTs,
946 List<StoreFile> filesCompacting, Collection<StoreFile> expiredStoreFiles) {
947
948 for (int i = 1; i < stripe.size(); ++i) {
949 StoreFile sf = stripe.get(i);
950 long fileTs = sf.getReader().getMaxTimestamp();
951 if (fileTs < maxTs && !filesCompacting.contains(sf)) {
952 LOG.info("Found an expired store file: " + sf.getPath()
953 + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
954 if (expiredStoreFiles == null) {
955 expiredStoreFiles = new ArrayList<StoreFile>();
956 }
957 expiredStoreFiles.add(sf);
958 }
959 }
960 return expiredStoreFiles;
961 }
962
963 @Override
964 public double getCompactionPressure() {
965 State stateLocal = this.state;
966 if (stateLocal.allFilesCached.size() > blockingFileCount) {
967
968 return 2.0;
969 }
970 if (stateLocal.stripeFiles.isEmpty()) {
971 return 0.0;
972 }
973 int blockingFilePerStripe = blockingFileCount / stateLocal.stripeFiles.size();
974
975
976 int delta = stateLocal.level0Files.isEmpty() ? 0 : 1;
977 double max = 0.0;
978 for (ImmutableList<StoreFile> stripeFile : stateLocal.stripeFiles) {
979 int stripeFileCount = stripeFile.size();
980 double normCount =
981 (double) (stripeFileCount + delta - config.getStripeCompactMinFiles())
982 / (blockingFilePerStripe - config.getStripeCompactMinFiles());
983 if (normCount >= 1.0) {
984
985
986 return 1.0;
987 }
988 if (normCount > max) {
989 max = normCount;
990 }
991 }
992 return max;
993 }
994
995 @Override
996 public Comparator<StoreFile> getStoreFileComparator() {
997 return StoreFile.Comparators.SEQ_ID;
998 }
999 }