1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.protobuf;
21
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.UUID;
30
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellScanner;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37 import org.apache.hadoop.hbase.io.SizedCellScanner;
38 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
39 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
40 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
41 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
43 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
44 import org.apache.hadoop.hbase.util.ByteStringer;
45 import org.apache.hadoop.hbase.util.Pair;
46 import org.apache.hadoop.hbase.wal.WAL.Entry;
47 import org.apache.hadoop.hbase.wal.WALKey;
48
49 import com.google.protobuf.ServiceException;
50
51 @InterfaceAudience.Private
52 public class ReplicationProtbufUtil {
53
54
55
56
57
58
59
60
61
62
63 public static void replicateWALEntry(final AdminService.BlockingInterface admin,
64 final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
65 Path sourceHFileArchiveDir) throws IOException {
66 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
67 buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
68 sourceHFileArchiveDir);
69 PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
70 try {
71 admin.replicateWALEntry(controller, p.getFirst());
72 } catch (ServiceException se) {
73 throw ProtobufUtil.getRemoteException(se);
74 }
75 }
76
77
78
79
80
81
82
83
84 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
85 buildReplicateWALEntryRequest(final Entry[] entries) {
86
87 return buildReplicateWALEntryRequest(entries, null, null, null, null);
88 }
89
90
91
92
93
94
95
96
97
98
99
100 public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
101 buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
102 String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
103
104 List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
105 int size = 0;
106 WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
107 AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
108 AdminProtos.ReplicateWALEntryRequest.Builder builder =
109 AdminProtos.ReplicateWALEntryRequest.newBuilder();
110 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
111 for (Entry entry: entries) {
112 entryBuilder.clear();
113
114 WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
115 WALKey key = entry.getKey();
116 keyBuilder.setEncodedRegionName(
117 ByteStringer.wrap(encodedRegionName == null
118 ? key.getEncodedRegionName()
119 : encodedRegionName));
120 keyBuilder.setTableName(ByteStringer.wrap(key.getTablename().getName()));
121 keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
122 keyBuilder.setWriteTime(key.getWriteTime());
123 if (key.getNonce() != HConstants.NO_NONCE) {
124 keyBuilder.setNonce(key.getNonce());
125 }
126 if (key.getNonceGroup() != HConstants.NO_NONCE) {
127 keyBuilder.setNonceGroup(key.getNonceGroup());
128 }
129 for(UUID clusterId : key.getClusterIds()) {
130 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
131 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
132 keyBuilder.addClusterIds(uuidBuilder.build());
133 }
134 if(key.getOrigLogSeqNum() > 0) {
135 keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
136 }
137 WALEdit edit = entry.getEdit();
138 NavigableMap<byte[], Integer> scopes = key.getScopes();
139 if (scopes != null && !scopes.isEmpty()) {
140 for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
141 scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
142 WALProtos.ScopeType scopeType =
143 WALProtos.ScopeType.valueOf(scope.getValue().intValue());
144 scopeBuilder.setScopeType(scopeType);
145 keyBuilder.addScopes(scopeBuilder.build());
146 }
147 }
148 List<Cell> cells = edit.getCells();
149
150 for (Cell cell: cells) {
151 size += CellUtil.estimatedSerializedSizeOf(cell);
152 }
153
154 allCells.add(cells);
155
156 entryBuilder.setAssociatedCellCount(cells.size());
157 builder.addEntry(entryBuilder.build());
158 }
159
160 if (replicationClusterId != null) {
161 builder.setReplicationClusterId(replicationClusterId);
162 }
163 if (sourceBaseNamespaceDir != null) {
164 builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
165 }
166 if (sourceHFileArchiveDir != null) {
167 builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
168 }
169
170 return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
171 getCellScanner(allCells, size));
172 }
173
174
175
176
177
178 static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
179 return new SizedCellScanner() {
180 private final Iterator<List<? extends Cell>> entries = cells.iterator();
181 private Iterator<? extends Cell> currentIterator = null;
182 private Cell currentCell;
183
184 @Override
185 public Cell current() {
186 return this.currentCell;
187 }
188
189 @Override
190 public boolean advance() {
191 if (this.currentIterator == null) {
192 if (!this.entries.hasNext()) return false;
193 this.currentIterator = this.entries.next().iterator();
194 }
195 if (this.currentIterator.hasNext()) {
196 this.currentCell = this.currentIterator.next();
197 return true;
198 }
199 this.currentCell = null;
200 this.currentIterator = null;
201 return advance();
202 }
203
204 @Override
205 public long heapSize() {
206 return size;
207 }
208 };
209 }
210 }