1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FileSystem;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.CellScanner;
27 import org.apache.hadoop.hbase.HBaseTestingUtility;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.HRegionInfo;
30 import org.apache.hadoop.hbase.HTableDescriptor;
31 import org.apache.hadoop.hbase.MiniHBaseCluster;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.Durability;
35 import org.apache.hadoop.hbase.client.Mutation;
36 import org.apache.hadoop.hbase.client.Put;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
39 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
40 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
41 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
42 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
43 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
44 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
45 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
46 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
47 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
48 import org.apache.hadoop.hbase.testclassification.MediumTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.JVMClusterUtil;
51 import org.apache.hadoop.hbase.wal.WAL;
52 import org.apache.hadoop.hdfs.DFSConfigKeys;
53 import org.apache.hadoop.hdfs.MiniDFSCluster;
54 import org.junit.After;
55 import org.junit.Before;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58
59 import java.io.IOException;
60 import java.util.Collection;
61 import java.util.List;
62
63 import static org.junit.Assert.assertFalse;
64 import static org.junit.Assert.assertNotNull;
65 import static org.junit.Assert.assertTrue;
66
67
68
69
70 @Category({MediumTests.class})
71 public class TestRegionServerAbort {
72 private static final byte[] FAMILY_BYTES = Bytes.toBytes("f");
73
74 private static final Log LOG = LogFactory.getLog(TestRegionServerAbort.class);
75
76 private HBaseTestingUtility testUtil;
77 private Configuration conf;
78 private MiniDFSCluster dfsCluster;
79 private MiniHBaseCluster cluster;
80
81 @Before
82 public void setup() throws Exception {
83 testUtil = new HBaseTestingUtility();
84 conf = testUtil.getConfiguration();
85 conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
86 StopBlockingRegionObserver.class.getName());
87 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
88 StopBlockingRegionObserver.class.getName());
89
90 conf.set("dfs.blocksize", Long.toString(100 * 1024));
91
92 conf.set(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, Long.toString(100 * 1024));
93
94 testUtil.startMiniZKCluster();
95 dfsCluster = testUtil.startMiniDFSCluster(2);
96 cluster = testUtil.startMiniHBaseCluster(1, 2);
97 }
98
99 @After
100 public void tearDown() throws Exception {
101 for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
102 HRegionServer rs = t.getRegionServer();
103 RegionServerCoprocessorHost cpHost = rs.getRegionServerCoprocessorHost();
104 StopBlockingRegionObserver cp = (StopBlockingRegionObserver)
105 cpHost.findCoprocessor(StopBlockingRegionObserver.class.getName());
106 cp.setStopAllowed(true);
107 }
108 testUtil.shutdownMiniCluster();
109 }
110
111
112
113
114
115 @Test
116 public void testAbortFromRPC() throws Exception {
117 TableName tableName = TableName.valueOf("testAbortFromRPC");
118
119 Table table = testUtil.createTable(tableName, FAMILY_BYTES);
120
121
122 testUtil.loadTable(table, FAMILY_BYTES);
123 LOG.info("Wrote data");
124
125 cluster.flushcache(tableName);
126 LOG.info("Flushed table");
127
128
129 Put put = new Put(new byte[]{0, 0, 0, 0});
130 put.addColumn(FAMILY_BYTES, Bytes.toBytes("c"), new byte[]{});
131 put.setAttribute(StopBlockingRegionObserver.DO_ABORT, new byte[]{1});
132
133 table.put(put);
134
135
136
137 HRegion firstRegion = cluster.findRegionsForTable(tableName).get(0);
138 assertNotNull(firstRegion);
139 assertNotNull(firstRegion.getRegionServerServices());
140 LOG.info("isAborted = " + firstRegion.getRegionServerServices().isAborted());
141 assertTrue(firstRegion.getRegionServerServices().isAborted());
142 LOG.info("isStopped = " + firstRegion.getRegionServerServices().isStopped());
143 assertTrue(firstRegion.getRegionServerServices().isStopped());
144 }
145
146
147
148
149 @Test
150 public void testStopOverrideFromCoprocessor() throws Exception {
151 Admin admin = testUtil.getHBaseAdmin();
152 HRegionServer regionserver = cluster.getRegionServer(0);
153 admin.stopRegionServer(regionserver.getServerName().getHostAndPort());
154
155
156 assertFalse(cluster.getRegionServer(0).isAborted());
157 assertFalse(cluster.getRegionServer(0).isStopped());
158 }
159
160 public static class StopBlockingRegionObserver extends BaseRegionObserver
161 implements RegionServerObserver {
162 public static final String DO_ABORT = "DO_ABORT";
163 private boolean stopAllowed;
164
165 @Override
166 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
167 Durability durability) throws IOException {
168 if (put.getAttribute(DO_ABORT) != null) {
169 HRegionServer rs = (HRegionServer) c.getEnvironment().getRegionServerServices();
170 LOG.info("Triggering abort for regionserver " + rs.getServerName());
171 rs.abort("Aborting for test");
172 }
173 }
174
175 @Override
176 public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
177 throws IOException {
178 if (!stopAllowed) {
179 throw new IOException("Stop not allowed");
180 }
181 }
182
183 @Override
184 public void preMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
185 Region regionA, Region regionB) throws IOException {
186
187 }
188
189 @Override
190 public void postMerge(ObserverContext<RegionServerCoprocessorEnvironment> c,
191 Region regionA, Region regionB, Region mergedRegion) throws IOException {
192
193 }
194
195 @Override
196 public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
197 Region regionA, Region regionB, List<Mutation> metaEntries)
198 throws IOException {
199
200 }
201
202 @Override
203 public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
204 Region regionA, Region regionB, Region mergedRegion)
205 throws IOException {
206
207 }
208
209 @Override
210 public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
211 Region regionA, Region regionB) throws IOException {
212
213 }
214
215 @Override
216 public void postRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
217 Region regionA, Region regionB) throws IOException {
218
219 }
220
221 @Override
222 public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
223 throws IOException {
224
225 }
226
227 @Override
228 public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
229 throws IOException {
230
231 }
232
233 @Override
234 public ReplicationEndpoint postCreateReplicationEndPoint(
235 ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
236 return null;
237 }
238
239 @Override
240 public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
241 List<AdminProtos.WALEntry> entries, CellScanner cells)
242 throws IOException {
243
244 }
245
246 @Override
247 public void postReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
248 List<AdminProtos.WALEntry> entries, CellScanner cells)
249 throws IOException {
250
251 }
252
253 public void setStopAllowed(boolean allowed) {
254 this.stopAllowed = allowed;
255 }
256
257 public boolean isStopAllowed() {
258 return stopAllowed;
259 }
260 }
261 }