View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.KeyValue.KVComparator;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
34  import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
35  import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
36  import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
37  
38  import com.google.common.annotations.VisibleForTesting;
39  
40  /**
41   * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
42   * into separate striped files, avoiding L0.
43   */
44  @InterfaceAudience.Private
45  public class StripeStoreFlusher extends StoreFlusher {
46    private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
47    private final Object flushLock = new Object();
48    private final StripeCompactionPolicy policy;
49    private final StripeCompactionPolicy.StripeInformationProvider stripes;
50  
51    public StripeStoreFlusher(Configuration conf, Store store,
52        StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
53      super(conf, store);
54      this.policy = policy;
55      this.stripes = stripes;
56    }
57  
58    @Override
59    public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
60        MonitoredTask status) throws IOException {
61      List<Path> result = new ArrayList<Path>();
62      int cellsCount = snapshot.getCellsCount();
63      if (cellsCount == 0) return result; // don't flush if there are no entries
64  
65      long smallestReadPoint = store.getSmallestReadPoint();
66      InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
67      if (scanner == null) {
68        return result; // NULL scanner returned from coprocessor hooks means skip normal processing
69      }
70  
71      // Let policy select flush method.
72      StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
73        cellsCount);
74  
75      boolean success = false;
76      StripeMultiFileWriter mw = null;
77      try {
78        mw = req.createWriter(); // Writer according to the policy.
79        StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
80            snapshot.getTimeRangeTracker(), cellsCount);
81        StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
82        mw.init(storeScanner, factory);
83  
84        synchronized (flushLock) {
85          performFlush(scanner, mw, smallestReadPoint);
86          result = mw.commitWriters(cacheFlushSeqNum, false);
87          success = true;
88        }
89      } finally {
90        if (!success && (mw != null)) {
91          for (Path leftoverFile : mw.abortWriters()) {
92            try {
93              store.getFileSystem().delete(leftoverFile, false);
94            } catch (Exception e) {
95              LOG.error("Failed to delete a file after failed flush: " + e);
96            }
97          }
98        }
99        try {
100         scanner.close();
101       } catch (IOException ex) {
102         LOG.warn("Failed to close flush scanner, ignoring", ex);
103       }
104     }
105     return result;
106   }
107 
108   private StripeMultiFileWriter.WriterFactory createWriterFactory(
109       final TimeRangeTracker tracker, final long kvCount) {
110     return new StripeMultiFileWriter.WriterFactory() {
111       @Override
112       public Writer createWriter() throws IOException {
113         StoreFile.Writer writer = store.createWriterInTmp(
114             kvCount, store.getFamily().getCompression(), false, true, true);
115         writer.setTimeRangeTracker(tracker);
116         return writer;
117       }
118     };
119   }
120 
121   /** Stripe flush request wrapper that writes a non-striped file. */
122   public static class StripeFlushRequest {
123 
124     protected final KVComparator comparator;
125 
126     public StripeFlushRequest(KVComparator comparator) {
127       this.comparator = comparator;
128     }
129 
130     @VisibleForTesting
131     public StripeMultiFileWriter createWriter() throws IOException {
132       StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
133           Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
134       writer.setNoStripeMetadata();
135       return writer;
136     }
137   }
138 
139   /** Stripe flush request wrapper based on boundaries. */
140   public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
141     private final List<byte[]> targetBoundaries;
142 
143     /** @param targetBoundaries New files should be written with these boundaries. */
144     public BoundaryStripeFlushRequest(KVComparator comparator, List<byte[]> targetBoundaries) {
145       super(comparator);
146       this.targetBoundaries = targetBoundaries;
147     }
148 
149     @Override
150     public StripeMultiFileWriter createWriter() throws IOException {
151       return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
152           null);
153     }
154   }
155 
156   /** Stripe flush request wrapper based on size. */
157   public static class SizeStripeFlushRequest extends StripeFlushRequest {
158     private final int targetCount;
159     private final long targetKvs;
160 
161     /**
162      * @param targetCount The maximum number of stripes to flush into.
163      * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
164      *                  total number of kvs, all the overflow data goes into the last stripe.
165      */
166     public SizeStripeFlushRequest(KVComparator comparator, int targetCount, long targetKvs) {
167       super(comparator);
168       this.targetCount = targetCount;
169       this.targetKvs = targetKvs;
170     }
171 
172     @Override
173     public StripeMultiFileWriter createWriter() throws IOException {
174       return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
175           OPEN_KEY, OPEN_KEY);
176     }
177   }
178 }