View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
32  import org.apache.hadoop.hbase.util.Bytes;
33  
34  /**
35   * Base class for cell sink that separates the provided cells into multiple files for stripe
36   * compaction.
37   */
38  @InterfaceAudience.Private
39  public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
40  
41    private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
42  
43    protected final KVComparator comparator;
44    protected List<StoreFile.Writer> existingWriters;
45    protected List<byte[]> boundaries;
46  
47    /** Whether to write stripe metadata */
48    private boolean doWriteStripeMetadata = true;
49  
50    public StripeMultiFileWriter(KVComparator comparator) {
51      this.comparator = comparator;
52    }
53  
54    public void setNoStripeMetadata() {
55      this.doWriteStripeMetadata = false;
56    }
57  
58    @Override
59    protected Collection<Writer> writers() {
60      return existingWriters;
61    }
62  
63    protected abstract void preCommitWritersInternal() throws IOException;
64  
65    @Override
66    protected final void preCommitWriters() throws IOException {
67      // do some sanity check here.
68      assert this.existingWriters != null;
69      preCommitWritersInternal();
70      assert this.boundaries.size() == (this.existingWriters.size() + 1);
71    }
72  
73    @Override
74    protected void preCloseWriter(Writer writer) throws IOException {
75      if (doWriteStripeMetadata) {
76        if (LOG.isDebugEnabled()) {
77          LOG.debug("Write stripe metadata for " + writer.getPath().toString());
78        }
79        int index = existingWriters.indexOf(writer);
80        writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index));
81        writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1));
82      } else {
83        if (LOG.isDebugEnabled()) {
84          LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString());
85        }
86      }
87    }
88  
89    /**
90     * Subclasses can call this method to make sure the first KV is within multi-writer range.
91     * @param left The left boundary of the writer.
92     * @param row The row to check.
93     * @param rowOffset Offset for row.
94     * @param rowLength Length for row.
95     */
96    protected void sanityCheckLeft(byte[] left, byte[] row, int rowOffset, int rowLength)
97        throws IOException {
98      if (StripeStoreFileManager.OPEN_KEY != left
99          && comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
100       String error =
101           "The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: ["
102               + Bytes.toString(row, rowOffset, rowLength) + "]";
103       LOG.error(error);
104       throw new IOException(error);
105     }
106   }
107 
108   /**
109    * Subclasses can call this method to make sure the last KV is within multi-writer range.
110    * @param right The right boundary of the writer.
111    * @param row The row to check.
112    * @param rowOffset Offset for row.
113    * @param rowLength Length for row.
114    */
115   protected void sanityCheckRight(byte[] right, byte[] row, int rowOffset, int rowLength)
116       throws IOException {
117     if (StripeStoreFileManager.OPEN_KEY != right
118         && comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
119       String error =
120           "The last row is higher or equal than the right boundary of [" + Bytes.toString(right)
121               + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
122       LOG.error(error);
123       throw new IOException(error);
124     }
125   }
126 
127   /**
128    * MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between
129    * each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and
130    * separate from all other such pairs.
131    */
132   public static class BoundaryMultiWriter extends StripeMultiFileWriter {
133     private StoreFile.Writer currentWriter;
134     private byte[] currentWriterEndKey;
135 
136     private Cell lastCell;
137     private long cellsInCurrentWriter = 0;
138     private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
139     private boolean hasAnyWriter = false;
140 
141     /**
142      * @param targetBoundaries The boundaries on which writers/files are separated.
143      * @param majorRangeFrom Major range is the range for which at least one file should be written
144      *          (because all files are included in compaction). majorRangeFrom is the left boundary.
145      * @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
146      */
147     public BoundaryMultiWriter(KVComparator comparator, List<byte[]> targetBoundaries,
148         byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
149       super(comparator);
150       this.boundaries = targetBoundaries;
151       this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
152       // "major" range (range for which all files are included) boundaries, if any,
153       // must match some target boundaries, let's find them.
154       assert (majorRangeFrom == null) == (majorRangeTo == null);
155       if (majorRangeFrom != null) {
156         majorRangeFromIndex =
157             (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0 : Collections.binarySearch(
158               this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
159         majorRangeToIndex =
160             (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size() : Collections
161                 .binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
162         if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
163           throw new IOException("Major range does not match writer boundaries: ["
164               + Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
165               + majorRangeFromIndex + " to " + majorRangeToIndex);
166         }
167       }
168     }
169 
170     @Override
171     public void append(Cell cell) throws IOException {
172       if (currentWriter == null && existingWriters.isEmpty()) {
173         // First append ever, do a sanity check.
174         sanityCheckLeft(this.boundaries.get(0), cell.getRowArray(), cell.getRowOffset(),
175           cell.getRowLength());
176       }
177       prepareWriterFor(cell);
178       currentWriter.append(cell);
179       lastCell = cell; // for the sanity check
180       ++cellsInCurrentWriter;
181     }
182 
183     private boolean isCellAfterCurrentWriter(Cell cell) {
184       return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) && (comparator.compareRows(
185         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), currentWriterEndKey, 0,
186         currentWriterEndKey.length) >= 0));
187     }
188 
189     @Override
190     protected void preCommitWritersInternal() throws IOException {
191       stopUsingCurrentWriter();
192       while (existingWriters.size() < boundaries.size() - 1) {
193         createEmptyWriter();
194       }
195       if (lastCell != null) {
196         sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell.getRowArray(),
197           lastCell.getRowOffset(), lastCell.getRowLength());
198       }
199     }
200 
201     private void prepareWriterFor(Cell cell) throws IOException {
202       if (currentWriter != null && !isCellAfterCurrentWriter(cell)) return; // Use same writer.
203 
204       stopUsingCurrentWriter();
205       // See if KV will be past the writer we are about to create; need to add another one.
206       while (isCellAfterCurrentWriter(cell)) {
207         checkCanCreateWriter();
208         createEmptyWriter();
209       }
210       checkCanCreateWriter();
211       hasAnyWriter = true;
212       currentWriter = writerFactory.createWriter();
213       existingWriters.add(currentWriter);
214     }
215 
216     /**
217      * Called if there are no cells for some stripe. We need to have something in the writer list
218      * for this stripe, so that writer-boundary list indices correspond to each other. We can insert
219      * null in the writer list for that purpose, except in the following cases where we actually
220      * need a file: 1) If we are in range for which we are compacting all the files, we need to
221      * create an empty file to preserve stripe metadata. 2) If we have not produced any file at all
222      * for this compactions, and this is the last chance (the last stripe), we need to preserve last
223      * seqNum (see also HBASE-6059).
224      */
225     private void createEmptyWriter() throws IOException {
226       int index = existingWriters.size();
227       boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
228       // Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
229       boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
230       boolean needEmptyFile = isInMajorRange || isLastWriter;
231       existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
232       hasAnyWriter |= needEmptyFile;
233       currentWriterEndKey =
234           (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
235               .size() + 1);
236     }
237 
238     private void checkCanCreateWriter() throws IOException {
239       int maxWriterCount = boundaries.size() - 1;
240       assert existingWriters.size() <= maxWriterCount;
241       if (existingWriters.size() >= maxWriterCount) {
242         throw new IOException("Cannot create any more writers (created " + existingWriters.size()
243             + " out of " + maxWriterCount + " - row might be out of range of all valid writers");
244       }
245     }
246 
247     private void stopUsingCurrentWriter() {
248       if (currentWriter != null) {
249         if (LOG.isDebugEnabled()) {
250           LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
251               + "] row; wrote out " + cellsInCurrentWriter + " kvs");
252         }
253         cellsInCurrentWriter = 0;
254       }
255       currentWriter = null;
256       currentWriterEndKey =
257           (existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
258               .size() + 1);
259     }
260   }
261 
262   /**
263    * MultiWriter that separates the cells based on target cell number per file and file count. New
264    * file is started every time the target number of KVs is reached, unless the fixed count of
265    * writers has already been created (in that case all the remaining KVs go into the last writer).
266    */
267   public static class SizeMultiWriter extends StripeMultiFileWriter {
268     private int targetCount;
269     private long targetCells;
270     private byte[] left;
271     private byte[] right;
272 
273     private Cell lastCell;
274     private StoreFile.Writer currentWriter;
275     protected byte[] lastRowInCurrentWriter = null;
276     private long cellsInCurrentWriter = 0;
277     private long cellsSeen = 0;
278     private long cellsSeenInPrevious = 0;
279 
280     /**
281      * @param targetCount The maximum count of writers that can be created.
282      * @param targetKvs The number of KVs to read from source before starting each new writer.
283      * @param left The left boundary of the first writer.
284      * @param right The right boundary of the last writer.
285      */
286     public SizeMultiWriter(KVComparator comparator, int targetCount, long targetKvs, byte[] left,
287         byte[] right) {
288       super(comparator);
289       this.targetCount = targetCount;
290       this.targetCells = targetKvs;
291       this.left = left;
292       this.right = right;
293       int preallocate = Math.min(this.targetCount, 64);
294       this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
295       this.boundaries = new ArrayList<byte[]>(preallocate + 1);
296     }
297 
298     @Override
299     public void append(Cell cell) throws IOException {
300       // If we are waiting for opportunity to close and we started writing different row,
301       // discard the writer and stop waiting.
302       boolean doCreateWriter = false;
303       if (currentWriter == null) {
304         // First append ever, do a sanity check.
305         sanityCheckLeft(left, cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
306         doCreateWriter = true;
307       } else if (lastRowInCurrentWriter != null
308           && !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
309             lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
310         if (LOG.isDebugEnabled()) {
311           LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
312               + "] row; wrote out " + cellsInCurrentWriter + " kvs");
313         }
314         lastRowInCurrentWriter = null;
315         cellsInCurrentWriter = 0;
316         cellsSeenInPrevious += cellsSeen;
317         doCreateWriter = true;
318       }
319       if (doCreateWriter) {
320         byte[] boundary = existingWriters.isEmpty() ? left : cell.getRow(); // make a copy
321         if (LOG.isDebugEnabled()) {
322           LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
323         }
324         currentWriter = writerFactory.createWriter();
325         boundaries.add(boundary);
326         existingWriters.add(currentWriter);
327       }
328 
329       currentWriter.append(cell);
330       lastCell = cell; // for the sanity check
331       ++cellsInCurrentWriter;
332       cellsSeen = cellsInCurrentWriter;
333       if (this.sourceScanner != null) {
334         cellsSeen =
335             Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned()
336                 - cellsSeenInPrevious);
337       }
338 
339       // If we are not already waiting for opportunity to close, start waiting if we can
340       // create any more writers and if the current one is too big.
341       if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount
342           && cellsSeen >= targetCells) {
343         lastRowInCurrentWriter = cell.getRow(); // make a copy
344         if (LOG.isDebugEnabled()) {
345           LOG.debug("Preparing to start a new writer after ["
346               + Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen
347               + " kvs and wrote out " + cellsInCurrentWriter + " kvs");
348         }
349       }
350     }
351 
352     @Override
353     protected void preCommitWritersInternal() throws IOException {
354       if (LOG.isDebugEnabled()) {
355         LOG.debug("Stopping with "
356             + cellsInCurrentWriter
357             + " kvs in last writer"
358             + ((this.sourceScanner == null) ? "" : ("; observed estimated "
359                 + this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
360       }
361       if (lastCell != null) {
362         sanityCheckRight(right, lastCell.getRowArray(), lastCell.getRowOffset(),
363           lastCell.getRowLength());
364       }
365 
366       // When expired stripes were going to be merged into one, and if no writer was created during
367       // the compaction, we need to create an empty file to preserve metadata.
368       if (existingWriters.isEmpty() && 1 == targetCount) {
369         if (LOG.isDebugEnabled()) {
370           LOG.debug("Merge expired stripes into one, create an empty file to preserve metadata.");
371         }
372         boundaries.add(left);
373         existingWriters.add(writerFactory.createWriter());
374       }
375 
376       this.boundaries.add(right);
377     }
378   }
379 }