View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  
20  import java.io.IOException;
21  import java.io.PrintWriter;
22  import java.io.StringWriter;
23  import java.security.PrivilegedExceptionAction;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.TableName;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.HTableInterface;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
33  import org.apache.hadoop.hbase.client.Table;
34  import org.apache.hadoop.hbase.security.User;
35  import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
36  import org.apache.hadoop.util.StringUtils;
37  
38  /**
39   * MultiThreadedWriter that helps in testing ACL
40   */
41  public class MultiThreadedWriterWithACL extends MultiThreadedWriter {
42  
43    private static final Log LOG = LogFactory.getLog(MultiThreadedWriterWithACL.class);
44    private User userOwner;
45  
46    public MultiThreadedWriterWithACL(LoadTestDataGenerator dataGen, Configuration conf,
47        TableName tableName, User userOwner) throws IOException {
48      super(dataGen, conf, tableName);
49      this.userOwner = userOwner;
50    }
51  
52    @Override
53    public void start(long startKey, long endKey, int numThreads) throws IOException {
54      super.start(startKey, endKey, numThreads);
55    }
56  
57    @Override
58    protected void createWriterThreads(int numThreads) throws IOException {
59      for (int i = 0; i < numThreads; ++i) {
60        HBaseWriterThread writer = new HBaseWriterThreadWithACL(i);
61        writers.add(writer);
62      }
63    }
64  
65    public class HBaseWriterThreadWithACL extends HBaseWriterThread {
66  
67      private Table table;
68      private WriteAccessAction writerAction = new WriteAccessAction();
69  
70      public HBaseWriterThreadWithACL(int writerId) throws IOException {
71        super(writerId);
72      }
73  
74      @Override
75      protected HTableInterface createTable() throws IOException {
76        return null;
77      }
78  
79      @Override
80      protected void closeHTable() {
81        if (table != null) {
82          try {
83            table.close();
84          } catch (Exception e) {
85            LOG.error("Error in closing the table "+table.getName(), e);
86          }
87        }
88      }
89  
90      @Override
91      public void insert(final Table table, Put put, final long keyBase) {
92        final long start = System.currentTimeMillis();
93        try {
94          put = (Put) dataGenerator.beforeMutate(keyBase, put);
95          writerAction.setPut(put);
96          writerAction.setKeyBase(keyBase);
97          writerAction.setStartTime(start);
98          userOwner.runAs(writerAction);
99        } catch (IOException e) {
100         recordFailure(table, put, keyBase, start, e);
101       } catch (InterruptedException e) {
102         failedKeySet.add(keyBase);
103       }
104     }
105 
106     class WriteAccessAction implements PrivilegedExceptionAction<Object> {
107       private Put put;
108       private long keyBase;
109       private long start;
110 
111       public WriteAccessAction() {
112       }
113 
114       public void setPut(final Put put) {
115         this.put = put;
116       }
117 
118       public void setKeyBase(final long keyBase) {
119         this.keyBase = keyBase;
120       }
121 
122       public void setStartTime(final long start) {
123         this.start = start;
124       }
125 
126       @Override
127       public Object run() throws Exception {
128         try {
129           if (table == null) {
130             table = new HTable(conf, tableName);
131           }
132           table.put(put);
133         } catch (IOException e) {
134           recordFailure(table, put, keyBase, start, e);
135         }
136         return null;
137       }
138     }
139   }
140 
141   private void recordFailure(final Table table, final Put put, final long keyBase,
142       final long start, IOException e) {
143     failedKeySet.add(keyBase);
144     String exceptionInfo;
145     if (e instanceof RetriesExhaustedWithDetailsException) {
146       RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
147       exceptionInfo = aggEx.getExhaustiveDescription();
148     } else {
149       StringWriter stackWriter = new StringWriter();
150       PrintWriter pw = new PrintWriter(stackWriter);
151       e.printStackTrace(pw);
152       pw.flush();
153       exceptionInfo = StringUtils.stringifyException(e);
154     }
155     LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
156         + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow()) + "; errors: "
157         + exceptionInfo);
158   }
159 }