View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase;
19  
20  import java.io.File;
21  import java.io.IOException;
22  import java.io.OutputStream;
23  import java.lang.reflect.Field;
24  import java.lang.reflect.Modifier;
25  import java.net.BindException;
26  import java.net.DatagramSocket;
27  import java.net.InetAddress;
28  import java.net.ServerSocket;
29  import java.net.Socket;
30  import java.net.UnknownHostException;
31  import java.security.MessageDigest;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.Collections;
36  import java.util.HashSet;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.NavigableSet;
40  import java.util.Properties;
41  import java.util.Random;
42  import java.util.Set;
43  import java.util.TreeSet;
44  import java.util.UUID;
45  import java.util.concurrent.TimeUnit;
46  
47  import org.apache.commons.io.FileUtils;
48  import org.apache.commons.lang.RandomStringUtils;
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.commons.logging.impl.Jdk14Logger;
52  import org.apache.commons.logging.impl.Log4JLogger;
53  import org.apache.hadoop.conf.Configuration;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
57  import org.apache.hadoop.hbase.Waiter.Predicate;
58  import org.apache.hadoop.hbase.backup.impl.BackupManager;
59  import org.apache.hadoop.hbase.classification.InterfaceAudience;
60  import org.apache.hadoop.hbase.classification.InterfaceStability;
61  import org.apache.hadoop.hbase.client.Admin;
62  import org.apache.hadoop.hbase.client.Connection;
63  import org.apache.hadoop.hbase.client.ConnectionFactory;
64  import org.apache.hadoop.hbase.client.Consistency;
65  import org.apache.hadoop.hbase.client.Delete;
66  import org.apache.hadoop.hbase.client.Durability;
67  import org.apache.hadoop.hbase.client.Get;
68  import org.apache.hadoop.hbase.client.HBaseAdmin;
69  import org.apache.hadoop.hbase.client.HConnection;
70  import org.apache.hadoop.hbase.client.HTable;
71  import org.apache.hadoop.hbase.client.Put;
72  import org.apache.hadoop.hbase.client.RegionLocator;
73  import org.apache.hadoop.hbase.client.Result;
74  import org.apache.hadoop.hbase.client.ResultScanner;
75  import org.apache.hadoop.hbase.client.Scan;
76  import org.apache.hadoop.hbase.client.Table;
77  import org.apache.hadoop.hbase.fs.HFileSystem;
78  import org.apache.hadoop.hbase.io.compress.Compression;
79  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
80  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
81  import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
82  import org.apache.hadoop.hbase.io.hfile.HFile;
83  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
84  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
85  import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
86  import org.apache.hadoop.hbase.master.HMaster;
87  import org.apache.hadoop.hbase.master.RegionStates;
88  import org.apache.hadoop.hbase.master.ServerManager;
89  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90  import org.apache.hadoop.hbase.regionserver.BloomType;
91  import org.apache.hadoop.hbase.regionserver.HRegion;
92  import org.apache.hadoop.hbase.regionserver.HRegionServer;
93  import org.apache.hadoop.hbase.regionserver.HStore;
94  import org.apache.hadoop.hbase.regionserver.InternalScanner;
95  import org.apache.hadoop.hbase.regionserver.Region;
96  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
97  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
98  import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
99  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
100 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
101 import org.apache.hadoop.hbase.security.User;
102 import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
103 import org.apache.hadoop.hbase.tool.Canary;
104 import org.apache.hadoop.hbase.util.Bytes;
105 import org.apache.hadoop.hbase.util.FSTableDescriptors;
106 import org.apache.hadoop.hbase.util.FSUtils;
107 import org.apache.hadoop.hbase.util.JVMClusterUtil;
108 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
109 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
110 import org.apache.hadoop.hbase.util.Pair;
111 import org.apache.hadoop.hbase.util.RegionSplitter;
112 import org.apache.hadoop.hbase.util.RetryCounter;
113 import org.apache.hadoop.hbase.util.Threads;
114 import org.apache.hadoop.hbase.wal.WAL;
115 import org.apache.hadoop.hbase.wal.WALFactory;
116 import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
117 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
118 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
119 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
120 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
121 import org.apache.hadoop.hdfs.DFSClient;
122 import org.apache.hadoop.hdfs.DistributedFileSystem;
123 import org.apache.hadoop.hdfs.MiniDFSCluster;
124 import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
125 import org.apache.hadoop.mapred.JobConf;
126 import org.apache.hadoop.mapred.MiniMRCluster;
127 import org.apache.hadoop.mapred.TaskLog;
128 import org.apache.zookeeper.KeeperException;
129 import org.apache.zookeeper.KeeperException.NodeExistsException;
130 import org.apache.hadoop.minikdc.MiniKdc;
131 import org.apache.zookeeper.WatchedEvent;
132 import org.apache.zookeeper.ZooKeeper;
133 import org.apache.zookeeper.ZooKeeper.States;
134 
135 import static org.junit.Assert.assertEquals;
136 import static org.junit.Assert.assertTrue;
137 import static org.junit.Assert.fail;
138 
139 /**
140  * Facility for testing HBase. Replacement for
141  * old HBaseTestCase and HBaseClusterTestCase functionality.
142  * Create an instance and keep it around testing HBase.  This class is
143  * meant to be your one-stop shop for anything you might need testing.  Manages
144  * one cluster at a time only. Managed cluster can be an in-process
145  * {@link MiniHBaseCluster}, or a deployed cluster of type {@link DistributedHBaseCluster}.
146  * Not all methods work with the real cluster.
147  * Depends on log4j being on classpath and
148  * hbase-site.xml for logging and test-run configuration.  It does not set
149  * logging levels nor make changes to configuration parameters.
150  * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir"
151  * setting it to true.
152  */
153 @InterfaceAudience.Public
154 @InterfaceStability.Evolving
155 @SuppressWarnings("deprecation")
156 public class HBaseTestingUtility extends HBaseCommonTestingUtility {
157    private MiniZooKeeperCluster zkCluster = null;
158 
159   public static final String REGIONS_PER_SERVER_KEY = "hbase.test.regions-per-server";
160   /**
161    * The default number of regions per regionserver when creating a pre-split
162    * table.
163    */
164   public static final int DEFAULT_REGIONS_PER_SERVER = 5;
165 
166   /**
167    * Set if we were passed a zkCluster.  If so, we won't shutdown zk as
168    * part of general shutdown.
169    */
170   private boolean passedZkCluster = false;
171   private MiniDFSCluster dfsCluster = null;
172 
173   private volatile HBaseCluster hbaseCluster = null;
174   private MiniMRCluster mrCluster = null;
175 
176   /** If there is a mini cluster running for this testing utility instance. */
177   private volatile boolean miniClusterRunning;
178 
179   private String hadoopLogDir;
180 
181   /** Directory (a subdirectory of dataTestDir) used by the dfs cluster if any */
182   private File clusterTestDir = null;
183 
184   /** Directory on test filesystem where we put the data for this instance of
185     * HBaseTestingUtility*/
186   private Path dataTestDirOnTestFS = null;
187 
188   /**
189    * Shared cluster connection.
190    */
191   private volatile Connection connection;
192 
193   /**
194    * System property key to get test directory value.
195    * Name is as it is because mini dfs has hard-codings to put test data here.
196    * It should NOT be used directly in HBase, as it's a property used in
197    *  mini dfs.
198    *  @deprecated can be used only with mini dfs
199    */
200   @Deprecated
201   private static final String TEST_DIRECTORY_KEY = "test.build.data";
202 
203   /** Filesystem URI used for map-reduce mini-cluster setup */
204   private static String FS_URI;
205 
206   /** A set of ports that have been claimed using {@link #randomFreePort()}. */
207   private static final Set<Integer> takenRandomPorts = new HashSet<Integer>();
208 
209   /** Compression algorithms to use in parameterized JUnit 4 tests */
210   public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED =
211     Arrays.asList(new Object[][] {
212       { Compression.Algorithm.NONE },
213       { Compression.Algorithm.GZ }
214     });
215 
216   /** This is for unit tests parameterized with a two booleans. */
217   public static final List<Object[]> BOOLEAN_PARAMETERIZED =
218       Arrays.asList(new Object[][] {
219           { new Boolean(false) },
220           { new Boolean(true) }
221       });
222 
223   /** This is for unit tests parameterized with a single boolean. */
224   public static final List<Object[]> MEMSTORETS_TAGS_PARAMETRIZED = memStoreTSAndTagsCombination()  ;
225   /** Compression algorithms to use in testing */
226   public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS ={
227       Compression.Algorithm.NONE, Compression.Algorithm.GZ
228     };
229 
230   /**
231    * Checks to see if a specific port is available.
232    *
233    * @param port the port number to check for availability
234    * @return <tt>true</tt> if the port is available, or <tt>false</tt> if not
235    */
236   public static boolean available(int port) {
237     ServerSocket ss = null;
238     DatagramSocket ds = null;
239     try {
240       ss = new ServerSocket(port);
241       ss.setReuseAddress(true);
242       ds = new DatagramSocket(port);
243       ds.setReuseAddress(true);
244       return true;
245     } catch (IOException e) {
246       // Do nothing
247     } finally {
248       if (ds != null) {
249         ds.close();
250       }
251 
252       if (ss != null) {
253         try {
254           ss.close();
255         } catch (IOException e) {
256           /* should not be thrown */
257         }
258       }
259     }
260 
261     return false;
262   }
263 
264   /**
265    * Create all combinations of Bloom filters and compression algorithms for
266    * testing.
267    */
268   private static List<Object[]> bloomAndCompressionCombinations() {
269     List<Object[]> configurations = new ArrayList<Object[]>();
270     for (Compression.Algorithm comprAlgo :
271          HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
272       for (BloomType bloomType : BloomType.values()) {
273         configurations.add(new Object[] { comprAlgo, bloomType });
274       }
275     }
276     return Collections.unmodifiableList(configurations);
277   }
278 
279   /**
280    * Create combination of memstoreTS and tags
281    */
282   private static List<Object[]> memStoreTSAndTagsCombination() {
283     List<Object[]> configurations = new ArrayList<Object[]>();
284     configurations.add(new Object[] { false, false });
285     configurations.add(new Object[] { false, true });
286     configurations.add(new Object[] { true, false });
287     configurations.add(new Object[] { true, true });
288     return Collections.unmodifiableList(configurations);
289   }
290 
291   public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
292       bloomAndCompressionCombinations();
293 
294   public HBaseTestingUtility() {
295     this(HBaseConfiguration.create());
296   }
297 
298   public HBaseTestingUtility(Configuration conf) {
299     super(conf);
300 
301     // a hbase checksum verification failure will cause unit tests to fail
302     ChecksumUtil.generateExceptionForChecksumFailureForTest(true);
303   }
304 
305   /**
306    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
307    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
308    * test dir.  Use this when you aren't using an Mini HDFS cluster.
309    * @return HBaseTestingUtility that use local fs for temp files.
310    */
311   public static HBaseTestingUtility createLocalHTU() {
312     Configuration c = HBaseConfiguration.create();
313     return createLocalHTU(c);
314   }
315 
316   /**
317    * Create an HBaseTestingUtility where all tmp files are written to the local test data dir.
318    * It is needed to properly base FSUtil.getRootDirs so that they drop temp files in the proper
319    * test dir.  Use this when you aren't using an Mini HDFS cluster.
320    * @param c Configuration (will be modified)
321    * @return HBaseTestingUtility that use local fs for temp files.
322    */
323   public static HBaseTestingUtility createLocalHTU(Configuration c) {
324     HBaseTestingUtility htu = new HBaseTestingUtility(c);
325     String dataTestDir = htu.getDataTestDir().toString();
326     htu.getConfiguration().set(HConstants.HBASE_DIR, dataTestDir);
327     LOG.debug("Setting " + HConstants.HBASE_DIR + " to " + dataTestDir);
328     return htu;
329   }
330 
331  /**
332   * Close the Region {@code r}. For use in tests.
333   */
334  public static void closeRegion(final Region r) throws IOException {
335    if (r != null) {
336      ((HRegion)r).close();
337    }
338  }
339 
340   /**
341    * Returns this classes's instance of {@link Configuration}.  Be careful how
342    * you use the returned Configuration since {@link HConnection} instances
343    * can be shared.  The Map of HConnections is keyed by the Configuration.  If
344    * say, a Connection was being used against a cluster that had been shutdown,
345    * see {@link #shutdownMiniCluster()}, then the Connection will no longer
346    * be wholesome.  Rather than use the return direct, its usually best to
347    * make a copy and use that.  Do
348    * <code>Configuration c = new Configuration(INSTANCE.getConfiguration());</code>
349    * @return Instance of Configuration.
350    */
351   @Override
352   public Configuration getConfiguration() {
353     return super.getConfiguration();
354   }
355 
356   public void setHBaseCluster(HBaseCluster hbaseCluster) {
357     this.hbaseCluster = hbaseCluster;
358   }
359 
360   /**
361    * Home our data in a dir under {@link #DEFAULT_BASE_TEST_DIRECTORY}.
362    * Give it a random name so can have many concurrent tests running if
363    * we need to.  It needs to amend the {@link #TEST_DIRECTORY_KEY}
364    * System property, as it's what minidfscluster bases
365    * it data dir on.  Moding a System property is not the way to do concurrent
366    * instances -- another instance could grab the temporary
367    * value unintentionally -- but not anything can do about it at moment;
368    * single instance only is how the minidfscluster works.
369    *
370    * We also create the underlying directory for
371    *  hadoop.log.dir, mapreduce.cluster.local.dir and hadoop.tmp.dir, and set the values
372    *  in the conf, and as a system property for hadoop.tmp.dir
373    *
374    * @return The calculated data test build directory, if newly-created.
375    */
376   @Override
377   protected Path setupDataTestDir() {
378     Path testPath = super.setupDataTestDir();
379     if (null == testPath) {
380       return null;
381     }
382 
383     createSubDirAndSystemProperty(
384       "hadoop.log.dir",
385       testPath, "hadoop-log-dir");
386 
387     // This is defaulted in core-default.xml to /tmp/hadoop-${user.name}, but
388     //  we want our own value to ensure uniqueness on the same machine
389     createSubDirAndSystemProperty(
390       "hadoop.tmp.dir",
391       testPath, "hadoop-tmp-dir");
392 
393     // Read and modified in org.apache.hadoop.mapred.MiniMRCluster
394     createSubDir(
395       "mapreduce.cluster.local.dir",
396       testPath, "mapred-local-dir");
397 
398     return testPath;
399   }
400 
401   private void createSubDirAndSystemProperty(
402     String propertyName, Path parent, String subDirName){
403 
404     String sysValue = System.getProperty(propertyName);
405 
406     if (sysValue != null) {
407       // There is already a value set. So we do nothing but hope
408       //  that there will be no conflicts
409       LOG.info("System.getProperty(\""+propertyName+"\") already set to: "+
410         sysValue + " so I do NOT create it in " + parent);
411       String confValue = conf.get(propertyName);
412       if (confValue != null && !confValue.endsWith(sysValue)){
413        LOG.warn(
414          propertyName + " property value differs in configuration and system: "+
415          "Configuration="+confValue+" while System="+sysValue+
416          " Erasing configuration value by system value."
417        );
418       }
419       conf.set(propertyName, sysValue);
420     } else {
421       // Ok, it's not set, so we create it as a subdirectory
422       createSubDir(propertyName, parent, subDirName);
423       System.setProperty(propertyName, conf.get(propertyName));
424     }
425   }
426 
427   /**
428    * @return Where to write test data on the test filesystem; Returns working directory
429    * for the test filesystem by default
430    * @see #setupDataTestDirOnTestFS()
431    * @see #getTestFileSystem()
432    */
433   private Path getBaseTestDirOnTestFS() throws IOException {
434     FileSystem fs = getTestFileSystem();
435     return new Path(fs.getWorkingDirectory(), "test-data");
436   }
437 
438   /**
439    * @return META table descriptor
440    */
441   public HTableDescriptor getMetaTableDescriptor() {
442     try {
443       return new FSTableDescriptors(conf).get(TableName.META_TABLE_NAME);
444     } catch (IOException e) {
445       throw new RuntimeException("Unable to create META table descriptor", e);
446     }
447   }
448 
449   /**
450    * @return Where the DFS cluster will write data on the local subsystem.
451    * Creates it if it does not exist already.  A subdir of {@link #getBaseTestDir()}
452    * @see #getTestFileSystem()
453    */
454   Path getClusterTestDir() {
455     if (clusterTestDir == null){
456       setupClusterTestDir();
457     }
458     return new Path(clusterTestDir.getAbsolutePath());
459   }
460 
461   /**
462    * Creates a directory for the DFS cluster, under the test data
463    */
464   private void setupClusterTestDir() {
465     if (clusterTestDir != null) {
466       return;
467     }
468 
469     // Using randomUUID ensures that multiple clusters can be launched by
470     //  a same test, if it stops & starts them
471     Path testDir = getDataTestDir("dfscluster_" + UUID.randomUUID().toString());
472     clusterTestDir = new File(testDir.toString()).getAbsoluteFile();
473     // Have it cleaned up on exit
474     boolean b = deleteOnExit();
475     if (b) clusterTestDir.deleteOnExit();
476     conf.set(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
477     LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b);
478   }
479 
480   /**
481    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
482    * to write temporary test data. Call this method after setting up the mini dfs cluster
483    * if the test relies on it.
484    * @return a unique path in the test filesystem
485    */
486   public Path getDataTestDirOnTestFS() throws IOException {
487     if (dataTestDirOnTestFS == null) {
488       setupDataTestDirOnTestFS();
489     }
490 
491     return dataTestDirOnTestFS;
492   }
493 
494   /**
495    * Returns a Path in the test filesystem, obtained from {@link #getTestFileSystem()}
496    * to write temporary test data. Call this method after setting up the mini dfs cluster
497    * if the test relies on it.
498    * @return a unique path in the test filesystem
499    * @param subdirName name of the subdir to create under the base test dir
500    */
501   public Path getDataTestDirOnTestFS(final String subdirName) throws IOException {
502     return new Path(getDataTestDirOnTestFS(), subdirName);
503   }
504 
505   /**
506    * Sets up a path in test filesystem to be used by tests.
507    * Creates a new directory if not already setup.
508    */
509   private void setupDataTestDirOnTestFS() throws IOException {
510     if (dataTestDirOnTestFS != null) {
511       LOG.warn("Data test on test fs dir already setup in "
512           + dataTestDirOnTestFS.toString());
513       return;
514     }
515     dataTestDirOnTestFS = getNewDataTestDirOnTestFS();
516   }
517 
518   /**
519    * Sets up a new path in test filesystem to be used by tests.
520    */
521   private Path getNewDataTestDirOnTestFS() throws IOException {
522     //The file system can be either local, mini dfs, or if the configuration
523     //is supplied externally, it can be an external cluster FS. If it is a local
524     //file system, the tests should use getBaseTestDir, otherwise, we can use
525     //the working directory, and create a unique sub dir there
526     FileSystem fs = getTestFileSystem();
527     Path newDataTestDir = null;
528     if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
529       File dataTestDir = new File(getDataTestDir().toString());
530       if (deleteOnExit()) dataTestDir.deleteOnExit();
531       newDataTestDir = new Path(dataTestDir.getAbsolutePath());
532     } else {
533       Path base = getBaseTestDirOnTestFS();
534       String randomStr = UUID.randomUUID().toString();
535       newDataTestDir = new Path(base, randomStr);
536       if (deleteOnExit()) fs.deleteOnExit(newDataTestDir);
537     }
538     return newDataTestDir;
539   }
540 
541   /**
542    * Cleans the test data directory on the test filesystem.
543    * @return True if we removed the test dirs
544    * @throws IOException
545    */
546   public boolean cleanupDataTestDirOnTestFS() throws IOException {
547     boolean ret = getTestFileSystem().delete(dataTestDirOnTestFS, true);
548     if (ret)
549       dataTestDirOnTestFS = null;
550     return ret;
551   }
552 
553   /**
554    * Cleans a subdirectory under the test data directory on the test filesystem.
555    * @return True if we removed child
556    * @throws IOException
557    */
558   public boolean cleanupDataTestDirOnTestFS(String subdirName) throws IOException {
559     Path cpath = getDataTestDirOnTestFS(subdirName);
560     return getTestFileSystem().delete(cpath, true);
561   }
562 
563   /**
564    * Start a minidfscluster.
565    * @param servers How many DNs to start.
566    * @throws Exception
567    * @see {@link #shutdownMiniDFSCluster()}
568    * @return The mini dfs cluster created.
569    */
570   public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
571     return startMiniDFSCluster(servers, null);
572   }
573 
574   /**
575    * Start a minidfscluster.
576    * This is useful if you want to run datanode on distinct hosts for things
577    * like HDFS block location verification.
578    * If you start MiniDFSCluster without host names, all instances of the
579    * datanodes will have the same host name.
580    * @param hosts hostnames DNs to run on.
581    * @throws Exception
582    * @see {@link #shutdownMiniDFSCluster()}
583    * @return The mini dfs cluster created.
584    */
585   public MiniDFSCluster startMiniDFSCluster(final String hosts[])
586   throws Exception {
587     if ( hosts != null && hosts.length != 0) {
588       return startMiniDFSCluster(hosts.length, hosts);
589     } else {
590       return startMiniDFSCluster(1, null);
591     }
592   }
593 
594   /**
595    * Start a minidfscluster.
596    * Can only create one.
597    * @param servers How many DNs to start.
598    * @param hosts hostnames DNs to run on.
599    * @throws Exception
600    * @see {@link #shutdownMiniDFSCluster()}
601    * @return The mini dfs cluster created.
602    */
603   public MiniDFSCluster startMiniDFSCluster(int servers, final String hosts[])
604   throws Exception {
605     createDirsAndSetProperties();
606     EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
607 
608     // Error level to skip some warnings specific to the minicluster. See HBASE-4709
609     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.util.MBeans.class).
610         setLevel(org.apache.log4j.Level.ERROR);
611     org.apache.log4j.Logger.getLogger(org.apache.hadoop.metrics2.impl.MetricsSystemImpl.class).
612         setLevel(org.apache.log4j.Level.ERROR);
613 
614 
615     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
616       true, null, null, hosts, null);
617 
618     // Set this just-started cluster as our filesystem.
619     setFs();
620 
621     // Wait for the cluster to be totally up
622     this.dfsCluster.waitClusterUp();
623 
624     //reset the test directory for test file system
625     dataTestDirOnTestFS = null;
626 
627     return this.dfsCluster;
628   }
629 
630   private void setFs() throws IOException {
631     if(this.dfsCluster == null){
632       LOG.info("Skipping setting fs because dfsCluster is null");
633       return;
634     }
635     FileSystem fs = this.dfsCluster.getFileSystem();
636     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
637   }
638 
639   public MiniDFSCluster startMiniDFSCluster(int servers, final  String racks[], String hosts[])
640       throws Exception {
641     createDirsAndSetProperties();
642     this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
643         true, null, racks, hosts, null);
644 
645     // Set this just-started cluster as our filesystem.
646     FileSystem fs = this.dfsCluster.getFileSystem();
647     FSUtils.setFsDefault(this.conf, new Path(fs.getUri()));
648 
649     // Wait for the cluster to be totally up
650     this.dfsCluster.waitClusterUp();
651 
652     //reset the test directory for test file system
653     dataTestDirOnTestFS = null;
654 
655     return this.dfsCluster;
656   }
657 
658   public MiniDFSCluster startMiniDFSClusterForTestWAL(int namenodePort) throws IOException {
659     createDirsAndSetProperties();
660     dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
661         null, null, null);
662     return dfsCluster;
663   }
664 
665   /** This is used before starting HDFS and map-reduce mini-clusters */
666   private void createDirsAndSetProperties() throws IOException {
667     setupClusterTestDir();
668     System.setProperty(TEST_DIRECTORY_KEY, clusterTestDir.getPath());
669     createDirAndSetProperty("cache_data", "test.cache.data");
670     createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir");
671     hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir");
672     createDirAndSetProperty("mapred_local", "mapreduce.cluster.local.dir");
673     createDirAndSetProperty("mapred_temp", "mapreduce.cluster.temp.dir");
674     enableShortCircuit();
675 
676     Path root = getDataTestDirOnTestFS("hadoop");
677     conf.set(MapreduceTestingShim.getMROutputDirProp(),
678       new Path(root, "mapred-output-dir").toString());
679     conf.set("mapreduce.jobtracker.system.dir", new Path(root, "mapred-system-dir").toString());
680     conf.set("mapreduce.jobtracker.staging.root.dir",
681       new Path(root, "mapreduce-jobtracker-staging-root-dir").toString());
682     conf.set("mapreduce.job.working.dir", new Path(root, "mapred-working-dir").toString());
683   }
684 
685 
686   /**
687    *  Get the HBase setting for dfs.client.read.shortcircuit from the conf or a system property.
688    *  This allows to specify this parameter on the command line.
689    *   If not set, default is true.
690    */
691   public boolean isReadShortCircuitOn(){
692     final String propName = "hbase.tests.use.shortcircuit.reads";
693     String readOnProp = System.getProperty(propName);
694     if (readOnProp != null){
695       return  Boolean.parseBoolean(readOnProp);
696     } else {
697       return conf.getBoolean(propName, false);
698     }
699   }
700 
701   /** Enable the short circuit read, unless configured differently.
702    * Set both HBase and HDFS settings, including skipping the hdfs checksum checks.
703    */
704   private void enableShortCircuit() {
705     if (isReadShortCircuitOn()) {
706       String curUser = System.getProperty("user.name");
707       LOG.info("read short circuit is ON for user " + curUser);
708       // read short circuit, for hdfs
709       conf.set("dfs.block.local-path-access.user", curUser);
710       // read short circuit, for hbase
711       conf.setBoolean("dfs.client.read.shortcircuit", true);
712       // Skip checking checksum, for the hdfs client and the datanode
713       conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", true);
714     } else {
715       LOG.info("read short circuit is OFF");
716     }
717   }
718 
719   private String createDirAndSetProperty(final String relPath, String property) {
720     String path = getDataTestDir(relPath).toString();
721     System.setProperty(property, path);
722     conf.set(property, path);
723     new File(path).mkdirs();
724     LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf");
725     return path;
726   }
727 
728   /**
729    * Shuts down instance created by call to {@link #startMiniDFSCluster(int)}
730    * or does nothing.
731    * @throws IOException
732    */
733   public void shutdownMiniDFSCluster() throws IOException {
734     if (this.dfsCluster != null) {
735       // The below throws an exception per dn, AsynchronousCloseException.
736       this.dfsCluster.shutdown();
737       dfsCluster = null;
738       dataTestDirOnTestFS = null;
739       FSUtils.setFsDefault(this.conf, new Path("file:///"));
740     }
741   }
742 
743   /**
744    * Call this if you only want a zk cluster.
745    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
746    * @throws Exception
747    * @see #shutdownMiniZKCluster()
748    * @return zk cluster started.
749    */
750   public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
751     return startMiniZKCluster(1);
752   }
753 
754   /**
755    * Call this if you only want a zk cluster.
756    * @param zooKeeperServerNum
757    * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
758    * @throws Exception
759    * @see #shutdownMiniZKCluster()
760    * @return zk cluster started.
761    */
762   public MiniZooKeeperCluster startMiniZKCluster(
763       final int zooKeeperServerNum,
764       final int ... clientPortList)
765       throws Exception {
766     setupClusterTestDir();
767     return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList);
768   }
769 
770   private MiniZooKeeperCluster startMiniZKCluster(final File dir)
771     throws Exception {
772     return startMiniZKCluster(dir, 1, null);
773   }
774 
775   /**
776    * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
777    *  the port mentionned is used as the default port for ZooKeeper.
778    */
779   private MiniZooKeeperCluster startMiniZKCluster(final File dir,
780       final int zooKeeperServerNum,
781       final int [] clientPortList)
782   throws Exception {
783     if (this.zkCluster != null) {
784       throw new IOException("Cluster already running at " + dir);
785     }
786     this.passedZkCluster = false;
787     this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
788     final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
789     if (defPort > 0){
790       // If there is a port in the config file, we use it.
791       this.zkCluster.setDefaultClientPort(defPort);
792     }
793 
794     if (clientPortList != null) {
795       // Ignore extra client ports
796       int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ?
797           clientPortList.length : zooKeeperServerNum;
798       for (int i=0; i < clientPortListSize; i++) {
799         this.zkCluster.addClientPort(clientPortList[i]);
800       }
801     }
802     int clientPort =   this.zkCluster.startup(dir,zooKeeperServerNum);
803     this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,
804       Integer.toString(clientPort));
805     return this.zkCluster;
806   }
807 
808   /**
809    * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)}
810    * or does nothing.
811    * @throws IOException
812    * @see #startMiniZKCluster()
813    */
814   public void shutdownMiniZKCluster() throws IOException {
815     if (this.zkCluster != null) {
816       this.zkCluster.shutdown();
817       this.zkCluster = null;
818     }
819   }
820 
821   /**
822    * Start up a minicluster of hbase, dfs, and zookeeper.
823    * @throws Exception
824    * @return Mini hbase cluster instance created.
825    * @see {@link #shutdownMiniDFSCluster()}
826    */
827   public MiniHBaseCluster startMiniCluster() throws Exception {
828     return startMiniCluster(1, 1);
829   }
830 
831   /**
832    * Start up a minicluster of hbase, dfs, and zookeeper where WAL's walDir is created separately.
833    * @throws Exception
834    * @return Mini hbase cluster instance created.
835    * @see {@link #shutdownMiniDFSCluster()}
836    */
837   public MiniHBaseCluster startMiniCluster(boolean withWALDir) throws Exception {
838     return startMiniCluster(1, 1, 1, null, null, null, false, withWALDir);
839   }
840 
841   /**
842    * Start up a minicluster of hbase, dfs, and zookeeper.
843    * Set the <code>create</code> flag to create root or data directory path or not
844    * (will overwrite if dir already exists)
845    * @throws Exception
846    * @return Mini hbase cluster instance created.
847    * @see {@link #shutdownMiniDFSCluster()}
848    */
849   public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
850   throws Exception {
851     return startMiniCluster(1, numSlaves, create);
852   }
853 
854   /**
855    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
856    * Modifies Configuration.  Homes the cluster data directory under a random
857    * subdirectory in a directory under System property test.build.data.
858    * Directory is cleaned up on exit.
859    * @param numSlaves Number of slaves to start up.  We'll start this many
860    * datanodes and regionservers.  If numSlaves is > 1, then make sure
861    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
862    * bind errors.
863    * @throws Exception
864    * @see {@link #shutdownMiniCluster()}
865    * @return Mini hbase cluster instance created.
866    */
867   public MiniHBaseCluster startMiniCluster(final int numSlaves)
868   throws Exception {
869     return startMiniCluster(1, numSlaves, false);
870   }
871 
872   public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create, boolean withWALDir)
873           throws Exception {
874     return startMiniCluster(1, numSlaves, numSlaves, null, null, null, create, withWALDir);
875   }
876 
877   /**
878    * Start minicluster. Whether to create a new root or data dir path even if such a path
879    * has been created earlier is decided based on flag <code>create</code>
880    * @throws Exception
881    * @see {@link #shutdownMiniCluster()}
882    * @return Mini hbase cluster instance created.
883    */
884   public MiniHBaseCluster startMiniCluster(final int numMasters,
885       final int numSlaves, boolean create)
886     throws Exception {
887       return startMiniCluster(numMasters, numSlaves, null, create);
888   }
889 
890   /**
891    * start minicluster
892    * @throws Exception
893    * @see {@link #shutdownMiniCluster()}
894    * @return Mini hbase cluster instance created.
895    */
896   public MiniHBaseCluster startMiniCluster(final int numMasters,
897     final int numSlaves)
898   throws Exception {
899     return startMiniCluster(numMasters, numSlaves, null, false);
900   }
901 
902   public MiniHBaseCluster startMiniCluster(final int numMasters,
903       final int numSlaves, final String[] dataNodeHosts, boolean create)
904       throws Exception {
905     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
906         null, null, create, false);
907   }
908 
909   /**
910    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
911    * Modifies Configuration.  Homes the cluster data directory under a random
912    * subdirectory in a directory under System property test.build.data.
913    * Directory is cleaned up on exit.
914    * @param numMasters Number of masters to start up.  We'll start this many
915    * hbase masters.  If numMasters > 1, you can find the active/primary master
916    * with {@link MiniHBaseCluster#getMaster()}.
917    * @param numSlaves Number of slaves to start up.  We'll start this many
918    * regionservers. If dataNodeHosts == null, this also indicates the number of
919    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
920    * based on dataNodeHosts.length.
921    * If numSlaves is > 1, then make sure
922    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
923    * bind errors.
924    * @param dataNodeHosts hostnames DNs to run on.
925    * This is useful if you want to run datanode on distinct hosts for things
926    * like HDFS block location verification.
927    * If you start MiniDFSCluster without host names,
928    * all instances of the datanodes will have the same host name.
929    * @throws Exception
930    * @see {@link #shutdownMiniCluster()}
931    * @return Mini hbase cluster instance created.
932    */
933   public MiniHBaseCluster startMiniCluster(final int numMasters,
934       final int numSlaves, final String[] dataNodeHosts) throws Exception {
935     return startMiniCluster(numMasters, numSlaves, numSlaves, dataNodeHosts,
936         null, null);
937   }
938 
939   /**
940    * Same as {@link #startMiniCluster(int, int)}, but with custom number of datanodes.
941    * @param numDataNodes Number of data nodes.
942    */
943   public MiniHBaseCluster startMiniCluster(final int numMasters,
944       final int numSlaves, final int numDataNodes) throws Exception {
945     return startMiniCluster(numMasters, numSlaves, numDataNodes, null, null, null);
946   }
947 
948   /**
949    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
950    * Modifies Configuration.  Homes the cluster data directory under a random
951    * subdirectory in a directory under System property test.build.data.
952    * Directory is cleaned up on exit.
953    * @param numMasters Number of masters to start up.  We'll start this many
954    * hbase masters.  If numMasters > 1, you can find the active/primary master
955    * with {@link MiniHBaseCluster#getMaster()}.
956    * @param numSlaves Number of slaves to start up.  We'll start this many
957    * regionservers. If dataNodeHosts == null, this also indicates the number of
958    * datanodes to start. If dataNodeHosts != null, the number of datanodes is
959    * based on dataNodeHosts.length.
960    * If numSlaves is > 1, then make sure
961    * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
962    * bind errors.
963    * @param dataNodeHosts hostnames DNs to run on.
964    * This is useful if you want to run datanode on distinct hosts for things
965    * like HDFS block location verification.
966    * If you start MiniDFSCluster without host names,
967    * all instances of the datanodes will have the same host name.
968    * @param masterClass The class to use as HMaster, or null for default
969    * @param regionserverClass The class to use as HRegionServer, or null for
970    * default
971    * @throws Exception
972    * @see {@link #shutdownMiniCluster()}
973    * @return Mini hbase cluster instance created.
974    */
975   public MiniHBaseCluster startMiniCluster(final int numMasters,
976       final int numSlaves, final String[] dataNodeHosts, Class<? extends HMaster> masterClass,
977       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
978           throws Exception {
979     return startMiniCluster(
980         numMasters, numSlaves, numSlaves, dataNodeHosts, masterClass, regionserverClass);
981   }
982 
983   public MiniHBaseCluster startMiniCluster(final int numMasters,
984       final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
985       Class<? extends HMaster> masterClass,
986       Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
987     throws Exception {
988     return startMiniCluster(numMasters, numSlaves, numDataNodes, dataNodeHosts,
989         masterClass, regionserverClass, false, false);
990   }
991 
992   /**
993    * Same as {@link #startMiniCluster(int, int, String[], Class, Class)}, but with custom
994    * number of datanodes.
995    * @param numDataNodes Number of data nodes.
996    * @param create Set this flag to create a new
997    * root or data directory path or not (will overwrite if exists already).
998    */
999   public MiniHBaseCluster startMiniCluster(final int numMasters,
1000     final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
1001     Class<? extends HMaster> masterClass,
1002     Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1003     boolean create, boolean withWALDir)
1004   throws Exception {
1005     if (dataNodeHosts != null && dataNodeHosts.length != 0) {
1006       numDataNodes = dataNodeHosts.length;
1007     }
1008 
1009     LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
1010         numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
1011 
1012     // If we already put up a cluster, fail.
1013     if (miniClusterRunning) {
1014       throw new IllegalStateException("A mini-cluster is already running");
1015     }
1016     miniClusterRunning = true;
1017 
1018     setupClusterTestDir();
1019     System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
1020 
1021     // Bring up mini dfs cluster. This spews a bunch of warnings about missing
1022     // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
1023     if(this.dfsCluster == null) {
1024       dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
1025     }
1026 
1027     // Start up a zk cluster.
1028     if (this.zkCluster == null) {
1029       startMiniZKCluster(clusterTestDir);
1030     }
1031 
1032     // Start the MiniHBaseCluster
1033     return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
1034       regionserverClass, create, withWALDir);
1035   }
1036 
1037   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
1038       throws IOException, InterruptedException{
1039     return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false);
1040   }
1041 
1042   /**
1043    * Starts up mini hbase cluster.  Usually used after call to
1044    * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
1045    * Usually you won't want this.  You'll usually want {@link #startMiniCluster()}.
1046    * @param numMasters
1047    * @param numSlaves
1048    * @param create Whether to create a
1049    * root or data directory path or not; will overwrite if exists already.
1050    * @return Reference to the hbase mini hbase cluster.
1051    * @throws IOException
1052    * @throws InterruptedException
1053    * @see {@link #startMiniCluster()}
1054    */
1055   public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
1056         final int numSlaves, Class<? extends HMaster> masterClass,
1057         Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
1058         boolean create, boolean withWALDir)
1059   throws IOException, InterruptedException {
1060     // Now do the mini hbase cluster.  Set the hbase.rootdir in config.
1061     createRootDir(create);
1062 
1063     if (withWALDir) {
1064       createWALRootDir();
1065     }
1066 
1067     // These settings will make the server waits until this exact number of
1068     // regions servers are connected.
1069     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
1070       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, numSlaves);
1071     }
1072     if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
1073       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, numSlaves);
1074     }
1075 
1076     Configuration c = new Configuration(this.conf);
1077     this.hbaseCluster =
1078         new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
1079     // Don't leave here till we've done a successful scan of the hbase:meta
1080     Table t = new HTable(c, TableName.META_TABLE_NAME);
1081     ResultScanner s = t.getScanner(new Scan());
1082     while (s.next() != null) {
1083       continue;
1084     }
1085     s.close();
1086     t.close();
1087 
1088     getHBaseAdmin(); // create immediately the hbaseAdmin
1089     LOG.info("Minicluster is up");
1090 
1091     // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is
1092     // for tests that do not read hbase-defaults.xml
1093     setHBaseFsTmpDir();
1094 
1095     return (MiniHBaseCluster)this.hbaseCluster;
1096   }
1097 
1098   /**
1099    * Starts the hbase cluster up again after shutting it down previously in a
1100    * test.  Use this if you want to keep dfs/zk up and just stop/start hbase.
1101    * @param servers number of region servers
1102    * @throws IOException
1103    */
1104   public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
1105     this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
1106     // Don't leave here till we've done a successful scan of the hbase:meta
1107     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
1108     ResultScanner s = t.getScanner(new Scan());
1109     while (s.next() != null) {
1110       // do nothing
1111     }
1112     LOG.info("HBase has been restarted");
1113     s.close();
1114     t.close();
1115   }
1116 
1117   /**
1118    * @return Current mini hbase cluster. Only has something in it after a call
1119    * to {@link #startMiniCluster()}.
1120    * @see #startMiniCluster()
1121    */
1122   public MiniHBaseCluster getMiniHBaseCluster() {
1123     if (this.hbaseCluster == null || this.hbaseCluster instanceof MiniHBaseCluster) {
1124       return (MiniHBaseCluster)this.hbaseCluster;
1125     }
1126     throw new RuntimeException(hbaseCluster + " not an instance of " +
1127                                MiniHBaseCluster.class.getName());
1128   }
1129 
1130   /**
1131    * Stops mini hbase, zk, and hdfs clusters.
1132    * @throws IOException
1133    * @see {@link #startMiniCluster(int)}
1134    */
1135   public void shutdownMiniCluster() throws Exception {
1136     LOG.info("Shutting down minicluster");
1137     if (this.connection != null && !this.connection.isClosed()) {
1138       this.connection.close();
1139       this.connection = null;
1140     }
1141     shutdownMiniHBaseCluster();
1142     if (!this.passedZkCluster){
1143       shutdownMiniZKCluster();
1144     }
1145     shutdownMiniDFSCluster();
1146 
1147     cleanupTestDir();
1148     miniClusterRunning = false;
1149     LOG.info("Minicluster is down");
1150   }
1151 
1152   /**
1153    * @return True if we removed the test dirs
1154    * @throws IOException
1155    */
1156   @Override
1157   public boolean cleanupTestDir() throws IOException {
1158     boolean ret = super.cleanupTestDir();
1159     if (deleteDir(this.clusterTestDir)) {
1160       this.clusterTestDir = null;
1161       return ret & true;
1162     }
1163     return false;
1164   }
1165 
1166   /**
1167    * Shutdown HBase mini cluster.  Does not shutdown zk or dfs if running.
1168    * @throws IOException
1169    */
1170   public void shutdownMiniHBaseCluster() throws IOException {
1171     if (hbaseAdmin != null) {
1172       hbaseAdmin.close0();
1173       hbaseAdmin = null;
1174     }
1175 
1176     // unset the configuration for MIN and MAX RS to start
1177     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
1178     conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
1179     if (this.hbaseCluster != null) {
1180       this.hbaseCluster.shutdown();
1181       // Wait till hbase is down before going on to shutdown zk.
1182       this.hbaseCluster.waitUntilShutDown();
1183       this.hbaseCluster = null;
1184     }
1185 
1186     if (zooKeeperWatcher != null) {
1187       zooKeeperWatcher.close();
1188       zooKeeperWatcher = null;
1189     }
1190   }
1191 
1192   /**
1193    * Returns the path to the default root dir the minicluster uses. If <code>create</code>
1194    * is true, a new root directory path is fetched irrespective of whether it has been fetched
1195    * before or not. If false, previous path is used.
1196    * Note: this does not cause the root dir to be created.
1197    * @return Fully qualified path for the default hbase root dir
1198    * @throws IOException
1199    */
1200   public Path getDefaultRootDirPath(boolean create) throws IOException {
1201     if (!create) {
1202       return getDataTestDirOnTestFS();
1203     } else {
1204       return getNewDataTestDirOnTestFS();
1205     }
1206   }
1207 
1208   /**
1209    * Same as {{@link HBaseTestingUtility#getDefaultRootDirPath(boolean create)}
1210    * except that <code>create</code> flag is false.
1211    * Note: this does not cause the root dir to be created.
1212    * @return Fully qualified path for the default hbase root dir
1213    * @throws IOException
1214    */
1215   public Path getDefaultRootDirPath() throws IOException {
1216     return getDefaultRootDirPath(false);
1217   }
1218 
1219   /**
1220    * Creates an hbase rootdir in user home directory.  Also creates hbase
1221    * version file.  Normally you won't make use of this method.  Root hbasedir
1222    * is created for you as part of mini cluster startup.  You'd only use this
1223    * method if you were doing manual operation.
1224    * @param create This flag decides whether to get a new
1225    * root or data directory path or not, if it has been fetched already.
1226    * Note : Directory will be made irrespective of whether path has been fetched or not.
1227    * If directory already exists, it will be overwritten
1228    * @return Fully qualified path to hbase root dir
1229    * @throws IOException
1230    */
1231   public Path createRootDir(boolean create) throws IOException {
1232     FileSystem fs = FileSystem.get(this.conf);
1233     Path hbaseRootdir = getDefaultRootDirPath(create);
1234     FSUtils.setRootDir(this.conf, hbaseRootdir);
1235     fs.mkdirs(hbaseRootdir);
1236     FSUtils.setVersion(fs, hbaseRootdir);
1237     return hbaseRootdir;
1238   }
1239 
1240   /**
1241    * Same as {@link HBaseTestingUtility#createRootDir(boolean create)}
1242    * except that <code>create</code> flag is false.
1243    * @return Fully qualified path to hbase root dir
1244    * @throws IOException
1245    */
1246   public Path createRootDir() throws IOException {
1247     return createRootDir(false);
1248   }
1249 
1250   /**
1251    * Creates a hbase walDir in the user's home directory.
1252    * Normally you won't make use of this method. Root hbaseWALDir
1253    * is created for you as part of mini cluster startup. You'd only use this
1254    * method if you were doing manual operation.
1255    *
1256    * @return Fully qualified path to hbase root dir
1257    * @throws IOException
1258   */
1259   public Path createWALRootDir() throws IOException {
1260     FileSystem fs = FileSystem.get(this.conf);
1261     Path walDir = getNewDataTestDirOnTestFS();
1262     FSUtils.setWALRootDir(this.conf, walDir);
1263     fs.mkdirs(walDir);
1264     return walDir;
1265   }
1266 
1267   private void setHBaseFsTmpDir() throws IOException {
1268     String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
1269     if (hbaseFsTmpDirInString == null) {
1270       this.conf.set("hbase.fs.tmp.dir",  getDataTestDirOnTestFS("hbase-staging").toString());
1271       LOG.info("Setting hbase.fs.tmp.dir to " + this.conf.get("hbase.fs.tmp.dir"));
1272     } else {
1273       LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString);
1274     }
1275   }
1276 
1277   /**
1278    * Flushes all caches in the mini hbase cluster
1279    * @throws IOException
1280    */
1281   public void flush() throws IOException {
1282     getMiniHBaseCluster().flushcache();
1283   }
1284 
1285   /**
1286    * Flushes all caches in the mini hbase cluster
1287    * @throws IOException
1288    */
1289   public void flush(TableName tableName) throws IOException {
1290     getMiniHBaseCluster().flushcache(tableName);
1291   }
1292 
1293   /**
1294    * Compact all regions in the mini hbase cluster
1295    * @throws IOException
1296    */
1297   public void compact(boolean major) throws IOException {
1298     getMiniHBaseCluster().compact(major);
1299   }
1300 
1301   /**
1302    * Compact all of a table's reagion in the mini hbase cluster
1303    * @throws IOException
1304    */
1305   public void compact(TableName tableName, boolean major) throws IOException {
1306     getMiniHBaseCluster().compact(tableName, major);
1307   }
1308 
1309   /**
1310    * Create a table.
1311    * @param tableName
1312    * @param family
1313    * @return An HTable instance for the created table.
1314    * @throws IOException
1315    */
1316   public Table createTable(TableName tableName, String family)
1317   throws IOException{
1318     return createTable(tableName, new String[]{family});
1319   }
1320 
1321   /**
1322    * Create a table.
1323    * @param tableName
1324    * @param family
1325    * @return An HTable instance for the created table.
1326    * @throws IOException
1327    */
1328   public HTable createTable(byte[] tableName, byte[] family)
1329   throws IOException{
1330     return createTable(TableName.valueOf(tableName), new byte[][]{family});
1331   }
1332 
1333   /**
1334    * Create a table.
1335    * @param tableName
1336    * @param families
1337    * @return An HTable instance for the created table.
1338    * @throws IOException
1339    */
1340   public Table createTable(TableName tableName, String[] families)
1341   throws IOException {
1342     List<byte[]> fams = new ArrayList<byte[]>(families.length);
1343     for (String family : families) {
1344       fams.add(Bytes.toBytes(family));
1345     }
1346     return createTable(tableName, fams.toArray(new byte[0][]));
1347   }
1348 
1349   /**
1350    * Create a table.
1351    * @param tableName
1352    * @param family
1353    * @return An HTable instance for the created table.
1354    * @throws IOException
1355    */
1356   public HTable createTable(TableName tableName, byte[] family)
1357   throws IOException{
1358     return createTable(tableName, new byte[][]{family});
1359   }
1360 
1361   /**
1362    * Create a table with multiple regions.
1363    * @param tableName
1364    * @param family
1365    * @param numRegions
1366    * @return An HTable instance for the created table.
1367    * @throws IOException
1368    */
1369   public HTable createMultiRegionTable(TableName tableName, byte[] family, int numRegions)
1370       throws IOException {
1371     if (numRegions < 3) throw new IOException("Must create at least 3 regions");
1372     byte[] startKey = Bytes.toBytes("aaaaa");
1373     byte[] endKey = Bytes.toBytes("zzzzz");
1374     byte[][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
1375 
1376     return createTable(tableName, new byte[][] { family }, splitKeys);
1377   }
1378 
1379 
1380   /**
1381    * Create a table.
1382    * @param tableName
1383    * @param families
1384    * @return An HTable instance for the created table.
1385    * @throws IOException
1386    */
1387   public HTable createTable(byte[] tableName, byte[][] families)
1388   throws IOException {
1389     return createTable(tableName, families,
1390         new Configuration(getConfiguration()));
1391   }
1392 
1393   /**
1394    * Create a table.
1395    * @param tableName
1396    * @param families
1397    * @return An HTable instance for the created table.
1398    * @throws IOException
1399    */
1400   public HTable createTable(TableName tableName, byte[][] families)
1401   throws IOException {
1402     return createTable(tableName, families, (byte[][]) null);
1403   }
1404 
1405   /**
1406    * Create a table with multiple regions.
1407    * @param tableName
1408    * @param families
1409    * @return An HTable instance for the created table.
1410    * @throws IOException
1411    */
1412   public HTable createMultiRegionTable(TableName tableName, byte[][] families) throws IOException {
1413     return createTable(tableName, families, KEYS_FOR_HBA_CREATE_TABLE);
1414   }
1415 
1416   /**
1417    * Create a table.
1418    * @param tableName
1419    * @param families
1420    * @param splitKeys
1421    * @return An HTable instance for the created table.
1422    * @throws IOException
1423    */
1424   public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys)
1425       throws IOException {
1426     return createTable(tableName, families, splitKeys, new Configuration(getConfiguration()));
1427   }
1428 
1429   public HTable createTable(byte[] tableName, byte[][] families,
1430       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1431     return createTable(TableName.valueOf(tableName), families, numVersions,
1432         startKey, endKey, numRegions);
1433   }
1434 
1435   public HTable createTable(String tableName, byte[][] families,
1436       int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException {
1437     return createTable(TableName.valueOf(tableName), families, numVersions,
1438         startKey, endKey, numRegions);
1439   }
1440 
1441   public HTable createTable(TableName tableName, byte[][] families,
1442       int numVersions, byte[] startKey, byte[] endKey, int numRegions)
1443   throws IOException{
1444     HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
1445 
1446     getHBaseAdmin().createTable(desc, startKey, endKey, numRegions);
1447     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1448     waitUntilAllRegionsAssigned(tableName);
1449     return new HTable(getConfiguration(), tableName);
1450   }
1451 
1452   /**
1453    * Create a table.
1454    * @param htd
1455    * @param families
1456    * @param c Configuration to use
1457    * @return An HTable instance for the created table.
1458    * @throws IOException
1459    */
1460   public HTable createTable(HTableDescriptor htd, byte[][] families, Configuration c)
1461   throws IOException {
1462     return createTable(htd, families, (byte[][]) null, c);
1463   }
1464 
1465   /**
1466    * Create a table.
1467    * @param htd
1468    * @param families
1469    * @param splitKeys
1470    * @param c Configuration to use
1471    * @return An HTable instance for the created table.
1472    * @throws IOException
1473    */
1474   public HTable createTable(HTableDescriptor htd, byte[][] families, byte[][] splitKeys,
1475       Configuration c) throws IOException {
1476     for (byte[] family : families) {
1477       HColumnDescriptor hcd = new HColumnDescriptor(family);
1478       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1479       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1480       // on is interfering.
1481       hcd.setBloomFilterType(BloomType.NONE);
1482       htd.addFamily(hcd);
1483     }
1484     getHBaseAdmin().createTable(htd, splitKeys);
1485     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are
1486     // assigned
1487     waitUntilAllRegionsAssigned(htd.getTableName());
1488     return (HTable) getConnection().getTable(htd.getTableName());
1489   }
1490 
1491   /**
1492    * Create a table.
1493    * @param htd
1494    * @param splitRows
1495    * @return An HTable instance for the created table.
1496    * @throws IOException
1497    */
1498   public HTable createTable(HTableDescriptor htd, byte[][] splitRows)
1499       throws IOException {
1500     getHBaseAdmin().createTable(htd, splitRows);
1501     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1502     waitUntilAllRegionsAssigned(htd.getTableName());
1503     return new HTable(getConfiguration(), htd.getTableName());
1504   }
1505 
1506   /**
1507    * Create a table.
1508    * @param tableName
1509    * @param families
1510    * @param c Configuration to use
1511    * @return An HTable instance for the created table.
1512    * @throws IOException
1513    */
1514   public HTable createTable(TableName tableName, byte[][] families,
1515       final Configuration c)
1516   throws IOException {
1517     return createTable(tableName, families, (byte[][]) null, c);
1518   }
1519 
1520   /**
1521    * Create a table.
1522    * @param tableName
1523    * @param families
1524    * @param splitKeys
1525    * @param c Configuration to use
1526    * @return An HTable instance for the created table.
1527    * @throws IOException
1528    */
1529   public HTable createTable(TableName tableName, byte[][] families, byte[][] splitKeys,
1530       final Configuration c) throws IOException {
1531     return createTable(new HTableDescriptor(tableName), families, splitKeys, c);
1532   }
1533 
1534   /**
1535    * Create a table.
1536    * @param tableName
1537    * @param families
1538    * @param c Configuration to use
1539    * @return An HTable instance for the created table.
1540    * @throws IOException
1541    */
1542   public HTable createTable(byte[] tableName, byte[][] families,
1543       final Configuration c)
1544   throws IOException {
1545     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1546     for(byte[] family : families) {
1547       HColumnDescriptor hcd = new HColumnDescriptor(family);
1548       // Disable blooms (they are on by default as of 0.95) but we disable them here because
1549       // tests have hard coded counts of what to expect in block cache, etc., and blooms being
1550       // on is interfering.
1551       hcd.setBloomFilterType(BloomType.NONE);
1552       desc.addFamily(hcd);
1553     }
1554     getHBaseAdmin().createTable(desc);
1555     return new HTable(c, desc.getTableName());
1556   }
1557 
1558   /**
1559    * Create a table.
1560    * @param tableName
1561    * @param families
1562    * @param c Configuration to use
1563    * @param numVersions
1564    * @return An HTable instance for the created table.
1565    * @throws IOException
1566    */
1567   public HTable createTable(TableName tableName, byte[][] families,
1568       final Configuration c, int numVersions)
1569   throws IOException {
1570     HTableDescriptor desc = new HTableDescriptor(tableName);
1571     for(byte[] family : families) {
1572       HColumnDescriptor hcd = new HColumnDescriptor(family)
1573           .setMaxVersions(numVersions);
1574       desc.addFamily(hcd);
1575     }
1576     getHBaseAdmin().createTable(desc);
1577     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1578     waitUntilAllRegionsAssigned(tableName);
1579     return new HTable(c, tableName);
1580   }
1581 
1582   /**
1583    * Create a table.
1584    * @param tableName
1585    * @param families
1586    * @param c Configuration to use
1587    * @param numVersions
1588    * @return An HTable instance for the created table.
1589    * @throws IOException
1590    */
1591   public HTable createTable(byte[] tableName, byte[][] families,
1592       final Configuration c, int numVersions)
1593   throws IOException {
1594     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1595     for(byte[] family : families) {
1596       HColumnDescriptor hcd = new HColumnDescriptor(family)
1597           .setMaxVersions(numVersions);
1598       desc.addFamily(hcd);
1599     }
1600     getHBaseAdmin().createTable(desc);
1601     return new HTable(c, desc.getTableName());
1602   }
1603 
1604   /**
1605    * Create a table.
1606    * @param tableName
1607    * @param family
1608    * @param numVersions
1609    * @return An HTable instance for the created table.
1610    * @throws IOException
1611    */
1612   public HTable createTable(byte[] tableName, byte[] family, int numVersions)
1613   throws IOException {
1614     return createTable(tableName, new byte[][]{family}, numVersions);
1615   }
1616 
1617   /**
1618    * Create a table.
1619    * @param tableName
1620    * @param family
1621    * @param numVersions
1622    * @return An HTable instance for the created table.
1623    * @throws IOException
1624    */
1625   public HTable createTable(TableName tableName, byte[] family, int numVersions)
1626   throws IOException {
1627     return createTable(tableName, new byte[][]{family}, numVersions);
1628   }
1629 
1630   /**
1631    * Create a table.
1632    * @param tableName
1633    * @param families
1634    * @param numVersions
1635    * @return An HTable instance for the created table.
1636    * @throws IOException
1637    */
1638   public HTable createTable(byte[] tableName, byte[][] families,
1639       int numVersions)
1640   throws IOException {
1641     return createTable(TableName.valueOf(tableName), families, numVersions);
1642   }
1643 
1644   /**
1645    * Create a table.
1646    * @param tableName
1647    * @param families
1648    * @param numVersions
1649    * @return An HTable instance for the created table.
1650    * @throws IOException
1651    */
1652   public HTable createTable(TableName tableName, byte[][] families,
1653       int numVersions)
1654   throws IOException {
1655     return createTable(tableName, families, numVersions, (byte[][]) null);
1656   }
1657 
1658   /**
1659    * Create a table.
1660    * @param tableName
1661    * @param families
1662    * @param numVersions
1663    * @param splitKeys
1664    * @return An HTable instance for the created table.
1665    * @throws IOException
1666    */
1667   public HTable createTable(TableName tableName, byte[][] families, int numVersions,
1668       byte[][] splitKeys) throws IOException {
1669     HTableDescriptor desc = new HTableDescriptor(tableName);
1670     for (byte[] family : families) {
1671       HColumnDescriptor hcd = new HColumnDescriptor(family).setMaxVersions(numVersions);
1672       desc.addFamily(hcd);
1673     }
1674     getHBaseAdmin().createTable(desc, splitKeys);
1675     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1676     waitUntilAllRegionsAssigned(tableName);
1677     return new HTable(new Configuration(getConfiguration()), tableName);
1678   }
1679 
1680   /**
1681    * Create a table with multiple regions.
1682    * @param tableName
1683    * @param families
1684    * @param numVersions
1685    * @return An HTable instance for the created table.
1686    * @throws IOException
1687    */
1688   public HTable createMultiRegionTable(TableName tableName, byte[][] families, int numVersions)
1689       throws IOException {
1690     return createTable(tableName, families, numVersions, KEYS_FOR_HBA_CREATE_TABLE);
1691   }
1692 
1693   /**
1694    * Create a table.
1695    * @param tableName
1696    * @param families
1697    * @param numVersions
1698    * @param blockSize
1699    * @return An HTable instance for the created table.
1700    * @throws IOException
1701    */
1702   public HTable createTable(byte[] tableName, byte[][] families,
1703     int numVersions, int blockSize) throws IOException {
1704     return createTable(TableName.valueOf(tableName),
1705         families, numVersions, blockSize);
1706   }
1707 
1708   /**
1709    * Create a table.
1710    * @param tableName
1711    * @param families
1712    * @param numVersions
1713    * @param blockSize
1714    * @return An HTable instance for the created table.
1715    * @throws IOException
1716    */
1717   public HTable createTable(TableName tableName, byte[][] families,
1718     int numVersions, int blockSize) throws IOException {
1719     HTableDescriptor desc = new HTableDescriptor(tableName);
1720     for (byte[] family : families) {
1721       HColumnDescriptor hcd = new HColumnDescriptor(family)
1722           .setMaxVersions(numVersions)
1723           .setBlocksize(blockSize);
1724       desc.addFamily(hcd);
1725     }
1726     getHBaseAdmin().createTable(desc);
1727     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1728     waitUntilAllRegionsAssigned(tableName);
1729     return new HTable(new Configuration(getConfiguration()), tableName);
1730   }
1731 
1732   /**
1733    * Create a table.
1734    * @param tableName
1735    * @param families
1736    * @param numVersions
1737    * @return An HTable instance for the created table.
1738    * @throws IOException
1739    */
1740   public HTable createTable(byte[] tableName, byte[][] families,
1741       int[] numVersions)
1742   throws IOException {
1743     return createTable(TableName.valueOf(tableName), families, numVersions);
1744   }
1745 
1746   /**
1747    * Create a table.
1748    * @param tableName
1749    * @param families
1750    * @param numVersions
1751    * @return An HTable instance for the created table.
1752    * @throws IOException
1753    */
1754   public HTable createTable(TableName tableName, byte[][] families,
1755       int[] numVersions)
1756   throws IOException {
1757     HTableDescriptor desc = new HTableDescriptor(tableName);
1758     int i = 0;
1759     for (byte[] family : families) {
1760       HColumnDescriptor hcd = new HColumnDescriptor(family)
1761           .setMaxVersions(numVersions[i]);
1762       desc.addFamily(hcd);
1763       i++;
1764     }
1765     getHBaseAdmin().createTable(desc);
1766     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1767     waitUntilAllRegionsAssigned(tableName);
1768     return new HTable(new Configuration(getConfiguration()), tableName);
1769   }
1770 
1771   /**
1772    * Create a table.
1773    * @param tableName
1774    * @param family
1775    * @param splitRows
1776    * @return An HTable instance for the created table.
1777    * @throws IOException
1778    */
1779   public HTable createTable(byte[] tableName, byte[] family, byte[][] splitRows)
1780     throws IOException{
1781     return createTable(TableName.valueOf(tableName), family, splitRows);
1782   }
1783 
1784   /**
1785    * Create a table.
1786    * @param tableName
1787    * @param family
1788    * @param splitRows
1789    * @return An HTable instance for the created table.
1790    * @throws IOException
1791    */
1792   public HTable createTable(TableName tableName, byte[] family, byte[][] splitRows)
1793       throws IOException {
1794     HTableDescriptor desc = new HTableDescriptor(tableName);
1795     HColumnDescriptor hcd = new HColumnDescriptor(family);
1796     desc.addFamily(hcd);
1797     getHBaseAdmin().createTable(desc, splitRows);
1798     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1799     waitUntilAllRegionsAssigned(tableName);
1800     return new HTable(getConfiguration(), tableName);
1801   }
1802 
1803   /**
1804    * Create a table with multiple regions.
1805    * @param tableName
1806    * @param family
1807    * @return An HTable instance for the created table.
1808    * @throws IOException
1809    */
1810   public HTable createMultiRegionTable(TableName tableName, byte[] family) throws IOException {
1811     return createTable(tableName, family, KEYS_FOR_HBA_CREATE_TABLE);
1812   }
1813 
1814   /**
1815    * Create a table.
1816    * @param tableNam
1817    * @param families
1818    * @param splitRows
1819    * @return An HTable instance for the created table.
1820    * @throws IOException
1821    */
1822   public HTable createTable(byte[] tableName, byte[][] families, byte[][] splitRows)
1823       throws IOException {
1824     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
1825     for(byte[] family:families) {
1826       HColumnDescriptor hcd = new HColumnDescriptor(family);
1827       desc.addFamily(hcd);
1828     }
1829     getHBaseAdmin().createTable(desc, splitRows);
1830     // HBaseAdmin only waits for regions to appear in hbase:meta we should wait until they are assigned
1831     waitUntilAllRegionsAssigned(desc.getTableName());
1832     return new HTable(getConfiguration(), desc.getTableName());
1833   }
1834 
1835   /**
1836    * Create an unmanaged WAL. Be sure to close it when you're through.
1837    */
1838   public static WAL createWal(final Configuration conf, final Path rootDir, final Path walRootDir, final HRegionInfo hri)
1839       throws IOException {
1840     // The WAL subsystem will use the default rootDir rather than the passed in rootDir
1841     // unless I pass along via the conf.
1842     Configuration confForWAL = new Configuration(conf);
1843     confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
1844     FSUtils.setWALRootDir(confForWAL, walRootDir);
1845     return (new WALFactory(confForWAL,
1846         Collections.<WALActionsListener>singletonList(new MetricsWAL()),
1847         "hregion-" + RandomStringUtils.randomNumeric(8))).
1848         getWAL(hri.getEncodedNameAsBytes());
1849   }
1850 
1851   /**
1852    * Create a region with it's own WAL. Be sure to call
1853    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
1854    */
1855   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1856       final Path walRootDir, final Configuration conf, final HTableDescriptor htd) throws IOException {
1857     return createRegionAndWAL(info, rootDir, walRootDir, conf, htd, true);
1858   }
1859 
1860   /**
1861    * Create a region with it's own WAL. Be sure to call
1862    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
1863    */
1864   public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
1865       final Path walRootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize)
1866       throws IOException {
1867     WAL wal = createWal(conf, rootDir, walRootDir, info);
1868     return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
1869   }
1870 
1871   /**
1872    * Close both the region {@code r} and it's underlying WAL. For use in tests.
1873    */
1874   public static void closeRegionAndWAL(final Region r) throws IOException {
1875     closeRegionAndWAL((HRegion)r);
1876   }
1877 
1878   /**
1879    * Close both the HRegion {@code r} and it's underlying WAL. For use in tests.
1880    */
1881   public static void closeRegionAndWAL(final HRegion r) throws IOException {
1882     if (r == null) return;
1883     r.close();
1884     if (r.getWAL() == null) return;
1885     r.getWAL().close();
1886   }
1887 
1888   /**
1889    * Modify a table, synchronous. Waiting logic similar to that of {@code admin.rb#alter_status}.
1890    */
1891   @SuppressWarnings("serial")
1892   public static void modifyTableSync(Admin admin, HTableDescriptor desc)
1893       throws IOException, InterruptedException {
1894     admin.modifyTable(desc.getTableName(), desc);
1895     Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{
1896       setFirst(0);
1897       setSecond(0);
1898     }};
1899     int i = 0;
1900     do {
1901       status = admin.getAlterStatus(desc.getTableName());
1902       if (status.getSecond() != 0) {
1903         LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond()
1904           + " regions updated.");
1905         Thread.sleep(1 * 1000l);
1906       } else {
1907         LOG.debug("All regions updated.");
1908         break;
1909       }
1910     } while (status.getFirst() != 0 && i++ < 500);
1911     if (status.getFirst() != 0) {
1912       throw new IOException("Failed to update all regions even after 500 seconds.");
1913     }
1914   }
1915 
1916   /**
1917    * Set the number of Region replicas.
1918    */
1919   public static void setReplicas(Admin admin, TableName table, int replicaCount)
1920       throws IOException, InterruptedException {
1921     admin.disableTable(table);
1922     HTableDescriptor desc = admin.getTableDescriptor(table);
1923     desc.setRegionReplication(replicaCount);
1924     admin.modifyTable(desc.getTableName(), desc);
1925     admin.enableTable(table);
1926   }
1927 
1928   /**
1929    * Drop an existing table
1930    * @param tableName existing table
1931    */
1932   public void deleteTable(String tableName) throws IOException {
1933     deleteTable(TableName.valueOf(tableName));
1934   }
1935 
1936   /**
1937    * Drop an existing table
1938    * @param tableName existing table
1939    */
1940   public void deleteTable(byte[] tableName) throws IOException {
1941     deleteTable(TableName.valueOf(tableName));
1942   }
1943 
1944   /**
1945    * Drop an existing table
1946    * @param tableName existing table
1947    */
1948   public void deleteTable(TableName tableName) throws IOException {
1949     try {
1950       getHBaseAdmin().disableTable(tableName);
1951     } catch (TableNotEnabledException e) {
1952       LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
1953     }
1954     getHBaseAdmin().deleteTable(tableName);
1955   }
1956 
1957   /**
1958    * Drop an existing table
1959    * @param tableName existing table
1960    */
1961   public void deleteTableIfAny(TableName tableName) throws IOException {
1962     try {
1963       deleteTable(tableName);
1964     } catch (TableNotFoundException e) {
1965       // ignore
1966     }
1967   }
1968 
1969   // ==========================================================================
1970   // Canned table and table descriptor creation
1971   // TODO replace HBaseTestCase
1972 
1973   public final static byte [] fam1 = Bytes.toBytes("colfamily11");
1974   public final static byte [] fam2 = Bytes.toBytes("colfamily21");
1975   public final static byte [] fam3 = Bytes.toBytes("colfamily31");
1976   public static final byte[][] COLUMNS = {fam1, fam2, fam3};
1977   private static final int MAXVERSIONS = 3;
1978 
1979   public static final char FIRST_CHAR = 'a';
1980   public static final char LAST_CHAR = 'z';
1981   public static final byte [] START_KEY_BYTES = {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
1982   public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
1983 
1984   /**
1985    * Create a table of name <code>name</code> with {@link COLUMNS} for
1986    * families.
1987    * @param name Name to give table.
1988    * @param versions How many versions to allow per column.
1989    * @return Column descriptor.
1990    */
1991   public HTableDescriptor createTableDescriptor(final String name,
1992       final int minVersions, final int versions, final int ttl, KeepDeletedCells keepDeleted) {
1993     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name));
1994     for (byte[] cfName : new byte[][]{ fam1, fam2, fam3 }) {
1995       htd.addFamily(new HColumnDescriptor(cfName)
1996           .setMinVersions(minVersions)
1997           .setMaxVersions(versions)
1998           .setKeepDeletedCells(keepDeleted)
1999           .setBlockCacheEnabled(false)
2000           .setTimeToLive(ttl)
2001       );
2002     }
2003     return htd;
2004   }
2005 
2006   /**
2007    * Create a table of name <code>name</code> with {@link COLUMNS} for
2008    * families.
2009    * @param name Name to give table.
2010    * @return Column descriptor.
2011    */
2012   public HTableDescriptor createTableDescriptor(final String name) {
2013     return createTableDescriptor(name,  HColumnDescriptor.DEFAULT_MIN_VERSIONS,
2014         MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
2015   }
2016 
2017   /**
2018    * Create an HRegion. Be sure to call {@link HBaseTestingUtility#closeRegion(Region)}
2019    * when you're finished with it.
2020    */
2021   public HRegion createHRegion(
2022       final HRegionInfo info,
2023       final Path rootDir,
2024       final Configuration conf,
2025       final HTableDescriptor htd) throws IOException {
2026     return HRegion.createHRegion(info, rootDir, conf, htd);
2027   }
2028 
2029   public HTableDescriptor createTableDescriptor(final TableName tableName,
2030       byte[] family) {
2031     return createTableDescriptor(tableName, new byte[][] {family}, 1);
2032   }
2033 
2034   public HTableDescriptor createTableDescriptor(final TableName tableName,
2035       byte[][] families, int maxVersions) {
2036     HTableDescriptor desc = new HTableDescriptor(tableName);
2037     for (byte[] family : families) {
2038       HColumnDescriptor hcd = new HColumnDescriptor(family)
2039           .setMaxVersions(maxVersions);
2040       desc.addFamily(hcd);
2041     }
2042     return desc;
2043   }
2044 
2045   /**
2046    * Create an HRegion that writes to the local tmp dirs
2047    * @param desc
2048    * @param startKey
2049    * @param endKey
2050    * @return
2051    * @throws IOException
2052    */
2053   public HRegion createLocalHRegion(HTableDescriptor desc, byte [] startKey,
2054       byte [] endKey)
2055   throws IOException {
2056     HRegionInfo hri = new HRegionInfo(desc.getTableName(), startKey, endKey);
2057     return createLocalHRegion(hri, desc);
2058   }
2059 
2060   /**
2061    * Create an HRegion that writes to the local tmp dirs
2062    * @param info
2063    * @param desc
2064    * @return
2065    * @throws IOException
2066    */
2067   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc) throws IOException {
2068     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc);
2069   }
2070 
2071   /**
2072    * Create an HRegion that writes to the local tmp dirs with specified wal
2073    * @param info regioninfo
2074    * @param desc table descriptor
2075    * @param wal wal for this region.
2076    * @return created hregion
2077    * @throws IOException
2078    */
2079   public HRegion createLocalHRegion(HRegionInfo info, HTableDescriptor desc, WAL wal)
2080       throws IOException {
2081     return HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), desc, wal);
2082   }
2083 
2084   /**
2085    * @param tableName
2086    * @param startKey
2087    * @param stopKey
2088    * @param callingMethod
2089    * @param conf
2090    * @param isReadOnly
2091    * @param families
2092    * @throws IOException
2093    * @return A region on which you must call
2094    *         {@link HRegion#closeHRegion(HRegion)} when done.
2095    */
2096   public HRegion createLocalHRegion(byte[] tableName, byte[] startKey, byte[] stopKey,
2097       String callingMethod, Configuration conf, boolean isReadOnly, Durability durability,
2098       WAL wal, byte[]... families) throws IOException {
2099     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
2100     htd.setReadOnly(isReadOnly);
2101     for (byte[] family : families) {
2102       HColumnDescriptor hcd = new HColumnDescriptor(family);
2103       // Set default to be three versions.
2104       hcd.setMaxVersions(Integer.MAX_VALUE);
2105       htd.addFamily(hcd);
2106     }
2107     htd.setDurability(durability);
2108     HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false);
2109     return createLocalHRegion(info, htd, wal);
2110   }
2111   //
2112   // ==========================================================================
2113 
2114   /**
2115    * Provide an existing table name to truncate.
2116    * Scans the table and issues a delete for each row read.
2117    * @param tableName existing table
2118    * @return HTable to that new table
2119    * @throws IOException
2120    */
2121   public HTable deleteTableData(byte[] tableName) throws IOException {
2122     return deleteTableData(TableName.valueOf(tableName));
2123   }
2124 
2125   /**
2126    * Provide an existing table name to truncate.
2127    * Scans the table and issues a delete for each row read.
2128    * @param tableName existing table
2129    * @return HTable to that new table
2130    * @throws IOException
2131    */
2132   public HTable deleteTableData(TableName tableName) throws IOException {
2133     HTable table = new HTable(getConfiguration(), tableName);
2134     Scan scan = new Scan();
2135     ResultScanner resScan = table.getScanner(scan);
2136     for(Result res : resScan) {
2137       Delete del = new Delete(res.getRow());
2138       table.delete(del);
2139     }
2140     resScan = table.getScanner(scan);
2141     resScan.close();
2142     return table;
2143   }
2144 
2145   /**
2146    * Truncate a table using the admin command.
2147    * Effectively disables, deletes, and recreates the table.
2148    * @param tableName table which must exist.
2149    * @param preserveRegions keep the existing split points
2150    * @return HTable for the new table
2151    */
2152   public HTable truncateTable(final TableName tableName, final boolean preserveRegions)
2153       throws IOException {
2154     Admin admin = getHBaseAdmin();
2155     admin.truncateTable(tableName, preserveRegions);
2156     return new HTable(getConfiguration(), tableName);
2157   }
2158 
2159   /**
2160    * Truncate a table using the admin command.
2161    * Effectively disables, deletes, and recreates the table.
2162    * For previous behavior of issuing row deletes, see
2163    * deleteTableData.
2164    * Expressly does not preserve regions of existing table.
2165    * @param tableName table which must exist.
2166    * @return HTable for the new table
2167    */
2168   public HTable truncateTable(final TableName tableName) throws IOException {
2169     return truncateTable(tableName, false);
2170   }
2171 
2172   /**
2173    * Truncate a table using the admin command.
2174    * Effectively disables, deletes, and recreates the table.
2175    * @param tableName table which must exist.
2176    * @param preserveRegions keep the existing split points
2177    * @return HTable for the new table
2178    */
2179   public HTable truncateTable(final byte[] tableName, final boolean preserveRegions)
2180       throws IOException {
2181     return truncateTable(TableName.valueOf(tableName), preserveRegions);
2182   }
2183 
2184   /**
2185    * Truncate a table using the admin command.
2186    * Effectively disables, deletes, and recreates the table.
2187    * For previous behavior of issuing row deletes, see
2188    * deleteTableData.
2189    * Expressly does not preserve regions of existing table.
2190    * @param tableName table which must exist.
2191    * @return HTable for the new table
2192    */
2193   public HTable truncateTable(final byte[] tableName) throws IOException {
2194     return truncateTable(tableName, false);
2195   }
2196 
2197   /**
2198    * Load table with rows from 'aaa' to 'zzz'.
2199    * @param t Table
2200    * @param f Family
2201    * @return Count of rows loaded.
2202    * @throws IOException
2203    */
2204   public int loadTable(final Table t, final byte[] f) throws IOException {
2205     return loadTable(t, new byte[][] {f});
2206   }
2207 
2208   /**
2209    * Load table with rows from 'aaa' to 'zzz'.
2210    * @param t Table
2211    * @param f Family
2212    * @return Count of rows loaded.
2213    * @throws IOException
2214    */
2215   public int loadTable(final Table t, final byte[] f, boolean writeToWAL) throws IOException {
2216     return loadTable(t, new byte[][] {f}, null, writeToWAL);
2217   }
2218 
2219   /**
2220    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2221    * @param t Table
2222    * @param f Array of Families to load
2223    * @return Count of rows loaded.
2224    * @throws IOException
2225    */
2226   public int loadTable(final Table t, final byte[][] f) throws IOException {
2227     return loadTable(t, f, null);
2228   }
2229 
2230   /**
2231    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2232    * @param t Table
2233    * @param f Array of Families to load
2234    * @param value the values of the cells. If null is passed, the row key is used as value
2235    * @return Count of rows loaded.
2236    * @throws IOException
2237    */
2238   public int loadTable(final Table t, final byte[][] f, byte[] value) throws IOException {
2239     return loadTable(t, f, value, true);
2240   }
2241 
2242   /**
2243    * Load table of multiple column families with rows from 'aaa' to 'zzz'.
2244    * @param t Table
2245    * @param f Array of Families to load
2246    * @param value the values of the cells. If null is passed, the row key is used as value
2247    * @return Count of rows loaded.
2248    * @throws IOException
2249    */
2250   public int loadTable(final Table t, final byte[][] f, byte[] value, boolean writeToWAL) throws IOException {
2251     List<Put> puts = new ArrayList<>();
2252     for (byte[] row : HBaseTestingUtility.ROWS) {
2253       Put put = new Put(row);
2254       put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
2255       for (int i = 0; i < f.length; i++) {
2256         put.add(f[i], f[i], value != null ? value : row);
2257       }
2258       puts.add(put);
2259     }
2260     t.put(puts);
2261     return puts.size();
2262   }
2263 
2264   /** A tracker for tracking and validating table rows
2265    * generated with {@link HBaseTestingUtility#loadTable(HTable, byte[])}
2266    */
2267   public static class SeenRowTracker {
2268     int dim = 'z' - 'a' + 1;
2269     int[][][] seenRows = new int[dim][dim][dim]; //count of how many times the row is seen
2270     byte[] startRow;
2271     byte[] stopRow;
2272 
2273     public SeenRowTracker(byte[] startRow, byte[] stopRow) {
2274       this.startRow = startRow;
2275       this.stopRow = stopRow;
2276     }
2277 
2278     void reset() {
2279       for (byte[] row : ROWS) {
2280         seenRows[i(row[0])][i(row[1])][i(row[2])] = 0;
2281       }
2282     }
2283 
2284     int i(byte b) {
2285       return b - 'a';
2286     }
2287 
2288     public void addRow(byte[] row) {
2289       seenRows[i(row[0])][i(row[1])][i(row[2])]++;
2290     }
2291 
2292     /** Validate that all the rows between startRow and stopRow are seen exactly once, and
2293      * all other rows none
2294      */
2295     public void validate() {
2296       for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2297         for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2298           for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2299             int count = seenRows[i(b1)][i(b2)][i(b3)];
2300             int expectedCount = 0;
2301             if (Bytes.compareTo(new byte[] {b1,b2,b3}, startRow) >= 0
2302                 && Bytes.compareTo(new byte[] {b1,b2,b3}, stopRow) < 0) {
2303               expectedCount = 1;
2304             }
2305             if (count != expectedCount) {
2306               String row = new String(new byte[] {b1,b2,b3});
2307               throw new RuntimeException("Row:" + row + " has a seen count of " + count + " instead of " + expectedCount);
2308             }
2309           }
2310         }
2311       }
2312     }
2313   }
2314 
2315   public int loadRegion(final HRegion r, final byte[] f) throws IOException {
2316     return loadRegion(r, f, false);
2317   }
2318 
2319   public int loadRegion(final Region r, final byte[] f) throws IOException {
2320     return loadRegion((HRegion)r, f);
2321   }
2322 
2323   /**
2324    * Load region with rows from 'aaa' to 'zzz'.
2325    * @param r Region
2326    * @param f Family
2327    * @param flush flush the cache if true
2328    * @return Count of rows loaded.
2329    * @throws IOException
2330    */
2331   public int loadRegion(final HRegion r, final byte[] f, final boolean flush)
2332   throws IOException {
2333     byte[] k = new byte[3];
2334     int rowCount = 0;
2335     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2336       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2337         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2338           k[0] = b1;
2339           k[1] = b2;
2340           k[2] = b3;
2341           Put put = new Put(k);
2342           put.setDurability(Durability.SKIP_WAL);
2343           put.add(f, null, k);
2344           if (r.getWAL() == null) {
2345             put.setDurability(Durability.SKIP_WAL);
2346           }
2347           int preRowCount = rowCount;
2348           int pause = 10;
2349           int maxPause = 1000;
2350           while (rowCount == preRowCount) {
2351             try {
2352               r.put(put);
2353               rowCount++;
2354             } catch (RegionTooBusyException e) {
2355               pause = (pause * 2 >= maxPause) ? maxPause : pause * 2;
2356               Threads.sleep(pause);
2357             }
2358           }
2359         }
2360       }
2361       if (flush) {
2362         r.flush(true);
2363       }
2364     }
2365     return rowCount;
2366   }
2367 
2368   public void loadNumericRows(final Table t, final byte[] f, int startRow, int endRow)
2369       throws IOException {
2370     for (int i = startRow; i < endRow; i++) {
2371       byte[] data = Bytes.toBytes(String.valueOf(i));
2372       Put put = new Put(data);
2373       put.add(f, null, data);
2374       t.put(put);
2375     }
2376   }
2377 
2378   public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow,
2379       int replicaId)
2380       throws IOException {
2381     for (int i = startRow; i < endRow; i++) {
2382       String failMsg = "Failed verification of row :" + i;
2383       byte[] data = Bytes.toBytes(String.valueOf(i));
2384       Get get = new Get(data);
2385       get.setReplicaId(replicaId);
2386       get.setConsistency(Consistency.TIMELINE);
2387       Result result = table.get(get);
2388       assertTrue(failMsg, result.containsColumn(f, null));
2389       assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2390       Cell cell = result.getColumnLatestCell(f, null);
2391       assertTrue(failMsg,
2392         Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2393           cell.getValueLength()));
2394     }
2395   }
2396 
2397   public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow)
2398       throws IOException {
2399     verifyNumericRows((HRegion)region, f, startRow, endRow);
2400   }
2401 
2402   public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow)
2403       throws IOException {
2404     verifyNumericRows(region, f, startRow, endRow, true);
2405   }
2406 
2407   public void verifyNumericRows(Region region, final byte[] f, int startRow, int endRow,
2408       final boolean present) throws IOException {
2409     verifyNumericRows((HRegion)region, f, startRow, endRow, present);
2410   }
2411 
2412   public void verifyNumericRows(HRegion region, final byte[] f, int startRow, int endRow,
2413       final boolean present) throws IOException {
2414     for (int i = startRow; i < endRow; i++) {
2415       String failMsg = "Failed verification of row :" + i;
2416       byte[] data = Bytes.toBytes(String.valueOf(i));
2417       Result result = region.get(new Get(data));
2418 
2419       boolean hasResult = result != null && !result.isEmpty();
2420       assertEquals(failMsg + result, present, hasResult);
2421       if (!present) continue;
2422 
2423       assertTrue(failMsg, result.containsColumn(f, null));
2424       assertEquals(failMsg, result.getColumnCells(f, null).size(), 1);
2425       Cell cell = result.getColumnLatestCell(f, null);
2426       assertTrue(failMsg,
2427         Bytes.equals(data, 0, data.length, cell.getValueArray(), cell.getValueOffset(),
2428           cell.getValueLength()));
2429     }
2430   }
2431 
2432   public void deleteNumericRows(final HTable t, final byte[] f, int startRow, int endRow)
2433       throws IOException {
2434     for (int i = startRow; i < endRow; i++) {
2435       byte[] data = Bytes.toBytes(String.valueOf(i));
2436       Delete delete = new Delete(data);
2437       delete.deleteFamily(f);
2438       t.delete(delete);
2439     }
2440   }
2441 
2442   /**
2443    * Return the number of rows in the given table.
2444    */
2445   public int countRows(final Table table) throws IOException {
2446     Scan scan = new Scan();
2447     ResultScanner results = table.getScanner(scan);
2448     int count = 0;
2449     for (@SuppressWarnings("unused") Result res : results) {
2450       count++;
2451     }
2452     results.close();
2453     return count;
2454   }
2455 
2456   public int countRows(final Table table, final Scan scan) throws IOException {
2457     ResultScanner results = table.getScanner(scan);
2458     int count = 0;
2459     for (@SuppressWarnings("unused") Result res : results) {
2460       count++;
2461     }
2462     results.close();
2463     return count;
2464   }
2465 
2466   public int countRows(final Table table, final byte[]... families) throws IOException {
2467     Scan scan = new Scan();
2468     for (byte[] family: families) {
2469       scan.addFamily(family);
2470     }
2471     ResultScanner results = table.getScanner(scan);
2472     int count = 0;
2473     for (@SuppressWarnings("unused") Result res : results) {
2474       count++;
2475     }
2476     results.close();
2477     return count;
2478   }
2479 
2480   /**
2481    * Return the number of rows in the given table.
2482    */
2483   public int countRows(final TableName tableName) throws IOException {
2484     Table table = getConnection().getTable(tableName);
2485     try {
2486       return countRows(table);
2487     } finally {
2488       table.close();
2489     }
2490   }
2491 
2492   /**
2493    * Return an md5 digest of the entire contents of a table.
2494    */
2495   public String checksumRows(final Table table) throws Exception {
2496     Scan scan = new Scan();
2497     ResultScanner results = table.getScanner(scan);
2498     MessageDigest digest = MessageDigest.getInstance("MD5");
2499     for (Result res : results) {
2500       digest.update(res.getRow());
2501     }
2502     results.close();
2503     return digest.toString();
2504   }
2505 
2506   /** All the row values for the data loaded by {@link #loadTable(HTable, byte[])} */
2507   public static final byte[][] ROWS = new byte[(int) Math.pow('z' - 'a' + 1, 3)][3]; // ~52KB
2508   static {
2509     int i = 0;
2510     for (byte b1 = 'a'; b1 <= 'z'; b1++) {
2511       for (byte b2 = 'a'; b2 <= 'z'; b2++) {
2512         for (byte b3 = 'a'; b3 <= 'z'; b3++) {
2513           ROWS[i][0] = b1;
2514           ROWS[i][1] = b2;
2515           ROWS[i][2] = b3;
2516           i++;
2517         }
2518       }
2519     }
2520   }
2521 
2522   public static final byte[][] KEYS = {
2523     HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
2524     Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2525     Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2526     Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2527     Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2528     Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2529     Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2530     Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2531     Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
2532   };
2533 
2534   public static final byte[][] KEYS_FOR_HBA_CREATE_TABLE = {
2535       Bytes.toBytes("bbb"),
2536       Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
2537       Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
2538       Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
2539       Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
2540       Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
2541       Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
2542       Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
2543       Bytes.toBytes("xxx"), Bytes.toBytes("yyy"), Bytes.toBytes("zzz")
2544   };
2545 
2546   /**
2547    * Create rows in hbase:meta for regions of the specified table with the specified
2548    * start keys.  The first startKey should be a 0 length byte array if you
2549    * want to form a proper range of regions.
2550    * @param conf
2551    * @param htd
2552    * @param startKeys
2553    * @return list of region info for regions added to meta
2554    * @throws IOException
2555    */
2556   public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
2557       final HTableDescriptor htd, byte [][] startKeys)
2558   throws IOException {
2559     Table meta = new HTable(conf, TableName.META_TABLE_NAME);
2560     Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
2561     List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
2562     // add custom ones
2563     for (int i = 0; i < startKeys.length; i++) {
2564       int j = (i + 1) % startKeys.length;
2565       HRegionInfo hri = new HRegionInfo(htd.getTableName(), startKeys[i],
2566           startKeys[j]);
2567       MetaTableAccessor.addRegionToMeta(meta, hri);
2568       newRegions.add(hri);
2569     }
2570 
2571     meta.close();
2572     return newRegions;
2573   }
2574 
2575   /**
2576    * Returns all rows from the hbase:meta table.
2577    *
2578    * @throws IOException When reading the rows fails.
2579    */
2580   public List<byte[]> getMetaTableRows() throws IOException {
2581     // TODO: Redo using MetaTableAccessor class
2582     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2583     List<byte[]> rows = new ArrayList<byte[]>();
2584     ResultScanner s = t.getScanner(new Scan());
2585     for (Result result : s) {
2586       LOG.info("getMetaTableRows: row -> " +
2587         Bytes.toStringBinary(result.getRow()));
2588       rows.add(result.getRow());
2589     }
2590     s.close();
2591     t.close();
2592     return rows;
2593   }
2594 
2595   /**
2596    * Returns all rows from the hbase:meta table for a given user table
2597    *
2598    * @throws IOException When reading the rows fails.
2599    */
2600   public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
2601     // TODO: Redo using MetaTableAccessor.
2602     Table t = new HTable(new Configuration(this.conf), TableName.META_TABLE_NAME);
2603     List<byte[]> rows = new ArrayList<byte[]>();
2604     ResultScanner s = t.getScanner(new Scan());
2605     for (Result result : s) {
2606       HRegionInfo info = HRegionInfo.getHRegionInfo(result);
2607       if (info == null) {
2608         LOG.error("No region info for row " + Bytes.toString(result.getRow()));
2609         // TODO figure out what to do for this new hosed case.
2610         continue;
2611       }
2612 
2613       if (info.getTable().equals(tableName)) {
2614         LOG.info("getMetaTableRows: row -> " +
2615             Bytes.toStringBinary(result.getRow()) + info);
2616         rows.add(result.getRow());
2617       }
2618     }
2619     s.close();
2620     t.close();
2621     return rows;
2622   }
2623 
2624   /**
2625    * Tool to get the reference to the region server object that holds the
2626    * region of the specified user table.
2627    * It first searches for the meta rows that contain the region of the
2628    * specified table, then gets the index of that RS, and finally retrieves
2629    * the RS's reference.
2630    * @param tableName user table to lookup in hbase:meta
2631    * @return region server that holds it, null if the row doesn't exist
2632    * @throws IOException
2633    * @throws InterruptedException
2634    */
2635   public HRegionServer getRSForFirstRegionInTable(TableName tableName)
2636       throws IOException, InterruptedException {
2637     List<byte[]> metaRows = getMetaTableRows(tableName);
2638     if (metaRows == null || metaRows.isEmpty()) {
2639       return null;
2640     }
2641     LOG.debug("Found " + metaRows.size() + " rows for table " +
2642       tableName);
2643     byte [] firstrow = metaRows.get(0);
2644     LOG.debug("FirstRow=" + Bytes.toString(firstrow));
2645     long pause = getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE,
2646       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
2647     int numRetries = getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
2648       HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
2649     RetryCounter retrier = new RetryCounter(numRetries+1, (int)pause, TimeUnit.MICROSECONDS);
2650     while(retrier.shouldRetry()) {
2651       int index = getMiniHBaseCluster().getServerWith(firstrow);
2652       if (index != -1) {
2653         return getMiniHBaseCluster().getRegionServerThreads().get(index).getRegionServer();
2654       }
2655       // Came back -1.  Region may not be online yet.  Sleep a while.
2656       retrier.sleepUntilNextRetry();
2657     }
2658     return null;
2659   }
2660 
2661   /**
2662    * Starts a <code>MiniMRCluster</code> with a default number of
2663    * <code>TaskTracker</code>'s.
2664    *
2665    * @throws IOException When starting the cluster fails.
2666    */
2667   public MiniMRCluster startMiniMapReduceCluster() throws IOException {
2668     startMiniMapReduceCluster(2);
2669     return mrCluster;
2670   }
2671 
2672   /**
2673    * Tasktracker has a bug where changing the hadoop.log.dir system property
2674    * will not change its internal static LOG_DIR variable.
2675    */
2676   private void forceChangeTaskLogDir() {
2677     Field logDirField;
2678     try {
2679       logDirField = TaskLog.class.getDeclaredField("LOG_DIR");
2680       logDirField.setAccessible(true);
2681 
2682       Field modifiersField = Field.class.getDeclaredField("modifiers");
2683       modifiersField.setAccessible(true);
2684       modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL);
2685 
2686       logDirField.set(null, new File(hadoopLogDir, "userlogs"));
2687     } catch (SecurityException e) {
2688       throw new RuntimeException(e);
2689     } catch (NoSuchFieldException e) {
2690       // TODO Auto-generated catch block
2691       throw new RuntimeException(e);
2692     } catch (IllegalArgumentException e) {
2693       throw new RuntimeException(e);
2694     } catch (IllegalAccessException e) {
2695       throw new RuntimeException(e);
2696     }
2697   }
2698 
2699   /**
2700    * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different
2701    * filesystem.
2702    * @param servers  The number of <code>TaskTracker</code>'s to start.
2703    * @throws IOException When starting the cluster fails.
2704    */
2705   private void startMiniMapReduceCluster(final int servers) throws IOException {
2706     if (mrCluster != null) {
2707       throw new IllegalStateException("MiniMRCluster is already running");
2708     }
2709     LOG.info("Starting mini mapreduce cluster...");
2710     setupClusterTestDir();
2711     createDirsAndSetProperties();
2712 
2713     forceChangeTaskLogDir();
2714 
2715     //// hadoop2 specific settings
2716     // Tests were failing because this process used 6GB of virtual memory and was getting killed.
2717     // we up the VM usable so that processes don't get killed.
2718     conf.setFloat("yarn.nodemanager.vmem-pmem-ratio", 8.0f);
2719 
2720     // Tests were failing due to MAPREDUCE-4880 / MAPREDUCE-4607 against hadoop 2.0.2-alpha and
2721     // this avoids the problem by disabling speculative task execution in tests.
2722     conf.setBoolean("mapreduce.map.speculative", false);
2723     conf.setBoolean("mapreduce.reduce.speculative", false);
2724     ////
2725 
2726     conf.setInt("mapreduce.map.memory.mb", 2048);
2727     conf.setInt("mapreduce.reduce.memory.mb", 2048);
2728 
2729     // Allow the user to override FS URI for this map-reduce cluster to use.
2730     mrCluster = new MiniMRCluster(servers,
2731       FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1,
2732       null, null, new JobConf(this.conf));
2733     JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster);
2734     if (jobConf == null) {
2735       jobConf = mrCluster.createJobConf();
2736     }
2737 
2738     jobConf.set("mapreduce.cluster.local.dir",
2739       conf.get("mapreduce.cluster.local.dir")); //Hadoop MiniMR overwrites this while it should not
2740     LOG.info("Mini mapreduce cluster started");
2741 
2742     // In hadoop2, YARN/MR2 starts a mini cluster with its own conf instance and updates settings.
2743     // Our HBase MR jobs need several of these settings in order to properly run.  So we copy the
2744     // necessary config properties here.  YARN-129 required adding a few properties.
2745     conf.set("mapreduce.jobtracker.address", jobConf.get("mapreduce.jobtracker.address"));
2746     // this for mrv2 support; mr1 ignores this
2747     conf.set("mapreduce.framework.name", "yarn");
2748     conf.setBoolean("yarn.is.minicluster", true);
2749     String rmAddress = jobConf.get("yarn.resourcemanager.address");
2750     if (rmAddress != null) {
2751       conf.set("yarn.resourcemanager.address", rmAddress);
2752     }
2753     String historyAddress = jobConf.get("mapreduce.jobhistory.address");
2754     if (historyAddress != null) {
2755       conf.set("mapreduce.jobhistory.address", historyAddress);
2756     }
2757     String schedulerAddress =
2758       jobConf.get("yarn.resourcemanager.scheduler.address");
2759     if (schedulerAddress != null) {
2760       conf.set("yarn.resourcemanager.scheduler.address", schedulerAddress);
2761     }
2762   }
2763 
2764   /**
2765    * Stops the previously started <code>MiniMRCluster</code>.
2766    */
2767   public void shutdownMiniMapReduceCluster() {
2768     if (mrCluster != null) {
2769       LOG.info("Stopping mini mapreduce cluster...");
2770       mrCluster.shutdown();
2771       mrCluster = null;
2772       LOG.info("Mini mapreduce cluster stopped");
2773     }
2774     // Restore configuration to point to local jobtracker
2775     conf.set("mapreduce.jobtracker.address", "local");
2776   }
2777 
2778   /**
2779    * Create a stubbed out RegionServerService, mainly for getting FS.
2780    */
2781   public RegionServerServices createMockRegionServerService() throws IOException {
2782     return createMockRegionServerService((ServerName)null);
2783   }
2784 
2785   /**
2786    * Create a stubbed out RegionServerService, mainly for getting FS.
2787    * This version is used by TestTokenAuthentication
2788    */
2789   public RegionServerServices createMockRegionServerService(RpcServerInterface rpc) throws IOException {
2790     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher());
2791     rss.setFileSystem(getTestFileSystem());
2792     rss.setRpcServer(rpc);
2793     return rss;
2794   }
2795 
2796   /**
2797    * Create a stubbed out RegionServerService, mainly for getting FS.
2798    * This version is used by TestOpenRegionHandler
2799    */
2800   public RegionServerServices createMockRegionServerService(ServerName name) throws IOException {
2801     final MockRegionServerServices rss = new MockRegionServerServices(getZooKeeperWatcher(), name);
2802     rss.setFileSystem(getTestFileSystem());
2803     return rss;
2804   }
2805 
2806   /**
2807    * Switches the logger for the given class to DEBUG level.
2808    *
2809    * @param clazz  The class for which to switch to debug logging.
2810    */
2811   public void enableDebug(Class<?> clazz) {
2812     Log l = LogFactory.getLog(clazz);
2813     if (l instanceof Log4JLogger) {
2814       ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
2815     } else if (l instanceof Jdk14Logger) {
2816       ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
2817     }
2818   }
2819 
2820   /**
2821    * Expire the Master's session
2822    * @throws Exception
2823    */
2824   public void expireMasterSession() throws Exception {
2825     HMaster master = getMiniHBaseCluster().getMaster();
2826     expireSession(master.getZooKeeper(), false);
2827   }
2828 
2829   /**
2830    * Expire a region server's session
2831    * @param index which RS
2832    * @throws Exception
2833    */
2834   public void expireRegionServerSession(int index) throws Exception {
2835     HRegionServer rs = getMiniHBaseCluster().getRegionServer(index);
2836     expireSession(rs.getZooKeeper(), false);
2837     decrementMinRegionServerCount();
2838   }
2839 
2840   private void decrementMinRegionServerCount() {
2841     // decrement the count for this.conf, for newly spwaned master
2842     // this.hbaseCluster shares this configuration too
2843     decrementMinRegionServerCount(getConfiguration());
2844 
2845     // each master thread keeps a copy of configuration
2846     for (MasterThread master : getHBaseCluster().getMasterThreads()) {
2847       decrementMinRegionServerCount(master.getMaster().getConfiguration());
2848     }
2849   }
2850 
2851   private void decrementMinRegionServerCount(Configuration conf) {
2852     int currentCount = conf.getInt(
2853         ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
2854     if (currentCount != -1) {
2855       conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
2856           Math.max(currentCount - 1, 1));
2857     }
2858   }
2859 
2860   public void expireSession(ZooKeeperWatcher nodeZK) throws Exception {
2861    expireSession(nodeZK, false);
2862   }
2863 
2864   @Deprecated
2865   public void expireSession(ZooKeeperWatcher nodeZK, Server server)
2866     throws Exception {
2867     expireSession(nodeZK, false);
2868   }
2869 
2870   /**
2871    * Expire a ZooKeeper session as recommended in ZooKeeper documentation
2872    * http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
2873    * There are issues when doing this:
2874    * [1] http://www.mail-archive.com/dev@zookeeper.apache.org/msg01942.html
2875    * [2] https://issues.apache.org/jira/browse/ZOOKEEPER-1105
2876    *
2877    * @param nodeZK - the ZK watcher to expire
2878    * @param checkStatus - true to check if we can create an HTable with the
2879    *                    current configuration.
2880    */
2881   public void expireSession(ZooKeeperWatcher nodeZK, boolean checkStatus)
2882     throws Exception {
2883     Configuration c = new Configuration(this.conf);
2884     String quorumServers = ZKConfig.getZKQuorumServersString(c);
2885     ZooKeeper zk = nodeZK.getRecoverableZooKeeper().getZooKeeper();
2886     byte[] password = zk.getSessionPasswd();
2887     long sessionID = zk.getSessionId();
2888 
2889     // Expiry seems to be asynchronous (see comment from P. Hunt in [1]),
2890     //  so we create a first watcher to be sure that the
2891     //  event was sent. We expect that if our watcher receives the event
2892     //  other watchers on the same machine will get is as well.
2893     // When we ask to close the connection, ZK does not close it before
2894     //  we receive all the events, so don't have to capture the event, just
2895     //  closing the connection should be enough.
2896     ZooKeeper monitor = new ZooKeeper(quorumServers,
2897       1000, new org.apache.zookeeper.Watcher(){
2898       @Override
2899       public void process(WatchedEvent watchedEvent) {
2900         LOG.info("Monitor ZKW received event="+watchedEvent);
2901       }
2902     } , sessionID, password);
2903 
2904     // Making it expire
2905     ZooKeeper newZK = new ZooKeeper(quorumServers,
2906         1000, EmptyWatcher.instance, sessionID, password);
2907 
2908     //ensure that we have connection to the server before closing down, otherwise
2909     //the close session event will be eaten out before we start CONNECTING state
2910     long start = System.currentTimeMillis();
2911     while (newZK.getState() != States.CONNECTED
2912          && System.currentTimeMillis() - start < 1000) {
2913        Thread.sleep(1);
2914     }
2915     newZK.close();
2916     LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID));
2917 
2918     // Now closing & waiting to be sure that the clients get it.
2919     monitor.close();
2920 
2921     if (checkStatus) {
2922       new HTable(new Configuration(conf), TableName.META_TABLE_NAME).close();
2923     }
2924   }
2925 
2926   /**
2927    * Get the Mini HBase cluster.
2928    *
2929    * @return hbase cluster
2930    * @see #getHBaseClusterInterface()
2931    */
2932   public MiniHBaseCluster getHBaseCluster() {
2933     return getMiniHBaseCluster();
2934   }
2935 
2936   /**
2937    * Returns the HBaseCluster instance.
2938    * <p>Returned object can be any of the subclasses of HBaseCluster, and the
2939    * tests referring this should not assume that the cluster is a mini cluster or a
2940    * distributed one. If the test only works on a mini cluster, then specific
2941    * method {@link #getMiniHBaseCluster()} can be used instead w/o the
2942    * need to type-cast.
2943    */
2944   public HBaseCluster getHBaseClusterInterface() {
2945     //implementation note: we should rename this method as #getHBaseCluster(),
2946     //but this would require refactoring 90+ calls.
2947     return hbaseCluster;
2948   }
2949 
2950   /**
2951    * Get a Connection to the cluster.
2952    * Not thread-safe (This class needs a lot of work to make it thread-safe).
2953    * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
2954    * @throws IOException
2955    */
2956   public Connection getConnection() throws IOException {
2957     if (this.connection == null) {
2958       this.connection = ConnectionFactory.createConnection(this.conf);
2959     }
2960     return this.connection;
2961   }
2962 
2963   /**
2964    * Returns a Admin instance.
2965    * This instance is shared between HBaseTestingUtility instance users.
2966    * Closing it has no effect, it will be closed automatically when the
2967    * cluster shutdowns
2968    *
2969    * @return An Admin instance.
2970    * @throws IOException
2971    */
2972   public synchronized HBaseAdmin getHBaseAdmin()
2973   throws IOException {
2974     if (hbaseAdmin == null){
2975       this.hbaseAdmin = new HBaseAdminForTests(getConnection());
2976     }
2977     return hbaseAdmin;
2978   }
2979 
2980   private HBaseAdminForTests hbaseAdmin = null;
2981   private static class HBaseAdminForTests extends HBaseAdmin {
2982     public HBaseAdminForTests(Connection connection) throws MasterNotRunningException,
2983         ZooKeeperConnectionException, IOException {
2984       super(connection);
2985     }
2986 
2987     @Override
2988     public synchronized void close() throws IOException {
2989       LOG.warn("close() called on HBaseAdmin instance returned from " +
2990         "HBaseTestingUtility.getHBaseAdmin()");
2991     }
2992 
2993     private synchronized void close0() throws IOException {
2994       super.close();
2995     }
2996   }
2997 
2998   /**
2999    * Returns a ZooKeeperWatcher instance.
3000    * This instance is shared between HBaseTestingUtility instance users.
3001    * Don't close it, it will be closed automatically when the
3002    * cluster shutdowns
3003    *
3004    * @return The ZooKeeperWatcher instance.
3005    * @throws IOException
3006    */
3007   public synchronized ZooKeeperWatcher getZooKeeperWatcher()
3008     throws IOException {
3009     if (zooKeeperWatcher == null) {
3010       zooKeeperWatcher = new ZooKeeperWatcher(conf, "testing utility",
3011         new Abortable() {
3012         @Override public void abort(String why, Throwable e) {
3013           throw new RuntimeException("Unexpected abort in HBaseTestingUtility:"+why, e);
3014         }
3015         @Override public boolean isAborted() {return false;}
3016       });
3017     }
3018     return zooKeeperWatcher;
3019   }
3020   private ZooKeeperWatcher zooKeeperWatcher;
3021 
3022 
3023 
3024   /**
3025    * Closes the named region.
3026    *
3027    * @param regionName  The region to close.
3028    * @throws IOException
3029    */
3030   public void closeRegion(String regionName) throws IOException {
3031     closeRegion(Bytes.toBytes(regionName));
3032   }
3033 
3034   /**
3035    * Closes the named region.
3036    *
3037    * @param regionName  The region to close.
3038    * @throws IOException
3039    */
3040   public void closeRegion(byte[] regionName) throws IOException {
3041     getHBaseAdmin().closeRegion(regionName, null);
3042   }
3043 
3044   /**
3045    * Closes the region containing the given row.
3046    *
3047    * @param row  The row to find the containing region.
3048    * @param table  The table to find the region.
3049    * @throws IOException
3050    */
3051   public void closeRegionByRow(String row, RegionLocator table) throws IOException {
3052     closeRegionByRow(Bytes.toBytes(row), table);
3053   }
3054 
3055   /**
3056    * Closes the region containing the given row.
3057    *
3058    * @param row  The row to find the containing region.
3059    * @param table  The table to find the region.
3060    * @throws IOException
3061    */
3062   public void closeRegionByRow(byte[] row, RegionLocator table) throws IOException {
3063     HRegionLocation hrl = table.getRegionLocation(row);
3064     closeRegion(hrl.getRegionInfo().getRegionName());
3065   }
3066 
3067   /*
3068    * Retrieves a splittable region randomly from tableName
3069    *
3070    * @param tableName name of table
3071    * @param maxAttempts maximum number of attempts, unlimited for value of -1
3072    * @return the HRegion chosen, null if none was found within limit of maxAttempts
3073    */
3074   public HRegion getSplittableRegion(TableName tableName, int maxAttempts) {
3075     List<HRegion> regions = getHBaseCluster().getRegions(tableName);
3076     int regCount = regions.size();
3077     Set<Integer> attempted = new HashSet<Integer>();
3078     int idx;
3079     int attempts = 0;
3080     do {
3081       regions = getHBaseCluster().getRegions(tableName);
3082       if (regCount != regions.size()) {
3083         // if there was region movement, clear attempted Set
3084         attempted.clear();
3085       }
3086       regCount = regions.size();
3087       // There are chances that before we get the region for the table from an RS the region may
3088       // be going for CLOSE.  This may be because online schema change is enabled
3089       if (regCount > 0) {
3090         idx = random.nextInt(regCount);
3091         // if we have just tried this region, there is no need to try again
3092         if (attempted.contains(idx))
3093           continue;
3094         try {
3095           regions.get(idx).checkSplit();
3096           return regions.get(idx);
3097         } catch (Exception ex) {
3098           LOG.warn("Caught exception", ex);
3099           attempted.add(idx);
3100         }
3101       }
3102       attempts++;
3103     } while (maxAttempts == -1 || attempts < maxAttempts);
3104     return null;
3105   }
3106 
3107   public MiniZooKeeperCluster getZkCluster() {
3108     return zkCluster;
3109   }
3110 
3111   public void setZkCluster(MiniZooKeeperCluster zkCluster) {
3112     this.passedZkCluster = true;
3113     this.zkCluster = zkCluster;
3114     conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort());
3115   }
3116 
3117   public MiniDFSCluster getDFSCluster() {
3118     return dfsCluster;
3119   }
3120 
3121   public void setDFSCluster(MiniDFSCluster cluster) throws IllegalStateException, IOException {
3122     setDFSCluster(cluster, true);
3123   }
3124 
3125   /**
3126    * Set the MiniDFSCluster
3127    * @param cluster cluster to use
3128    * @param requireDown require the that cluster not be "up" (MiniDFSCluster#isClusterUp) before
3129    * it is set.
3130    * @throws IllegalStateException if the passed cluster is up when it is required to be down
3131    * @throws IOException if the FileSystem could not be set from the passed dfs cluster
3132    */
3133   public void setDFSCluster(MiniDFSCluster cluster, boolean requireDown)
3134       throws IllegalStateException, IOException {
3135     if (dfsCluster != null && requireDown && dfsCluster.isClusterUp()) {
3136       throw new IllegalStateException("DFSCluster is already running! Shut it down first.");
3137     }
3138     this.dfsCluster = cluster;
3139     this.setFs();
3140   }
3141 
3142   public FileSystem getTestFileSystem() throws IOException {
3143     return HFileSystem.get(conf);
3144   }
3145 
3146   /**
3147    * Wait until all regions in a table have been assigned.  Waits default timeout before giving up
3148    * (30 seconds).
3149    * @param table Table to wait on.
3150    * @throws InterruptedException
3151    * @throws IOException
3152    */
3153   public void waitTableAvailable(TableName table)
3154       throws InterruptedException, IOException {
3155     waitTableAvailable(table.getName(), 30000);
3156   }
3157 
3158   public void waitTableAvailable(TableName table, long timeoutMillis)
3159       throws InterruptedException, IOException {
3160     waitFor(timeoutMillis, predicateTableAvailable(table));
3161   }
3162 
3163   public String explainTableAvailability(TableName tableName) throws IOException {
3164     String msg = explainTableState(tableName) + ",";
3165     if (getHBaseCluster().getMaster().isAlive()) {
3166       Map<HRegionInfo, ServerName> assignments =
3167           getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
3168               .getRegionAssignments();
3169       final List<Pair<HRegionInfo, ServerName>> metaLocations =
3170           MetaTableAccessor
3171               .getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName);
3172       for (Pair<HRegionInfo, ServerName> metaLocation : metaLocations) {
3173         HRegionInfo hri = metaLocation.getFirst();
3174         ServerName sn = metaLocation.getSecond();
3175         if (!assignments.containsKey(hri)) {
3176           msg += ", region " + hri
3177               + " not assigned, but found in meta, it expected to be on " + sn;
3178 
3179         } else if (sn == null) {
3180           msg += ",  region " + hri
3181               + " assigned,  but has no server in meta";
3182         } else if (!sn.equals(assignments.get(hri))) {
3183           msg += ",  region " + hri
3184               + " assigned,  but has different servers in meta and AM ( " +
3185               sn + " <> " + assignments.get(hri);
3186         }
3187       }
3188     }
3189     return msg;
3190   }
3191 
3192   public String explainTableState(TableName tableName) throws IOException {
3193     try {
3194       if (getHBaseAdmin().isTableEnabled(tableName))
3195         return "table enabled in zk";
3196       else if (getHBaseAdmin().isTableDisabled(tableName))
3197         return "table disabled in zk";
3198       else
3199         return "table in uknown state";
3200     } catch (TableNotFoundException e) {
3201       return "table not exists";
3202     }
3203   }
3204 
3205   /**
3206    * Wait until all regions in a table have been assigned
3207    * @param table Table to wait on.
3208    * @param timeoutMillis Timeout.
3209    * @throws InterruptedException
3210    * @throws IOException
3211    */
3212   public void waitTableAvailable(byte[] table, long timeoutMillis)
3213   throws InterruptedException, IOException {
3214     waitTableAvailable(getHBaseAdmin(), table, timeoutMillis);
3215   }
3216 
3217   public void waitTableAvailable(Admin admin, byte[] table, long timeoutMillis)
3218   throws InterruptedException, IOException {
3219     long startWait = System.currentTimeMillis();
3220     while (!admin.isTableAvailable(TableName.valueOf(table))) {
3221       assertTrue("Timed out waiting for table to become available " +
3222         Bytes.toStringBinary(table),
3223         System.currentTimeMillis() - startWait < timeoutMillis);
3224       Thread.sleep(200);
3225     }
3226   }
3227 
3228   /**
3229    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3230    * regions have been all assigned.  Will timeout after default period (30 seconds)
3231    * @see #waitTableAvailable(byte[])
3232    * @param table Table to wait on.
3233    * @param table
3234    * @throws InterruptedException
3235    * @throws IOException
3236    */
3237   public void waitTableEnabled(TableName table)
3238       throws InterruptedException, IOException {
3239     waitTableEnabled(table, 30000);
3240   }
3241 
3242   /**
3243    * Waits for a table to be 'enabled'.  Enabled means that table is set as 'enabled' and the
3244    * regions have been all assigned.
3245    * @see #waitTableAvailable(byte[])
3246    * @param table Table to wait on.
3247    * @param timeoutMillis Time to wait on it being marked enabled.
3248    * @throws InterruptedException
3249    * @throws IOException
3250    */
3251   public void waitTableEnabled(byte[] table, long timeoutMillis)
3252   throws InterruptedException, IOException {
3253     waitTableEnabled(TableName.valueOf(table), timeoutMillis);
3254   }
3255 
3256   public void waitTableEnabled(TableName table, long timeoutMillis)
3257       throws IOException {
3258     waitFor(timeoutMillis, predicateTableEnabled(table));
3259   }
3260 
3261   /**
3262    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3263    * Will timeout after default period (30 seconds)
3264    * @param table Table to wait on.
3265    * @throws InterruptedException
3266    * @throws IOException
3267    */
3268   public void waitTableDisabled(byte[] table)
3269       throws InterruptedException, IOException {
3270     waitTableDisabled(getHBaseAdmin(), table, 30000);
3271   }
3272 
3273   public void waitTableDisabled(Admin admin, byte[] table)
3274       throws InterruptedException, IOException {
3275     waitTableDisabled(admin, table, 30000);
3276   }
3277 
3278   /**
3279    * Waits for a table to be 'disabled'.  Disabled means that table is set as 'disabled'
3280    * @param table Table to wait on.
3281    * @param timeoutMillis Time to wait on it being marked disabled.
3282    * @throws InterruptedException
3283    * @throws IOException
3284    */
3285   public void waitTableDisabled(byte[] table, long timeoutMillis)
3286       throws InterruptedException, IOException {
3287     waitTableDisabled(getHBaseAdmin(), table, timeoutMillis);
3288   }
3289 
3290   public void waitTableDisabled(Admin admin, byte[] table, long timeoutMillis)
3291       throws InterruptedException, IOException {
3292     TableName tableName = TableName.valueOf(table);
3293     long startWait = System.currentTimeMillis();
3294     while (!admin.isTableDisabled(tableName)) {
3295       assertTrue("Timed out waiting for table to become disabled " +
3296               Bytes.toStringBinary(table),
3297           System.currentTimeMillis() - startWait < timeoutMillis);
3298       Thread.sleep(200);
3299     }
3300   }
3301 
3302   /**
3303    * Make sure that at least the specified number of region servers
3304    * are running
3305    * @param num minimum number of region servers that should be running
3306    * @return true if we started some servers
3307    * @throws IOException
3308    */
3309   public boolean ensureSomeRegionServersAvailable(final int num)
3310       throws IOException {
3311     boolean startedServer = false;
3312     MiniHBaseCluster hbaseCluster = getMiniHBaseCluster();
3313     for (int i=hbaseCluster.getLiveRegionServerThreads().size(); i<num; ++i) {
3314       LOG.info("Started new server=" + hbaseCluster.startRegionServer());
3315       startedServer = true;
3316     }
3317 
3318     return startedServer;
3319   }
3320 
3321 
3322   /**
3323    * Make sure that at least the specified number of region servers
3324    * are running. We don't count the ones that are currently stopping or are
3325    * stopped.
3326    * @param num minimum number of region servers that should be running
3327    * @return true if we started some servers
3328    * @throws IOException
3329    */
3330   public boolean ensureSomeNonStoppedRegionServersAvailable(final int num)
3331     throws IOException {
3332     boolean startedServer = ensureSomeRegionServersAvailable(num);
3333 
3334     int nonStoppedServers = 0;
3335     for (JVMClusterUtil.RegionServerThread rst :
3336       getMiniHBaseCluster().getRegionServerThreads()) {
3337 
3338       HRegionServer hrs = rst.getRegionServer();
3339       if (hrs.isStopping() || hrs.isStopped()) {
3340         LOG.info("A region server is stopped or stopping:"+hrs);
3341       } else {
3342         nonStoppedServers++;
3343       }
3344     }
3345     for (int i=nonStoppedServers; i<num; ++i) {
3346       LOG.info("Started new server=" + getMiniHBaseCluster().startRegionServer());
3347       startedServer = true;
3348     }
3349     return startedServer;
3350   }
3351 
3352 
3353   /**
3354    * This method clones the passed <code>c</code> configuration setting a new
3355    * user into the clone.  Use it getting new instances of FileSystem.  Only
3356    * works for DistributedFileSystem.
3357    * @param c Initial configuration
3358    * @param differentiatingSuffix Suffix to differentiate this user from others.
3359    * @return A new configuration instance with a different user set into it.
3360    * @throws IOException
3361    */
3362   public static User getDifferentUser(final Configuration c,
3363     final String differentiatingSuffix)
3364   throws IOException {
3365     FileSystem currentfs = FileSystem.get(c);
3366     if (!(currentfs instanceof DistributedFileSystem)) {
3367       return User.getCurrent();
3368     }
3369     // Else distributed filesystem.  Make a new instance per daemon.  Below
3370     // code is taken from the AppendTestUtil over in hdfs.
3371     String username = User.getCurrent().getName() +
3372       differentiatingSuffix;
3373     User user = User.createUserForTesting(c, username,
3374         new String[]{"supergroup"});
3375     return user;
3376   }
3377 
3378   public static NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster)
3379       throws IOException {
3380     NavigableSet<String> online = new TreeSet<String>();
3381     for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
3382       try {
3383         for (HRegionInfo region :
3384             ProtobufUtil.getOnlineRegions(rst.getRegionServer().getRSRpcServices())) {
3385           online.add(region.getRegionNameAsString());
3386         }
3387       } catch (RegionServerStoppedException e) {
3388         // That's fine.
3389       }
3390     }
3391     for (MasterThread mt : cluster.getLiveMasterThreads()) {
3392       try {
3393         for (HRegionInfo region :
3394             ProtobufUtil.getOnlineRegions(mt.getMaster().getRSRpcServices())) {
3395           online.add(region.getRegionNameAsString());
3396         }
3397       } catch (RegionServerStoppedException e) {
3398         // That's fine.
3399       } catch (ServerNotRunningYetException e) {
3400         // That's fine.
3401       }
3402     }
3403     return online;
3404   }
3405 
3406   /**
3407    * Set maxRecoveryErrorCount in DFSClient.  In 0.20 pre-append its hard-coded to 5 and
3408    * makes tests linger.  Here is the exception you'll see:
3409    * <pre>
3410    * 2010-06-15 11:52:28,511 WARN  [DataStreamer for file /hbase/.logs/wal.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed  because recovery from primary datanode 127.0.0.1:53683 failed 4 times.  Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry...
3411    * </pre>
3412    * @param stream A DFSClient.DFSOutputStream.
3413    * @param max
3414    * @throws NoSuchFieldException
3415    * @throws SecurityException
3416    * @throws IllegalAccessException
3417    * @throws IllegalArgumentException
3418    */
3419   public static void setMaxRecoveryErrorCount(final OutputStream stream,
3420       final int max) {
3421     try {
3422       Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
3423       for (Class<?> clazz: clazzes) {
3424         String className = clazz.getSimpleName();
3425         if (className.equals("DFSOutputStream")) {
3426           if (clazz.isInstance(stream)) {
3427             Field maxRecoveryErrorCountField =
3428               stream.getClass().getDeclaredField("maxRecoveryErrorCount");
3429             maxRecoveryErrorCountField.setAccessible(true);
3430             maxRecoveryErrorCountField.setInt(stream, max);
3431             break;
3432           }
3433         }
3434       }
3435     } catch (Exception e) {
3436       LOG.info("Could not set max recovery field", e);
3437     }
3438   }
3439 
3440   /**
3441    * Move region to destination server and wait till region is completely moved and online
3442    *
3443    * @param destRegion region to move
3444    * @param destServer destination server of the region
3445    * @throws InterruptedException
3446    * @throws IOException
3447    */
3448   public void moveRegionAndWait(HRegionInfo destRegion, ServerName destServer)
3449       throws InterruptedException, IOException {
3450     HMaster master = getMiniHBaseCluster().getMaster();
3451     getHBaseAdmin().move(destRegion.getEncodedNameAsBytes(),
3452         Bytes.toBytes(destServer.getServerName()));
3453     while (true) {
3454       ServerName serverName = master.getAssignmentManager().getRegionStates()
3455           .getRegionServerOfRegion(destRegion);
3456       if (serverName != null && serverName.equals(destServer)) {
3457         assertRegionOnServer(destRegion, serverName, 200);
3458         break;
3459       }
3460       Thread.sleep(10);
3461     }
3462   }
3463 
3464   /**
3465    * Wait until all regions for a table in hbase:meta have a non-empty
3466    * info:server, up to a configuable timeout value (default is 60 seconds)
3467    * This means all regions have been deployed,
3468    * master has been informed and updated hbase:meta with the regions deployed
3469    * server.
3470    * @param tableName the table name
3471    * @throws IOException
3472    */
3473   public void waitUntilAllRegionsAssigned(final TableName tableName) throws IOException {
3474     waitUntilAllRegionsAssigned(
3475       tableName,
3476       this.conf.getLong("hbase.client.sync.wait.timeout.msec", 60000));
3477   }
3478 
3479   /**
3480    * Waith until all system table's regions get assigned
3481    * @throws IOException
3482    */
3483   public void waitUntilAllSystemRegionsAssigned() throws IOException {
3484     waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
3485     waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
3486     if(BackupManager.isBackupEnabled(conf)){
3487       waitUntilAllRegionsAssigned(TableName.BACKUP_TABLE_NAME);
3488     }
3489   }
3490   
3491   /**
3492    * Wait until all regions for a table in hbase:meta have a non-empty
3493    * info:server, or until timeout.  This means all regions have been deployed,
3494    * master has been informed and updated hbase:meta with the regions deployed
3495    * server.
3496    * @param tableName the table name
3497    * @param timeout timeout, in milliseconds
3498    * @throws IOException
3499    */
3500   public void waitUntilAllRegionsAssigned(final TableName tableName, final long timeout)
3501       throws IOException {
3502     final Table meta = new HTable(getConfiguration(), TableName.META_TABLE_NAME);
3503     try {
3504       waitFor(timeout, 200, true, new Predicate<IOException>() {
3505         @Override
3506         public boolean evaluate() throws IOException {
3507           boolean allRegionsAssigned = true;
3508           Scan scan = new Scan();
3509           scan.addFamily(HConstants.CATALOG_FAMILY);
3510           ResultScanner s = meta.getScanner(scan);
3511           try {
3512             Result r;
3513             while ((r = s.next()) != null) {
3514               byte[] b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
3515               HRegionInfo info = HRegionInfo.parseFromOrNull(b);
3516               if (info != null && info.getTable().equals(tableName)) {
3517                 b = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
3518                 allRegionsAssigned &= (b != null);
3519               }
3520             }
3521           } finally {
3522             s.close();
3523           }
3524           return allRegionsAssigned;
3525         }
3526       });
3527     } finally {
3528       meta.close();
3529     }
3530 
3531     // check from the master state if we are using a mini cluster
3532     if (!getHBaseClusterInterface().isDistributedCluster()) {
3533       // So, all regions are in the meta table but make sure master knows of the assignments before
3534       // returing -- sometimes this can lag.
3535       HMaster master = getHBaseCluster().getMaster();
3536       final RegionStates states = master.getAssignmentManager().getRegionStates();
3537       waitFor(timeout, 200, new ExplainingPredicate<IOException>() {
3538         @Override
3539         public String explainFailure() throws IOException {
3540           return explainTableAvailability(tableName);
3541         }
3542 
3543         @Override
3544         public boolean evaluate() throws IOException {
3545           List<HRegionInfo> hris = states.getRegionsOfTable(tableName);
3546           return hris != null && !hris.isEmpty();
3547         }
3548       });
3549     }
3550   }
3551 
3552   /**
3553    * Do a small get/scan against one store. This is required because store
3554    * has no actual methods of querying itself, and relies on StoreScanner.
3555    */
3556   public static List<Cell> getFromStoreFile(HStore store,
3557                                                 Get get) throws IOException {
3558     Scan scan = new Scan(get);
3559     InternalScanner scanner = (InternalScanner) store.getScanner(scan,
3560         scan.getFamilyMap().get(store.getFamily().getName()),
3561         // originally MultiVersionConsistencyControl.resetThreadReadPoint() was called to set
3562         // readpoint 0.
3563         0);
3564 
3565     List<Cell> result = new ArrayList<Cell>();
3566     scanner.next(result);
3567     if (!result.isEmpty()) {
3568       // verify that we are on the row we want:
3569       Cell kv = result.get(0);
3570       if (!CellUtil.matchingRow(kv, get.getRow())) {
3571         result.clear();
3572       }
3573     }
3574     scanner.close();
3575     return result;
3576   }
3577 
3578   /**
3579    * Create region split keys between startkey and endKey
3580    *
3581    * @param startKey
3582    * @param endKey
3583    * @param numRegions the number of regions to be created. it has to be greater than 3.
3584    * @return
3585    */
3586   public byte[][] getRegionSplitStartKeys(byte[] startKey, byte[] endKey, int numRegions){
3587     assertTrue(numRegions>3);
3588     byte [][] tmpSplitKeys = Bytes.split(startKey, endKey, numRegions - 3);
3589     byte [][] result = new byte[tmpSplitKeys.length+1][];
3590     System.arraycopy(tmpSplitKeys, 0, result, 1, tmpSplitKeys.length);
3591     result[0] = HConstants.EMPTY_BYTE_ARRAY;
3592     return result;
3593   }
3594 
3595   /**
3596    * Do a small get/scan against one store. This is required because store
3597    * has no actual methods of querying itself, and relies on StoreScanner.
3598    */
3599   public static List<Cell> getFromStoreFile(HStore store,
3600                                                 byte [] row,
3601                                                 NavigableSet<byte[]> columns
3602                                                 ) throws IOException {
3603     Get get = new Get(row);
3604     Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
3605     s.put(store.getFamily().getName(), columns);
3606 
3607     return getFromStoreFile(store,get);
3608   }
3609 
3610   /**
3611    * Gets a ZooKeeperWatcher.
3612    * @param TEST_UTIL
3613    */
3614   public static ZooKeeperWatcher getZooKeeperWatcher(
3615       HBaseTestingUtility TEST_UTIL) throws ZooKeeperConnectionException,
3616       IOException {
3617     ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
3618         "unittest", new Abortable() {
3619           boolean aborted = false;
3620 
3621           @Override
3622           public void abort(String why, Throwable e) {
3623             aborted = true;
3624             throw new RuntimeException("Fatal ZK error, why=" + why, e);
3625           }
3626 
3627           @Override
3628           public boolean isAborted() {
3629             return aborted;
3630           }
3631         });
3632     return zkw;
3633   }
3634 
3635   /**
3636    * Creates a znode with OPENED state.
3637    * @param TEST_UTIL
3638    * @param region
3639    * @param serverName
3640    * @return
3641    * @throws IOException
3642    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
3643    * @throws KeeperException
3644    * @throws NodeExistsException
3645    */
3646   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3647       HBaseTestingUtility TEST_UTIL, Region region,
3648       ServerName serverName) throws ZooKeeperConnectionException,
3649       IOException, KeeperException, NodeExistsException {
3650     return createAndForceNodeToOpenedState(TEST_UTIL, (HRegion)region, serverName);
3651   }
3652 
3653   /**
3654    * Creates a znode with OPENED state.
3655    * @param TEST_UTIL
3656    * @param region
3657    * @param serverName
3658    * @return
3659    * @throws IOException
3660    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
3661    * @throws KeeperException
3662    * @throws NodeExistsException
3663    */
3664   public static ZooKeeperWatcher createAndForceNodeToOpenedState(
3665       HBaseTestingUtility TEST_UTIL, HRegion region,
3666       ServerName serverName) throws ZooKeeperConnectionException,
3667       IOException, KeeperException, NodeExistsException {
3668     ZooKeeperWatcher zkw = getZooKeeperWatcher(TEST_UTIL);
3669     ZKAssign.createNodeOffline(zkw, region.getRegionInfo(), serverName);
3670     int version = ZKAssign.transitionNodeOpening(zkw, region
3671         .getRegionInfo(), serverName);
3672     ZKAssign.transitionNodeOpened(zkw, region.getRegionInfo(), serverName,
3673         version);
3674     return zkw;
3675   }
3676 
3677   public static void assertKVListsEqual(String additionalMsg,
3678       final List<? extends Cell> expected,
3679       final List<? extends Cell> actual) {
3680     final int eLen = expected.size();
3681     final int aLen = actual.size();
3682     final int minLen = Math.min(eLen, aLen);
3683 
3684     int i;
3685     for (i = 0; i < minLen
3686         && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
3687         ++i) {}
3688 
3689     if (additionalMsg == null) {
3690       additionalMsg = "";
3691     }
3692     if (!additionalMsg.isEmpty()) {
3693       additionalMsg = ". " + additionalMsg;
3694     }
3695 
3696     if (eLen != aLen || i != minLen) {
3697       throw new AssertionError(
3698           "Expected and actual KV arrays differ at position " + i + ": " +
3699           safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
3700           safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
3701     }
3702   }
3703 
3704   public static <T> String safeGetAsStr(List<T> lst, int i) {
3705     if (0 <= i && i < lst.size()) {
3706       return lst.get(i).toString();
3707     } else {
3708       return "<out_of_range>";
3709     }
3710   }
3711 
3712   public String getClusterKey() {
3713     return conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
3714         + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":"
3715         + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
3716             HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
3717   }
3718 
3719   /** Creates a random table with the given parameters */
3720   public HTable createRandomTable(String tableName,
3721       final Collection<String> families,
3722       final int maxVersions,
3723       final int numColsPerRow,
3724       final int numFlushes,
3725       final int numRegions,
3726       final int numRowsPerFlush)
3727       throws IOException, InterruptedException {
3728 
3729     LOG.info("\n\nCreating random table " + tableName + " with " + numRegions +
3730         " regions, " + numFlushes + " storefiles per region, " +
3731         numRowsPerFlush + " rows per flush, maxVersions=" +  maxVersions +
3732         "\n");
3733 
3734     final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L);
3735     final int numCF = families.size();
3736     final byte[][] cfBytes = new byte[numCF][];
3737     {
3738       int cfIndex = 0;
3739       for (String cf : families) {
3740         cfBytes[cfIndex++] = Bytes.toBytes(cf);
3741       }
3742     }
3743 
3744     final int actualStartKey = 0;
3745     final int actualEndKey = Integer.MAX_VALUE;
3746     final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions;
3747     final int splitStartKey = actualStartKey + keysPerRegion;
3748     final int splitEndKey = actualEndKey - keysPerRegion;
3749     final String keyFormat = "%08x";
3750     final HTable table = createTable(tableName, cfBytes,
3751         maxVersions,
3752         Bytes.toBytes(String.format(keyFormat, splitStartKey)),
3753         Bytes.toBytes(String.format(keyFormat, splitEndKey)),
3754         numRegions);
3755 
3756     if (hbaseCluster != null) {
3757       getMiniHBaseCluster().flushcache(TableName.META_TABLE_NAME);
3758     }
3759 
3760     for (int iFlush = 0; iFlush < numFlushes; ++iFlush) {
3761       for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) {
3762         final byte[] row = Bytes.toBytes(String.format(keyFormat,
3763             actualStartKey + rand.nextInt(actualEndKey - actualStartKey)));
3764 
3765         Put put = new Put(row);
3766         Delete del = new Delete(row);
3767         for (int iCol = 0; iCol < numColsPerRow; ++iCol) {
3768           final byte[] cf = cfBytes[rand.nextInt(numCF)];
3769           final long ts = rand.nextInt();
3770           final byte[] qual = Bytes.toBytes("col" + iCol);
3771           if (rand.nextBoolean()) {
3772             final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
3773                 "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
3774                 ts + "_random_" + rand.nextLong());
3775             put.add(cf, qual, ts, value);
3776           } else if (rand.nextDouble() < 0.8) {
3777             del.deleteColumn(cf, qual, ts);
3778           } else {
3779             del.deleteColumns(cf, qual, ts);
3780           }
3781         }
3782 
3783         if (!put.isEmpty()) {
3784           table.put(put);
3785         }
3786 
3787         if (!del.isEmpty()) {
3788           table.delete(del);
3789         }
3790       }
3791       LOG.info("Initiating flush #" + iFlush + " for table " + tableName);
3792       table.flushCommits();
3793       if (hbaseCluster != null) {
3794         getMiniHBaseCluster().flushcache(table.getName());
3795       }
3796     }
3797 
3798     return table;
3799   }
3800 
3801   private static final int MIN_RANDOM_PORT = 0xc000;
3802   private static final int MAX_RANDOM_PORT = 0xfffe;
3803   private static Random random = new Random();
3804 
3805   /**
3806    * Returns a random port. These ports cannot be registered with IANA and are
3807    * intended for dynamic allocation (see http://bit.ly/dynports).
3808    */
3809   public static int randomPort() {
3810     return MIN_RANDOM_PORT
3811         + random.nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
3812   }
3813 
3814   /**
3815    * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be
3816    * called from single-threaded test setup code/
3817    */
3818   public static int randomFreePort() {
3819     int port = 0;
3820     do {
3821       port = randomPort();
3822       if (takenRandomPorts.contains(port)) {
3823         continue;
3824       }
3825       takenRandomPorts.add(port);
3826 
3827       try {
3828         ServerSocket sock = new ServerSocket(port);
3829         sock.close();
3830       } catch (IOException ex) {
3831         port = 0;
3832       }
3833     } while (port == 0);
3834     return port;
3835   }
3836 
3837 
3838   public static String randomMultiCastAddress() {
3839     return "226.1.1." + random.nextInt(254);
3840   }
3841 
3842 
3843 
3844   public static void waitForHostPort(String host, int port)
3845       throws IOException {
3846     final int maxTimeMs = 10000;
3847     final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS;
3848     IOException savedException = null;
3849     LOG.info("Waiting for server at " + host + ":" + port);
3850     for (int attempt = 0; attempt < maxNumAttempts; ++attempt) {
3851       try {
3852         Socket sock = new Socket(InetAddress.getByName(host), port);
3853         sock.close();
3854         savedException = null;
3855         LOG.info("Server at " + host + ":" + port + " is available");
3856         break;
3857       } catch (UnknownHostException e) {
3858         throw new IOException("Failed to look up " + host, e);
3859       } catch (IOException e) {
3860         savedException = e;
3861       }
3862       Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS);
3863     }
3864 
3865     if (savedException != null) {
3866       throw savedException;
3867     }
3868   }
3869 
3870   /**
3871    * Creates a pre-split table for load testing. If the table already exists,
3872    * logs a warning and continues.
3873    * @return the number of regions the table was split into
3874    */
3875   public static int createPreSplitLoadTestTable(Configuration conf,
3876       TableName tableName, byte[] columnFamily, Algorithm compression,
3877       DataBlockEncoding dataBlockEncoding) throws IOException {
3878     return createPreSplitLoadTestTable(conf, tableName,
3879       columnFamily, compression, dataBlockEncoding, DEFAULT_REGIONS_PER_SERVER, 1,
3880       Durability.USE_DEFAULT);
3881   }
3882   /**
3883    * Creates a pre-split table for load testing. If the table already exists,
3884    * logs a warning and continues.
3885    * @return the number of regions the table was split into
3886    */
3887   public static int createPreSplitLoadTestTable(Configuration conf,
3888       TableName tableName, byte[] columnFamily, Algorithm compression,
3889       DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3890       Durability durability)
3891           throws IOException {
3892     HTableDescriptor desc = new HTableDescriptor(tableName);
3893     desc.setDurability(durability);
3894     desc.setRegionReplication(regionReplication);
3895     HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
3896     hcd.setDataBlockEncoding(dataBlockEncoding);
3897     hcd.setCompressionType(compression);
3898     return createPreSplitLoadTestTable(conf, desc, hcd, numRegionsPerServer);
3899   }
3900 
3901   /**
3902    * Creates a pre-split table for load testing. If the table already exists,
3903    * logs a warning and continues.
3904    * @return the number of regions the table was split into
3905    */
3906   public static int createPreSplitLoadTestTable(Configuration conf,
3907       TableName tableName, byte[][] columnFamilies, Algorithm compression,
3908       DataBlockEncoding dataBlockEncoding, int numRegionsPerServer, int regionReplication,
3909       Durability durability)
3910           throws IOException {
3911     HTableDescriptor desc = new HTableDescriptor(tableName);
3912     desc.setDurability(durability);
3913     desc.setRegionReplication(regionReplication);
3914     HColumnDescriptor[] hcds = new HColumnDescriptor[columnFamilies.length];
3915     for (int i = 0; i < columnFamilies.length; i++) {
3916       HColumnDescriptor hcd = new HColumnDescriptor(columnFamilies[i]);
3917       hcd.setDataBlockEncoding(dataBlockEncoding);
3918       hcd.setCompressionType(compression);
3919       hcds[i] = hcd;
3920     }
3921     return createPreSplitLoadTestTable(conf, desc, hcds, numRegionsPerServer);
3922   }
3923 
3924   /**
3925    * Creates a pre-split table for load testing. If the table already exists,
3926    * logs a warning and continues.
3927    * @return the number of regions the table was split into
3928    */
3929   public static int createPreSplitLoadTestTable(Configuration conf,
3930       HTableDescriptor desc, HColumnDescriptor hcd) throws IOException {
3931     return createPreSplitLoadTestTable(conf, desc, hcd, DEFAULT_REGIONS_PER_SERVER);
3932   }
3933 
3934   /**
3935    * Creates a pre-split table for load testing. If the table already exists,
3936    * logs a warning and continues.
3937    * @return the number of regions the table was split into
3938    */
3939   public static int createPreSplitLoadTestTable(Configuration conf,
3940       HTableDescriptor desc, HColumnDescriptor hcd, int numRegionsPerServer) throws IOException {
3941     return createPreSplitLoadTestTable(conf, desc, new HColumnDescriptor[] {hcd},
3942         numRegionsPerServer);
3943   }
3944 
3945   /**
3946    * Creates a pre-split table for load testing. If the table already exists,
3947    * logs a warning and continues.
3948    * @return the number of regions the table was split into
3949    */
3950   public static int createPreSplitLoadTestTable(Configuration conf,
3951       HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException {
3952     for (HColumnDescriptor hcd : hcds) {
3953       if (!desc.hasFamily(hcd.getName())) {
3954         desc.addFamily(hcd);
3955       }
3956     }
3957 
3958     int totalNumberOfRegions = 0;
3959     Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
3960     Admin admin = unmanagedConnection.getAdmin();
3961 
3962     try {
3963       // create a table a pre-splits regions.
3964       // The number of splits is set as:
3965       //    region servers * regions per region server).
3966       int numberOfServers = admin.getClusterStatus().getServers().size();
3967       if (numberOfServers == 0) {
3968         throw new IllegalStateException("No live regionservers");
3969       }
3970 
3971       totalNumberOfRegions = numberOfServers * numRegionsPerServer;
3972       LOG.info("Number of live regionservers: " + numberOfServers + ", " +
3973           "pre-splitting table into " + totalNumberOfRegions + " regions " +
3974           "(regions per server: " + numRegionsPerServer + ")");
3975 
3976       byte[][] splits = new RegionSplitter.HexStringSplit().split(
3977           totalNumberOfRegions);
3978 
3979       admin.createTable(desc, splits);
3980     } catch (MasterNotRunningException e) {
3981       LOG.error("Master not running", e);
3982       throw new IOException(e);
3983     } catch (TableExistsException e) {
3984       LOG.warn("Table " + desc.getTableName() +
3985           " already exists, continuing");
3986     } finally {
3987       admin.close();
3988       unmanagedConnection.close();
3989     }
3990     return totalNumberOfRegions;
3991   }
3992 
3993   public static int getMetaRSPort(Configuration conf) throws IOException {
3994     try (Connection c = ConnectionFactory.createConnection();
3995         RegionLocator locator = c.getRegionLocator(TableName.META_TABLE_NAME)) {
3996       return locator.getRegionLocation(Bytes.toBytes("")).getPort();
3997     }
3998   }
3999 
4000   /**
4001    *  Due to async racing issue, a region may not be in
4002    *  the online region list of a region server yet, after
4003    *  the assignment znode is deleted and the new assignment
4004    *  is recorded in master.
4005    */
4006   public void assertRegionOnServer(
4007       final HRegionInfo hri, final ServerName server,
4008       final long timeout) throws IOException, InterruptedException {
4009     long timeoutTime = System.currentTimeMillis() + timeout;
4010     while (true) {
4011       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
4012       if (regions.contains(hri)) return;
4013       long now = System.currentTimeMillis();
4014       if (now > timeoutTime) break;
4015       Thread.sleep(10);
4016     }
4017     fail("Could not find region " + hri.getRegionNameAsString()
4018       + " on server " + server);
4019   }
4020 
4021   /**
4022    * Check to make sure the region is open on the specified
4023    * region server, but not on any other one.
4024    */
4025   public void assertRegionOnlyOnServer(
4026       final HRegionInfo hri, final ServerName server,
4027       final long timeout) throws IOException, InterruptedException {
4028     long timeoutTime = System.currentTimeMillis() + timeout;
4029     while (true) {
4030       List<HRegionInfo> regions = getHBaseAdmin().getOnlineRegions(server);
4031       if (regions.contains(hri)) {
4032         List<JVMClusterUtil.RegionServerThread> rsThreads =
4033           getHBaseCluster().getLiveRegionServerThreads();
4034         for (JVMClusterUtil.RegionServerThread rsThread: rsThreads) {
4035           HRegionServer rs = rsThread.getRegionServer();
4036           if (server.equals(rs.getServerName())) {
4037             continue;
4038           }
4039           Collection<Region> hrs = rs.getOnlineRegionsLocalContext();
4040           for (Region r: hrs) {
4041             assertTrue("Region should not be double assigned",
4042               r.getRegionInfo().getRegionId() != hri.getRegionId());
4043           }
4044         }
4045         return; // good, we are happy
4046       }
4047       long now = System.currentTimeMillis();
4048       if (now > timeoutTime) break;
4049       Thread.sleep(10);
4050     }
4051     fail("Could not find region " + hri.getRegionNameAsString()
4052       + " on server " + server);
4053   }
4054 
4055   public HRegion createTestRegion(String tableName, HColumnDescriptor hcd)
4056       throws IOException {
4057     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
4058     htd.addFamily(hcd);
4059     HRegionInfo info =
4060         new HRegionInfo(TableName.valueOf(tableName), null, null, false);
4061     HRegion region =
4062         HRegion.createHRegion(info, getDataTestDir(), getConfiguration(), htd);
4063     return region;
4064   }
4065 
4066   public void setFileSystemURI(String fsURI) {
4067     FS_URI = fsURI;
4068   }
4069 
4070   /**
4071    * Wrapper method for {@link Waiter#waitFor(Configuration, long, Predicate)}.
4072    */
4073   public <E extends Exception> long waitFor(long timeout, Predicate<E> predicate)
4074       throws E {
4075     return Waiter.waitFor(this.conf, timeout, predicate);
4076   }
4077 
4078   /**
4079    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, Predicate)}.
4080    */
4081   public <E extends Exception> long waitFor(long timeout, long interval, Predicate<E> predicate)
4082       throws E {
4083     return Waiter.waitFor(this.conf, timeout, interval, predicate);
4084   }
4085 
4086   /**
4087    * Wrapper method for {@link Waiter#waitFor(Configuration, long, long, boolean, Predicate)}.
4088    */
4089   public <E extends Exception> long waitFor(long timeout, long interval,
4090       boolean failIfTimeout, Predicate<E> predicate) throws E {
4091     return Waiter.waitFor(this.conf, timeout, interval, failIfTimeout, predicate);
4092   }
4093 
4094   /**
4095    * Returns a {@link Predicate} for checking that there are no regions in transition in master
4096    */
4097   public ExplainingPredicate<IOException> predicateNoRegionsInTransition() {
4098     return new ExplainingPredicate<IOException>() {
4099       @Override
4100       public String explainFailure() throws IOException {
4101         final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4102             .getAssignmentManager().getRegionStates();
4103         return "found in transition: " + regionStates.getRegionsInTransition().toString();
4104       }
4105 
4106       @Override
4107       public boolean evaluate() throws IOException {
4108         final RegionStates regionStates = getMiniHBaseCluster().getMaster()
4109             .getAssignmentManager().getRegionStates();
4110         return !regionStates.isRegionsInTransition();
4111       }
4112     };
4113   }
4114 
4115   /**
4116    * Returns a {@link Predicate} for checking that table is enabled
4117    */
4118   public Waiter.Predicate<IOException> predicateTableEnabled(final TableName tableName) {
4119     return new ExplainingPredicate<IOException>() {
4120       @Override
4121       public String explainFailure() throws IOException {
4122         return explainTableState(tableName);
4123       }
4124 
4125       @Override
4126       public boolean evaluate() throws IOException {
4127         return getHBaseAdmin().tableExists(tableName) && getHBaseAdmin().isTableEnabled(tableName);
4128       }
4129     };
4130   }
4131 
4132   /**
4133    * Returns a {@link Predicate} for checking that table is enabled
4134    */
4135   public Waiter.Predicate<IOException> predicateTableDisabled(final TableName tableName) {
4136     return new ExplainingPredicate<IOException>() {
4137       @Override
4138       public String explainFailure() throws IOException {
4139         return explainTableState(tableName);
4140       }
4141 
4142       @Override
4143       public boolean evaluate() throws IOException {
4144         return getHBaseAdmin().isTableDisabled(tableName);
4145       }
4146     };
4147   }
4148 
4149   /**
4150    * Returns a {@link Predicate} for checking that table is enabled
4151    */
4152   public Waiter.Predicate<IOException> predicateTableAvailable(final TableName tableName) {
4153     return new ExplainingPredicate<IOException>() {
4154       @Override
4155       public String explainFailure() throws IOException {
4156         return explainTableAvailability(tableName);
4157       }
4158 
4159       @Override
4160       public boolean evaluate() throws IOException {
4161         boolean tableAvailable = getHBaseAdmin().isTableAvailable(tableName);
4162         if (tableAvailable) {
4163           try {
4164             Canary.sniff(getHBaseAdmin(), tableName);
4165           } catch (Exception e) {
4166             throw new IOException("Canary sniff failed for table " + tableName, e);
4167           }
4168         }
4169         return tableAvailable;
4170       }
4171     };
4172   }
4173 
4174   /**
4175    * Wait until no regions in transition.
4176    * @param timeout How long to wait.
4177    * @throws Exception
4178    */
4179   public void waitUntilNoRegionsInTransition(
4180       final long timeout) throws Exception {
4181     waitFor(timeout, predicateNoRegionsInTransition());
4182   }
4183 
4184   /**
4185    * Wait until labels is ready in VisibilityLabelsCache.
4186    * @param timeoutMillis
4187    * @param labels
4188    */
4189   public void waitLabelAvailable(long timeoutMillis, final String... labels) {
4190     final VisibilityLabelsCache labelsCache = VisibilityLabelsCache.get();
4191     waitFor(timeoutMillis, new Waiter.ExplainingPredicate<RuntimeException>() {
4192 
4193       @Override
4194       public boolean evaluate() {
4195         for (String label : labels) {
4196           if (labelsCache.getLabelOrdinal(label) == 0) {
4197             return false;
4198           }
4199         }
4200         return true;
4201       }
4202 
4203       @Override
4204       public String explainFailure() {
4205         for (String label : labels) {
4206           if (labelsCache.getLabelOrdinal(label) == 0) {
4207             return label + " is not available yet";
4208           }
4209         }
4210         return "";
4211       }
4212     });
4213   }
4214 
4215   /**
4216    * Create a set of column descriptors with the combination of compression,
4217    * encoding, bloom codecs available.
4218    * @return the list of column descriptors
4219    */
4220   public static List<HColumnDescriptor> generateColumnDescriptors() {
4221     return generateColumnDescriptors("");
4222   }
4223 
4224   /**
4225    * Create a set of column descriptors with the combination of compression,
4226    * encoding, bloom codecs available.
4227    * @param prefix family names prefix
4228    * @return the list of column descriptors
4229    */
4230   public static List<HColumnDescriptor> generateColumnDescriptors(final String prefix) {
4231     List<HColumnDescriptor> htds = new ArrayList<HColumnDescriptor>();
4232     long familyId = 0;
4233     for (Compression.Algorithm compressionType: getSupportedCompressionAlgorithms()) {
4234       for (DataBlockEncoding encodingType: DataBlockEncoding.values()) {
4235         for (BloomType bloomType: BloomType.values()) {
4236           String name = String.format("%s-cf-!@#&-%d!@#", prefix, familyId);
4237           HColumnDescriptor htd = new HColumnDescriptor(name);
4238           htd.setCompressionType(compressionType);
4239           htd.setDataBlockEncoding(encodingType);
4240           htd.setBloomFilterType(bloomType);
4241           htds.add(htd);
4242           familyId++;
4243         }
4244       }
4245     }
4246     return htds;
4247   }
4248 
4249   /**
4250    * Get supported compression algorithms.
4251    * @return supported compression algorithms.
4252    */
4253   public static Compression.Algorithm[] getSupportedCompressionAlgorithms() {
4254     String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
4255     List<Compression.Algorithm> supportedAlgos = new ArrayList<Compression.Algorithm>();
4256     for (String algoName : allAlgos) {
4257       try {
4258         Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
4259         algo.getCompressor();
4260         supportedAlgos.add(algo);
4261       } catch (Throwable t) {
4262         // this algo is not available
4263       }
4264     }
4265     return supportedAlgos.toArray(new Algorithm[supportedAlgos.size()]);
4266   }
4267 
4268   /**
4269    * Sets up {@link MiniKdc} for testing security.
4270    * Uses {@link HBaseKerberosUtils} to set the given keytab file as
4271    * {@link HBaseKerberosUtils#KRB_KEYTAB_FILE}.
4272    */
4273   public MiniKdc setupMiniKdc(File keytabFile) throws Exception {
4274     Properties conf = MiniKdc.createConf();
4275     conf.put(MiniKdc.DEBUG, true);
4276     MiniKdc kdc = null;
4277     File dir = null;
4278     // There is time lag between selecting a port and trying to bind with it. It's possible that
4279     // another service captures the port in between which'll result in BindException.
4280     boolean bindException;
4281     int numTries = 0;
4282     do {
4283       try {
4284         bindException = false;
4285         dir = new File(getDataTestDir("kdc").toUri().getPath());
4286         kdc = new MiniKdc(conf, dir);
4287         kdc.start();
4288       } catch (BindException e) {
4289         FileUtils.deleteDirectory(dir);  // clean directory
4290         numTries++;
4291         if (numTries == 3) {
4292           LOG.error("Failed setting up MiniKDC. Tried " + numTries + " times.");
4293           throw e;
4294         }
4295         LOG.error("BindException encountered when setting up MiniKdc. Trying again.");
4296         bindException = true;
4297       }
4298     } while (bindException);
4299     HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
4300     return kdc;
4301   }
4302 }