View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertNotNull;
24  import static org.junit.Assert.assertTrue;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HRegionLocation;
30  import org.apache.hadoop.hbase.testclassification.LargeTests;
31  import org.apache.hadoop.hbase.MiniHBaseCluster;
32  import org.apache.hadoop.hbase.ServerName;
33  import org.apache.hadoop.hbase.TableName;
34  import org.apache.hadoop.hbase.regionserver.HRegionServer;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.Pair;
37  import org.junit.AfterClass;
38  import org.junit.BeforeClass;
39  import org.junit.Test;
40  import org.junit.experimental.categories.Category;
41  
42  @Category(LargeTests.class)
43  public class TestHTableMultiplexerFlushCache {
44    final Log LOG = LogFactory.getLog(getClass());
45    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
46    private static byte[] FAMILY = Bytes.toBytes("testFamily");
47    private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
48    private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
49    private static byte[] VALUE1 = Bytes.toBytes("testValue1");
50    private static byte[] VALUE2 = Bytes.toBytes("testValue2");
51    private static int SLAVES = 3;
52    private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
53  
54    /**
55     * @throws java.lang.Exception
56     */
57    @BeforeClass
58    public static void setUpBeforeClass() throws Exception {
59      TEST_UTIL.startMiniCluster(SLAVES);
60    }
61  
62    /**
63     * @throws java.lang.Exception
64     */
65    @AfterClass
66    public static void tearDownAfterClass() throws Exception {
67      TEST_UTIL.shutdownMiniCluster();
68    }
69  
70    private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
71        byte[] value) throws Exception {
72      // verify that the Get returns the correct result
73      Result r;
74      Get get = new Get(row);
75      get.addColumn(family, quality);
76      int nbTry = 0;
77      do {
78        assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
79        nbTry++;
80        Thread.sleep(100);
81        r = htable.get(get);
82      } while (r == null || r.getValue(family, quality) == null);
83      assertEquals("value", Bytes.toStringBinary(value),
84        Bytes.toStringBinary(r.getValue(family, quality)));
85    }
86  
87    @Test
88    public void testOnRegionChange() throws Exception {
89      TableName TABLE = TableName.valueOf("testOnRegionChange");
90      final int NUM_REGIONS = 10;
91      HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
92        Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
93  
94      HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(), 
95        PER_REGIONSERVER_QUEUE_SIZE);
96      
97      byte[][] startRows = htable.getStartKeys();
98      byte[] row = startRows[1];
99      assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
100 
101     Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
102     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
103     
104     checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
105 
106     // Now let's shutdown the regionserver and let regions moved to other servers.
107     HRegionLocation loc = htable.getRegionLocation(row);
108     MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster(); 
109     hbaseCluster.stopRegionServer(loc.getServerName());
110     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
111 
112     // put with multiplexer.
113     put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
114     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
115 
116     checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
117   }
118 
119   @Test
120   public void testOnRegionMove() throws Exception {
121     // This test is doing near exactly the same thing that testOnRegionChange but avoiding the
122     // potential to get a ConnectionClosingException. By moving the region, we can be certain that
123     // the connection is still valid and that the implementation is correctly handling an invalid
124     // Region cache (and not just tearing down the entire connection).
125     TableName TABLE = TableName.valueOf("testOnRegionMove");
126     final int NUM_REGIONS = 10;
127     HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
128       Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
129 
130     HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
131       PER_REGIONSERVER_QUEUE_SIZE);
132 
133     final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
134     Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
135     byte[] row = startEndRows.getFirst()[1];
136     assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
137 
138     Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
139     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
140 
141     checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
142 
143     final HRegionLocation loc = regionLocator.getRegionLocation(row);
144     final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
145     // The current server for the region we're writing to
146     final ServerName originalServer = loc.getServerName();
147     ServerName newServer = null;
148     // Find a new server to move that region to
149     for (int i = 0; i < SLAVES; i++) {
150       HRegionServer rs = hbaseCluster.getRegionServer(0);
151       if (!rs.getServerName().equals(originalServer.getServerName())) {
152         newServer = rs.getServerName();
153         break;
154       }
155     }
156     assertNotNull("Did not find a new RegionServer to use", newServer);
157 
158     // Move the region
159     LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
160         +  " to " + newServer);
161     TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
162         Bytes.toBytes(newServer.getServerName()));
163 
164     TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
165 
166     // Send a new Put
167     put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
168     assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
169 
170     // We should see the update make it to the new server eventually
171     checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
172   }
173 }