1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import static org.junit.Assert.*;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.HBaseTestingUtility;
27 import org.apache.hadoop.hbase.HColumnDescriptor;
28 import org.apache.hadoop.hbase.HRegionInfo;
29 import org.apache.hadoop.hbase.HTableDescriptor;
30 import org.apache.hadoop.hbase.MiniHBaseCluster;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.client.Admin;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.client.Table;
39 import org.apache.hadoop.hbase.master.HMaster;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.junit.Test;
43 import org.junit.experimental.categories.Category;
44 import org.mortbay.log.Log;
45
46
47
48
49
50
51 @Category(MediumTests.class)
52 public class TestHRegionOnCluster {
53 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
54
55 @Test (timeout=300000)
56 public void testDataCorrectnessReplayingRecoveredEdits() throws Exception {
57 final int NUM_MASTERS = 1;
58 final int NUM_RS = 3;
59 Admin hbaseAdmin = null;
60 TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
61
62 try {
63 final TableName TABLENAME = TableName.valueOf("testDataCorrectnessReplayingRecoveredEdits");
64 final byte[] FAMILY = Bytes.toBytes("family");
65 MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
66 HMaster master = cluster.getMaster();
67
68
69 HTableDescriptor desc = new HTableDescriptor(TABLENAME);
70 desc.addFamily(new HColumnDescriptor(FAMILY));
71 hbaseAdmin = master.getConnection().getAdmin();
72 hbaseAdmin.createTable(desc);
73
74 assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
75
76
77 Log.info("Loading r1 to v1 into " + TABLENAME);
78 HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
79 putDataAndVerify(table, "r1", FAMILY, "v1", 1);
80
81 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
82
83 HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo();
84 int originServerNum = cluster.getServerWith(regionInfo.getRegionName());
85 HRegionServer originServer = cluster.getRegionServer(originServerNum);
86 int targetServerNum = (originServerNum + 1) % NUM_RS;
87 HRegionServer targetServer = cluster.getRegionServer(targetServerNum);
88 assertFalse(originServer.equals(targetServer));
89
90 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
91 Log.info("Moving " + regionInfo.getEncodedName() + " to " + targetServer.getServerName());
92 hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
93 Bytes.toBytes(targetServer.getServerName().getServerName()));
94 do {
95 Thread.sleep(1);
96 } while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum);
97
98
99 Log.info("Loading r2 to v2 into " + TABLENAME);
100 putDataAndVerify(table, "r2", FAMILY, "v2", 2);
101
102 TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
103
104 Log.info("Moving " + regionInfo.getEncodedName() + " to " + originServer.getServerName());
105 hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
106 Bytes.toBytes(originServer.getServerName().getServerName()));
107 do {
108 Thread.sleep(1);
109 } while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum);
110
111
112 Log.info("Loading r3 to v3 into " + TABLENAME);
113 putDataAndVerify(table, "r3", FAMILY, "v3", 3);
114
115
116 Log.info("Killing target server " + targetServer.getServerName());
117 targetServer.kill();
118 cluster.getRegionServerThreads().get(targetServerNum).join();
119
120 while (master.getServerManager().areDeadServersInProgress()) {
121 Thread.sleep(5);
122 }
123
124 Log.info("Killing origin server " + targetServer.getServerName());
125 originServer.kill();
126 cluster.getRegionServerThreads().get(originServerNum).join();
127
128
129 Log.info("Loading r4 to v4 into " + TABLENAME);
130 putDataAndVerify(table, "r4", FAMILY, "v4", 4);
131
132 } finally {
133 if (hbaseAdmin != null) hbaseAdmin.close();
134 TEST_UTIL.shutdownMiniCluster();
135 }
136 }
137
138 private void putDataAndVerify(Table table, String row, byte[] family,
139 String value, int verifyNum) throws IOException {
140 System.out.println("=========Putting data :" + row);
141 Put put = new Put(Bytes.toBytes(row));
142 put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value));
143 table.put(put);
144 ResultScanner resultScanner = table.getScanner(new Scan());
145 List<Result> results = new ArrayList<Result>();
146 while (true) {
147 Result r = resultScanner.next();
148 if (r == null)
149 break;
150 results.add(r);
151 }
152 resultScanner.close();
153 if (results.size() != verifyNum) {
154 System.out.println(results);
155 }
156 assertEquals(verifyNum, results.size());
157 }
158
159 }