View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import static org.junit.Assert.assertEquals;
21  import static org.junit.Assert.assertNotNull;
22  import static org.junit.Assert.assertNull;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.io.IOException;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.Random;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.Path;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HBaseTestingUtility;
36  import org.apache.hadoop.hbase.HColumnDescriptor;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.HTableDescriptor;
40  import org.apache.hadoop.hbase.MiniHBaseCluster;
41  import org.apache.hadoop.hbase.NamespaceDescriptor;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.Waiter;
44  import org.apache.hadoop.hbase.client.Admin;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Get;
48  import org.apache.hadoop.hbase.client.HTable;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Result;
51  import org.apache.hadoop.hbase.client.Table;
52  import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
53  import org.apache.hadoop.hbase.testclassification.LargeTests;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.JVMClusterUtil;
56  import org.apache.hadoop.hbase.util.Pair;
57  import org.apache.hadoop.hbase.wal.WAL;
58  import org.junit.Test;
59  import org.junit.experimental.categories.Category;
60  
61  import com.google.common.hash.Hashing;
62  
63  /**
64   * This test verifies the correctness of the Per Column Family flushing strategy
65   */
66  @Category(LargeTests.class)
67  public class TestPerColumnFamilyFlush {
68    private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class);
69  
70    Region region = null;
71  
72    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
73  
74    private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion");
75  
76    public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1");
77  
78    public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"),
79        Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
80  
81    public static final byte[] FAMILY1 = FAMILIES[0];
82  
83    public static final byte[] FAMILY2 = FAMILIES[1];
84  
85    public static final byte[] FAMILY3 = FAMILIES[2];
86  
87    private void initHRegion(String callingMethod, Configuration conf) throws IOException {
88      HTableDescriptor htd = new HTableDescriptor(TABLENAME);
89      for (byte[] family : FAMILIES) {
90        htd.addFamily(new HColumnDescriptor(family));
91      }
92      HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
93      Path path = new Path(DIR, callingMethod);
94      region = HRegion.createHRegion(info, path, conf, htd);
95    }
96  
97    // A helper function to create puts.
98    private Put createPut(int familyNum, int putNum) {
99      byte[] qf = Bytes.toBytes("q" + familyNum);
100     byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
101     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
102     Put p = new Put(row);
103     p.addColumn(FAMILIES[familyNum - 1], qf, val);
104     return p;
105   }
106 
107   // A helper function to create puts.
108   private Get createGet(int familyNum, int putNum) {
109     byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
110     return new Get(row);
111   }
112 
113   // A helper function to verify edits.
114   void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
115     Result r = table.get(createGet(familyNum, putNum));
116     byte[] family = FAMILIES[familyNum - 1];
117     byte[] qf = Bytes.toBytes("q" + familyNum);
118     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
119     assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family));
120     assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum),
121       r.getFamilyMap(family).get(qf));
122     assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum),
123       Arrays.equals(r.getFamilyMap(family).get(qf), val));
124   }
125 
126   @Test(timeout = 180000)
127   public void testSelectiveFlushWhenEnabled() throws IOException {
128     // Set up the configuration
129     Configuration conf = HBaseConfiguration.create();
130     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
131     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
132     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
133       100 * 1024);
134     // Intialize the HRegion
135     initHRegion("testSelectiveFlushWhenEnabled", conf);
136     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
137     for (int i = 1; i <= 1200; i++) {
138       region.put(createPut(1, i));
139 
140       if (i <= 100) {
141         region.put(createPut(2, i));
142         if (i <= 50) {
143           region.put(createPut(3, i));
144         }
145       }
146     }
147 
148     long totalMemstoreSize = region.getMemstoreSize();
149 
150     // Find the smallest LSNs for edits wrt to each CF.
151     long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1);
152     long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2);
153     long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3);
154 
155     // Find the sizes of the memstores of each CF.
156     long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
157     long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
158     long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
159 
160     // Get the overall smallest LSN in the region's memstores.
161     long smallestSeqInRegionCurrentMemstore = getWAL(region)
162         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
163 
164     // The overall smallest LSN in the region's memstores should be the same as
165     // the LSN of the smallest edit in CF1
166     assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore);
167 
168     // Some other sanity checks.
169     assertTrue(smallestSeqCF1 < smallestSeqCF2);
170     assertTrue(smallestSeqCF2 < smallestSeqCF3);
171     assertTrue(cf1MemstoreSize > 0);
172     assertTrue(cf2MemstoreSize > 0);
173     assertTrue(cf3MemstoreSize > 0);
174 
175     // The total memstore size should be the same as the sum of the sizes of
176     // memstores of CF1, CF2 and CF3.
177     assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
178         + cf2MemstoreSize + cf3MemstoreSize);
179 
180     // Flush!
181     region.flush(false);
182 
183     // Will use these to check if anything changed.
184     long oldCF2MemstoreSize = cf2MemstoreSize;
185     long oldCF3MemstoreSize = cf3MemstoreSize;
186 
187     // Recalculate everything
188     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
189     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
190     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
191     totalMemstoreSize = region.getMemstoreSize();
192     smallestSeqInRegionCurrentMemstore = getWAL(region)
193         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
194 
195     // We should have cleared out only CF1, since we chose the flush thresholds
196     // and number of puts accordingly.
197     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
198     // Nothing should have happened to CF2, ...
199     assertEquals(cf2MemstoreSize, oldCF2MemstoreSize);
200     // ... or CF3
201     assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
202     // Now the smallest LSN in the region should be the same as the smallest
203     // LSN in the memstore of CF2.
204     assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2);
205     // Of course, this should hold too.
206     assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
207         + cf3MemstoreSize);
208 
209     // Now add more puts (mostly for CF2), so that we only flush CF2 this time.
210     for (int i = 1200; i < 2400; i++) {
211       region.put(createPut(2, i));
212 
213       // Add only 100 puts for CF3
214       if (i - 1200 < 100) {
215         region.put(createPut(3, i));
216       }
217     }
218 
219     // How much does the CF3 memstore occupy? Will be used later.
220     oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
221 
222     // Flush again
223     region.flush(false);
224 
225     // Recalculate everything
226     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
227     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
228     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
229     totalMemstoreSize = region.getMemstoreSize();
230     smallestSeqInRegionCurrentMemstore = getWAL(region)
231         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
232 
233     // CF1 and CF2, both should be absent.
234     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
235     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
236     // CF3 shouldn't have been touched.
237     assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
238     assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
239     assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);
240 
241     // What happens when we hit the memstore limit, but we are not able to find
242     // any Column Family above the threshold?
243     // In that case, we should flush all the CFs.
244 
245     // Clearing the existing memstores.
246     region.flush(true);
247 
248     // The memstore limit is 200*1024 and the column family flush threshold is
249     // around 50*1024. We try to just hit the memstore limit with each CF's
250     // memstore being below the CF flush threshold.
251     for (int i = 1; i <= 300; i++) {
252       region.put(createPut(1, i));
253       region.put(createPut(2, i));
254       region.put(createPut(3, i));
255       region.put(createPut(4, i));
256       region.put(createPut(5, i));
257     }
258 
259     region.flush(false);
260 
261     // Since we won't find any CF above the threshold, and hence no specific
262     // store to flush, we should flush all the memstores.
263     assertEquals(0, region.getMemstoreSize());
264   }
265 
266   @Test(timeout = 180000)
267   public void testSelectiveFlushWhenNotEnabled() throws IOException {
268     // Set up the configuration
269     Configuration conf = HBaseConfiguration.create();
270     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024);
271     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
272 
273     // Intialize the HRegion
274     initHRegion("testSelectiveFlushWhenNotEnabled", conf);
275     // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3
276     for (int i = 1; i <= 1200; i++) {
277       region.put(createPut(1, i));
278       if (i <= 100) {
279         region.put(createPut(2, i));
280         if (i <= 50) {
281           region.put(createPut(3, i));
282         }
283       }
284     }
285 
286     long totalMemstoreSize = region.getMemstoreSize();
287 
288     // Find the sizes of the memstores of each CF.
289     long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
290     long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
291     long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
292 
293     // Some other sanity checks.
294     assertTrue(cf1MemstoreSize > 0);
295     assertTrue(cf2MemstoreSize > 0);
296     assertTrue(cf3MemstoreSize > 0);
297 
298     // The total memstore size should be the same as the sum of the sizes of
299     // memstores of CF1, CF2 and CF3.
300     assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize
301         + cf2MemstoreSize + cf3MemstoreSize);
302 
303     // Flush!
304     region.flush(false);
305 
306     cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize();
307     cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
308     cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
309     totalMemstoreSize = region.getMemstoreSize();
310     long smallestSeqInRegionCurrentMemstore = ((HRegion)region).getWAL()
311         .getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
312 
313     // Everything should have been cleared
314     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
315     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize);
316     assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize);
317     assertEquals(0, totalMemstoreSize);
318     assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore);
319   }
320 
321   // Find the (first) region which has the specified name.
322   private static Pair<Region, HRegionServer> getRegionWithName(TableName tableName) {
323     MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
324     List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
325     for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
326       HRegionServer hrs = rsts.get(i).getRegionServer();
327       for (Region region : hrs.getOnlineRegions(tableName)) {
328         return Pair.newPair(region, hrs);
329       }
330     }
331     return null;
332   }
333 
334   private void doTestLogReplay() throws Exception {
335     Configuration conf = TEST_UTIL.getConfiguration();
336     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
337     // Carefully chosen limits so that the memstore just flushes when we're done
338     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
339     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 10000);
340     final int numRegionServers = 4;
341     try {
342       TEST_UTIL.startMiniCluster(numRegionServers);
343       TEST_UTIL.getHBaseAdmin().createNamespace(
344         NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
345       HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
346       HTableDescriptor htd = table.getTableDescriptor();
347 
348       for (byte[] family : FAMILIES) {
349         if (!htd.hasFamily(family)) {
350           htd.addFamily(new HColumnDescriptor(family));
351         }
352       }
353 
354       // Add 100 edits for CF1, 20 for CF2, 20 for CF3.
355       // These will all be interleaved in the log.
356       for (int i = 1; i <= 80; i++) {
357         table.put(createPut(1, i));
358         if (i <= 10) {
359           table.put(createPut(2, i));
360           table.put(createPut(3, i));
361         }
362       }
363       table.flushCommits();
364       Thread.sleep(1000);
365 
366       Pair<Region, HRegionServer> desiredRegionAndServer = getRegionWithName(TABLENAME);
367       Region desiredRegion = desiredRegionAndServer.getFirst();
368       assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
369 
370       // Flush the region selectively.
371       desiredRegion.flush(false);
372 
373       long totalMemstoreSize;
374       long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize;
375       totalMemstoreSize = desiredRegion.getMemstoreSize();
376 
377       // Find the sizes of the memstores of each CF.
378       cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize();
379       cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize();
380       cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize();
381 
382       // CF1 Should have been flushed
383       assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize);
384       // CF2 and CF3 shouldn't have been flushed.
385       assertTrue(cf2MemstoreSize > 0);
386       assertTrue(cf3MemstoreSize > 0);
387       assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize
388           + cf3MemstoreSize);
389 
390       // Wait for the RS report to go across to the master, so that the master
391       // is aware of which sequence ids have been flushed, before we kill the RS.
392       // If in production, the RS dies before the report goes across, we will
393       // safely replay all the edits.
394       Thread.sleep(2000);
395 
396       // Abort the region server where we have the region hosted.
397       HRegionServer rs = desiredRegionAndServer.getSecond();
398       rs.abort("testing");
399 
400       // The aborted region server's regions will be eventually assigned to some
401       // other region server, and the get RPC call (inside verifyEdit()) will
402       // retry for some time till the regions come back up.
403 
404       // Verify that all the edits are safe.
405       for (int i = 1; i <= 80; i++) {
406         verifyEdit(1, i, table);
407         if (i <= 10) {
408           verifyEdit(2, i, table);
409           verifyEdit(3, i, table);
410         }
411       }
412     } finally {
413       TEST_UTIL.shutdownMiniCluster();
414     }
415   }
416 
417   // Test Log Replay with Distributed Replay on.
418   // In distributed log replay, the log splitters ask the master for the
419   // last flushed sequence id for a region. This test would ensure that we
420   // are doing the book-keeping correctly.
421   @Test(timeout = 180000)
422   public void testLogReplayWithDistributedReplay() throws Exception {
423     TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
424     doTestLogReplay();
425   }
426 
427   // Test Log Replay with Distributed log split on.
428   @Test(timeout = 180000)
429   public void testLogReplayWithDistributedLogSplit() throws Exception {
430     TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
431     doTestLogReplay();
432   }
433 
434   private WAL getWAL(Region region) {
435     return ((HRegion)region).getWAL();
436   }
437 
438   private int getNumRolledLogFiles(Region region) {
439     return ((FSHLog)getWAL(region)).getNumRolledLogFiles();
440   }
441 
442   /**
443    * When a log roll is about to happen, we do a flush of the regions who will be affected by the
444    * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This
445    * test ensures that we do a full-flush in that scenario.
446    * @throws IOException
447    */
448   @Test(timeout = 180000)
449   public void testFlushingWhenLogRolling() throws Exception {
450     TableName tableName = TableName.valueOf("testFlushingWhenLogRolling");
451     Configuration conf = TEST_UTIL.getConfiguration();
452     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
453     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
454     long cfFlushSizeLowerBound = 2048;
455     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN,
456       cfFlushSizeLowerBound);
457 
458     // One hour, prevent periodic rolling
459     conf.setLong("hbase.regionserver.logroll.period", 60L * 60 * 1000);
460     // prevent rolling by size
461     conf.setLong("hbase.regionserver.hlog.blocksize", 128L * 1024 * 1024);
462     // Make it 10 as max logs before a flush comes on.
463     final int maxLogs = 10;
464     conf.setInt("hbase.regionserver.maxlogs", maxLogs);
465 
466     final int numRegionServers = 1;
467     TEST_UTIL.startMiniCluster(numRegionServers);
468     try {
469       HTable table = null;
470       table = TEST_UTIL.createTable(tableName, FAMILIES);
471       // Force flush the namespace table so edits to it are not hanging around as oldest
472       // edits. Otherwise, below, when we make maximum number of WAL files, then it will be
473       // the namespace region that is flushed and not the below 'desiredRegion'.
474       try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
475         admin.flush(TableName.NAMESPACE_TABLE_NAME);
476       }
477       Pair<Region, HRegionServer> desiredRegionAndServer = getRegionWithName(tableName);
478       final Region desiredRegion = desiredRegionAndServer.getFirst();
479       assertTrue("Could not find a region which hosts the new region.", desiredRegion != null);
480       LOG.info("Writing to region=" + desiredRegion);
481 
482       // Add one row for both CFs.
483       for (int i = 1; i <= 3; i++) {
484         table.put(createPut(i, 0));
485       }
486       // Now only add row to CF1, make sure when we force a flush, CF1 is larger than the lower
487       // bound and CF2 and CF3 are smaller than the lower bound.
488       for (int i = 0; i < maxLogs; i++) {
489         for (int j = 0; j < 100; j++) {
490           table.put(createPut(1, i * 100 + j));
491         }
492         table.flushCommits();
493         // Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
494         int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
495         assertNull(getWAL(desiredRegion).rollWriter());
496         while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
497           Thread.sleep(100);
498         }
499       }
500       table.close();
501       assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
502       assertTrue(desiredRegion.getStore(FAMILY1).getMemStoreSize() > cfFlushSizeLowerBound);
503       assertTrue(desiredRegion.getStore(FAMILY2).getMemStoreSize() < cfFlushSizeLowerBound);
504       assertTrue(desiredRegion.getStore(FAMILY3).getMemStoreSize() < cfFlushSizeLowerBound);
505       table.put(createPut(1, 12345678));
506       table.flushCommits();
507       // Make numRolledLogFiles greater than maxLogs
508       desiredRegionAndServer.getSecond().walRoller.requestRollAll();
509       // Wait for some time till the flush caused by log rolling happens.
510       TEST_UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
511 
512         @Override
513         public boolean evaluate() throws Exception {
514           return desiredRegion.getMemstoreSize() == 0;
515         }
516 
517         @Override
518         public String explainFailure() throws Exception {
519           long memstoreSize = desiredRegion.getMemstoreSize();
520           if (memstoreSize > 0) {
521             return "Still have unflushed entries in memstore, memstore size is " + memstoreSize;
522           }
523           return "Unknown";
524         }
525       });
526       LOG.info("Finished waiting on flush after too many WALs...");
527       // Individual families should have been flushed.
528       assertEquals(DefaultMemStore.DEEP_OVERHEAD,
529         desiredRegion.getStore(FAMILY1).getMemStoreSize());
530       assertEquals(DefaultMemStore.DEEP_OVERHEAD,
531         desiredRegion.getStore(FAMILY2).getMemStoreSize());
532       assertEquals(DefaultMemStore.DEEP_OVERHEAD,
533         desiredRegion.getStore(FAMILY3).getMemStoreSize());
534       // let WAL cleanOldLogs
535       assertNull(getWAL(desiredRegion).rollWriter(true));
536       assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
537     } finally {
538       TEST_UTIL.shutdownMiniCluster();
539     }
540   }
541 
542   private void doPut(Table table, long memstoreFlushSize) throws IOException, InterruptedException {
543     Region region = getRegionWithName(table.getName()).getFirst();
544     // cf1 4B per row, cf2 40B per row and cf3 400B per row
545     byte[] qf = Bytes.toBytes("qf");
546     Random rand = new Random();
547     byte[] value1 = new byte[100];
548     byte[] value2 = new byte[200];
549     byte[] value3 = new byte[400];
550     for (int i = 0; i < 10000; i++) {
551       Put put = new Put(Bytes.toBytes("row-" + i));
552       rand.setSeed(i);
553       rand.nextBytes(value1);
554       rand.nextBytes(value2);
555       rand.nextBytes(value3);
556       put.addColumn(FAMILY1, qf, value1);
557       put.addColumn(FAMILY2, qf, value2);
558       put.addColumn(FAMILY3, qf, value3);
559       table.put(put);
560       // slow down to let regionserver flush region.
561       while (region.getMemstoreSize() > memstoreFlushSize) {
562         Thread.sleep(100);
563       }
564     }
565   }
566 
567   // Under the same write load, small stores should have less store files when
568   // percolumnfamilyflush enabled.
569   @Test(timeout = 180000)
570   public void testCompareStoreFileCount() throws Exception {
571     long memstoreFlushSize = 1024L * 1024;
572     Configuration conf = TEST_UTIL.getConfiguration();
573     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, memstoreFlushSize);
574     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName());
575     conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
576     conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
577       ConstantSizeRegionSplitPolicy.class.getName());
578 
579     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
580     htd.setCompactionEnabled(false);
581     htd.addFamily(new HColumnDescriptor(FAMILY1));
582     htd.addFamily(new HColumnDescriptor(FAMILY2));
583     htd.addFamily(new HColumnDescriptor(FAMILY3));
584 
585     LOG.info("==============Test with selective flush disabled===============");
586     int cf1StoreFileCount = -1;
587     int cf2StoreFileCount = -1;
588     int cf3StoreFileCount = -1;
589     int cf1StoreFileCount1 = -1;
590     int cf2StoreFileCount1 = -1;
591     int cf3StoreFileCount1 = -1;
592     try {
593       TEST_UTIL.startMiniCluster(1);
594       TEST_UTIL.getHBaseAdmin().createNamespace(
595         NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
596       TEST_UTIL.getHBaseAdmin().createTable(htd);
597       TEST_UTIL.waitTableAvailable(TABLENAME);
598       Connection conn = ConnectionFactory.createConnection(conf);
599       Table table = conn.getTable(TABLENAME);
600       doPut(table, memstoreFlushSize);
601       table.close();
602       conn.close();
603 
604       Region region = getRegionWithName(TABLENAME).getFirst();
605       cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount();
606       cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount();
607       cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount();
608     } finally {
609       TEST_UTIL.shutdownMiniCluster();
610     }
611 
612     LOG.info("==============Test with selective flush enabled===============");
613     conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
614     // default value of per-cf flush lower bound is too big, set to a small enough value
615     conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, 0);
616     try {
617       TEST_UTIL.startMiniCluster(1);
618       TEST_UTIL.getHBaseAdmin().createNamespace(
619         NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
620       TEST_UTIL.getHBaseAdmin().createTable(htd);
621       Connection conn = ConnectionFactory.createConnection(conf);
622       Table table = conn.getTable(TABLENAME);
623       doPut(table, memstoreFlushSize);
624       table.close();
625       conn.close();
626 
627       region = getRegionWithName(TABLENAME).getFirst();
628       cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount();
629       cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount();
630       cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount();
631     } finally {
632       TEST_UTIL.shutdownMiniCluster();
633     }
634 
635     LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount
636         + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", "
637         + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount);
638     LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1
639         + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", "
640         + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1);
641     // small CF will have less store files.
642     assertTrue(cf1StoreFileCount1 < cf1StoreFileCount);
643     assertTrue(cf2StoreFileCount1 < cf2StoreFileCount);
644   }
645 
646   public static void main(String[] args) throws Exception {
647     int numRegions = Integer.parseInt(args[0]);
648     long numRows = Long.parseLong(args[1]);
649 
650     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
651     htd.setMaxFileSize(10L * 1024 * 1024 * 1024);
652     htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
653     htd.addFamily(new HColumnDescriptor(FAMILY1));
654     htd.addFamily(new HColumnDescriptor(FAMILY2));
655     htd.addFamily(new HColumnDescriptor(FAMILY3));
656 
657     Configuration conf = HBaseConfiguration.create();
658     Connection conn = ConnectionFactory.createConnection(conf);
659     Admin admin = conn.getAdmin();
660     if (admin.tableExists(TABLENAME)) {
661       admin.disableTable(TABLENAME);
662       admin.deleteTable(TABLENAME);
663     }
664     if (numRegions >= 3) {
665       byte[] startKey = new byte[16];
666       byte[] endKey = new byte[16];
667       Arrays.fill(endKey, (byte) 0xFF);
668       admin.createTable(htd, startKey, endKey, numRegions);
669     } else {
670       admin.createTable(htd);
671     }
672     admin.close();
673 
674     Table table = conn.getTable(TABLENAME);
675     byte[] qf = Bytes.toBytes("qf");
676     Random rand = new Random();
677     byte[] value1 = new byte[16];
678     byte[] value2 = new byte[256];
679     byte[] value3 = new byte[4096];
680     for (long i = 0; i < numRows; i++) {
681       Put put = new Put(Hashing.md5().hashLong(i).asBytes());
682       rand.setSeed(i);
683       rand.nextBytes(value1);
684       rand.nextBytes(value2);
685       rand.nextBytes(value3);
686       put.addColumn(FAMILY1, qf, value1);
687       put.addColumn(FAMILY2, qf, value2);
688       put.addColumn(FAMILY3, qf, value3);
689       table.put(put);
690       if (i % 10000 == 0) {
691         LOG.info(i + " rows put");
692       }
693     }
694     table.close();
695     conn.close();
696   }
697 }