1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.client;
20
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.Cell;
26 import org.apache.hadoop.hbase.HBaseConfiguration;
27 import org.apache.hadoop.hbase.HBaseTestingUtility;
28 import org.apache.hadoop.hbase.testclassification.MediumTests;
29 import org.apache.hadoop.hbase.TableName;
30 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
31 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
32 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
33 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.hbase.util.Threads;
36 import org.junit.AfterClass;
37 import org.junit.Assert;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41
42 import java.io.IOException;
43 import java.io.InterruptedIOException;
44 import java.net.SocketTimeoutException;
45 import java.util.ArrayList;
46 import java.util.List;
47 import java.util.concurrent.atomic.AtomicInteger;
48
49 @Category(MediumTests.class)
50 public class TestClientOperationInterrupt {
51 private static final Log LOG = LogFactory.getLog(TestClientOperationInterrupt.class);
52
53 private static HBaseTestingUtility util;
54 private static final TableName tableName = TableName.valueOf("test");
55 private static final byte[] dummy = Bytes.toBytes("dummy");
56 private static final byte[] row1 = Bytes.toBytes("r1");
57 private static final byte[] test = Bytes.toBytes("test");
58 private static Configuration conf;
59
60 public static class TestCoprocessor extends BaseRegionObserver {
61 @Override
62 public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
63 final Get get, final List<Cell> results) throws IOException {
64 Threads.sleep(2500);
65 }
66 }
67
68
69 @BeforeClass
70 public static void setUpBeforeClass() throws Exception {
71 conf = HBaseConfiguration.create();
72 conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
73 TestCoprocessor.class.getName());
74 util = new HBaseTestingUtility(conf);
75 util.startMiniCluster();
76
77 Admin admin = util.getHBaseAdmin();
78 if (admin.tableExists(tableName)) {
79 if (admin.isTableEnabled(tableName)) {
80 admin.disableTable(tableName);
81 }
82 admin.deleteTable(tableName);
83 }
84 util.createTable(tableName, new byte[][]{dummy, test});
85
86 Table ht = new HTable(conf, tableName);
87 Put p = new Put(row1);
88 p.add(dummy, dummy, dummy);
89 ht.put(p);
90 }
91
92
93 @Test
94 public void testInterrupt50Percent() throws IOException, InterruptedException {
95 final AtomicInteger noEx = new AtomicInteger(0);
96 final AtomicInteger badEx = new AtomicInteger(0);
97 final AtomicInteger noInt = new AtomicInteger(0);
98 final AtomicInteger done = new AtomicInteger(0);
99 List<Thread> threads = new ArrayList<Thread>();
100
101 final int nbThread = 100;
102
103 for (int i = 0; i < nbThread; i++) {
104 Thread t = new Thread() {
105 @Override
106 public void run() {
107 try {
108 Table ht = new HTable(conf, tableName);
109 Result r = ht.get(new Get(row1));
110 noEx.incrementAndGet();
111 } catch (IOException e) {
112 LOG.info("exception", e);
113 if (!(e instanceof InterruptedIOException) || (e instanceof SocketTimeoutException)) {
114 badEx.incrementAndGet();
115 } else {
116 if (Thread.currentThread().isInterrupted()) {
117 noInt.incrementAndGet();
118 LOG.info("The thread should NOT be with the 'interrupt' status.");
119 }
120 }
121 } finally {
122 done.incrementAndGet();
123 }
124 }
125 };
126 t.setName("TestClientOperationInterrupt #" + i);
127 threads.add(t);
128 t.start();
129 }
130
131 for (int i = 0; i < nbThread / 2; i++) {
132 threads.get(i).interrupt();
133 }
134
135
136 boolean stillAlive = true;
137 while (stillAlive) {
138 stillAlive = false;
139 for (Thread t : threads) {
140 if (t.isAlive()) {
141 stillAlive = true;
142 }
143 }
144 Threads.sleep(10);
145 }
146
147 Assert.assertFalse(Thread.currentThread().isInterrupted());
148
149 Assert.assertTrue(" noEx: " + noEx.get() + ", badEx=" + badEx.get() + ", noInt=" + noInt.get(),
150 noEx.get() == nbThread / 2 && badEx.get() == 0);
151
152
153 while (done.get() != nbThread){
154 Thread.sleep(1);
155 }
156
157 Table ht = new HTable(conf, tableName);
158 Result r = ht.get(new Get(row1));
159 Assert.assertFalse(r.isEmpty());
160 }
161
162 @AfterClass
163 public static void tearDownAfterClass() throws Exception {
164 util.shutdownMiniCluster();
165 }
166 }