View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.FileSystem;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.ProcedureInfo;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.util.Threads;
33  import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
34  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
35  import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
36  import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
37  import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
38  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
39  import org.apache.hadoop.hbase.util.NonceKey;
40  import static org.junit.Assert.assertEquals;
41  import static org.junit.Assert.assertFalse;
42  import static org.junit.Assert.assertTrue;
43  
44  public class ProcedureTestingUtility {
45    private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
46  
47    private ProcedureTestingUtility() {
48    }
49  
50    public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
51        final Path baseDir) throws IOException {
52      return createWalStore(conf, fs, baseDir);
53    }
54  
55    public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
56        final Path walDir) throws IOException {
57      return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
58        @Override
59        public void recoverFileLease(FileSystem fs, Path path) throws IOException {
60          // no-op
61        }
62      });
63    }
64  
65    public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
66        throws Exception {
67      restart(procExecutor, null);
68    }
69  
70    public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
71        Runnable beforeStartAction) throws Exception {
72      ProcedureStore procStore = procExecutor.getStore();
73      int storeThreads = procExecutor.getNumThreads();
74      int execThreads = procExecutor.getNumThreads();
75      // stop
76      procExecutor.stop();
77      procExecutor.join();
78      procStore.stop(false);
79      // nothing running...
80      if (beforeStartAction != null) {
81        beforeStartAction.run();
82      }
83      // re-start
84      procStore.start(storeThreads);
85      procExecutor.start(execThreads);
86    }
87  
88    public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
89        boolean value) {
90      if (procExecutor.testing == null) {
91        procExecutor.testing = new ProcedureExecutor.Testing();
92      }
93      procExecutor.testing.killBeforeStoreUpdate = value;
94      LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
95    }
96  
97    public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
98        boolean value) {
99      if (procExecutor.testing == null) {
100       procExecutor.testing = new ProcedureExecutor.Testing();
101     }
102     procExecutor.testing.toggleKillBeforeStoreUpdate = value;
103   }
104 
105   public static <TEnv> void toggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor) {
106     if (procExecutor.testing == null) {
107       procExecutor.testing = new ProcedureExecutor.Testing();
108     }
109     procExecutor.testing.killBeforeStoreUpdate = !procExecutor.testing.killBeforeStoreUpdate;
110     LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
111   }
112 
113   public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
114       boolean value) {
115     ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
116     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
117   }
118 
119   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
120     return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE);
121   }
122 
123   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
124       final long nonceGroup, final long nonce) {
125     long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
126     waitProcedure(procExecutor, procId);
127     return procId;
128   }
129 
130   public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
131       final long nonceGroup, final long nonce) {
132     final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
133     long procId = procExecutor.registerNonce(nonceKey);
134     assertFalse(procId >= 0);
135     return procExecutor.submitProcedure(proc, nonceKey);
136   }
137 
138   public static <TEnv> void waitProcedure(ProcedureExecutor<TEnv> procExecutor, long procId) {
139     while (!procExecutor.isFinished(procId) && procExecutor.isRunning()) {
140       Threads.sleepWithoutInterrupt(250);
141     }
142   }
143 
144   public static <TEnv> void waitNoProcedureRunning(ProcedureExecutor<TEnv> procExecutor) {
145     int stableRuns = 0;
146     while (stableRuns < 10) {
147       if (procExecutor.getActiveExecutorCount() > 0 || procExecutor.getRunnableSet().size() > 0) {
148         stableRuns = 0;
149         Threads.sleepWithoutInterrupt(100);
150       } else {
151         stableRuns++;
152         Threads.sleepWithoutInterrupt(25);
153       }
154     }
155   }
156 
157   public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
158       long procId) {
159     assertFalse("expected a running proc", procExecutor.isFinished(procId));
160     assertEquals(null, procExecutor.getResult(procId));
161   }
162 
163   public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
164       long procId) {
165     ProcedureInfo result = procExecutor.getResult(procId);
166     assertTrue("expected procedure result", result != null);
167     assertProcNotFailed(result);
168   }
169 
170   public static void assertProcNotFailed(final ProcedureInfo result) {
171     ForeignExceptionMessage exception = result.getForeignExceptionMessage();
172     String msg = exception != null ? result.getExceptionFullMessage() : "no exception found";
173     assertFalse(msg, result.isFailed());
174   }
175 
176   public static Throwable assertProcFailed(final ProcedureInfo result) {
177     assertEquals(true, result.isFailed());
178     LOG.info("procId=" + result.getProcId() + " exception: " + result.getException().getMessage());
179     return getExceptionCause(result);
180   }
181 
182   public static void assertIsAbortException(final ProcedureInfo result) {
183     assertEquals(true, result.isFailed());
184     LOG.info(result.getExceptionFullMessage());
185     Throwable cause = getExceptionCause(result);
186     assertTrue("expected abort exception, got " + cause,
187       cause instanceof ProcedureAbortedException);
188   }
189 
190   public static void assertIsTimeoutException(final ProcedureInfo result) {
191     assertEquals(true, result.isFailed());
192     LOG.info(result.getExceptionFullMessage());
193     Throwable cause = getExceptionCause(result);
194     assertTrue("expected TimeoutIOException, got " + cause, cause instanceof TimeoutIOException);
195   }
196 
197   public static void assertIsIllegalArgumentException(final ProcedureInfo result) {
198     assertEquals(true, result.isFailed());
199     LOG.info(result.getExceptionFullMessage());
200     Throwable cause = ProcedureTestingUtility.getExceptionCause(result);
201     assertTrue("expected IllegalArgumentIOException, got " + cause,
202       cause instanceof IllegalArgumentIOException);
203   }
204 
205   public static Throwable getExceptionCause(final ProcedureInfo procInfo) {
206     assert procInfo.getForeignExceptionMessage() != null;
207     return RemoteProcedureException.fromProto(procInfo.getForeignExceptionMessage()).getCause();
208   }
209 
210   public static class TestProcedure extends Procedure<Void> {
211     public TestProcedure() {}
212 
213     public TestProcedure(long procId) {
214       this(procId, 0);
215     }
216 
217     public TestProcedure(long procId, long parentId) {
218       setProcId(procId);
219       if (parentId > 0) {
220         setParentProcId(parentId);
221       }
222     }
223 
224     public void addStackId(final int index) {
225       addStackIndex(index);
226     }
227 
228     @Override
229     protected Procedure[] execute(Void env) { return null; }
230 
231     @Override
232     protected void rollback(Void env) { }
233 
234     @Override
235     protected boolean abort(Void env) { return false; }
236 
237     @Override
238     protected void serializeStateData(final OutputStream stream) throws IOException { }
239 
240     @Override
241     protected void deserializeStateData(final InputStream stream) throws IOException { }
242   }
243 }