View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
22  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
23  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
24  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
25  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
26  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
27  import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
28  import static org.junit.Assert.assertEquals;
29  import static org.junit.Assert.assertFalse;
30  import static org.junit.Assert.assertTrue;
31  import static org.junit.Assert.fail;
32  
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.Arrays;
36  import java.util.HashSet;
37  import java.util.Iterator;
38  import java.util.LinkedList;
39  import java.util.List;
40  import java.util.NavigableSet;
41  import java.util.Set;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.Future;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.TimeoutException;
47  import java.util.concurrent.atomic.AtomicLong;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.fs.FSDataOutputStream;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.hbase.HBaseConfiguration;
58  import org.apache.hadoop.hbase.HBaseTestingUtility;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.KeyValue;
64  import org.apache.hadoop.hbase.MiniHBaseCluster;
65  import org.apache.hadoop.hbase.NamespaceDescriptor;
66  import org.apache.hadoop.hbase.ServerName;
67  import org.apache.hadoop.hbase.SplitLogCounters;
68  import org.apache.hadoop.hbase.TableName;
69  import org.apache.hadoop.hbase.Waiter;
70  import org.apache.hadoop.hbase.client.ClusterConnection;
71  import org.apache.hadoop.hbase.client.ConnectionUtils;
72  import org.apache.hadoop.hbase.client.Delete;
73  import org.apache.hadoop.hbase.client.Get;
74  import org.apache.hadoop.hbase.client.HTable;
75  import org.apache.hadoop.hbase.client.Increment;
76  import org.apache.hadoop.hbase.client.NonceGenerator;
77  import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
78  import org.apache.hadoop.hbase.client.Put;
79  import org.apache.hadoop.hbase.client.RegionLocator;
80  import org.apache.hadoop.hbase.client.Result;
81  import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
82  import org.apache.hadoop.hbase.client.Table;
83  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
84  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
85  import org.apache.hadoop.hbase.exceptions.OperationConflictException;
86  import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
87  import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
88  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
89  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
90  import org.apache.hadoop.hbase.regionserver.HRegion;
91  import org.apache.hadoop.hbase.regionserver.HRegionServer;
92  import org.apache.hadoop.hbase.regionserver.Region;
93  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
94  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
95  import org.apache.hadoop.hbase.testclassification.LargeTests;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98  import org.apache.hadoop.hbase.util.FSUtils;
99  import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
100 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
101 import org.apache.hadoop.hbase.util.Threads;
102 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
103 import org.apache.hadoop.hbase.wal.WAL;
104 import org.apache.hadoop.hbase.wal.WALFactory;
105 import org.apache.hadoop.hbase.wal.WALSplitter;
106 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
107 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
108 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
110 import org.apache.hadoop.hdfs.MiniDFSCluster;
111 import org.apache.zookeeper.KeeperException;
112 import org.junit.After;
113 import org.junit.AfterClass;
114 import org.junit.Assert;
115 import org.junit.Before;
116 import org.junit.BeforeClass;
117 import org.junit.Ignore;
118 import org.junit.Test;
119 import org.junit.experimental.categories.Category;
120 
121 @Category(LargeTests.class)
122 @SuppressWarnings("deprecation")
123 public class TestDistributedLogSplitting {
124   private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
125   static {
126     // Uncomment the following line if more verbosity is needed for
127     // debugging (see HBASE-12285 for details).
128     //Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
129 
130     // test ThreeRSAbort fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on. this
131     // turns it off for this test.  TODO: Figure out why scr breaks recovery.
132     System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
133 
134   }
135 
136   // Start a cluster with 2 masters and 6 regionservers
137   static final int NUM_MASTERS = 2;
138   static final int NUM_RS = 6;
139 
140   MiniHBaseCluster cluster;
141   HMaster master;
142   Configuration conf;
143   static Configuration originalConf;
144   static HBaseTestingUtility TEST_UTIL;
145   static MiniDFSCluster dfsCluster;
146   static MiniZooKeeperCluster zkCluster;
147 
148   @BeforeClass
149   public static void setup() throws Exception {
150     TEST_UTIL = new HBaseTestingUtility(HBaseConfiguration.create());
151     dfsCluster = TEST_UTIL.startMiniDFSCluster(1);
152     zkCluster = TEST_UTIL.startMiniZKCluster();
153     originalConf = TEST_UTIL.getConfiguration();
154   }
155 
156   @AfterClass
157   public static void tearDown() throws IOException {
158     TEST_UTIL.shutdownMiniZKCluster();
159     TEST_UTIL.shutdownMiniDFSCluster();
160     TEST_UTIL.shutdownMiniHBaseCluster();
161   }
162 
163   private void startCluster(int num_rs) throws Exception {
164     SplitLogCounters.resetCounters();
165     LOG.info("Starting cluster");
166     conf.getLong("hbase.splitlog.max.resubmit", 0);
167     // Make the failure test faster
168     conf.setInt("zookeeper.recovery.retry", 0);
169     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
170     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
171     conf.setInt("hbase.regionserver.wal.max.splitters", 3);
172     conf.setInt("hbase.master.maximum.ping.server.attempts", 3);
173     conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 40);
174     TEST_UTIL.shutdownMiniHBaseCluster();
175     TEST_UTIL = new HBaseTestingUtility(conf);
176     TEST_UTIL.setDFSCluster(dfsCluster);
177     TEST_UTIL.setZkCluster(zkCluster);
178     TEST_UTIL.startMiniHBaseCluster(NUM_MASTERS, num_rs);
179     cluster = TEST_UTIL.getHBaseCluster();
180     LOG.info("Waiting for active/ready master");
181     cluster.waitForActiveAndReadyMaster();
182     master = cluster.getMaster();
183     while (cluster.getLiveRegionServerThreads().size() < num_rs) {
184       Threads.sleep(1);
185     }
186   }
187 
188   @Before
189   public void before() throws Exception {
190     // refresh configuration
191     conf = HBaseConfiguration.create(originalConf);
192   }
193 
194   @After
195   public void after() throws Exception {
196     try {
197       if (TEST_UTIL.getHBaseCluster() != null) {
198         for (MasterThread mt : TEST_UTIL.getHBaseCluster().getLiveMasterThreads()) {
199           mt.getMaster().abort("closing...", null);
200         }
201       }
202       TEST_UTIL.shutdownMiniHBaseCluster();
203     } finally {
204       TEST_UTIL.getTestFileSystem().delete(FSUtils.getRootDir(TEST_UTIL.getConfiguration()), true);
205       ZKUtil.deleteNodeRecursively(TEST_UTIL.getZooKeeperWatcher(), "/hbase");
206     }
207   }
208 
209   @Test (timeout=300000)
210   public void testRecoveredEdits() throws Exception {
211     LOG.info("testRecoveredEdits");
212     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
213     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
214     startCluster(NUM_RS);
215 
216     final int NUM_LOG_LINES = 1000;
217     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
218     // turn off load balancing to prevent regions from moving around otherwise
219     // they will consume recovered.edits
220     master.balanceSwitch(false);
221     FileSystem fs = master.getMasterFileSystem().getFileSystem();
222 
223     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
224 
225     Path rootdir = FSUtils.getRootDir(conf);
226 
227     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
228         "table", "family", 40);
229     TableName table = TableName.valueOf("table");
230     List<HRegionInfo> regions = null;
231     HRegionServer hrs = null;
232     for (int i = 0; i < NUM_RS; i++) {
233       boolean foundRs = false;
234       hrs = rsts.get(i).getRegionServer();
235       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
236       for (HRegionInfo region : regions) {
237         if (region.getTable().getNameAsString().equalsIgnoreCase("table")) {
238           foundRs = true;
239           break;
240         }
241       }
242       if (foundRs) break;
243     }
244     final Path logDir = new Path(rootdir, DefaultWALProvider.getWALDirectoryName(hrs
245         .getServerName().toString()));
246 
247     LOG.info("#regions = " + regions.size());
248     Iterator<HRegionInfo> it = regions.iterator();
249     while (it.hasNext()) {
250       HRegionInfo region = it.next();
251       if (region.getTable().getNamespaceAsString()
252           .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
253         it.remove();
254       }
255     }
256     
257     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
258 
259     slm.splitLogDistributed(logDir);
260 
261     int count = 0;
262     for (HRegionInfo hri : regions) {
263 
264       Path tdir = FSUtils.getTableDir(rootdir, table);
265       Path editsdir =
266         WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
267       LOG.debug("checking edits dir " + editsdir);
268       FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
269         @Override
270         public boolean accept(Path p) {
271           if (WALSplitter.isSequenceIdFile(p)) {
272             return false;
273           }
274           return true;
275         }
276       });
277       assertTrue("edits dir should have more than a single file in it. instead has " + files.length,
278           files.length > 1);
279       for (int i = 0; i < files.length; i++) {
280         int c = countWAL(files[i].getPath(), fs, conf);
281         count += c;
282       }
283       LOG.info(count + " edits in " + files.length + " recovered edits files.");
284     }
285 
286     // check that the log file is moved
287     assertFalse(fs.exists(logDir));
288 
289     assertEquals(NUM_LOG_LINES, count);
290   }
291 
292   @Test(timeout = 300000)
293   public void testLogReplayWithNonMetaRSDown() throws Exception {
294     LOG.info("testLogReplayWithNonMetaRSDown");
295     conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
296     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
297     startCluster(NUM_RS);
298     final int NUM_REGIONS_TO_CREATE = 40;
299     final int NUM_LOG_LINES = 1000;
300     // turn off load balancing to prevent regions from moving around otherwise
301     // they will consume recovered.edits
302     master.balanceSwitch(false);
303 
304     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
305     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
306 
307     HRegionServer hrs = findRSToKill(false, "table");
308     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
309     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
310 
311     // wait for abort completes
312     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
313     ht.close();
314     zkw.close();
315   }
316 
317   private static class NonceGeneratorWithDups extends PerClientRandomNonceGenerator {
318     private boolean isDups = false;
319     private LinkedList<Long> nonces = new LinkedList<Long>();
320 
321     public void startDups() {
322       isDups = true;
323     }
324 
325     @Override
326     public long newNonce() {
327       long nonce = isDups ? nonces.removeFirst() : super.newNonce();
328       if (!isDups) {
329         nonces.add(nonce);
330       }
331       return nonce;
332     }
333   }
334 
335   @Test(timeout = 300000)
336   public void testNonceRecovery() throws Exception {
337     LOG.info("testNonceRecovery");
338     final String TABLE_NAME = "table";
339     final String FAMILY_NAME = "family";
340     final int NUM_REGIONS_TO_CREATE = 40;
341 
342     conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
343     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
344     startCluster(NUM_RS);
345     master.balanceSwitch(false);
346 
347     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
348     HTable ht = installTable(zkw, TABLE_NAME, FAMILY_NAME, NUM_REGIONS_TO_CREATE);
349     NonceGeneratorWithDups ng = new NonceGeneratorWithDups();
350     NonceGenerator oldNg =
351         ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection)ht.getConnection(), ng);
352 
353     try {
354       List<Increment> reqs = new ArrayList<Increment>();
355       for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
356         HRegionServer hrs = rst.getRegionServer();
357         List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
358         for (HRegionInfo hri : hris) {
359           if (TABLE_NAME.equalsIgnoreCase(hri.getTable().getNameAsString())) {
360             byte[] key = hri.getStartKey();
361             if (key == null || key.length == 0) {
362               key = Bytes.copy(hri.getEndKey());
363               --(key[key.length - 1]);
364             }
365             Increment incr = new Increment(key);
366             incr.addColumn(Bytes.toBytes(FAMILY_NAME), Bytes.toBytes("q"), 1);
367             ht.increment(incr);
368             reqs.add(incr);
369           }
370         }
371       }
372 
373       HRegionServer hrs = findRSToKill(false, "table");
374       abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
375       ng.startDups();
376       for (Increment incr : reqs) {
377         try {
378           ht.increment(incr);
379           fail("should have thrown");
380         } catch (OperationConflictException ope) {
381           LOG.debug("Caught as expected: " + ope.getMessage());
382         }
383       }
384     } finally {
385       ConnectionUtils.injectNonceGeneratorForTesting((ClusterConnection) ht.getConnection(), oldNg);
386       ht.close();
387       zkw.close();
388     }
389   }
390 
391   @Test(timeout = 300000)
392   public void testLogReplayWithMetaRSDown() throws Exception {
393     LOG.info("testRecoveredEditsReplayWithMetaRSDown");
394     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
395     startCluster(NUM_RS);
396     final int NUM_REGIONS_TO_CREATE = 40;
397     final int NUM_LOG_LINES = 1000;
398     // turn off load balancing to prevent regions from moving around otherwise
399     // they will consume recovered.edits
400     master.balanceSwitch(false);
401 
402     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
403     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
404 
405     HRegionServer hrs = findRSToKill(true, "table");
406     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
407     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
408 
409     this.abortRSAndVerifyRecovery(hrs, ht, zkw, NUM_REGIONS_TO_CREATE, NUM_LOG_LINES);
410     ht.close();
411     zkw.close();
412   }
413 
414   private void abortRSAndVerifyRecovery(HRegionServer hrs, Table ht, final ZooKeeperWatcher zkw,
415       final int numRegions, final int numofLines) throws Exception {
416 
417     abortRSAndWaitForRecovery(hrs, zkw, numRegions);
418     assertEquals(numofLines, TEST_UTIL.countRows(ht));
419   }
420 
421   private void abortRSAndWaitForRecovery(HRegionServer hrs, final ZooKeeperWatcher zkw,
422       final int numRegions) throws Exception {
423     final MiniHBaseCluster tmpCluster = this.cluster;
424 
425     // abort RS
426     LOG.info("Aborting region server: " + hrs.getServerName());
427     hrs.abort("testing");
428 
429     // wait for abort completes
430     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
431       @Override
432       public boolean evaluate() throws Exception {
433         return (tmpCluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
434       }
435     });
436 
437     // wait for regions come online
438     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
439       @Override
440       public boolean evaluate() throws Exception {
441         return (HBaseTestingUtility.getAllOnlineRegions(tmpCluster).size()
442             >= (numRegions + 1));
443       }
444     });
445 
446     // wait for all regions are fully recovered
447     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
448       @Override
449       public boolean evaluate() throws Exception {
450         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
451           zkw.recoveringRegionsZNode, false);
452         return (recoveringRegions != null && recoveringRegions.size() == 0);
453       }
454     });
455   }
456 
457   @Test(timeout = 300000)
458   public void testMasterStartsUpWithLogSplittingWork() throws Exception {
459     LOG.info("testMasterStartsUpWithLogSplittingWork");
460     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
461     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
462     startCluster(NUM_RS);
463 
464     final int NUM_REGIONS_TO_CREATE = 40;
465     final int NUM_LOG_LINES = 1000;
466     // turn off load balancing to prevent regions from moving around otherwise
467     // they will consume recovered.edits
468     master.balanceSwitch(false);
469 
470     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
471     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
472 
473     HRegionServer hrs = findRSToKill(false, "table");
474     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
475     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
476 
477     // abort master
478     abortMaster(cluster);
479 
480     // abort RS
481     LOG.info("Aborting region server: " + hrs.getServerName());
482     hrs.abort("testing");
483 
484     // wait for abort completes
485     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
486       @Override
487       public boolean evaluate() throws Exception {
488         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
489       }
490     });
491 
492     Thread.sleep(2000);
493     LOG.info("Current Open Regions:"
494         + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
495 
496     // wait for abort completes
497     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
498       @Override
499       public boolean evaluate() throws Exception {
500         return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
501           >= (NUM_REGIONS_TO_CREATE + 1));
502       }
503     });
504 
505     LOG.info("Current Open Regions After Master Node Starts Up:"
506         + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
507 
508     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
509 
510     ht.close();
511     zkw.close();
512   }
513 
514   @Test(timeout = 300000)
515   public void testMasterStartsUpWithLogReplayWork() throws Exception {
516     LOG.info("testMasterStartsUpWithLogReplayWork");
517     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
518     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
519     startCluster(NUM_RS);
520 
521     final int NUM_REGIONS_TO_CREATE = 40;
522     final int NUM_LOG_LINES = 1000;
523     // turn off load balancing to prevent regions from moving around otherwise
524     // they will consume recovered.edits
525     master.balanceSwitch(false);
526 
527     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
528     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
529 
530     HRegionServer hrs = findRSToKill(false, "table");
531     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
532     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
533 
534     // abort master
535     abortMaster(cluster);
536 
537     // abort RS
538     LOG.info("Aborting region server: " + hrs.getServerName());
539     hrs.abort("testing");
540 
541     // wait for the RS dies
542     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
543       @Override
544       public boolean evaluate() throws Exception {
545         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
546       }
547     });
548 
549     Thread.sleep(2000);
550     LOG.info("Current Open Regions:" + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
551 
552     // wait for all regions are fully recovered
553     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
554       @Override
555       public boolean evaluate() throws Exception {
556         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
557           zkw.recoveringRegionsZNode, false);
558         boolean done = recoveringRegions != null && recoveringRegions.size() == 0;
559         if (!done) {
560           LOG.info("Recovering regions: " + recoveringRegions);
561         }
562         return done;
563       }
564     });
565 
566     LOG.info("Current Open Regions After Master Node Starts Up:"
567         + HBaseTestingUtility.getAllOnlineRegions(cluster).size());
568 
569     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
570 
571     ht.close();
572     zkw.close();
573   }
574 
575 
576   @Test(timeout = 300000)
577   public void testLogReplayTwoSequentialRSDown() throws Exception {
578     LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
579     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
580     startCluster(NUM_RS);
581     final int NUM_REGIONS_TO_CREATE = 40;
582     final int NUM_LOG_LINES = 1000;
583     // turn off load balancing to prevent regions from moving around otherwise
584     // they will consume recovered.edits
585     master.balanceSwitch(false);
586 
587     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
588     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
589     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
590 
591     List<HRegionInfo> regions = null;
592     HRegionServer hrs1 = findRSToKill(false, "table");
593     regions = ProtobufUtil.getOnlineRegions(hrs1.getRSRpcServices());
594 
595     makeWAL(hrs1, regions, "table", "family", NUM_LOG_LINES, 100);
596 
597     // abort RS1
598     LOG.info("Aborting region server: " + hrs1.getServerName());
599     hrs1.abort("testing");
600 
601     // wait for abort completes
602     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
603       @Override
604       public boolean evaluate() throws Exception {
605         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
606       }
607     });
608 
609     // wait for regions come online
610     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
611       @Override
612       public boolean evaluate() throws Exception {
613         return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
614             >= (NUM_REGIONS_TO_CREATE + 1));
615       }
616     });
617 
618     // sleep a little bit in order to interrupt recovering in the middle
619     Thread.sleep(300);
620     // abort second region server
621     rsts = cluster.getLiveRegionServerThreads();
622     HRegionServer hrs2 = rsts.get(0).getRegionServer();
623     LOG.info("Aborting one more region server: " + hrs2.getServerName());
624     hrs2.abort("testing");
625 
626     // wait for abort completes
627     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
628       @Override
629       public boolean evaluate() throws Exception {
630         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 2));
631       }
632     });
633 
634     // wait for regions come online
635     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
636       @Override
637       public boolean evaluate() throws Exception {
638         return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
639             >= (NUM_REGIONS_TO_CREATE + 1));
640       }
641     });
642 
643     // wait for all regions are fully recovered
644     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
645       @Override
646       public boolean evaluate() throws Exception {
647         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
648           zkw.recoveringRegionsZNode, false);
649         return (recoveringRegions != null && recoveringRegions.size() == 0);
650       }
651     });
652 
653     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
654     ht.close();
655     zkw.close();
656   }
657 
658   @Test(timeout = 300000)
659   public void testMarkRegionsRecoveringInZK() throws Exception {
660     LOG.info("testMarkRegionsRecoveringInZK");
661     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
662     startCluster(NUM_RS);
663     master.balanceSwitch(false);
664     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
665     final ZooKeeperWatcher zkw = master.getZooKeeper();
666     Table ht = installTable(zkw, "table", "family", 40);
667     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
668 
669     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
670     HRegionInfo region = null;
671     HRegionServer hrs = null;
672     ServerName firstFailedServer = null;
673     ServerName secondFailedServer = null;
674     for (int i = 0; i < NUM_RS; i++) {
675       hrs = rsts.get(i).getRegionServer();
676       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
677       if (regions.isEmpty()) continue;
678       region = regions.get(0);
679       regionSet.add(region);
680       firstFailedServer = hrs.getServerName();
681       secondFailedServer = rsts.get((i + 1) % NUM_RS).getRegionServer().getServerName();
682       break;
683     }
684 
685     slm.markRegionsRecovering(firstFailedServer, regionSet);
686     slm.markRegionsRecovering(secondFailedServer, regionSet);
687 
688     List<String> recoveringRegions = ZKUtil.listChildrenNoWatch(zkw,
689       ZKUtil.joinZNode(zkw.recoveringRegionsZNode, region.getEncodedName()));
690 
691     assertEquals(recoveringRegions.size(), 2);
692 
693     // wait for splitLogWorker to mark them up because there is no WAL files recorded in ZK
694     final HRegionServer tmphrs = hrs;
695     TEST_UTIL.waitFor(60000, 1000, new Waiter.Predicate<Exception>() {
696       @Override
697       public boolean evaluate() throws Exception {
698         return (tmphrs.getRecoveringRegions().size() == 0);
699       }
700     });
701     ht.close();
702   }
703 
704   @Test(timeout = 300000)
705   public void testReplayCmd() throws Exception {
706     LOG.info("testReplayCmd");
707     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
708     startCluster(NUM_RS);
709     final int NUM_REGIONS_TO_CREATE = 40;
710     // turn off load balancing to prevent regions from moving around otherwise
711     // they will consume recovered.edits
712     master.balanceSwitch(false);
713 
714     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
715     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
716     HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
717 
718     List<HRegionInfo> regions = null;
719     HRegionServer hrs = null;
720     for (int i = 0; i < NUM_RS; i++) {
721       boolean isCarryingMeta = false;
722       hrs = rsts.get(i).getRegionServer();
723       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
724       for (HRegionInfo region : regions) {
725         if (region.isMetaRegion()) {
726           isCarryingMeta = true;
727           break;
728         }
729       }
730       if (isCarryingMeta) {
731         continue;
732       }
733       if (regions.size() > 0) break;
734     }
735 
736     this.prepareData(ht, Bytes.toBytes("family"), Bytes.toBytes("c1"));
737     String originalCheckSum = TEST_UTIL.checksumRows(ht);
738 
739     // abort RA and trigger replay
740     abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
741 
742     assertEquals("Data should remain after reopening of regions", originalCheckSum,
743       TEST_UTIL.checksumRows(ht));
744 
745     ht.close();
746     zkw.close();
747   }
748 
749   @Test(timeout = 300000)
750   public void testLogReplayForDisablingTable() throws Exception {
751     LOG.info("testLogReplayForDisablingTable");
752     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
753     startCluster(NUM_RS);
754     final int NUM_REGIONS_TO_CREATE = 40;
755     final int NUM_LOG_LINES = 1000;
756 
757     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
758     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
759     Table disablingHT = installTable(zkw, "disableTable", "family", NUM_REGIONS_TO_CREATE);
760     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE, NUM_REGIONS_TO_CREATE);
761 
762     // turn off load balancing to prevent regions from moving around otherwise
763     // they will consume recovered.edits
764     master.balanceSwitch(false);
765 
766     List<HRegionInfo> regions = null;
767     HRegionServer hrs = null;
768     boolean hasRegionsForBothTables = false;
769     String tableName = null;
770     for (int i = 0; i < NUM_RS; i++) {
771       tableName = null;
772       hasRegionsForBothTables = false;
773       boolean isCarryingSystem = false;
774       hrs = rsts.get(i).getRegionServer();
775       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
776       for (HRegionInfo region : regions) {
777         if (region.getTable().isSystemTable()) {
778           isCarryingSystem = true;
779           break;
780         }
781         if (tableName != null &&
782             !tableName.equalsIgnoreCase(region.getTable().getNameAsString())) {
783           // make sure that we find a RS has online regions for both "table" and "disableTable"
784           hasRegionsForBothTables = true;
785           break;
786         } else if (tableName == null) {
787           tableName = region.getTable().getNameAsString();
788         }
789       }
790       if (isCarryingSystem) {
791         continue;
792       }
793       if (hasRegionsForBothTables) {
794         break;
795       }
796     }
797 
798     // make sure we found a good RS
799     Assert.assertTrue(hasRegionsForBothTables);
800 
801     LOG.info("#regions = " + regions.size());
802     Iterator<HRegionInfo> it = regions.iterator();
803     while (it.hasNext()) {
804       HRegionInfo region = it.next();
805       if (region.isMetaTable()) {
806         it.remove();
807       }
808     }
809     makeWAL(hrs, regions, "disableTable", "family", NUM_LOG_LINES, 100, false);
810     makeWAL(hrs, regions, "table", "family", NUM_LOG_LINES, 100);
811 
812     LOG.info("Disabling table\n");
813     TEST_UTIL.getHBaseAdmin().disableTable(TableName.valueOf("disableTable"));
814 
815     // abort RS
816     LOG.info("Aborting region server: " + hrs.getServerName());
817     hrs.abort("testing");
818 
819     // wait for abort completes
820     TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
821       @Override
822       public boolean evaluate() throws Exception {
823         return (cluster.getLiveRegionServerThreads().size() <= (NUM_RS - 1));
824       }
825     });
826 
827     // wait for regions come online
828     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
829       @Override
830       public boolean evaluate() throws Exception {
831         return (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
832             >= (NUM_REGIONS_TO_CREATE + 1));
833       }
834     });
835 
836     // wait for all regions are fully recovered
837     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
838       @Override
839       public boolean evaluate() throws Exception {
840         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
841           zkw.recoveringRegionsZNode, false);
842         ServerManager serverManager = master.getServerManager();
843         return (!serverManager.areDeadServersInProgress() &&
844             recoveringRegions != null && recoveringRegions.size() == 0);
845       }
846     });
847 
848     int count = 0;
849     FileSystem fs = master.getMasterFileSystem().getFileSystem();
850     Path rootdir = FSUtils.getRootDir(conf);
851     Path tdir = FSUtils.getTableDir(rootdir, TableName.valueOf("disableTable"));
852     for (HRegionInfo hri : regions) {
853       Path editsdir =
854         WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
855       LOG.debug("checking edits dir " + editsdir);
856       if(!fs.exists(editsdir)) continue;
857       FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
858         @Override
859         public boolean accept(Path p) {
860           if (WALSplitter.isSequenceIdFile(p)) {
861             return false;
862           }
863           return true;
864         }
865       });
866       if(files != null) {
867         for(FileStatus file : files) {
868           int c = countWAL(file.getPath(), fs, conf);
869           count += c;
870           LOG.info(c + " edits in " + file.getPath());
871         }
872       }
873     }
874 
875     LOG.info("Verify edits in recovered.edits files");
876     assertEquals(NUM_LOG_LINES, count);
877     LOG.info("Verify replayed edits");
878     assertEquals(NUM_LOG_LINES, TEST_UTIL.countRows(ht));
879 
880     // clean up
881     for (HRegionInfo hri : regions) {
882       Path editsdir =
883         WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName()));
884       fs.delete(editsdir, true);
885     }
886     disablingHT.close();
887     ht.close();
888     zkw.close();
889   }
890 
891   @Ignore ("We don't support DLR anymore") @Test(timeout = 300000)
892   public void testDisallowWritesInRecovering() throws Exception {
893     LOG.info("testDisallowWritesInRecovering");
894     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
895     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
896     conf.setBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, true);
897     startCluster(NUM_RS);
898     final int NUM_REGIONS_TO_CREATE = 40;
899     // turn off load balancing to prevent regions from moving around otherwise
900     // they will consume recovered.edits
901     master.balanceSwitch(false);
902 
903     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
904     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
905     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
906     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
907 
908     Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
909     HRegionInfo region = null;
910     HRegionServer hrs = null;
911     HRegionServer dstRS = null;
912     for (int i = 0; i < NUM_RS; i++) {
913       hrs = rsts.get(i).getRegionServer();
914       List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
915       if (regions.isEmpty()) continue;
916       region = regions.get(0);
917       if (region.isMetaRegion()) continue;
918       regionSet.add(region);
919       dstRS = rsts.get((i+1) % NUM_RS).getRegionServer();
920       break;
921     }
922 
923     slm.markRegionsRecovering(hrs.getServerName(), regionSet);
924     // move region in order for the region opened in recovering state
925     final HRegionInfo hri = region;
926     final HRegionServer tmpRS = dstRS;
927     TEST_UTIL.getHBaseAdmin().move(region.getEncodedNameAsBytes(),
928       Bytes.toBytes(dstRS.getServerName().getServerName()));
929     // wait for region move completes
930     final RegionStates regionStates =
931         TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
932     TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
933       @Override
934       public boolean evaluate() throws Exception {
935         ServerName sn = regionStates.getRegionServerOfRegion(hri);
936         return (sn != null && sn.equals(tmpRS.getServerName()));
937       }
938     });
939 
940     try {
941       byte[] key = region.getStartKey();
942       if (key == null || key.length == 0) {
943         key = new byte[] { 0, 0, 0, 0, 1 };
944       }
945       Put put = new Put(key);
946       put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'});
947       ht.put(put);
948     } catch (IOException ioe) {
949       Assert.assertTrue(ioe instanceof RetriesExhaustedWithDetailsException);
950       RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException) ioe;
951       boolean foundRegionInRecoveryException = false;
952       for (Throwable t : re.getCauses()) {
953         if (t instanceof RegionInRecoveryException) {
954           foundRegionInRecoveryException = true;
955           break;
956         }
957       }
958       Assert.assertTrue(
959         "No RegionInRecoveryException. Following exceptions returned=" + re.getCauses(),
960         foundRegionInRecoveryException);
961     }
962 
963     ht.close();
964     zkw.close();
965   }
966 
967   /**
968    * The original intention of this test was to force an abort of a region
969    * server and to make sure that the failure path in the region servers is
970    * properly evaluated. But it is difficult to ensure that the region server
971    * doesn't finish the log splitting before it aborts. Also now, there is
972    * this code path where the master will preempt the region server when master
973    * detects that the region server has aborted.
974    * @throws Exception
975    */
976   @Test (timeout=300000)
977   public void testWorkerAbort() throws Exception {
978     LOG.info("testWorkerAbort");
979     startCluster(3);
980     final int NUM_LOG_LINES = 10000;
981     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
982     FileSystem fs = master.getMasterFileSystem().getFileSystem();
983 
984     final List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
985     HRegionServer hrs = findRSToKill(false, "table");
986     Path rootdir = FSUtils.getRootDir(conf);
987     final Path logDir = new Path(rootdir,
988         DefaultWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
989 
990     installTable(new ZooKeeperWatcher(conf, "table-creation", null),
991         "table", "family", 40);
992 
993     makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()),
994       "table", "family", NUM_LOG_LINES, 100);
995 
996     new Thread() {
997       @Override
998       public void run() {
999         waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
1000         for (RegionServerThread rst : rsts) {
1001           rst.getRegionServer().abort("testing");
1002           break;
1003         }
1004       }
1005     }.start();
1006     // slm.splitLogDistributed(logDir);
1007     FileStatus[] logfiles = fs.listStatus(logDir);
1008     TaskBatch batch = new TaskBatch();
1009     slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
1010     //waitForCounter but for one of the 2 counters
1011     long curt = System.currentTimeMillis();
1012     long waitTime = 80000;
1013     long endt = curt + waitTime;
1014     while (curt < endt) {
1015       if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
1016           tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
1017           tot_wkr_preempt_task.get()) == 0) {
1018         Thread.yield();
1019         curt = System.currentTimeMillis();
1020       } else {
1021         assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() +
1022             tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() +
1023             tot_wkr_preempt_task.get()));
1024         return;
1025       }
1026     }
1027     fail("none of the following counters went up in " + waitTime +
1028         " milliseconds - " +
1029         "tot_wkr_task_resigned, tot_wkr_task_err, " +
1030         "tot_wkr_final_transition_failed, tot_wkr_task_done, " +
1031         "tot_wkr_preempt_task");
1032   }
1033 
1034   @Test (timeout=300000)
1035   public void testThreeRSAbort() throws Exception {
1036     LOG.info("testThreeRSAbort");
1037     final int NUM_REGIONS_TO_CREATE = 40;
1038     final int NUM_ROWS_PER_REGION = 100;
1039 
1040     startCluster(NUM_RS); // NUM_RS=6.
1041 
1042     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf,
1043         "distributed log splitting test", null);
1044 
1045     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1046     populateDataInTable(NUM_ROWS_PER_REGION, "family");
1047 
1048 
1049     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1050     assertEquals(NUM_RS, rsts.size());
1051     rsts.get(0).getRegionServer().abort("testing");
1052     rsts.get(1).getRegionServer().abort("testing");
1053     rsts.get(2).getRegionServer().abort("testing");
1054 
1055     long start = EnvironmentEdgeManager.currentTime();
1056     while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) {
1057       if (EnvironmentEdgeManager.currentTime() - start > 60000) {
1058         assertTrue(false);
1059       }
1060       Thread.sleep(200);
1061     }
1062 
1063     start = EnvironmentEdgeManager.currentTime();
1064     while (HBaseTestingUtility.getAllOnlineRegions(cluster).size()
1065         < (NUM_REGIONS_TO_CREATE + 1)) {
1066       if (EnvironmentEdgeManager.currentTime() - start > 60000) {
1067         assertTrue("Timedout", false);
1068       }
1069       Thread.sleep(200);
1070     }
1071 
1072     // wait for all regions are fully recovered
1073     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
1074       @Override
1075       public boolean evaluate() throws Exception {
1076         List<String> recoveringRegions = zkw.getRecoverableZooKeeper().getChildren(
1077           zkw.recoveringRegionsZNode, false);
1078         return (recoveringRegions != null && recoveringRegions.size() == 0);
1079       }
1080     });
1081 
1082     assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION,
1083         TEST_UTIL.countRows(ht));
1084     ht.close();
1085     zkw.close();
1086   }
1087 
1088 
1089 
1090   @Test(timeout=30000)
1091   public void testDelayedDeleteOnFailure() throws Exception {
1092     LOG.info("testDelayedDeleteOnFailure");
1093     startCluster(1);
1094     final SplitLogManager slm = master.getMasterFileSystem().splitLogManager;
1095     final FileSystem fs = master.getMasterFileSystem().getFileSystem();
1096     final Path logDir = new Path(FSUtils.getRootDir(conf), "x");
1097     fs.mkdirs(logDir);
1098     ExecutorService executor = null;
1099     try {
1100       final Path corruptedLogFile = new Path(logDir, "x");
1101       FSDataOutputStream out;
1102       out = fs.create(corruptedLogFile);
1103       out.write(0);
1104       out.write(Bytes.toBytes("corrupted bytes"));
1105       out.close();
1106       ZKSplitLogManagerCoordination coordination =
1107           (ZKSplitLogManagerCoordination) ((BaseCoordinatedStateManager) master
1108               .getCoordinatedStateManager()).getSplitLogManagerCoordination();
1109       coordination.setIgnoreDeleteForTesting(true);
1110       executor = Executors.newSingleThreadExecutor();
1111       Runnable runnable = new Runnable() {
1112        @Override
1113        public void run() {
1114           try {
1115             // since the logDir is a fake, corrupted one, so the split log worker
1116             // will finish it quickly with error, and this call will fail and throw
1117             // an IOException.
1118             slm.splitLogDistributed(logDir);
1119           } catch (IOException ioe) {
1120             try {
1121               assertTrue(fs.exists(corruptedLogFile));
1122               // this call will block waiting for the task to be removed from the
1123               // tasks map which is not going to happen since ignoreZKDeleteForTesting
1124               // is set to true, until it is interrupted.
1125               slm.splitLogDistributed(logDir);
1126             } catch (IOException e) {
1127               assertTrue(Thread.currentThread().isInterrupted());
1128               return;
1129             }
1130             fail("did not get the expected IOException from the 2nd call");
1131           }
1132           fail("did not get the expected IOException from the 1st call");
1133         }
1134       };
1135       Future<?> result = executor.submit(runnable);
1136       try {
1137         result.get(2000, TimeUnit.MILLISECONDS);
1138       } catch (TimeoutException te) {
1139         // it is ok, expected.
1140       }
1141       waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
1142       executor.shutdownNow();
1143       executor = null;
1144 
1145       // make sure the runnable is finished with no exception thrown.
1146       result.get();
1147     } finally {
1148       if (executor != null) {
1149         // interrupt the thread in case the test fails in the middle.
1150         // it has no effect if the thread is already terminated.
1151         executor.shutdownNow();
1152       }
1153       fs.delete(logDir, true);
1154     }
1155   }
1156 
1157   @Test(timeout = 300000)
1158   public void testMetaRecoveryInZK() throws Exception {
1159     LOG.info("testMetaRecoveryInZK");
1160     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1161     startCluster(NUM_RS);
1162 
1163     // turn off load balancing to prevent regions from moving around otherwise
1164     // they will consume recovered.edits
1165     master.balanceSwitch(false);
1166     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1167 
1168     // only testing meta recovery in ZK operation
1169     HRegionServer hrs = findRSToKill(true, null);
1170     List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1171 
1172     LOG.info("#regions = " + regions.size());
1173     Set<HRegionInfo> tmpRegions = new HashSet<HRegionInfo>();
1174     tmpRegions.add(HRegionInfo.FIRST_META_REGIONINFO);
1175     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), tmpRegions);
1176     Set<HRegionInfo> userRegionSet = new HashSet<HRegionInfo>();
1177     userRegionSet.addAll(regions);
1178     master.getMasterFileSystem().prepareLogReplay(hrs.getServerName(), userRegionSet);
1179     boolean isMetaRegionInRecovery = false;
1180     List<String> recoveringRegions =
1181         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1182     for (String curEncodedRegionName : recoveringRegions) {
1183       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1184         isMetaRegionInRecovery = true;
1185         break;
1186       }
1187     }
1188     assertTrue(isMetaRegionInRecovery);
1189 
1190     master.getMasterFileSystem().splitMetaLog(hrs.getServerName());
1191 
1192     isMetaRegionInRecovery = false;
1193     recoveringRegions =
1194         zkw.getRecoverableZooKeeper().getChildren(zkw.recoveringRegionsZNode, false);
1195     for (String curEncodedRegionName : recoveringRegions) {
1196       if (curEncodedRegionName.equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1197         isMetaRegionInRecovery = true;
1198         break;
1199       }
1200     }
1201     // meta region should be recovered
1202     assertFalse(isMetaRegionInRecovery);
1203     zkw.close();
1204   }
1205 
1206   @Test(timeout = 300000)
1207   public void testSameVersionUpdatesRecovery() throws Exception {
1208     LOG.info("testSameVersionUpdatesRecovery");
1209     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1210     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1211     startCluster(NUM_RS);
1212     final AtomicLong sequenceId = new AtomicLong(100);
1213     final int NUM_REGIONS_TO_CREATE = 40;
1214     final int NUM_LOG_LINES = 1000;
1215     // turn off load balancing to prevent regions from moving around otherwise
1216     // they will consume recovered.edits
1217     master.balanceSwitch(false);
1218 
1219     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1220     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1221     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1222 
1223     List<HRegionInfo> regions = null;
1224     HRegionServer hrs = null;
1225     for (int i = 0; i < NUM_RS; i++) {
1226       boolean isCarryingMeta = false;
1227       hrs = rsts.get(i).getRegionServer();
1228       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1229       for (HRegionInfo region : regions) {
1230         if (region.isMetaRegion()) {
1231           isCarryingMeta = true;
1232           break;
1233         }
1234       }
1235       if (isCarryingMeta) {
1236         continue;
1237       }
1238       break;
1239     }
1240 
1241     LOG.info("#regions = " + regions.size());
1242     Iterator<HRegionInfo> it = regions.iterator();
1243     while (it.hasNext()) {
1244       HRegionInfo region = it.next();
1245       if (region.isMetaTable()
1246           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1247         it.remove();
1248       }
1249     }
1250     if (regions.size() == 0) return;
1251     HRegionInfo curRegionInfo = regions.get(0);
1252     byte[] startRow = curRegionInfo.getStartKey();
1253     if (startRow == null || startRow.length == 0) {
1254       startRow = new byte[] { 0, 0, 0, 0, 1 };
1255     }
1256     byte[] row = Bytes.incrementBytes(startRow, 1);
1257     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1258     row = Arrays.copyOfRange(row, 3, 8);
1259     long value = 0;
1260     TableName tableName = TableName.valueOf("table");
1261     byte[] family = Bytes.toBytes("family");
1262     byte[] qualifier = Bytes.toBytes("c1");
1263     long timeStamp = System.currentTimeMillis();
1264     HTableDescriptor htd = new HTableDescriptor();
1265     htd.addFamily(new HColumnDescriptor(family));
1266     final WAL wal = hrs.getWAL(curRegionInfo);
1267     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1268       WALEdit e = new WALEdit();
1269       value++;
1270       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1271       wal.append(htd, curRegionInfo,
1272           new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
1273           e, sequenceId, true, null);
1274     }
1275     wal.sync();
1276     wal.shutdown();
1277 
1278     // wait for abort completes
1279     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1280 
1281     // verify we got the last value
1282     LOG.info("Verification Starts...");
1283     Get g = new Get(row);
1284     Result r = ht.get(g);
1285     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1286     assertEquals(value, theStoredVal);
1287 
1288     // after flush
1289     LOG.info("Verification after flush...");
1290     TEST_UTIL.getHBaseAdmin().flush(tableName);
1291     r = ht.get(g);
1292     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1293     assertEquals(value, theStoredVal);
1294     ht.close();
1295   }
1296 
1297   @Test(timeout = 300000)
1298   public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
1299     LOG.info("testSameVersionUpdatesRecoveryWithWrites");
1300     conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
1301     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
1302     conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
1303     conf.setInt("hbase.hstore.compactionThreshold", 3);
1304     startCluster(NUM_RS);
1305     final AtomicLong sequenceId = new AtomicLong(100);
1306     final int NUM_REGIONS_TO_CREATE = 40;
1307     final int NUM_LOG_LINES = 2000;
1308     // turn off load balancing to prevent regions from moving around otherwise
1309     // they will consume recovered.edits
1310     master.balanceSwitch(false);
1311 
1312     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1313     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1314     Table ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
1315 
1316     List<HRegionInfo> regions = null;
1317     HRegionServer hrs = null;
1318     for (int i = 0; i < NUM_RS; i++) {
1319       boolean isCarryingMeta = false;
1320       hrs = rsts.get(i).getRegionServer();
1321       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1322       for (HRegionInfo region : regions) {
1323         if (region.isMetaRegion()) {
1324           isCarryingMeta = true;
1325           break;
1326         }
1327       }
1328       if (isCarryingMeta) {
1329         continue;
1330       }
1331       break;
1332     }
1333 
1334     LOG.info("#regions = " + regions.size());
1335     Iterator<HRegionInfo> it = regions.iterator();
1336     while (it.hasNext()) {
1337       HRegionInfo region = it.next();
1338       if (region.isMetaTable()
1339           || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
1340         it.remove();
1341       }
1342     }
1343     if (regions.size() == 0) return;
1344     HRegionInfo curRegionInfo = regions.get(0);
1345     byte[] startRow = curRegionInfo.getStartKey();
1346     if (startRow == null || startRow.length == 0) {
1347       startRow = new byte[] { 0, 0, 0, 0, 1 };
1348     }
1349     byte[] row = Bytes.incrementBytes(startRow, 1);
1350     // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
1351     row = Arrays.copyOfRange(row, 3, 8);
1352     long value = 0;
1353     final TableName tableName = TableName.valueOf("table");
1354     byte[] family = Bytes.toBytes("family");
1355     byte[] qualifier = Bytes.toBytes("c1");
1356     long timeStamp = System.currentTimeMillis();
1357     HTableDescriptor htd = new HTableDescriptor(tableName);
1358     htd.addFamily(new HColumnDescriptor(family));
1359     final WAL wal = hrs.getWAL(curRegionInfo);
1360     for (int i = 0; i < NUM_LOG_LINES; i += 1) {
1361       WALEdit e = new WALEdit();
1362       value++;
1363       e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
1364       wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
1365           tableName, System.currentTimeMillis()), e, sequenceId, true, null);
1366     }
1367     wal.sync();
1368     wal.shutdown();
1369 
1370     // wait for abort completes
1371     this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
1372 
1373     // verify we got the last value
1374     LOG.info("Verification Starts...");
1375     Get g = new Get(row);
1376     Result r = ht.get(g);
1377     long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1378     assertEquals(value, theStoredVal);
1379 
1380     // after flush & compaction
1381     LOG.info("Verification after flush...");
1382     TEST_UTIL.getHBaseAdmin().flush(tableName);
1383     TEST_UTIL.getHBaseAdmin().compact(tableName);
1384 
1385     // wait for compaction completes
1386     TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
1387       @Override
1388       public boolean evaluate() throws Exception {
1389         return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
1390       }
1391     });
1392 
1393     r = ht.get(g);
1394     theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
1395     assertEquals(value, theStoredVal);
1396     ht.close();
1397   }
1398 
1399   @Test(timeout = 300000)
1400   public void testReadWriteSeqIdFiles() throws Exception {
1401     LOG.info("testReadWriteSeqIdFiles");
1402     startCluster(2);
1403     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
1404     HTable ht = installTable(zkw, "table", "family", 10);
1405     FileSystem fs = master.getMasterFileSystem().getFileSystem();
1406     Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table"));
1407     List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
1408     long newSeqId = WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L);
1409     WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L);
1410     assertEquals(newSeqId + 2000,
1411       WALSplitter.writeRegionSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L));
1412     
1413     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0));
1414     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
1415       @Override
1416       public boolean accept(Path p) {
1417         return WALSplitter.isSequenceIdFile(p);
1418       }
1419     });
1420     // only one seqid file should exist
1421     assertEquals(1, files.length);
1422 
1423     // verify all seqId files aren't treated as recovered.edits files
1424     NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(fs, regionDirs.get(0));
1425     assertEquals(0, recoveredEdits.size());
1426 
1427     ht.close();
1428   }
1429 
1430   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
1431     return installTable(zkw, tname, fname, nrs, 0);
1432   }
1433 
1434   HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs,
1435       int existingRegions) throws Exception {
1436     // Create a table with regions
1437     TableName table = TableName.valueOf(tname);
1438     byte [] family = Bytes.toBytes(fname);
1439     LOG.info("Creating table with " + nrs + " regions");
1440     HTable ht = TEST_UTIL.createMultiRegionTable(table, family, nrs);
1441     int numRegions = -1;
1442     try (RegionLocator r = ht.getRegionLocator()) {
1443       numRegions = r.getStartKeys().length;
1444     }
1445     assertEquals(nrs, numRegions);
1446       LOG.info("Waiting for no more RIT\n");
1447     blockUntilNoRIT(zkw, master);
1448     // disable-enable cycle to get rid of table's dead regions left behind
1449     // by createMultiRegions
1450     LOG.debug("Disabling table\n");
1451     TEST_UTIL.getHBaseAdmin().disableTable(table);
1452     LOG.debug("Waiting for no more RIT\n");
1453     blockUntilNoRIT(zkw, master);
1454     NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
1455     LOG.debug("Verifying only catalog and namespace regions are assigned\n");
1456     if (regions.size() != 2) {
1457       for (String oregion : regions)
1458         LOG.debug("Region still online: " + oregion);
1459     }
1460     assertTrue(2 + existingRegions <= regions.size());
1461     LOG.debug("Enabling table\n");
1462     TEST_UTIL.getHBaseAdmin().enableTable(table);
1463     LOG.debug("Waiting for no more RIT\n");
1464     blockUntilNoRIT(zkw, master);
1465     LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
1466     regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
1467     assertTrue(numRegions + 2 + existingRegions <= regions.size());
1468     return ht;
1469   }
1470 
1471   void populateDataInTable(int nrows, String fname) throws Exception {
1472     byte [] family = Bytes.toBytes(fname);
1473 
1474     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1475     assertEquals(NUM_RS, rsts.size());
1476 
1477     for (RegionServerThread rst : rsts) {
1478       HRegionServer hrs = rst.getRegionServer();
1479       List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1480       for (HRegionInfo hri : hris) {
1481         if (hri.getTable().isSystemTable()) {
1482           continue;
1483         }
1484         LOG.debug("adding data to rs = " + rst.getName() +
1485             " region = "+ hri.getRegionNameAsString());
1486         Region region = hrs.getOnlineRegion(hri.getRegionName());
1487         assertTrue(region != null);
1488         putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family);
1489       }
1490     }
1491   }
1492 
1493   public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
1494       int num_edits, int edit_size) throws IOException {
1495     makeWAL(hrs, regions, tname, fname, num_edits, edit_size, true);
1496   }
1497 
1498   public void makeWAL(HRegionServer hrs, List<HRegionInfo> regions, String tname, String fname,
1499       int num_edits, int edit_size, boolean cleanShutdown) throws IOException {
1500     TableName fullTName = TableName.valueOf(tname);
1501     // remove root and meta region
1502     regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
1503     // using one sequenceId for edits across all regions is ok.
1504     final AtomicLong sequenceId = new AtomicLong(10);
1505 
1506 
1507     for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
1508       HRegionInfo regionInfo = iter.next();
1509       if(regionInfo.getTable().isSystemTable()) {
1510          iter.remove();
1511       }
1512     }
1513     HTableDescriptor htd = new HTableDescriptor(fullTName);
1514     byte[] family = Bytes.toBytes(fname);
1515     htd.addFamily(new HColumnDescriptor(family));
1516     byte[] value = new byte[edit_size];
1517 
1518     List<HRegionInfo> hris = new ArrayList<HRegionInfo>();
1519     for (HRegionInfo region : regions) {
1520       if (!region.getTable().getNameAsString().equalsIgnoreCase(tname)) {
1521         continue;
1522       }
1523       hris.add(region);
1524     }
1525     LOG.info("Creating wal edits across " + hris.size() + " regions.");
1526     for (int i = 0; i < edit_size; i++) {
1527       value[i] = (byte) ('a' + (i % 26));
1528     }
1529     int n = hris.size();
1530     int[] counts = new int[n];
1531     // sync every ~30k to line up with desired wal rolls
1532     final int syncEvery = 30 * 1024 / edit_size;
1533     if (n > 0) {
1534       for (int i = 0; i < num_edits; i += 1) {
1535         WALEdit e = new WALEdit();
1536         HRegionInfo curRegionInfo = hris.get(i % n);
1537         final WAL log = hrs.getWAL(curRegionInfo);
1538         byte[] startRow = curRegionInfo.getStartKey();
1539         if (startRow == null || startRow.length == 0) {
1540           startRow = new byte[] { 0, 0, 0, 0, 1 };
1541         }
1542         byte[] row = Bytes.incrementBytes(startRow, counts[i % n]);
1543         row = Arrays.copyOfRange(row, 3, 8); // use last 5 bytes because
1544                                              // HBaseTestingUtility.createMultiRegions use 5 bytes
1545                                              // key
1546         byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
1547         e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
1548         log.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
1549             System.currentTimeMillis()), e, sequenceId, true, null);
1550         if (0 == i % syncEvery) {
1551           log.sync();
1552         }
1553         counts[i % n] += 1;
1554       }
1555     }
1556     // done as two passes because the regions might share logs. shutdown is idempotent, but sync
1557     // will cause errors if done after.
1558     for (HRegionInfo info : hris) {
1559       final WAL log = hrs.getWAL(info);
1560       log.sync();
1561     }
1562     if (cleanShutdown) {
1563       for (HRegionInfo info : hris) {
1564         final WAL log = hrs.getWAL(info);
1565         log.shutdown();
1566       }
1567     }
1568     for (int i = 0; i < n; i++) {
1569       LOG.info("region " + hris.get(i).getRegionNameAsString() + " has " + counts[i] + " edits");
1570     }
1571     return;
1572   }
1573 
1574   private int countWAL(Path log, FileSystem fs, Configuration conf)
1575   throws IOException {
1576     int count = 0;
1577     WAL.Reader in = WALFactory.createReader(fs, log, conf);
1578     try {
1579       WAL.Entry e;
1580       while ((e = in.next()) != null) {
1581         if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
1582           count++;
1583         }
1584       }
1585     } finally {
1586       try {
1587         in.close();
1588       } catch (IOException exception) {
1589         LOG.warn("Problem closing wal: " + exception.getMessage());
1590         LOG.debug("exception details.", exception);
1591       }
1592     }
1593     return count;
1594   }
1595 
1596   private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
1597   throws KeeperException, InterruptedException {
1598     ZKAssign.blockUntilNoRIT(zkw);
1599     master.assignmentManager.waitUntilNoRegionsInTransition(60000);
1600   }
1601 
1602   private void putData(Region region, byte[] startRow, int numRows, byte [] qf,
1603       byte [] ...families)
1604   throws IOException {
1605     for(int i = 0; i < numRows; i++) {
1606       Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i)));
1607       for(byte [] family : families) {
1608         put.add(family, qf, null);
1609       }
1610       region.put(put);
1611     }
1612   }
1613 
1614   /**
1615    * Load table with puts and deletes with expected values so that we can verify later
1616    */
1617   private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException {
1618     byte[] k = new byte[3];
1619 
1620     // add puts
1621     List<Put> puts = new ArrayList<>();
1622     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
1623       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
1624         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1625           k[0] = b1;
1626           k[1] = b2;
1627           k[2] = b3;
1628           Put put = new Put(k);
1629           put.add(f, column, k);
1630           puts.add(put);
1631         }
1632       }
1633     }
1634     t.put(puts);
1635     // add deletes
1636     for (byte b3 = 'a'; b3 <= 'z'; b3++) {
1637       k[0] = 'a';
1638       k[1] = 'a';
1639       k[2] = b3;
1640       Delete del = new Delete(k);
1641       t.delete(del);
1642     }
1643   }
1644 
1645   private void waitForCounter(AtomicLong ctr, long oldval, long newval,
1646       long timems) {
1647     long curt = System.currentTimeMillis();
1648     long endt = curt + timems;
1649     while (curt < endt) {
1650       if (ctr.get() == oldval) {
1651         Thread.yield();
1652         curt = System.currentTimeMillis();
1653       } else {
1654         assertEquals(newval, ctr.get());
1655         return;
1656       }
1657     }
1658     assertTrue(false);
1659   }
1660 
1661   private void abortMaster(MiniHBaseCluster cluster) throws InterruptedException {
1662     for (MasterThread mt : cluster.getLiveMasterThreads()) {
1663       if (mt.getMaster().isActiveMaster()) {
1664         mt.getMaster().abort("Aborting for tests", new Exception("Trace info"));
1665         mt.join();
1666         break;
1667       }
1668     }
1669     LOG.debug("Master is aborted");
1670   }
1671 
1672   /**
1673    * Find a RS that has regions of a table.
1674    * @param hasMetaRegion when true, the returned RS has hbase:meta region as well
1675    * @param tableName
1676    * @return
1677    * @throws Exception
1678    */
1679   private HRegionServer findRSToKill(boolean hasMetaRegion, String tableName) throws Exception {
1680     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
1681     int numOfRSs = rsts.size();
1682     List<HRegionInfo> regions = null;
1683     HRegionServer hrs = null;
1684 
1685     for (int i = 0; i < numOfRSs; i++) {
1686       boolean isCarryingMeta = false;
1687       boolean foundTableRegion = false;
1688       hrs = rsts.get(i).getRegionServer();
1689       regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
1690       for (HRegionInfo region : regions) {
1691         if (region.isMetaRegion()) {
1692           isCarryingMeta = true;
1693         }
1694         if (tableName == null || region.getTable().getNameAsString().equals(tableName)) {
1695           foundTableRegion = true;
1696         }
1697         if (foundTableRegion && (isCarryingMeta || !hasMetaRegion)) {
1698           break;
1699         }
1700       }
1701       if (isCarryingMeta && hasMetaRegion) {
1702         // clients ask for a RS with META
1703         if (!foundTableRegion) {
1704           final HRegionServer destRS = hrs;
1705           // the RS doesn't have regions of the specified table so we need move one to this RS
1706           List<HRegionInfo> tableRegions =
1707               TEST_UTIL.getHBaseAdmin().getTableRegions(TableName.valueOf(tableName));
1708           final HRegionInfo hri = tableRegions.get(0);
1709           TEST_UTIL.getHBaseAdmin().move(hri.getEncodedNameAsBytes(),
1710             Bytes.toBytes(destRS.getServerName().getServerName()));
1711           // wait for region move completes
1712           final RegionStates regionStates =
1713               TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
1714           TEST_UTIL.waitFor(45000, 200, new Waiter.Predicate<Exception>() {
1715             @Override
1716             public boolean evaluate() throws Exception {
1717               ServerName sn = regionStates.getRegionServerOfRegion(hri);
1718               return (sn != null && sn.equals(destRS.getServerName()));
1719             }
1720           });
1721         }
1722         return hrs;
1723       } else if (hasMetaRegion || isCarryingMeta) {
1724         continue;
1725       }
1726       if (foundTableRegion) break;
1727     }
1728 
1729     return hrs;
1730   }
1731 
1732 }