View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.util;
20  
21  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22  import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
23  
24  import java.io.IOException;
25  import java.io.PrintWriter;
26  import java.io.StringWriter;
27  import java.util.Arrays;
28  import java.util.HashSet;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.TableName;
36  import org.apache.hadoop.hbase.client.HTableInterface;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
39  import org.apache.hadoop.hbase.client.Table;
40  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
41  import org.apache.hadoop.util.StringUtils;
42  
43  /** Creates multiple threads that write key/values into the */
44  public class MultiThreadedWriter extends MultiThreadedWriterBase {
45    private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class);
46  
47    protected Set<HBaseWriterThread> writers = new HashSet<HBaseWriterThread>();
48  
49    protected boolean isMultiPut = false;
50  
51    public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
52        TableName tableName) throws IOException {
53      super(dataGen, conf, tableName, "W");
54    }
55  
56    /** Use multi-puts vs. separate puts for every column in a row */
57    public void setMultiPut(boolean isMultiPut) {
58      this.isMultiPut = isMultiPut;
59    }
60  
61    @Override
62    public void start(long startKey, long endKey, int numThreads) throws IOException {
63      super.start(startKey, endKey, numThreads);
64  
65      if (verbose) {
66        LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
67      }
68  
69      createWriterThreads(numThreads);
70  
71      startThreads(writers);
72    }
73  
74    protected void createWriterThreads(int numThreads) throws IOException {
75      for (int i = 0; i < numThreads; ++i) {
76        HBaseWriterThread writer = new HBaseWriterThread(i);
77        Threads.setLoggingUncaughtExceptionHandler(writer);
78        writers.add(writer);
79      }
80    }
81  
82    public class HBaseWriterThread extends Thread {
83      private final Table table;
84  
85      public HBaseWriterThread(int writerId) throws IOException {
86        setName(getClass().getSimpleName() + "_" + writerId);
87        table = createTable();
88      }
89  
90      protected HTableInterface createTable() throws IOException {
91        return connection.getTable(tableName);
92      }
93  
94      @Override
95      public void run() {
96        try {
97          long rowKeyBase;
98          byte[][] columnFamilies = dataGenerator.getColumnFamilies();
99          while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
100           byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
101           Put put = new Put(rowKey);
102           numKeys.addAndGet(1);
103           int columnCount = 0;
104           for (byte[] cf : columnFamilies) {
105             byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
106             for (byte[] column : columns) {
107               byte[] value = dataGenerator.generateValue(rowKey, cf, column);
108               put.add(cf, column, value);
109               ++columnCount;
110               if (!isMultiPut) {
111                 insert(table, put, rowKeyBase);
112                 numCols.addAndGet(1);
113                 put = new Put(rowKey);
114               }
115             }
116             long rowKeyHash = Arrays.hashCode(rowKey);
117             put.add(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
118             put.add(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
119             if (!isMultiPut) {
120               insert(table, put, rowKeyBase);
121               numCols.addAndGet(1);
122               put = new Put(rowKey);
123             }
124           }
125           if (isMultiPut) {
126             if (verbose) {
127               LOG.debug("Preparing put for key = [" + rowKey + "], " + columnCount + " columns");
128             }
129             insert(table, put, rowKeyBase);
130             numCols.addAndGet(columnCount);
131           }
132           if (trackWroteKeys) {
133             wroteKeys.add(rowKeyBase);
134           }
135         }
136       } finally {
137         closeHTable();
138         numThreadsWorking.decrementAndGet();
139       }
140     }
141 
142     public void insert(Table table, Put put, long keyBase) {
143       long start = System.currentTimeMillis();
144       try {
145         put = (Put) dataGenerator.beforeMutate(keyBase, put);
146         table.put(put);
147         totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
148       } catch (IOException e) {
149         failedKeySet.add(keyBase);
150         String exceptionInfo;
151         if (e instanceof RetriesExhaustedWithDetailsException) {
152           RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
153           exceptionInfo = aggEx.getExhaustiveDescription();
154         } else {
155           StringWriter stackWriter = new StringWriter();
156           PrintWriter pw = new PrintWriter(stackWriter);
157           e.printStackTrace(pw);
158           pw.flush();
159           exceptionInfo = StringUtils.stringifyException(e);
160         }
161         LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
162             + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow())
163             + "; errors: " + exceptionInfo);
164       }
165     }
166     protected void closeHTable() {
167       try {
168         if (table != null) {
169           table.close();
170         }
171       } catch (IOException e) {
172         LOG.error("Error closing table", e);
173       }
174     }
175   }
176 
177   @Override
178   public void waitForFinish() {
179     super.waitForFinish();
180     System.out.println("Failed to write keys: " + failedKeySet.size());
181     for (Long key : failedKeySet) {
182        System.out.println("Failed to write key: " + key);
183     }
184   }
185 }