1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.rsgroup;
21
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import com.google.common.net.HostAndPort;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.client.ConnectionFactory;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.apache.hadoop.hbase.client.Table;
31 import org.apache.hadoop.hbase.exceptions.DeserializationException;
32 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
33 import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
34 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
35 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
36 import org.apache.zookeeper.KeeperException;
37 import org.junit.Assert;
38
39 import java.io.ByteArrayInputStream;
40 import java.io.IOException;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44
45 public class VerifyingRSGroupAdminClient extends RSGroupAdmin {
46 private Table table;
47 private ZooKeeperWatcher zkw;
48 private RSGroupSerDe serDe;
49 private RSGroupAdmin wrapped;
50
51 public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf)
52 throws IOException {
53 wrapped = RSGroupAdmin;
54 table = ConnectionFactory.createConnection(conf).getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME);
55 zkw = new ZooKeeperWatcher(conf, this.getClass().getSimpleName(), null);
56 serDe = new RSGroupSerDe();
57 }
58
59 @Override
60 public void addRSGroup(String groupName) throws IOException {
61 wrapped.addRSGroup(groupName);
62 verify();
63 }
64
65 @Override
66 public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
67 return wrapped.getRSGroupInfo(groupName);
68 }
69
70 @Override
71 public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
72 return wrapped.getRSGroupInfoOfTable(tableName);
73 }
74
75 @Override
76 public void moveServers(Set<HostAndPort> servers, String targetGroup) throws IOException {
77 wrapped.moveServers(servers, targetGroup);
78 verify();
79 }
80
81 @Override
82 public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
83 wrapped.moveTables(tables, targetGroup);
84 verify();
85 }
86
87 @Override
88 public void removeRSGroup(String name) throws IOException {
89 wrapped.removeRSGroup(name);
90 verify();
91 }
92
93 @Override
94 public boolean balanceRSGroup(String name) throws IOException {
95 return wrapped.balanceRSGroup(name);
96 }
97
98 @Override
99 public List<RSGroupInfo> listRSGroups() throws IOException {
100 return wrapped.listRSGroups();
101 }
102
103 @Override
104 public RSGroupInfo getRSGroupOfServer(HostAndPort hostPort) throws IOException {
105 return wrapped.getRSGroupOfServer(hostPort);
106 }
107
108 public void verify() throws IOException {
109 Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
110 Set<RSGroupInfo> zList = Sets.newHashSet();
111
112 for (Result result : table.getScanner(new Scan())) {
113 RSGroupProtos.RSGroupInfo proto =
114 RSGroupProtos.RSGroupInfo.parseFrom(
115 result.getValue(
116 RSGroupInfoManager.META_FAMILY_BYTES,
117 RSGroupInfoManager.META_QUALIFIER_BYTES));
118 groupMap.put(proto.getName(), ProtobufUtil.toGroupInfo(proto));
119 }
120 Assert.assertEquals(Sets.newHashSet(groupMap.values()),
121 Sets.newHashSet(wrapped.listRSGroups()));
122 try {
123 String groupBasePath = ZKUtil.joinZNode(zkw.baseZNode, "rsgroup");
124 for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) {
125 byte[] data = ZKUtil.getData(zkw, ZKUtil.joinZNode(groupBasePath, znode));
126 if(data.length > 0) {
127 ProtobufUtil.expectPBMagicPrefix(data);
128 ByteArrayInputStream bis = new ByteArrayInputStream(
129 data, ProtobufUtil.lengthOfPBMagic(), data.length);
130 zList.add(ProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
131 }
132 }
133 Assert.assertEquals(zList.size(), groupMap.size());
134 for(RSGroupInfo RSGroupInfo : zList) {
135 Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo));
136 }
137 } catch (KeeperException e) {
138 throw new IOException("ZK verification failed", e);
139 } catch (DeserializationException e) {
140 throw new IOException("ZK verification failed", e);
141 } catch (InterruptedException e) {
142 throw new IOException("ZK verification failed", e);
143 }
144 }
145
146 @Override
147 public void close() throws IOException {
148 }
149 }