View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import junit.framework.TestCase;
21  import org.apache.hadoop.hbase.testclassification.SmallTests;
22  import org.junit.experimental.categories.Category;
23  
24  import java.util.Random;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  /**
29   * This is a hammer test that verifies MultiVersionConsistencyControl in a
30   * multiple writer single reader scenario.
31   */
32  @Category(SmallTests.class)
33  public class TestMultiVersionConsistencyControl extends TestCase {
34    static class Writer implements Runnable {
35      final AtomicBoolean finished;
36      final MultiVersionConsistencyControl mvcc;
37      final AtomicBoolean status;
38  
39      Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
40        this.finished = finished;
41        this.mvcc = mvcc;
42        this.status = status;
43      }
44  
45      private Random rnd = new Random();
46      public boolean failed = false;
47  
48      public void run() {
49        AtomicLong startPoint = new AtomicLong();
50        while (!finished.get()) {
51          MultiVersionConsistencyControl.WriteEntry e = 
52              mvcc.beginMemstoreInsertWithSeqNum(startPoint.incrementAndGet());
53          // System.out.println("Begin write: " + e.getWriteNumber());
54          // 10 usec - 500usec (including 0)
55          int sleepTime = rnd.nextInt(500);
56          // 500 * 1000 = 500,000ns = 500 usec
57          // 1 * 100 = 100ns = 1usec
58          try {
59            if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
60          } catch (InterruptedException e1) {
61          }
62          try {
63            mvcc.completeMemstoreInsert(e);
64          } catch (RuntimeException ex) {
65            // got failure
66            System.out.println(ex.toString());
67            ex.printStackTrace();
68            status.set(false);
69            return;
70            // Report failure if possible.
71          }
72        }
73      }
74    }
75  
76    public void testParallelism() throws Exception {
77      final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
78  
79      final AtomicBoolean finished = new AtomicBoolean(false);
80  
81      // fail flag for the reader thread
82      final AtomicBoolean readerFailed = new AtomicBoolean(false);
83      final AtomicLong failedAt = new AtomicLong();
84      Runnable reader = new Runnable() {
85        public void run() {
86          long prev = mvcc.memstoreReadPoint();
87          while (!finished.get()) {
88            long newPrev = mvcc.memstoreReadPoint();
89            if (newPrev < prev) {
90              // serious problem.
91              System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
92              readerFailed.set(true);
93              // might as well give up
94              failedAt.set(newPrev);
95              return;
96            }
97          }
98        }
99      };
100 
101     // writer thread parallelism.
102     int n = 20;
103     Thread[] writers = new Thread[n];
104     AtomicBoolean[] statuses = new AtomicBoolean[n];
105     Thread readThread = new Thread(reader);
106 
107     for (int i = 0; i < n; ++i) {
108       statuses[i] = new AtomicBoolean(true);
109       writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
110       writers[i].start();
111     }
112     readThread.start();
113 
114     try {
115       Thread.sleep(10 * 1000);
116     } catch (InterruptedException ex) {
117     }
118 
119     finished.set(true);
120 
121     readThread.join();
122     for (int i = 0; i < n; ++i) {
123       writers[i].join();
124     }
125 
126     // check failure.
127     assertFalse(readerFailed.get());
128     for (int i = 0; i < n; ++i) {
129       assertTrue(statuses[i].get());
130     }
131   }
132 }