1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HRegionLocation;
30 import org.apache.hadoop.hbase.testclassification.LargeTests;
31 import org.apache.hadoop.hbase.MiniHBaseCluster;
32 import org.apache.hadoop.hbase.ServerName;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.regionserver.HRegionServer;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.util.Pair;
37 import org.junit.AfterClass;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41
42 @Category(LargeTests.class)
43 public class TestHTableMultiplexerFlushCache {
44 final Log LOG = LogFactory.getLog(getClass());
45 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
46 private static byte[] FAMILY = Bytes.toBytes("testFamily");
47 private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier_1");
48 private static byte[] QUALIFIER2 = Bytes.toBytes("testQualifier_2");
49 private static byte[] VALUE1 = Bytes.toBytes("testValue1");
50 private static byte[] VALUE2 = Bytes.toBytes("testValue2");
51 private static int SLAVES = 3;
52 private static int PER_REGIONSERVER_QUEUE_SIZE = 100000;
53
54
55
56
57 @BeforeClass
58 public static void setUpBeforeClass() throws Exception {
59 TEST_UTIL.startMiniCluster(SLAVES);
60 }
61
62
63
64
65 @AfterClass
66 public static void tearDownAfterClass() throws Exception {
67 TEST_UTIL.shutdownMiniCluster();
68 }
69
70 private static void checkExistence(HTable htable, byte[] row, byte[] family, byte[] quality,
71 byte[] value) throws Exception {
72
73 Result r;
74 Get get = new Get(row);
75 get.addColumn(family, quality);
76 int nbTry = 0;
77 do {
78 assertTrue("Fail to get from " + htable.getName() + " after " + nbTry + " tries", nbTry < 50);
79 nbTry++;
80 Thread.sleep(100);
81 r = htable.get(get);
82 } while (r == null || r.getValue(family, quality) == null);
83 assertEquals("value", Bytes.toStringBinary(value),
84 Bytes.toStringBinary(r.getValue(family, quality)));
85 }
86
87 @Test
88 public void testOnRegionChange() throws Exception {
89 TableName TABLE = TableName.valueOf("testOnRegionChange");
90 final int NUM_REGIONS = 10;
91 HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
92 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
93
94 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
95 PER_REGIONSERVER_QUEUE_SIZE);
96
97 byte[][] startRows = htable.getStartKeys();
98 byte[] row = startRows[1];
99 assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
100
101 Put put = new Put(row).add(FAMILY, QUALIFIER1, VALUE1);
102 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
103
104 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
105
106
107 HRegionLocation loc = htable.getRegionLocation(row);
108 MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
109 hbaseCluster.stopRegionServer(loc.getServerName());
110 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
111
112
113 put = new Put(row).add(FAMILY, QUALIFIER2, VALUE2);
114 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
115
116 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
117 }
118
119 @Test
120 public void testOnRegionMove() throws Exception {
121
122
123
124
125 TableName TABLE = TableName.valueOf("testOnRegionMove");
126 final int NUM_REGIONS = 10;
127 HTable htable = TEST_UTIL.createTable(TABLE, new byte[][] { FAMILY }, 3,
128 Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
129
130 HTableMultiplexer multiplexer = new HTableMultiplexer(TEST_UTIL.getConfiguration(),
131 PER_REGIONSERVER_QUEUE_SIZE);
132
133 final RegionLocator regionLocator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
134 Pair<byte[][],byte[][]> startEndRows = regionLocator.getStartEndKeys();
135 byte[] row = startEndRows.getFirst()[1];
136 assertTrue("2nd region should not start with empty row", row != null && row.length > 0);
137
138 Put put = new Put(row).addColumn(FAMILY, QUALIFIER1, VALUE1);
139 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
140
141 checkExistence(htable, row, FAMILY, QUALIFIER1, VALUE1);
142
143 final HRegionLocation loc = regionLocator.getRegionLocation(row);
144 final MiniHBaseCluster hbaseCluster = TEST_UTIL.getHBaseCluster();
145
146 final ServerName originalServer = loc.getServerName();
147 ServerName newServer = null;
148
149 for (int i = 0; i < SLAVES; i++) {
150 HRegionServer rs = hbaseCluster.getRegionServer(0);
151 if (!rs.getServerName().equals(originalServer.getServerName())) {
152 newServer = rs.getServerName();
153 break;
154 }
155 }
156 assertNotNull("Did not find a new RegionServer to use", newServer);
157
158
159 LOG.info("Moving " + loc.getRegionInfo().getEncodedName() + " from " + originalServer
160 + " to " + newServer);
161 TEST_UTIL.getHBaseAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(),
162 Bytes.toBytes(newServer.getServerName()));
163
164 TEST_UTIL.waitUntilAllRegionsAssigned(TABLE);
165
166
167 put = new Put(row).addColumn(FAMILY, QUALIFIER2, VALUE2);
168 assertTrue("multiplexer.put returns", multiplexer.put(TABLE, put));
169
170
171 checkExistence(htable, row, FAMILY, QUALIFIER2, VALUE2);
172 }
173 }