View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.coprocessor;
20  
21  import static org.junit.Assert.assertEquals;
22  import static org.junit.Assert.assertTrue;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Collection;
28  import java.util.Collections;
29  import java.util.HashSet;
30  import java.util.List;
31  import java.util.Set;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.hadoop.hbase.TableName;
37  import org.apache.hadoop.hbase.client.Table;
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HBaseTestingUtility;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.KeyValue;
45  import org.apache.hadoop.hbase.testclassification.MediumTests;
46  import org.apache.hadoop.hbase.client.Delete;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.IsolationLevel;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
53  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
54  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
55  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
56  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
57  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
58  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
59  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
60  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
61  import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
62  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
63  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
64  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
65  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
66  import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
67  import org.apache.hadoop.hbase.regionserver.HRegion;
68  import org.apache.hadoop.hbase.regionserver.InternalScanner;
69  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.junit.AfterClass;
72  import org.junit.BeforeClass;
73  import org.junit.Test;
74  import org.junit.experimental.categories.Category;
75  
76  import com.google.protobuf.Message;
77  import org.apache.commons.logging.Log;
78  import org.apache.commons.logging.LogFactory;
79  
80  /**
81   * Verifies ProcessEndpoint works.
82   * The tested RowProcessor performs two scans and a read-modify-write.
83   */
84  @Category(MediumTests.class)
85  public class TestRowProcessorEndpoint {
86  
87    static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
88  
89    private static final TableName TABLE = TableName.valueOf("testtable");
90    private final static byte[] ROW = Bytes.toBytes("testrow");
91    private final static byte[] ROW2 = Bytes.toBytes("testrow2");
92    private final static byte[] FAM = Bytes.toBytes("friendlist");
93  
94    // Column names
95    private final static byte[] A = Bytes.toBytes("a");
96    private final static byte[] B = Bytes.toBytes("b");
97    private final static byte[] C = Bytes.toBytes("c");
98    private final static byte[] D = Bytes.toBytes("d");
99    private final static byte[] E = Bytes.toBytes("e");
100   private final static byte[] F = Bytes.toBytes("f");
101   private final static byte[] G = Bytes.toBytes("g");
102   private final static byte[] COUNTER = Bytes.toBytes("counter");
103   private final static AtomicLong myTimer = new AtomicLong(0);
104   private final AtomicInteger failures = new AtomicInteger(0);
105 
106   private static HBaseTestingUtility util = new HBaseTestingUtility();
107   private static volatile int expectedCounter = 0;
108   private static int rowSize, row2Size;
109 
110   private volatile static Table table = null;
111   private volatile static boolean swapped = false;
112   private volatile CountDownLatch startSignal;
113   private volatile CountDownLatch doneSignal;
114 
115   @BeforeClass
116   public static void setupBeforeClass() throws Exception {
117     Configuration conf = util.getConfiguration();
118     conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
119         RowProcessorEndpoint.class.getName());
120     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
121     conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
122     util.startMiniCluster();
123   }
124 
125   @AfterClass
126   public static void tearDownAfterClass() throws Exception {
127     util.shutdownMiniCluster();
128   }
129 
130   public void prepareTestData() throws Exception {
131     try {
132       util.getHBaseAdmin().disableTable(TABLE);
133       util.getHBaseAdmin().deleteTable(TABLE);
134     } catch (Exception e) {
135       // ignore table not found
136     }
137     table = util.createTable(TABLE, FAM);
138     {
139       Put put = new Put(ROW);
140       put.add(FAM, A, Bytes.add(B, C));    // B, C are friends of A
141       put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
142       put.add(FAM, C, G);                  // G is a friend of C
143       table.put(put);
144       rowSize = put.size();
145     }
146     Put put = new Put(ROW2);
147     put.add(FAM, D, E);
148     put.add(FAM, F, G);
149     table.put(put);
150     row2Size = put.size();
151   }
152 
153   @Test
154   public void testDoubleScan() throws Throwable {
155     prepareTestData();
156 
157     CoprocessorRpcChannel channel = table.coprocessorService(ROW);
158     RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
159         new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
160     RowProcessorService.BlockingInterface service =
161         RowProcessorService.newBlockingStub(channel);
162     ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
163     ProcessResponse protoResult = service.process(null, request);
164     FriendsOfFriendsProcessorResponse response =
165         FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
166     Set<String> result = new HashSet<String>();
167     result.addAll(response.getResultList());
168     Set<String> expected =
169       new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
170     Get get = new Get(ROW);
171     LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
172     assertEquals(expected, result);
173   }
174 
175   @Test
176   public void testReadModifyWrite() throws Throwable {
177     prepareTestData();
178     failures.set(0);
179     int numThreads = 100;
180     concurrentExec(new IncrementRunner(), numThreads);
181     Get get = new Get(ROW);
182     LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
183     int finalCounter = incrementCounter(table);
184     assertEquals(numThreads + 1, finalCounter);
185     assertEquals(0, failures.get());
186   }
187 
188   class IncrementRunner implements Runnable {
189     @Override
190     public void run() {
191       try {
192         incrementCounter(table);
193       } catch (Throwable e) {
194         e.printStackTrace();
195       }
196     }
197   }
198 
199   private int incrementCounter(Table table) throws Throwable {
200     CoprocessorRpcChannel channel = table.coprocessorService(ROW);
201     RowProcessorEndpoint.IncrementCounterProcessor processor =
202         new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
203     RowProcessorService.BlockingInterface service =
204         RowProcessorService.newBlockingStub(channel);
205     ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
206     ProcessResponse protoResult = service.process(null, request);
207     IncCounterProcessorResponse response = IncCounterProcessorResponse
208         .parseFrom(protoResult.getRowProcessorResult());
209     Integer result = response.getResponse();
210     return result;
211   }
212 
213   private void concurrentExec(
214       final Runnable task, final int numThreads) throws Throwable {
215     startSignal = new CountDownLatch(numThreads);
216     doneSignal = new CountDownLatch(numThreads);
217     for (int i = 0; i < numThreads; ++i) {
218       new Thread(new Runnable() {
219         @Override
220         public void run() {
221           try {
222             startSignal.countDown();
223             startSignal.await();
224             task.run();
225           } catch (Throwable e) {
226             failures.incrementAndGet();
227             e.printStackTrace();
228           }
229           doneSignal.countDown();
230         }
231       }).start();
232     }
233     doneSignal.await();
234   }
235 
236   @Test
237   public void testMultipleRows() throws Throwable {
238     prepareTestData();
239     failures.set(0);
240     int numThreads = 100;
241     concurrentExec(new SwapRowsRunner(), numThreads);
242     LOG.debug("row keyvalues:" +
243               stringifyKvs(table.get(new Get(ROW)).listCells()));
244     LOG.debug("row2 keyvalues:" +
245               stringifyKvs(table.get(new Get(ROW2)).listCells()));
246     assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
247     assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
248     assertEquals(0, failures.get());
249   }
250 
251   class SwapRowsRunner implements Runnable {
252     @Override
253     public void run() {
254       try {
255         swapRows(table);
256       } catch (Throwable e) {
257         e.printStackTrace();
258       }
259     }
260   }
261 
262   private void swapRows(Table table) throws Throwable {
263     CoprocessorRpcChannel channel = table.coprocessorService(ROW);
264     RowProcessorEndpoint.RowSwapProcessor processor =
265         new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
266     RowProcessorService.BlockingInterface service =
267         RowProcessorService.newBlockingStub(channel);
268     ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
269     service.process(null, request);
270   }
271 
272   @Test
273   public void testTimeout() throws Throwable {
274     prepareTestData();
275     CoprocessorRpcChannel channel = table.coprocessorService(ROW);
276     RowProcessorEndpoint.TimeoutProcessor processor =
277         new RowProcessorEndpoint.TimeoutProcessor(ROW);
278     RowProcessorService.BlockingInterface service =
279         RowProcessorService.newBlockingStub(channel);
280     ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
281     boolean exceptionCaught = false;
282     try {
283       service.process(null, request);
284     } catch (Exception e) {
285       exceptionCaught = true;
286     }
287     assertTrue(exceptionCaught);
288   }
289 
290   /**
291    * This class defines two RowProcessors:
292    * IncrementCounterProcessor and FriendsOfFriendsProcessor.
293    *
294    * We define the RowProcessors as the inner class of the endpoint.
295    * So they can be loaded with the endpoint on the coprocessor.
296    */
297   public static class RowProcessorEndpoint<S extends Message,T extends Message>
298   extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
299     public static class IncrementCounterProcessor extends
300         BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
301         IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
302       int counter = 0;
303       byte[] row = new byte[0];
304 
305       /**
306        * Empty constructor for Writable
307        */
308       IncrementCounterProcessor() {
309       }
310 
311       IncrementCounterProcessor(byte[] row) {
312         this.row = row;
313       }
314 
315       @Override
316       public Collection<byte[]> getRowsToLock() {
317         return Collections.singleton(row);
318       }
319 
320       @Override
321       public IncCounterProcessorResponse getResult() {
322         IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
323         i.setResponse(counter);
324         return i.build();
325       }
326 
327       @Override
328       public boolean readOnly() {
329         return false;
330       }
331 
332       @Override
333       public void process(long now, HRegion region,
334           List<Mutation> mutations, WALEdit walEdit) throws IOException {
335         // Scan current counter
336         List<Cell> kvs = new ArrayList<Cell>();
337         Scan scan = new Scan(row, row);
338         scan.addColumn(FAM, COUNTER);
339         doScan(region, scan, kvs);
340         counter = kvs.size() == 0 ? 0 :
341           Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
342 
343         // Assert counter value
344         assertEquals(expectedCounter, counter);
345 
346         // Increment counter and send it to both memstore and wal edit
347         counter += 1;
348         expectedCounter += 1;
349 
350 
351         Put p = new Put(row);
352         KeyValue kv =
353             new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
354         p.add(kv);
355         mutations.add(p);
356         walEdit.add(kv);
357 
358         // We can also inject some meta data to the walEdit
359         KeyValue metaKv = new KeyValue(
360             row, WALEdit.METAFAMILY,
361             Bytes.toBytes("I just increment counter"),
362             Bytes.toBytes(counter));
363         walEdit.add(metaKv);
364       }
365 
366       @Override
367       public IncCounterProcessorRequest getRequestData() throws IOException {
368         IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
369         builder.setCounter(counter);
370         builder.setRow(ByteStringer.wrap(row));
371         return builder.build();
372       }
373 
374       @Override
375       public void initialize(IncCounterProcessorRequest msg) {
376         this.row = msg.getRow().toByteArray();
377         this.counter = msg.getCounter();
378       }
379     }
380 
381     public static class FriendsOfFriendsProcessor extends
382         BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
383       byte[] row = null;
384       byte[] person = null;
385       final Set<String> result = new HashSet<String>();
386 
387       /**
388        * Empty constructor for Writable
389        */
390       FriendsOfFriendsProcessor() {
391       }
392 
393       FriendsOfFriendsProcessor(byte[] row, byte[] person) {
394         this.row = row;
395         this.person = person;
396       }
397 
398       @Override
399       public Collection<byte[]> getRowsToLock() {
400         return Collections.singleton(row);
401       }
402 
403       @Override
404       public FriendsOfFriendsProcessorResponse getResult() {
405         FriendsOfFriendsProcessorResponse.Builder builder = 
406             FriendsOfFriendsProcessorResponse.newBuilder();
407         builder.addAllResult(result);
408         return builder.build();
409       }
410 
411       @Override
412       public boolean readOnly() {
413         return true;
414       }
415 
416       @Override
417       public void process(long now, HRegion region,
418           List<Mutation> mutations, WALEdit walEdit) throws IOException {
419         List<Cell> kvs = new ArrayList<Cell>();
420         { // First scan to get friends of the person
421           Scan scan = new Scan(row, row);
422           scan.addColumn(FAM, person);
423           doScan(region, scan, kvs);
424         }
425 
426         // Second scan to get friends of friends
427         Scan scan = new Scan(row, row);
428         for (Cell kv : kvs) {
429           byte[] friends = CellUtil.cloneValue(kv);
430           for (byte f : friends) {
431             scan.addColumn(FAM, new byte[]{f});
432           }
433         }
434         doScan(region, scan, kvs);
435 
436         // Collect result
437         result.clear();
438         for (Cell kv : kvs) {
439           for (byte b : CellUtil.cloneValue(kv)) {
440             result.add((char)b + "");
441           }
442         }
443       }
444 
445       @Override
446       public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
447         FriendsOfFriendsProcessorRequest.Builder builder =
448             FriendsOfFriendsProcessorRequest.newBuilder();
449         builder.setPerson(ByteStringer.wrap(person));
450         builder.setRow(ByteStringer.wrap(row));
451         builder.addAllResult(result);
452         FriendsOfFriendsProcessorRequest f = builder.build();
453         return f;
454       }
455 
456       @Override
457       public void initialize(FriendsOfFriendsProcessorRequest request) 
458           throws IOException {
459         this.person = request.getPerson().toByteArray();
460         this.row = request.getRow().toByteArray();
461         result.clear();
462         result.addAll(request.getResultList());
463       }
464     }
465 
466     public static class RowSwapProcessor extends
467         BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
468       byte[] row1 = new byte[0];
469       byte[] row2 = new byte[0];
470 
471       /**
472        * Empty constructor for Writable
473        */
474       RowSwapProcessor() {
475       }
476 
477       RowSwapProcessor(byte[] row1, byte[] row2) {
478         this.row1 = row1;
479         this.row2 = row2;
480       }
481 
482       @Override
483       public Collection<byte[]> getRowsToLock() {
484         List<byte[]> rows = new ArrayList<byte[]>();
485         rows.add(row1);
486         rows.add(row2);
487         return rows;
488       }
489 
490       @Override
491       public boolean readOnly() {
492         return false;
493       }
494 
495       @Override
496       public RowSwapProcessorResponse getResult() {
497         return RowSwapProcessorResponse.getDefaultInstance();
498       }
499 
500       @Override
501       public void process(long now, HRegion region,
502           List<Mutation> mutations, WALEdit walEdit) throws IOException {
503 
504         // Override the time to avoid race-condition in the unit test caused by
505         // inacurate timer on some machines
506         now = myTimer.getAndIncrement();
507 
508         // Scan both rows
509         List<Cell> kvs1 = new ArrayList<Cell>();
510         List<Cell> kvs2 = new ArrayList<Cell>();
511         doScan(region, new Scan(row1, row1), kvs1);
512         doScan(region, new Scan(row2, row2), kvs2);
513 
514         // Assert swapped
515         if (swapped) {
516           assertEquals(rowSize, kvs2.size());
517           assertEquals(row2Size, kvs1.size());
518         } else {
519           assertEquals(rowSize, kvs1.size());
520           assertEquals(row2Size, kvs2.size());
521         }
522         swapped = !swapped;
523 
524         // Add and delete keyvalues
525         List<List<Cell>> kvs = new ArrayList<List<Cell>>();
526         kvs.add(kvs1);
527         kvs.add(kvs2);
528         byte[][] rows = new byte[][]{row1, row2};
529         for (int i = 0; i < kvs.size(); ++i) {
530           for (Cell kv : kvs.get(i)) {
531             // Delete from the current row and add to the other row
532             Delete d = new Delete(rows[i]);
533             KeyValue kvDelete =
534                 new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 
535                     kv.getTimestamp(), KeyValue.Type.Delete);
536             d.addDeleteMarker(kvDelete);
537             Put p = new Put(rows[1 - i]);
538             KeyValue kvAdd =
539                 new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
540                     now, CellUtil.cloneValue(kv));
541             p.add(kvAdd);
542             mutations.add(d);
543             walEdit.add(kvDelete);
544             mutations.add(p);
545             walEdit.add(kvAdd);
546           }
547         }
548       }
549 
550       @Override
551       public String getName() {
552         return "swap";
553       }
554 
555       @Override
556       public RowSwapProcessorRequest getRequestData() throws IOException {
557         RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
558         builder.setRow1(ByteStringer.wrap(row1));
559         builder.setRow2(ByteStringer.wrap(row2));
560         return builder.build();
561       }
562 
563       @Override
564       public void initialize(RowSwapProcessorRequest msg) {
565         this.row1 = msg.getRow1().toByteArray();
566         this.row2 = msg.getRow2().toByteArray();
567       }
568     }
569 
570     public static class TimeoutProcessor extends
571         BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
572 
573       byte[] row = new byte[0];
574 
575       /**
576        * Empty constructor for Writable
577        */
578       public TimeoutProcessor() {
579       }
580 
581       public TimeoutProcessor(byte[] row) {
582         this.row = row;
583       }
584 
585       public Collection<byte[]> getRowsToLock() {
586         return Collections.singleton(row);
587       }
588 
589       @Override
590       public TimeoutProcessorResponse getResult() {
591         return TimeoutProcessorResponse.getDefaultInstance();
592       }
593 
594       @Override
595       public void process(long now, HRegion region,
596           List<Mutation> mutations, WALEdit walEdit) throws IOException {
597         try {
598           // Sleep for a long time so it timeout
599           Thread.sleep(100 * 1000L);
600         } catch (Exception e) {
601           throw new IOException(e);
602         }
603       }
604 
605       @Override
606       public boolean readOnly() {
607         return true;
608       }
609 
610       @Override
611       public String getName() {
612         return "timeout";
613       }
614 
615       @Override
616       public TimeoutProcessorRequest getRequestData() throws IOException {
617         TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
618         builder.setRow(ByteStringer.wrap(row));
619         return builder.build();
620       }
621 
622       @Override
623       public void initialize(TimeoutProcessorRequest msg) throws IOException {
624         this.row = msg.getRow().toByteArray();
625       }
626     }
627 
628     public static void doScan(
629         HRegion region, Scan scan, List<Cell> result) throws IOException {
630       InternalScanner scanner = null;
631       try {
632         scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
633         scanner = region.getScanner(scan);
634         result.clear();
635         scanner.next(result);
636       } finally {
637         if (scanner != null) scanner.close();
638       }
639     }
640   }
641 
642   static String stringifyKvs(Collection<Cell> kvs) {
643     StringBuilder out = new StringBuilder();
644     out.append("[");
645     if (kvs != null) {
646       for (Cell kv : kvs) {
647         byte[] col = CellUtil.cloneQualifier(kv);
648         byte[] val = CellUtil.cloneValue(kv);
649         if (Bytes.equals(col, COUNTER)) {
650           out.append(Bytes.toStringBinary(col) + ":" +
651                      Bytes.toInt(val) + " ");
652         } else {
653           out.append(Bytes.toStringBinary(col) + ":" +
654                      Bytes.toStringBinary(val) + " ");
655         }
656       }
657     }
658     out.append("]");
659     return out.toString();
660   }
661 
662 }