View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.junit.Assert.assertEquals;
22  
23  import java.io.IOException;
24  import java.util.List;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.testclassification.MediumTests;
32  import org.apache.hadoop.hbase.MiniHBaseCluster;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.ServerName;
35  import org.apache.hadoop.hbase.client.Admin;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Table;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.regionserver.Region;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.util.JVMClusterUtil;
43  import org.junit.AfterClass;
44  import org.junit.BeforeClass;
45  import org.junit.Ignore;
46  import org.junit.Test;
47  import org.junit.experimental.categories.Category;
48  
49  @Category(MediumTests.class)
50  public class TestAssignmentListener {
51    private static final Log LOG = LogFactory.getLog(TestAssignmentListener.class);
52  
53    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54  
55    static class DummyListener {
56      protected AtomicInteger modified = new AtomicInteger(0);
57  
58      public void awaitModifications(int count) throws InterruptedException {
59        while (!modified.compareAndSet(count, 0)) {
60          Thread.sleep(100);
61        }
62      }
63    }
64  
65    static class DummyAssignmentListener extends DummyListener implements AssignmentListener {
66      private AtomicInteger closeCount = new AtomicInteger(0);
67      private AtomicInteger openCount = new AtomicInteger(0);
68  
69      public DummyAssignmentListener() {
70      }
71  
72      @Override
73      public void regionOpened(final HRegionInfo regionInfo, final ServerName serverName) {
74        LOG.info("Assignment open region=" + regionInfo + " server=" + serverName);
75        openCount.incrementAndGet();
76        modified.incrementAndGet();
77      }
78  
79      @Override
80      public void regionClosed(final HRegionInfo regionInfo) {
81        LOG.info("Assignment close region=" + regionInfo);
82        closeCount.incrementAndGet();
83        modified.incrementAndGet();
84      }
85  
86      public void reset() {
87        openCount.set(0);
88        closeCount.set(0);
89      }
90  
91      public int getLoadCount() {
92        return openCount.get();
93      }
94  
95      public int getCloseCount() {
96        return closeCount.get();
97      }
98    }
99  
100   static class DummyServerListener extends DummyListener implements ServerListener {
101     private AtomicInteger removedCount = new AtomicInteger(0);
102     private AtomicInteger addedCount = new AtomicInteger(0);
103 
104     public DummyServerListener() {
105     }
106 
107     @Override
108     public void serverAdded(final ServerName serverName) {
109       LOG.info("Server added " + serverName);
110       addedCount.incrementAndGet();
111       modified.incrementAndGet();
112     }
113 
114     @Override
115     public void serverRemoved(final ServerName serverName) {
116       LOG.info("Server removed " + serverName);
117       removedCount.incrementAndGet();
118       modified.incrementAndGet();
119     }
120 
121     public void reset() {
122       addedCount.set(0);
123       removedCount.set(0);
124     }
125 
126     public int getAddedCount() {
127       return addedCount.get();
128     }
129 
130     public int getRemovedCount() {
131       return removedCount.get();
132     }
133   }
134 
135   @BeforeClass
136   public static void beforeAllTests() throws Exception {
137     TEST_UTIL.startMiniCluster(2);
138   }
139 
140   @AfterClass
141   public static void afterAllTests() throws Exception {
142     TEST_UTIL.shutdownMiniCluster();
143   }
144 
145   @Test(timeout=60000)
146   public void testServerListener() throws IOException, InterruptedException {
147     ServerManager serverManager = TEST_UTIL.getHBaseCluster().getMaster().getServerManager();
148 
149     DummyServerListener listener = new DummyServerListener();
150     serverManager.registerListener(listener);
151     try {
152       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
153 
154       // Start a new Region Server
155       miniCluster.startRegionServer();
156       listener.awaitModifications(1);
157       assertEquals(1, listener.getAddedCount());
158       assertEquals(0, listener.getRemovedCount());
159 
160       // Start another Region Server
161       listener.reset();
162       miniCluster.startRegionServer();
163       listener.awaitModifications(1);
164       assertEquals(1, listener.getAddedCount());
165       assertEquals(0, listener.getRemovedCount());
166 
167       int nrs = miniCluster.getRegionServerThreads().size();
168 
169       // Stop a Region Server
170       listener.reset();
171       miniCluster.stopRegionServer(nrs - 1);
172       listener.awaitModifications(1);
173       assertEquals(0, listener.getAddedCount());
174       assertEquals(1, listener.getRemovedCount());
175 
176       // Stop another Region Server
177       listener.reset();
178       miniCluster.stopRegionServer(nrs - 2);
179       listener.awaitModifications(1);
180       assertEquals(0, listener.getAddedCount());
181       assertEquals(1, listener.getRemovedCount());
182     } finally {
183       serverManager.unregisterListener(listener);
184     }
185   }
186 
187   @Ignore @Test(timeout=60000)
188   public void testAssignmentListener() throws IOException, InterruptedException {
189     AssignmentManager am = TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager();
190     Admin admin = TEST_UTIL.getHBaseAdmin();
191 
192     DummyAssignmentListener listener = new DummyAssignmentListener();
193     am.registerListener(listener);
194     try {
195       final String TABLE_NAME_STR = "testtb";
196       final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
197       final byte[] FAMILY = Bytes.toBytes("cf");
198 
199       // Create a new table, with a single region
200       LOG.info("Create Table");
201       TEST_UTIL.createTable(TABLE_NAME, FAMILY);
202       listener.awaitModifications(1);
203       assertEquals(1, listener.getLoadCount());
204       assertEquals(0, listener.getCloseCount());
205 
206       // Add some data
207       Table table = new HTable(TEST_UTIL.getConfiguration(), TABLE_NAME);
208       try {
209         for (int i = 0; i < 10; ++i) {
210           byte[] key = Bytes.toBytes("row-" + i);
211           Put put = new Put(key);
212           put.add(FAMILY, null, key);
213           table.put(put);
214         }
215       } finally {
216         table.close();
217       }
218 
219       // Split the table in two
220       LOG.info("Split Table");
221       listener.reset();
222       admin.split(TABLE_NAME, Bytes.toBytes("row-3"));
223       listener.awaitModifications(3);
224       assertEquals(2, listener.getLoadCount());     // daughters added
225       assertEquals(1, listener.getCloseCount());    // parent removed
226 
227       // Wait for the Regions to be mergeable
228       MiniHBaseCluster miniCluster = TEST_UTIL.getMiniHBaseCluster();
229       int mergeable = 0;
230       while (mergeable < 2) {
231         Thread.sleep(100);
232         admin.majorCompact(TABLE_NAME);
233         mergeable = 0;
234         for (JVMClusterUtil.RegionServerThread regionThread: miniCluster.getRegionServerThreads()) {
235           for (Region region: regionThread.getRegionServer().getOnlineRegions(TABLE_NAME)) {
236             mergeable += ((HRegion)region).isMergeable() ? 1 : 0;
237           }
238         }
239       }
240 
241       // Merge the two regions
242       LOG.info("Merge Regions");
243       listener.reset();
244       List<HRegionInfo> regions = admin.getTableRegions(TABLE_NAME);
245       assertEquals(2, regions.size());
246       admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
247         regions.get(1).getEncodedNameAsBytes(), true);
248       listener.awaitModifications(3);
249       assertEquals(1, admin.getTableRegions(TABLE_NAME).size());
250       assertEquals(1, listener.getLoadCount());     // new merged region added
251       assertEquals(2, listener.getCloseCount());    // daughters removed
252 
253       // Delete the table
254       LOG.info("Drop Table");
255       listener.reset();
256       TEST_UTIL.deleteTable(TABLE_NAME);
257       listener.awaitModifications(1);
258       assertEquals(0, listener.getLoadCount());
259       assertEquals(1, listener.getCloseCount());
260     } finally {
261       am.unregisterListener(listener);
262     }
263   }
264 }