1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.management.ManagementFactory;
25 import java.lang.management.MemoryUsage;
26 import java.lang.reflect.Constructor;
27 import java.net.BindException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.Comparator;
35 import java.util.HashMap;
36 import java.util.HashSet;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Objects;
41 import java.util.Map.Entry;
42 import java.util.Set;
43 import java.util.SortedMap;
44 import java.util.TreeMap;
45 import java.util.TreeSet;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.ConcurrentMap;
48 import java.util.concurrent.ConcurrentSkipListMap;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.atomic.AtomicBoolean;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.concurrent.locks.ReentrantReadWriteLock;
54
55 import javax.management.MalformedObjectNameException;
56 import javax.management.ObjectName;
57 import javax.servlet.http.HttpServlet;
58
59 import org.apache.commons.lang.math.RandomUtils;
60 import org.apache.commons.logging.Log;
61 import org.apache.commons.logging.LogFactory;
62 import org.apache.hadoop.conf.Configuration;
63 import org.apache.hadoop.fs.FileSystem;
64 import org.apache.hadoop.fs.Path;
65 import org.apache.hadoop.hbase.ChoreService;
66 import org.apache.hadoop.hbase.ClockOutOfSyncException;
67 import org.apache.hadoop.hbase.CoordinatedStateManager;
68 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
69 import org.apache.hadoop.hbase.DoNotRetryIOException;
70 import org.apache.hadoop.hbase.HBaseConfiguration;
71 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
72 import org.apache.hadoop.hbase.HConstants;
73 import org.apache.hadoop.hbase.HRegionInfo;
74 import org.apache.hadoop.hbase.HealthCheckChore;
75 import org.apache.hadoop.hbase.MetaTableAccessor;
76 import org.apache.hadoop.hbase.NotServingRegionException;
77 import org.apache.hadoop.hbase.PleaseHoldException;
78 import org.apache.hadoop.hbase.RemoteExceptionHandler;
79 import org.apache.hadoop.hbase.ScheduledChore;
80 import org.apache.hadoop.hbase.ServerName;
81 import org.apache.hadoop.hbase.Stoppable;
82 import org.apache.hadoop.hbase.TableDescriptors;
83 import org.apache.hadoop.hbase.TableName;
84 import org.apache.hadoop.hbase.YouAreDeadException;
85 import org.apache.hadoop.hbase.ZNodeClearer;
86 import org.apache.hadoop.hbase.backup.impl.BackupManager;
87 import org.apache.hadoop.hbase.classification.InterfaceAudience;
88 import org.apache.hadoop.hbase.client.ClusterConnection;
89 import org.apache.hadoop.hbase.client.ConnectionFactory;
90 import org.apache.hadoop.hbase.client.ConnectionUtils;
91 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
92 import org.apache.hadoop.hbase.conf.ConfigurationManager;
93 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
94 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
95 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
96 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
97 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
98 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
99 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
100 import org.apache.hadoop.hbase.executor.ExecutorService;
101 import org.apache.hadoop.hbase.executor.ExecutorType;
102 import org.apache.hadoop.hbase.fs.HFileSystem;
103 import org.apache.hadoop.hbase.http.HttpServer;
104 import org.apache.hadoop.hbase.http.InfoServer;
105 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
106 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
107 import org.apache.hadoop.hbase.ipc.RpcClient;
108 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
109 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
110 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
111 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
112 import org.apache.hadoop.hbase.ipc.ServerRpcController;
113 import org.apache.hadoop.hbase.master.HMaster;
114 import org.apache.hadoop.hbase.master.RegionState.State;
115 import org.apache.hadoop.hbase.master.TableLockManager;
116 import org.apache.hadoop.hbase.mob.MobCacheConfig;
117 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
118 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
119 import org.apache.hadoop.hbase.protobuf.RequestConverter;
120 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
121 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
122 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
123 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
124 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
125 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
126 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
127 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
128 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor.Builder;
129 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
130 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
131 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
132 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
133 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
137 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
138 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
139 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse;
140 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest;
141 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
142 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
143 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
145 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
146 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
147 import org.apache.hadoop.hbase.quotas.QuotaUtil;
148 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
149 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
150 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
151 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
152 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
153 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
154 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
155 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
156 import org.apache.hadoop.hbase.replication.regionserver.Replication;
157 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
158 import org.apache.hadoop.hbase.security.Superusers;
159 import org.apache.hadoop.hbase.security.User;
160 import org.apache.hadoop.hbase.security.UserProvider;
161 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
162 import org.apache.hadoop.hbase.util.Addressing;
163 import org.apache.hadoop.hbase.util.ByteStringer;
164 import org.apache.hadoop.hbase.util.Bytes;
165 import org.apache.hadoop.hbase.util.CompressionTest;
166 import org.apache.hadoop.hbase.util.ConfigUtil;
167 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
168 import org.apache.hadoop.hbase.util.FSTableDescriptors;
169 import org.apache.hadoop.hbase.util.FSUtils;
170 import org.apache.hadoop.hbase.util.HasThread;
171 import org.apache.hadoop.hbase.util.JSONBean;
172 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
173 import org.apache.hadoop.hbase.util.MBeanUtil;
174 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
175 import org.apache.hadoop.hbase.util.Sleeper;
176 import org.apache.hadoop.hbase.util.Threads;
177 import org.apache.hadoop.hbase.util.VersionInfo;
178 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
179 import org.apache.hadoop.hbase.wal.WAL;
180 import org.apache.hadoop.hbase.wal.WALFactory;
181 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
182 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
183 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
184 import org.apache.hadoop.hbase.zookeeper.RecoveringRegionWatcher;
185 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
186 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
187 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
188 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
189 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
190 import org.apache.hadoop.ipc.RemoteException;
191 import org.apache.hadoop.util.ReflectionUtils;
192 import org.apache.hadoop.util.StringUtils;
193 import org.apache.zookeeper.KeeperException;
194 import org.apache.zookeeper.KeeperException.NoNodeException;
195 import org.apache.zookeeper.data.Stat;
196
197 import com.google.common.annotations.VisibleForTesting;
198 import com.google.common.base.Preconditions;
199 import com.google.common.collect.Maps;
200 import com.google.protobuf.BlockingRpcChannel;
201 import com.google.protobuf.Descriptors;
202 import com.google.protobuf.Message;
203 import com.google.protobuf.RpcCallback;
204 import com.google.protobuf.RpcController;
205 import com.google.protobuf.Service;
206 import com.google.protobuf.ServiceException;
207
208
209
210
211
212 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
213 @SuppressWarnings("deprecation")
214 public class HRegionServer extends HasThread implements
215 RegionServerServices, LastSequenceId {
216
217 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
218
219
220
221
222
223 protected static final String OPEN = "OPEN";
224 protected static final String CLOSE = "CLOSE";
225
226
227
228
229 protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
230 new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
231
232
233 protected MemStoreFlusher cacheFlusher;
234
235 protected HeapMemoryManager hMemManager;
236 protected CountDownLatch initLatch = null;
237
238
239
240
241
242
243 protected ClusterConnection clusterConnection;
244
245
246
247
248
249
250
251 protected MetaTableLocator metaTableLocator;
252
253
254 @SuppressWarnings("unused")
255 private RecoveringRegionWatcher recoveringRegionWatcher;
256
257
258
259
260 protected TableDescriptors tableDescriptors;
261
262
263 protected ReplicationSourceService replicationSourceHandler;
264 protected ReplicationSinkService replicationSinkHandler;
265
266
267 public CompactSplitThread compactSplitThread;
268
269
270
271
272
273 protected final Map<String, Region> onlineRegions = new ConcurrentHashMap<String, Region>();
274
275
276
277
278
279
280
281
282
283
284 protected final Map<String, InetSocketAddress[]> regionFavoredNodesMap =
285 new ConcurrentHashMap<String, InetSocketAddress[]>();
286
287
288
289
290
291 protected final Map<String, Region> recoveringRegions = Collections
292 .synchronizedMap(new HashMap<String, Region>());
293
294
295 protected Leases leases;
296
297
298 protected ExecutorService service;
299
300
301 protected volatile boolean fsOk;
302 protected HFileSystem fs;
303 protected HFileSystem walFs;
304
305
306
307
308 private volatile boolean stopped = false;
309
310
311
312 private volatile boolean abortRequested;
313
314 ConcurrentMap<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
315
316
317
318 private boolean stopping = false;
319
320 private volatile boolean killed = false;
321
322 protected final Configuration conf;
323
324 private Path rootDir;
325 private Path walRootDir;
326
327 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
328
329 final int numRetries;
330 protected final int threadWakeFrequency;
331 protected final int msgInterval;
332
333 protected final int numRegionsToReport;
334
335
336 private volatile RegionServerStatusService.BlockingInterface rssStub;
337
338 RpcClient rpcClient;
339
340 private RpcRetryingCallerFactory rpcRetryingCallerFactory;
341 private RpcControllerFactory rpcControllerFactory;
342
343 private UncaughtExceptionHandler uncaughtExceptionHandler;
344
345
346
347
348 protected InfoServer infoServer;
349 private JvmPauseMonitor pauseMonitor;
350
351
352 public static final String REGIONSERVER = "regionserver";
353
354 MetricsRegionServer metricsRegionServer;
355 MetricsTable metricsTable;
356 private SpanReceiverHost spanReceiverHost;
357
358
359
360
361 private final ChoreService choreService;
362
363
364
365
366 ScheduledChore compactionChecker;
367
368
369
370
371 ScheduledChore periodicFlusher;
372
373 protected volatile WALFactory walFactory;
374
375
376
377 final LogRoller walRoller;
378
379 final AtomicReference<LogRoller> metawalRoller = new AtomicReference<LogRoller>();
380
381
382 final AtomicBoolean online = new AtomicBoolean(false);
383
384
385 protected ZooKeeperWatcher zooKeeper;
386
387
388 private MasterAddressTracker masterAddressTracker;
389
390
391 protected ClusterStatusTracker clusterStatusTracker;
392
393
394 private SplitLogWorker splitLogWorker;
395
396
397 protected final Sleeper sleeper;
398
399 private final int operationTimeout;
400 private final int shortOperationTimeout;
401
402 private final RegionServerAccounting regionServerAccounting;
403
404
405 protected CacheConfig cacheConfig;
406
407 final MobCacheConfig mobCacheConfig;
408
409
410 private HealthCheckChore healthCheckChore;
411
412
413 private ScheduledChore nonceManagerChore;
414
415 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
416
417
418
419
420
421
422 protected ServerName serverName;
423
424
425
426
427 private String useThisHostnameInstead;
428
429
430
431
432 final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname";
433
434 final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname";
435
436
437
438 final static String RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY =
439 "hbase.regionserver.hostname.disable.master.reversedns";
440
441
442
443
444 protected final long startcode;
445
446
447
448
449 private String clusterId;
450
451
452
453
454 private ObjectName mxBean = null;
455
456
457
458
459 private MovedRegionsCleaner movedRegionsCleaner;
460
461
462 private StorefileRefresherChore storefileRefresher;
463
464 private RegionServerCoprocessorHost rsHost;
465
466 private RegionServerProcedureManagerHost rspmHost;
467
468 private RegionServerRpcQuotaManager rsQuotaManager;
469 private RegionServerSpaceQuotaManager rsSpaceQuotaManager;
470
471
472 protected TableLockManager tableLockManager;
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492 final ServerNonceManager nonceManager;
493
494 private UserProvider userProvider;
495
496 protected final RSRpcServices rpcServices;
497
498 protected BaseCoordinatedStateManager csm;
499
500 private final boolean useZKForAssignment;
501
502
503
504
505
506 protected final ConfigurationManager configurationManager;
507
508 protected FileSystemUtilizationChore fsUtilizationChore;
509
510
511
512
513
514
515
516 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
517 this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
518 }
519
520
521
522
523
524
525
526
527 public HRegionServer(Configuration conf, CoordinatedStateManager csm)
528 throws IOException, InterruptedException {
529 this.fsOk = true;
530 this.conf = conf;
531 checkCodecs(this.conf);
532 this.userProvider = UserProvider.instantiate(conf);
533 FSUtils.setupShortCircuitRead(this.conf);
534
535 Replication.decorateRegionServerConfiguration(this.conf);
536
537
538 this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
539 BackupManager.decorateRSConfiguration(conf);
540
541 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
542 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
543 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
544 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
545
546 this.sleeper = new Sleeper(this.msgInterval, this);
547
548 boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
549 this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
550
551 this.numRegionsToReport = conf.getInt(
552 "hbase.regionserver.numregionstoreport", 10);
553
554 this.operationTimeout = conf.getInt(
555 HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
556 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
557
558 this.shortOperationTimeout = conf.getInt(
559 HConstants.HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY,
560 HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT);
561
562 this.abortRequested = false;
563 this.stopped = false;
564
565 rpcServices = createRpcServices();
566 this.startcode = System.currentTimeMillis();
567 if (this instanceof HMaster) {
568 useThisHostnameInstead = conf.get(MASTER_HOSTNAME_KEY);
569 } else {
570 useThisHostnameInstead = conf.get(RS_HOSTNAME_KEY);
571 if (conf.getBoolean(RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY, false)) {
572 if (shouldUseThisHostnameInstead()) {
573 String msg = RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY + " and " + RS_HOSTNAME_KEY +
574 " are mutually exclusive. Do not set " + RS_HOSTNAME_DISABLE_MASTER_REVERSEDNS_KEY +
575 " to true while " + RS_HOSTNAME_KEY + " is used";
576 throw new IOException(msg);
577 } else {
578 useThisHostnameInstead = rpcServices.isa.getHostName();
579 }
580 }
581 }
582 String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead :
583 rpcServices.isa.getHostName();
584 serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode);
585
586 rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
587 rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
588
589
590 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
591 "hbase.zookeeper.client.kerberos.principal", hostName);
592
593 login(userProvider, hostName);
594
595
596 Superusers.initialize(conf);
597
598 regionServerAccounting = new RegionServerAccounting();
599 cacheConfig = new CacheConfig(conf);
600 mobCacheConfig = new MobCacheConfig(conf);
601 uncaughtExceptionHandler = new UncaughtExceptionHandler() {
602 @Override
603 public void uncaughtException(Thread t, Throwable e) {
604 abort("Uncaught exception in service thread " + t.getName(), e);
605 }
606 };
607
608 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
609
610 initializeFileSystem();
611
612 service = new ExecutorService(getServerName().toShortString());
613 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
614
615
616 if (!conf.getBoolean("hbase.testing.nocluster", false)) {
617
618 zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
619 rpcServices.isa.getPort(), this, canCreateBaseZNode());
620
621 this.csm = (BaseCoordinatedStateManager) csm;
622 this.csm.initialize(this);
623 this.csm.start();
624
625 tableLockManager = TableLockManager.createTableLockManager(
626 conf, zooKeeper, serverName);
627
628 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
629 masterAddressTracker.start();
630
631 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
632 clusterStatusTracker.start();
633 }
634 this.configurationManager = new ConfigurationManager();
635
636 rpcServices.start();
637 putUpWebUI();
638 this.walRoller = new LogRoller(this, this);
639 this.choreService = new ChoreService(getServerName().toString());
640 }
641
642 protected void setInitLatch(CountDownLatch latch) {
643 this.initLatch = latch;
644 }
645
646 private void initializeFileSystem() throws IOException {
647
648
649 boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
650 FSUtils.setFsDefault(this.conf, FSUtils.getWALRootDir(this.conf));
651 this.walFs = new HFileSystem(this.conf, useHBaseChecksum);
652 this.walRootDir = FSUtils.getWALRootDir(this.conf);
653
654
655
656 FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
657 this.fs = new HFileSystem(this.conf, useHBaseChecksum);
658 this.rootDir = FSUtils.getRootDir(this.conf);
659 this.tableDescriptors = new FSTableDescriptors(this.conf, this.fs, this.rootDir,
660 !canUpdateTableDescriptor(), false);
661 }
662
663
664
665
666 protected boolean shouldUseThisHostnameInstead() {
667 return useThisHostnameInstead != null && !useThisHostnameInstead.isEmpty();
668 }
669
670 protected void login(UserProvider user, String host) throws IOException {
671 user.login("hbase.regionserver.keytab.file",
672 "hbase.regionserver.kerberos.principal", host);
673 }
674
675 protected void waitForMasterActive(){
676 }
677
678 protected String getProcessName() {
679 return REGIONSERVER;
680 }
681
682 protected boolean canCreateBaseZNode() {
683 return false;
684 }
685
686 protected boolean canUpdateTableDescriptor() {
687 return false;
688 }
689
690 protected RSRpcServices createRpcServices() throws IOException {
691 return new RSRpcServices(this);
692 }
693
694 protected void configureInfoServer() {
695 infoServer.addServlet("rs-status", "/rs-status", RSStatusServlet.class);
696 infoServer.setAttribute(REGIONSERVER, this);
697 }
698
699 protected Class<? extends HttpServlet> getDumpServlet() {
700 return RSDumpServlet.class;
701 }
702
703 @Override
704 public boolean registerService(Service instance) {
705
706
707
708 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
709 String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
710 if (coprocessorServiceHandlers.containsKey(serviceName)) {
711 LOG.error("Coprocessor service " + serviceName
712 + " already registered, rejecting request from " + instance);
713 return false;
714 }
715
716 coprocessorServiceHandlers.put(serviceName, instance);
717 if (LOG.isDebugEnabled()) {
718 LOG.debug("Registered regionserver coprocessor service: service=" + serviceName);
719 }
720 return true;
721 }
722
723
724
725
726
727
728
729 @VisibleForTesting
730 protected ClusterConnection createClusterConnection() throws IOException {
731
732
733
734 return ConnectionUtils.createShortCircuitHConnection(
735 ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices);
736 }
737
738
739
740
741
742
743 private static void checkCodecs(final Configuration c) throws IOException {
744
745 String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
746 if (codecs == null) return;
747 for (String codec : codecs) {
748 if (!CompressionTest.testCompression(codec)) {
749 throw new IOException("Compression codec " + codec +
750 " not supported, aborting RS construction");
751 }
752 }
753 }
754
755 public String getClusterId() {
756 return this.clusterId;
757 }
758
759
760
761
762
763 protected synchronized void setupClusterConnection() throws IOException {
764 if (clusterConnection == null) {
765 clusterConnection = createClusterConnection();
766 metaTableLocator = new MetaTableLocator();
767 }
768 }
769
770
771
772
773
774
775
776 private void preRegistrationInitialization(){
777 try {
778 setupClusterConnection();
779
780
781 if (isHealthCheckerConfigured()) {
782 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
783 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
784 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
785 }
786
787 initializeZooKeeper();
788 if (!isStopped() && !isAborted()) {
789 initializeThreads();
790 }
791 } catch (Throwable t) {
792
793
794 this.rpcServices.stop();
795 abort("Initialization of RS failed. Hence aborting RS.", t);
796 }
797 }
798
799
800
801
802
803
804
805
806
807 private void initializeZooKeeper() throws IOException, InterruptedException {
808
809
810
811 blockAndCheckIfStopped(this.masterAddressTracker);
812
813
814
815 blockAndCheckIfStopped(this.clusterStatusTracker);
816
817 if (this.initLatch != null) {
818 this.initLatch.await(20, TimeUnit.SECONDS);
819 }
820
821
822
823 try {
824 clusterId = ZKClusterId.readClusterIdZNode(this.zooKeeper);
825 if (clusterId == null) {
826 this.abort("Cluster ID has not been set");
827 }
828 LOG.info("ClusterId : "+clusterId);
829 } catch (KeeperException e) {
830 this.abort("Failed to retrieve Cluster ID",e);
831 }
832
833
834
835
836
837 waitForMasterActive();
838 if (isStopped() || isAborted()) {
839 return;
840 }
841
842
843 try {
844 rspmHost = new RegionServerProcedureManagerHost();
845 rspmHost.loadProcedures(conf);
846 rspmHost.initialize(this);
847 } catch (IOException e) {
848 this.abort("Failed to reach coordination cluster when creating procedure handler.", e);
849 }
850
851 this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this);
852 }
853
854
855
856
857
858
859
860
861 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
862 throws IOException, InterruptedException {
863 while (tracker.blockUntilAvailable(this.msgInterval, false) == null) {
864 if (this.stopped) {
865 throw new IOException("Received the shutdown message while waiting.");
866 }
867 }
868 }
869
870
871
872
873 private boolean isClusterUp() {
874 return clusterStatusTracker != null && clusterStatusTracker.isClusterUp();
875 }
876
877 private void initializeThreads() throws IOException {
878
879 this.cacheFlusher = new MemStoreFlusher(conf, this);
880
881
882 this.compactSplitThread = new CompactSplitThread(this);
883
884
885
886 this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
887 this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
888 this.leases = new Leases(this.threadWakeFrequency);
889
890
891 movedRegionsCleaner = MovedRegionsCleaner.create(this);
892
893 if (this.nonceManager != null) {
894
895 nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
896 }
897
898
899 rsQuotaManager = new RegionServerRpcQuotaManager(this);
900 rsSpaceQuotaManager = new RegionServerSpaceQuotaManager(this);
901
902 if (QuotaUtil.isQuotaEnabled(conf)) {
903 this.fsUtilizationChore = new FileSystemUtilizationChore(this);
904 }
905
906
907 rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
908 rpcServices.isa.getAddress(), 0));
909
910 boolean onlyMetaRefresh = false;
911 int storefileRefreshPeriod = conf.getInt(
912 StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
913 , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
914 if (storefileRefreshPeriod == 0) {
915 storefileRefreshPeriod = conf.getInt(
916 StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
917 StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
918 onlyMetaRefresh = true;
919 }
920 if (storefileRefreshPeriod > 0) {
921 this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
922 onlyMetaRefresh, this, this);
923 }
924 registerConfigurationObservers();
925 }
926
927 private void registerConfigurationObservers() {
928
929 configurationManager.registerObserver(this.compactSplitThread);
930 }
931
932
933
934
935 @Override
936 public void run() {
937 try {
938
939 preRegistrationInitialization();
940 } catch (Throwable e) {
941 abort("Fatal exception during initialization", e);
942 }
943
944 try {
945 if (!isStopped() && !isAborted()) {
946 ShutdownHook.install(conf, fs, this, Thread.currentThread());
947
948 createMyEphemeralNode();
949
950
951 this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
952 }
953
954
955
956 while (keepLooping()) {
957 RegionServerStartupResponse w = reportForDuty();
958 if (w == null) {
959 LOG.warn("reportForDuty failed; sleeping and then retrying.");
960 this.sleeper.sleep();
961 } else {
962 handleReportForDutyResponse(w);
963 break;
964 }
965 }
966
967 if (!isStopped() && isHealthy()){
968
969
970 rspmHost.start();
971 }
972
973
974 if (this.rsQuotaManager != null) {
975 rsQuotaManager.start(getRpcServer().getScheduler());
976 rsSpaceQuotaManager.start();
977 }
978
979
980 long lastMsg = System.currentTimeMillis();
981 long oldRequestCount = -1;
982
983 while (!isStopped() && isHealthy()) {
984 if (!isClusterUp()) {
985 if (isOnlineRegionsEmpty()) {
986 stop("Exiting; cluster shutdown set and not carrying any regions");
987 } else if (!this.stopping) {
988 this.stopping = true;
989 LOG.info("Closing user regions");
990 closeUserRegions(this.abortRequested);
991 } else if (this.stopping) {
992 boolean allUserRegionsOffline = areAllUserRegionsOffline();
993 if (allUserRegionsOffline) {
994
995
996
997 if (oldRequestCount == getWriteRequestCount()) {
998 stop("Stopped; only catalog regions remaining online");
999 break;
1000 }
1001 oldRequestCount = getWriteRequestCount();
1002 } else {
1003
1004
1005
1006 closeUserRegions(this.abortRequested);
1007 }
1008 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
1009 }
1010 }
1011 long now = System.currentTimeMillis();
1012 if ((now - lastMsg) >= msgInterval) {
1013 tryRegionServerReport(lastMsg, now);
1014 lastMsg = System.currentTimeMillis();
1015 }
1016 if (!isStopped() && !isAborted()) {
1017 this.sleeper.sleep();
1018 }
1019 }
1020 } catch (Throwable t) {
1021 if (!rpcServices.checkOOME(t)) {
1022 String prefix = t instanceof YouAreDeadException? "": "Unhandled: ";
1023 abort(prefix + t.getMessage(), t);
1024 }
1025 }
1026
1027 if (mxBean != null) {
1028 MBeanUtil.unregisterMBean(mxBean);
1029 mxBean = null;
1030 }
1031 if (this.leases != null) this.leases.closeAfterLeasesExpire();
1032 if (this.splitLogWorker != null) {
1033 splitLogWorker.stop();
1034 }
1035 if (this.infoServer != null) {
1036 LOG.info("Stopping infoServer");
1037 try {
1038 this.infoServer.stop();
1039 } catch (Exception e) {
1040 LOG.error("Failed to stop infoServer", e);
1041 }
1042 }
1043
1044 if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
1045 cacheConfig.getBlockCache().shutdown();
1046 }
1047 mobCacheConfig.getMobFileCache().shutdown();
1048
1049 if (movedRegionsCleaner != null) {
1050 movedRegionsCleaner.stop("Region Server stopping");
1051 }
1052
1053
1054
1055 if (this.hMemManager != null) this.hMemManager.stop();
1056 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
1057 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
1058 if (this.compactionChecker != null) this.compactionChecker.cancel(true);
1059 if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
1060 if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
1061 if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
1062 sendShutdownInterrupt();
1063
1064
1065 if (rsQuotaManager != null) {
1066 rsQuotaManager.stop();
1067 }
1068 if (rsSpaceQuotaManager != null) {
1069 rsSpaceQuotaManager.stop();
1070 }
1071
1072
1073 if (rspmHost != null) {
1074 rspmHost.stop(this.abortRequested || this.killed);
1075 }
1076
1077 if (this.killed) {
1078
1079 } else if (abortRequested) {
1080 if (this.fsOk) {
1081 closeUserRegions(abortRequested);
1082 }
1083 LOG.info("aborting server " + this.serverName);
1084 } else {
1085 closeUserRegions(abortRequested);
1086 LOG.info("stopping server " + this.serverName);
1087 }
1088
1089
1090 if (this.metaTableLocator != null) this.metaTableLocator.stop();
1091 if (this.clusterConnection != null && !clusterConnection.isClosed()) {
1092 try {
1093 this.clusterConnection.close();
1094 } catch (IOException e) {
1095
1096
1097 LOG.warn("Attempt to close server's short circuit HConnection failed.", e);
1098 }
1099 }
1100
1101
1102 if (!this.killed && containsMetaTableRegions()) {
1103 if (!abortRequested || this.fsOk) {
1104 if (this.compactSplitThread != null) {
1105 this.compactSplitThread.join();
1106 this.compactSplitThread = null;
1107 }
1108 closeMetaTableRegions(abortRequested);
1109 }
1110 }
1111
1112 if (!this.killed && this.fsOk) {
1113 waitOnAllRegionsToClose(abortRequested);
1114 LOG.info("stopping server " + this.serverName +
1115 "; all regions closed.");
1116 }
1117
1118
1119 if (this.fsOk) {
1120 shutdownWAL(!abortRequested);
1121 }
1122
1123
1124 if (this.rssStub != null) {
1125 this.rssStub = null;
1126 }
1127 if (this.rpcClient != null) {
1128 this.rpcClient.close();
1129 }
1130 if (this.leases != null) {
1131 this.leases.close();
1132 }
1133 if (this.pauseMonitor != null) {
1134 this.pauseMonitor.stop();
1135 }
1136
1137 if (!killed) {
1138 stopServiceThreads();
1139 }
1140
1141 if (this.rpcServices != null) {
1142 this.rpcServices.stop();
1143 }
1144
1145 try {
1146 deleteMyEphemeralNode();
1147 } catch (KeeperException.NoNodeException nn) {
1148 } catch (KeeperException e) {
1149 LOG.warn("Failed deleting my ephemeral node", e);
1150 }
1151
1152
1153 ZNodeClearer.deleteMyEphemeralNodeOnDisk();
1154
1155 if (this.zooKeeper != null) {
1156 this.zooKeeper.close();
1157 }
1158 LOG.info("stopping server " + this.serverName +
1159 "; zookeeper connection closed.");
1160
1161 LOG.info(Thread.currentThread().getName() + " exiting");
1162 }
1163
1164 private boolean containsMetaTableRegions() {
1165 return onlineRegions.containsKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1166 }
1167
1168 private boolean areAllUserRegionsOffline() {
1169 if (getNumberOfOnlineRegions() > 2) return false;
1170 boolean allUserRegionsOffline = true;
1171 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1172 if (!e.getValue().getRegionInfo().isMetaTable()) {
1173 allUserRegionsOffline = false;
1174 break;
1175 }
1176 }
1177 return allUserRegionsOffline;
1178 }
1179
1180
1181
1182
1183 private long getWriteRequestCount() {
1184 long writeCount = 0;
1185 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
1186 writeCount += e.getValue().getWriteRequestsCount();
1187 }
1188 return writeCount;
1189 }
1190
1191 @VisibleForTesting
1192 protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
1193 throws IOException {
1194 RegionServerStatusService.BlockingInterface rss = rssStub;
1195 if (rss == null) {
1196
1197 return;
1198 }
1199 ClusterStatusProtos.ServerLoad sl = buildServerLoad(reportStartTime, reportEndTime);
1200 try {
1201 RegionServerReportRequest.Builder request = RegionServerReportRequest.newBuilder();
1202 ServerName sn = ServerName.parseVersionedServerName(
1203 this.serverName.getVersionedBytes());
1204 request.setServer(ProtobufUtil.toServerName(sn));
1205 request.setLoad(sl);
1206 rss.regionServerReport(null, request.build());
1207 } catch (ServiceException se) {
1208 IOException ioe = ProtobufUtil.getRemoteException(se);
1209 if (ioe instanceof YouAreDeadException) {
1210
1211 throw ioe;
1212 }
1213 if (rssStub == rss) {
1214 rssStub = null;
1215 }
1216
1217
1218 createRegionServerStatusStub();
1219 }
1220 }
1221
1222
1223
1224
1225
1226
1227
1228 public boolean reportRegionSizesForQuotas(final Map<HRegionInfo, Long> onlineRegionSizes) {
1229 RegionServerStatusService.BlockingInterface rss = rssStub;
1230 if (rss == null) {
1231
1232 LOG.trace("Skipping Region size report to HMaster as stub is null");
1233 return true;
1234 }
1235 try {
1236 RegionSpaceUseReportRequest request = buildRegionSpaceUseReportRequest(
1237 Objects.requireNonNull(onlineRegionSizes));
1238 rss.reportRegionSpaceUse(null, request);
1239 } catch (ServiceException se) {
1240 IOException ioe = ProtobufUtil.getRemoteException(se);
1241 if (ioe instanceof PleaseHoldException) {
1242 LOG.trace("Failed to report region sizes to Master because it is initializing."
1243 + " This will be retried.", ioe);
1244
1245 return true;
1246 }
1247 if (rssStub == rss) {
1248 rssStub = null;
1249 }
1250 createRegionServerStatusStub();
1251 if (ioe instanceof DoNotRetryIOException) {
1252 DoNotRetryIOException doNotRetryEx = (DoNotRetryIOException) ioe;
1253 if (doNotRetryEx.getCause() != null) {
1254 Throwable t = doNotRetryEx.getCause();
1255 if (t instanceof UnsupportedOperationException) {
1256 LOG.debug("master doesn't support ReportRegionSpaceUse, pause before retrying");
1257 return false;
1258 }
1259 }
1260 }
1261 LOG.debug("Failed to report region sizes to Master. This will be retried.", ioe);
1262 }
1263 return true;
1264 }
1265
1266
1267
1268
1269
1270
1271
1272 RegionSpaceUseReportRequest buildRegionSpaceUseReportRequest(Map<HRegionInfo,Long> regionSizes) {
1273 RegionSpaceUseReportRequest.Builder request = RegionSpaceUseReportRequest.newBuilder();
1274 for (Entry<HRegionInfo, Long> entry : Objects.requireNonNull(regionSizes).entrySet()) {
1275 request.addSpaceUse(convertRegionSize(entry.getKey(), entry.getValue()));
1276 }
1277 return request.build();
1278 }
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288 RegionSpaceUse convertRegionSize(HRegionInfo regionInfo, Long sizeInBytes) {
1289 return RegionSpaceUse.newBuilder()
1290 .setRegionInfo(HRegionInfo.convert(Objects.requireNonNull(regionInfo)))
1291 .setRegionSize(Objects.requireNonNull(sizeInBytes))
1292 .build();
1293 }
1294
1295 ClusterStatusProtos.ServerLoad buildServerLoad(long reportStartTime, long reportEndTime)
1296 throws IOException {
1297
1298
1299
1300
1301
1302
1303
1304 MetricsRegionServerWrapper regionServerWrapper = metricsRegionServer.getRegionServerWrapper();
1305 Collection<Region> regions = getOnlineRegionsLocalContext();
1306 MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
1307
1308 ClusterStatusProtos.ServerLoad.Builder serverLoad =
1309 ClusterStatusProtos.ServerLoad.newBuilder();
1310 serverLoad.setNumberOfRequests((int) regionServerWrapper.getRequestsPerSecond());
1311 serverLoad.setTotalNumberOfRequests((int) regionServerWrapper.getTotalRequestCount());
1312 serverLoad.setUsedHeapMB((int)(memory.getUsed() / 1024 / 1024));
1313 serverLoad.setMaxHeapMB((int) (memory.getMax() / 1024 / 1024));
1314 Set<String> coprocessors = getWAL(null).getCoprocessorHost().getCoprocessors();
1315 Builder coprocessorBuilder = Coprocessor.newBuilder();
1316 for (String coprocessor : coprocessors) {
1317 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1318 }
1319 RegionLoad.Builder regionLoadBldr = RegionLoad.newBuilder();
1320 RegionSpecifier.Builder regionSpecifier = RegionSpecifier.newBuilder();
1321 for (Region region : regions) {
1322 if (region.getCoprocessorHost() != null) {
1323 Set<String> regionCoprocessors = region.getCoprocessorHost().getCoprocessors();
1324 Iterator<String> iterator = regionCoprocessors.iterator();
1325 while (iterator.hasNext()) {
1326 serverLoad.addCoprocessors(coprocessorBuilder.setName(iterator.next()).build());
1327 }
1328 }
1329 serverLoad.addRegionLoads(createRegionLoad(region, regionLoadBldr, regionSpecifier));
1330 for (String coprocessor : getWAL(region.getRegionInfo()).getCoprocessorHost()
1331 .getCoprocessors()) {
1332 serverLoad.addCoprocessors(coprocessorBuilder.setName(coprocessor).build());
1333 }
1334 }
1335 serverLoad.setReportStartTime(reportStartTime);
1336 serverLoad.setReportEndTime(reportEndTime);
1337 if (this.infoServer != null) {
1338 serverLoad.setInfoServerPort(this.infoServer.getPort());
1339 } else {
1340 serverLoad.setInfoServerPort(-1);
1341 }
1342
1343
1344
1345 ReplicationSourceService rsources = getReplicationSourceService();
1346
1347 if (rsources != null) {
1348
1349 ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
1350 if (rLoad != null) {
1351 serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
1352 for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad.getReplicationLoadSourceList()) {
1353 serverLoad.addReplLoadSource(rLS);
1354 }
1355 }
1356 }
1357
1358 return serverLoad.build();
1359 }
1360
1361 String getOnlineRegionsAsPrintableString() {
1362 StringBuilder sb = new StringBuilder();
1363 for (Region r: this.onlineRegions.values()) {
1364 if (sb.length() > 0) sb.append(", ");
1365 sb.append(r.getRegionInfo().getEncodedName());
1366 }
1367 return sb.toString();
1368 }
1369
1370
1371
1372
1373 private void waitOnAllRegionsToClose(final boolean abort) {
1374
1375 int lastCount = -1;
1376 long previousLogTime = 0;
1377 Set<String> closedRegions = new HashSet<String>();
1378 boolean interrupted = false;
1379 try {
1380 while (!isOnlineRegionsEmpty()) {
1381 int count = getNumberOfOnlineRegions();
1382
1383 if (count != lastCount) {
1384
1385 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
1386 previousLogTime = System.currentTimeMillis();
1387 lastCount = count;
1388 LOG.info("Waiting on " + count + " regions to close");
1389
1390
1391 if (count < 10 && LOG.isDebugEnabled()) {
1392 LOG.debug(this.onlineRegions);
1393 }
1394 }
1395 }
1396
1397
1398
1399 for (Map.Entry<String, Region> e : this.onlineRegions.entrySet()) {
1400 HRegionInfo hri = e.getValue().getRegionInfo();
1401 if (!this.regionsInTransitionInRS.containsKey(hri.getEncodedNameAsBytes())
1402 && !closedRegions.contains(hri.getEncodedName())) {
1403 closedRegions.add(hri.getEncodedName());
1404
1405 closeRegionIgnoreErrors(hri, abort);
1406 }
1407 }
1408
1409 if (this.regionsInTransitionInRS.isEmpty()) {
1410 if (!isOnlineRegionsEmpty()) {
1411 LOG.info("We were exiting though online regions are not empty," +
1412 " because some regions failed closing");
1413 }
1414 break;
1415 }
1416 if (sleep(200)) {
1417 interrupted = true;
1418 }
1419 }
1420 } finally {
1421 if (interrupted) {
1422 Thread.currentThread().interrupt();
1423 }
1424 }
1425 }
1426
1427 private boolean sleep(long millis) {
1428 boolean interrupted = false;
1429 try {
1430 Thread.sleep(millis);
1431 } catch (InterruptedException e) {
1432 LOG.warn("Interrupted while sleeping");
1433 interrupted = true;
1434 }
1435 return interrupted;
1436 }
1437
1438 private void shutdownWAL(final boolean close) {
1439 if (this.walFactory != null) {
1440 try {
1441 if (close) {
1442 walFactory.close();
1443 } else {
1444 walFactory.shutdown();
1445 }
1446 } catch (Throwable e) {
1447 e = RemoteExceptionHandler.checkThrowable(e);
1448 LOG.error("Shutdown / close of WAL failed: " + e);
1449 LOG.debug("Shutdown / close exception details:", e);
1450 }
1451 }
1452 }
1453
1454
1455
1456
1457
1458
1459 protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
1460 throws IOException {
1461 try {
1462 boolean updateRootDir = false;
1463 for (NameStringPair e : c.getMapEntriesList()) {
1464 String key = e.getName();
1465
1466 if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
1467 String hostnameFromMasterPOV = e.getValue();
1468 this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
1469 rpcServices.isa.getPort(), this.startcode);
1470 if (shouldUseThisHostnameInstead() &&
1471 !hostnameFromMasterPOV.equals(useThisHostnameInstead)) {
1472 String msg = "Master passed us a different hostname to use; was=" +
1473 this.useThisHostnameInstead + ", but now=" + hostnameFromMasterPOV;
1474 LOG.error(msg);
1475 throw new IOException(msg);
1476 }
1477 if (!shouldUseThisHostnameInstead() &&
1478 !hostnameFromMasterPOV.equals(rpcServices.isa.getHostName())) {
1479 String msg = "Master passed us a different hostname to use; was=" +
1480 rpcServices.isa.getHostName() + ", but now=" + hostnameFromMasterPOV;
1481 LOG.error(msg);
1482 }
1483 continue;
1484 }
1485
1486 String value = e.getValue();
1487 if (key.equals(HConstants.HBASE_DIR)) {
1488 if (value != null && !value.equals(conf.get(HConstants.HBASE_DIR))) {
1489 updateRootDir = true;
1490 }
1491 }
1492
1493 if (LOG.isDebugEnabled()) {
1494 LOG.debug("Config from master: " + key + "=" + value);
1495 }
1496 this.conf.set(key, value);
1497 }
1498
1499 if (updateRootDir) {
1500
1501 initializeFileSystem();
1502 }
1503
1504
1505
1506 if (this.conf.get("mapreduce.task.attempt.id") == null) {
1507 this.conf.set("mapreduce.task.attempt.id", "hb_rs_" +
1508 this.serverName.toString());
1509 }
1510
1511
1512 ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());
1513
1514 this.cacheConfig = new CacheConfig(conf);
1515 this.walFactory = setupWALAndReplication();
1516
1517 this.metricsTable = new MetricsTable(new MetricsTableWrapperAggregateImpl(this));
1518 this.metricsRegionServer = new MetricsRegionServer(getConfiguration(),
1519 new MetricsRegionServerWrapperImpl(this), metricsTable);
1520
1521 this.pauseMonitor = new JvmPauseMonitor(conf, getMetrics().getMetricsSource());
1522 pauseMonitor.start();
1523
1524 startServiceThreads();
1525 startHeapMemoryManager();
1526 LOG.info("Serving as " + this.serverName +
1527 ", RpcServer on " + rpcServices.isa +
1528 ", sessionid=0x" +
1529 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
1530
1531
1532 synchronized (online) {
1533 online.set(true);
1534 online.notifyAll();
1535 }
1536 } catch (Throwable e) {
1537 stop("Failed initialization");
1538 throw convertThrowableToIOE(cleanup(e, "Failed init"),
1539 "Region server startup failed");
1540 } finally {
1541 sleeper.skipSleepCycle();
1542 }
1543 }
1544
1545 private void startHeapMemoryManager() {
1546 this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this);
1547 if (this.hMemManager != null) {
1548 this.hMemManager.start(getChoreService());
1549 }
1550 }
1551
1552 private void createMyEphemeralNode() throws KeeperException, IOException {
1553 RegionServerInfo.Builder rsInfo = RegionServerInfo.newBuilder();
1554 rsInfo.setInfoPort(infoServer != null ? infoServer.getPort() : -1);
1555 rsInfo.setVersionInfo(ProtobufUtil.getVersionInfo());
1556 byte[] data = ProtobufUtil.prependPBMagic(rsInfo.build().toByteArray());
1557 ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper,
1558 getMyEphemeralNodePath(), data);
1559 }
1560
1561 private void deleteMyEphemeralNode() throws KeeperException {
1562 ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
1563 }
1564
1565 @Override
1566 public RegionServerAccounting getRegionServerAccounting() {
1567 return regionServerAccounting;
1568 }
1569
1570 @Override
1571 public TableLockManager getTableLockManager() {
1572 return tableLockManager;
1573 }
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583 private RegionLoad createRegionLoad(final Region r, RegionLoad.Builder regionLoadBldr,
1584 RegionSpecifier.Builder regionSpecifier) throws IOException {
1585 byte[] name = r.getRegionInfo().getRegionName();
1586 int stores = 0;
1587 int storefiles = 0;
1588 int storeUncompressedSizeMB = 0;
1589 int storefileSizeMB = 0;
1590 int memstoreSizeMB = (int) (r.getMemstoreSize() / 1024 / 1024);
1591 int storefileIndexSizeMB = 0;
1592 int rootIndexSizeKB = 0;
1593 int totalStaticIndexSizeKB = 0;
1594 int totalStaticBloomSizeKB = 0;
1595 long totalCompactingKVs = 0;
1596 long currentCompactedKVs = 0;
1597 List<Store> storeList = r.getStores();
1598 stores += storeList.size();
1599 for (Store store : storeList) {
1600 storefiles += store.getStorefilesCount();
1601 storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024);
1602 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
1603 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
1604 CompactionProgress progress = store.getCompactionProgress();
1605 if (progress != null) {
1606 totalCompactingKVs += progress.totalCompactingKVs;
1607 currentCompactedKVs += progress.currentCompactedKVs;
1608 }
1609 rootIndexSizeKB += (int) (store.getStorefilesIndexSize() / 1024);
1610 totalStaticIndexSizeKB += (int) (store.getTotalStaticIndexSize() / 1024);
1611 totalStaticBloomSizeKB += (int) (store.getTotalStaticBloomSize() / 1024);
1612 }
1613
1614 float dataLocality =
1615 r.getHDFSBlocksDistribution().getBlockLocalityIndex(serverName.getHostname());
1616 if (regionLoadBldr == null) {
1617 regionLoadBldr = RegionLoad.newBuilder();
1618 }
1619 if (regionSpecifier == null) {
1620 regionSpecifier = RegionSpecifier.newBuilder();
1621 }
1622 regionSpecifier.setType(RegionSpecifierType.REGION_NAME);
1623 regionSpecifier.setValue(ByteStringer.wrap(name));
1624 regionLoadBldr.setRegionSpecifier(regionSpecifier.build())
1625 .setStores(stores)
1626 .setStorefiles(storefiles)
1627 .setStoreUncompressedSizeMB(storeUncompressedSizeMB)
1628 .setStorefileSizeMB(storefileSizeMB)
1629 .setMemstoreSizeMB(memstoreSizeMB)
1630 .setStorefileIndexSizeMB(storefileIndexSizeMB)
1631 .setRootIndexSizeKB(rootIndexSizeKB)
1632 .setTotalStaticIndexSizeKB(totalStaticIndexSizeKB)
1633 .setTotalStaticBloomSizeKB(totalStaticBloomSizeKB)
1634 .setReadRequestsCount(r.getReadRequestsCount())
1635 .setWriteRequestsCount(r.getWriteRequestsCount())
1636 .setTotalCompactingKVs(totalCompactingKVs)
1637 .setCurrentCompactedKVs(currentCompactedKVs)
1638 .setDataLocality(dataLocality)
1639 .setLastMajorCompactionTs(r.getOldestHfileTs(true));
1640 ((HRegion)r).setCompleteSequenceId(regionLoadBldr);
1641
1642 return regionLoadBldr.build();
1643 }
1644
1645
1646
1647
1648
1649 public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
1650 Region r = onlineRegions.get(encodedRegionName);
1651 return r != null ? createRegionLoad(r, null, null) : null;
1652 }
1653
1654
1655
1656
1657 private static class CompactionChecker extends ScheduledChore {
1658 private final HRegionServer instance;
1659 private final int majorCompactPriority;
1660 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1661 private long iteration = 0;
1662
1663 CompactionChecker(final HRegionServer h, final int sleepTime,
1664 final Stoppable stopper) {
1665 super("CompactionChecker", stopper, sleepTime);
1666 this.instance = h;
1667 LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime));
1668
1669
1670
1671
1672 this.majorCompactPriority = this.instance.conf.
1673 getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
1674 DEFAULT_PRIORITY);
1675 }
1676
1677 @Override
1678 protected void chore() {
1679 for (Region r : this.instance.onlineRegions.values()) {
1680 if (r == null)
1681 continue;
1682 for (Store s : r.getStores()) {
1683 try {
1684 long multiplier = s.getCompactionCheckMultiplier();
1685 assert multiplier > 0;
1686 if (iteration % multiplier != 0) continue;
1687 if (s.needsCompaction()) {
1688
1689 this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
1690 + " requests compaction");
1691 } else if (s.isMajorCompaction()) {
1692 if (majorCompactPriority == DEFAULT_PRIORITY
1693 || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
1694 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1695 + " requests major compaction; use default priority", null);
1696 } else {
1697 this.instance.compactSplitThread.requestCompaction(r, s, getName()
1698 + " requests major compaction; use configured priority",
1699 this.majorCompactPriority, null, null);
1700 }
1701 }
1702 } catch (IOException e) {
1703 LOG.warn("Failed major compaction check on " + r, e);
1704 }
1705 }
1706 }
1707 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
1708 }
1709 }
1710
1711 static class PeriodicMemstoreFlusher extends ScheduledChore {
1712 final HRegionServer server;
1713 final static int RANGE_OF_DELAY = 20000;
1714 final static int MIN_DELAY_TIME = 3000;
1715 public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
1716 super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
1717 this.server = server;
1718 }
1719
1720 @Override
1721 protected void chore() {
1722 for (Region r : this.server.onlineRegions.values()) {
1723 if (r == null)
1724 continue;
1725 if (((HRegion)r).shouldFlush()) {
1726 FlushRequester requester = server.getFlushRequester();
1727 if (requester != null) {
1728 long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
1729 LOG.info(getName() + " requesting flush for region " +
1730 r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
1731
1732
1733
1734 requester.requestDelayedFlush(r, randomDelay, false);
1735 }
1736 }
1737 }
1738 }
1739 }
1740
1741
1742
1743
1744
1745
1746
1747
1748 public boolean isOnline() {
1749 return online.get();
1750 }
1751
1752
1753
1754
1755
1756
1757
1758 private WALFactory setupWALAndReplication() throws IOException {
1759
1760 final Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1761 final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
1762
1763 Path logDir = new Path(walRootDir, logName);
1764 if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
1765 if (this.walFs.exists(logDir)) {
1766 throw new RegionServerRunningException("Region server has already " +
1767 "created directory at " + this.serverName.toString());
1768 }
1769
1770
1771
1772 createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir);
1773
1774
1775 final List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
1776 listeners.add(new MetricsWAL());
1777
1778
1779 if (this.replicationSourceHandler != null &&
1780 this.replicationSourceHandler.getWALActionsListener() != null) {
1781
1782 listeners.add(this.replicationSourceHandler.getWALActionsListener());
1783 }
1784
1785 return new WALFactory(conf, listeners, serverName.toString());
1786 }
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796 protected LogRoller ensureMetaWALRoller() {
1797
1798
1799 LogRoller roller = metawalRoller.get();
1800 if (null == roller) {
1801 LogRoller tmpLogRoller = new LogRoller(this, this);
1802 String n = Thread.currentThread().getName();
1803 Threads.setDaemonThreadRunning(tmpLogRoller.getThread(),
1804 n + "-MetaLogRoller", uncaughtExceptionHandler);
1805 if (metawalRoller.compareAndSet(null, tmpLogRoller)) {
1806 roller = tmpLogRoller;
1807 } else {
1808
1809 Threads.shutdown(tmpLogRoller.getThread());
1810 roller = metawalRoller.get();
1811 }
1812 }
1813 return roller;
1814 }
1815
1816 public MetricsRegionServer getRegionServerMetrics() {
1817 return this.metricsRegionServer;
1818 }
1819
1820
1821
1822
1823 public MasterAddressTracker getMasterAddressTracker() {
1824 return this.masterAddressTracker;
1825 }
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839 private void startServiceThreads() throws IOException {
1840
1841 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1842 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1843 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1844 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1845 this.service.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
1846 conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
1847 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1848 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1849 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1850 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1851 if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
1852 this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
1853 conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
1854 }
1855 this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
1856 "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
1857
1858 if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
1859 this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
1860 conf.getInt("hbase.regionserver.region.replica.flusher.threads",
1861 conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
1862 }
1863
1864 Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller",
1865 uncaughtExceptionHandler);
1866 this.cacheFlusher.start(uncaughtExceptionHandler);
1867
1868 if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
1869 if (this.periodicFlusher != null) choreService.scheduleChore(periodicFlusher);
1870 if (this.healthCheckChore != null) choreService.scheduleChore(healthCheckChore);
1871 if (this.nonceManagerChore != null) choreService.scheduleChore(nonceManagerChore);
1872 if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
1873 if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
1874 if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
1875
1876
1877
1878 Threads.setDaemonThreadRunning(this.leases.getThread(), getName() + ".leaseChecker",
1879 uncaughtExceptionHandler);
1880
1881 if (this.replicationSourceHandler == this.replicationSinkHandler &&
1882 this.replicationSourceHandler != null) {
1883 this.replicationSourceHandler.startReplicationService();
1884 } else {
1885 if (this.replicationSourceHandler != null) {
1886 this.replicationSourceHandler.startReplicationService();
1887 }
1888 if (this.replicationSinkHandler != null) {
1889 this.replicationSinkHandler.startReplicationService();
1890 }
1891 }
1892
1893
1894
1895
1896
1897 Configuration sinkConf = HBaseConfiguration.create(conf);
1898 sinkConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
1899 conf.getInt("hbase.log.replay.retries.number", 8));
1900 sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
1901 conf.getInt("hbase.log.replay.rpc.timeout", 30000));
1902 sinkConf.setInt("hbase.client.serverside.retries.multiplier", 1);
1903 this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
1904 splitLogWorker.start();
1905 }
1906
1907
1908
1909
1910
1911
1912 private int putUpWebUI() throws IOException {
1913 int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
1914 HConstants.DEFAULT_REGIONSERVER_INFOPORT);
1915 String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
1916
1917 if(this instanceof HMaster) {
1918 port = conf.getInt(HConstants.MASTER_INFO_PORT,
1919 HConstants.DEFAULT_MASTER_INFOPORT);
1920 addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1921 }
1922
1923 if (port < 0) return port;
1924
1925 if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
1926 String msg =
1927 "Failed to start http info server. Address " + addr
1928 + " does not belong to this host. Correct configuration parameter: "
1929 + "hbase.regionserver.info.bindAddress";
1930 LOG.error(msg);
1931 throw new IOException(msg);
1932 }
1933
1934 boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO,
1935 false);
1936 while (true) {
1937 try {
1938 this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf);
1939 infoServer.addServlet("dump", "/dump", getDumpServlet());
1940 configureInfoServer();
1941 this.infoServer.start();
1942 break;
1943 } catch (BindException e) {
1944 if (!auto) {
1945
1946 LOG.error("Failed binding http info server to port: " + port);
1947 throw e;
1948 }
1949
1950 LOG.info("Failed binding http info server to port: " + port);
1951 port++;
1952 }
1953 }
1954 port = this.infoServer.getPort();
1955 conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port);
1956 int masterInfoPort = conf.getInt(HConstants.MASTER_INFO_PORT,
1957 HConstants.DEFAULT_MASTER_INFOPORT);
1958 conf.setInt("hbase.master.info.port.orig", masterInfoPort);
1959 conf.setInt(HConstants.MASTER_INFO_PORT, port);
1960 return port;
1961 }
1962
1963
1964
1965
1966 private boolean isHealthy() {
1967 if (!fsOk) {
1968
1969 return false;
1970 }
1971
1972 if (!(leases.isAlive()
1973 && cacheFlusher.isAlive() && walRoller.isAlive()
1974 && this.compactionChecker.isScheduled()
1975 && this.periodicFlusher.isScheduled())) {
1976 stop("One or more threads are no longer alive -- stop");
1977 return false;
1978 }
1979 final LogRoller metawalRoller = this.metawalRoller.get();
1980 if (metawalRoller != null && !metawalRoller.isAlive()) {
1981 stop("Meta WAL roller thread is no longer alive -- stop");
1982 return false;
1983 }
1984 return true;
1985 }
1986
1987 private static final byte[] UNSPECIFIED_REGION = new byte[]{};
1988
1989 @Override
1990 public WAL getWAL(HRegionInfo regionInfo) throws IOException {
1991 WAL wal;
1992 LogRoller roller = walRoller;
1993
1994 if (regionInfo != null && regionInfo.isMetaTable() &&
1995 regionInfo.getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1996 roller = ensureMetaWALRoller();
1997 wal = walFactory.getMetaWAL(regionInfo.getEncodedNameAsBytes());
1998 } else if (regionInfo == null) {
1999 wal = walFactory.getWAL(UNSPECIFIED_REGION);
2000 } else {
2001 wal = walFactory.getWAL(regionInfo.getEncodedNameAsBytes());
2002 }
2003 roller.addWAL(wal);
2004 return wal;
2005 }
2006
2007 @Override
2008 public ClusterConnection getConnection() {
2009 return this.clusterConnection;
2010 }
2011
2012 @Override
2013 public MetaTableLocator getMetaTableLocator() {
2014 return this.metaTableLocator;
2015 }
2016
2017 @Override
2018 public void stop(final String msg) {
2019 stop(msg, false);
2020 }
2021
2022
2023
2024
2025
2026
2027 public void stop(final String msg, final boolean force) {
2028 if (!this.stopped) {
2029 if (this.rsHost != null) {
2030
2031 try {
2032 this.rsHost.preStop(msg);
2033 } catch (IOException ioe) {
2034 if (!force) {
2035 LOG.warn("The region server did not stop", ioe);
2036 return;
2037 }
2038 LOG.warn("Skipping coprocessor exception on preStop() due to forced shutdown", ioe);
2039 }
2040 }
2041 this.stopped = true;
2042 LOG.info("STOPPED: " + msg);
2043
2044 sleeper.skipSleepCycle();
2045 }
2046 }
2047
2048 public void waitForServerOnline(){
2049 while (!isStopped() && !isOnline()) {
2050 synchronized (online) {
2051 try {
2052 online.wait(msgInterval);
2053 } catch (InterruptedException ie) {
2054 Thread.currentThread().interrupt();
2055 break;
2056 }
2057 }
2058 }
2059 }
2060
2061 @Override
2062 public void postOpenDeployTasks(final Region r) throws KeeperException, IOException {
2063 postOpenDeployTasks(new PostOpenDeployContext(r, -1));
2064 }
2065
2066 @Override
2067 public void postOpenDeployTasks(final PostOpenDeployContext context)
2068 throws KeeperException, IOException {
2069 Region r = context.getRegion();
2070 long masterSystemTime = context.getMasterSystemTime();
2071 Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
2072 rpcServices.checkOpen();
2073 LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
2074
2075 for (Store s : r.getStores()) {
2076 if (s.hasReferences() || s.needsCompaction()) {
2077 this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
2078 }
2079 }
2080 long openSeqNum = r.getOpenSeqNum();
2081 if (openSeqNum == HConstants.NO_SEQNUM) {
2082
2083 LOG.error("No sequence number found when opening " +
2084 r.getRegionInfo().getRegionNameAsString());
2085 openSeqNum = 0;
2086 }
2087
2088
2089 updateRecoveringRegionLastFlushedSequenceId(r);
2090
2091
2092 if (r.getRegionInfo().isMetaRegion()) {
2093 MetaTableLocator.setMetaLocation(getZooKeeper(), serverName, r.getRegionInfo().getReplicaId(),
2094 State.OPEN);
2095 } else if (useZKForAssignment) {
2096 MetaTableAccessor.updateRegionLocation(getConnection(), r.getRegionInfo(),
2097 this.serverName, openSeqNum, masterSystemTime);
2098 }
2099 if (!useZKForAssignment && !reportRegionStateTransition(new RegionStateTransitionContext(
2100 TransitionCode.OPENED, openSeqNum, masterSystemTime, r.getRegionInfo()))) {
2101 throw new IOException("Failed to report opened region to master: "
2102 + r.getRegionInfo().getRegionNameAsString());
2103 }
2104
2105 triggerFlushInPrimaryRegion((HRegion)r);
2106
2107 LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
2108 }
2109
2110 @Override
2111 public boolean reportRegionStateTransition(TransitionCode code, HRegionInfo... hris) {
2112 return reportRegionStateTransition(code, HConstants.NO_SEQNUM, hris);
2113 }
2114
2115 @Override
2116 public boolean reportRegionStateTransition(
2117 TransitionCode code, long openSeqNum, HRegionInfo... hris) {
2118 return reportRegionStateTransition(
2119 new RegionStateTransitionContext(code, HConstants.NO_SEQNUM, -1, hris));
2120 }
2121
2122 @Override
2123 public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
2124 TransitionCode code = context.getCode();
2125 long openSeqNum = context.getOpenSeqNum();
2126 long masterSystemTime = context.getMasterSystemTime();
2127 HRegionInfo[] hris = context.getHris();
2128
2129 ReportRegionStateTransitionRequest.Builder builder =
2130 ReportRegionStateTransitionRequest.newBuilder();
2131 builder.setServer(ProtobufUtil.toServerName(serverName));
2132 RegionStateTransition.Builder transition = builder.addTransitionBuilder();
2133 transition.setTransitionCode(code);
2134 if (code == TransitionCode.OPENED && openSeqNum >= 0) {
2135 transition.setOpenSeqNum(openSeqNum);
2136 }
2137 for (HRegionInfo hri: hris) {
2138 transition.addRegionInfo(HRegionInfo.convert(hri));
2139 }
2140 ReportRegionStateTransitionRequest request = builder.build();
2141 while (keepLooping()) {
2142 RegionServerStatusService.BlockingInterface rss = rssStub;
2143 try {
2144 if (rss == null) {
2145 createRegionServerStatusStub();
2146 continue;
2147 }
2148 ReportRegionStateTransitionResponse response =
2149 rss.reportRegionStateTransition(null, request);
2150 if (response.hasErrorMessage()) {
2151 LOG.info("Failed to transition " + hris[0]
2152 + " to " + code + ": " + response.getErrorMessage());
2153 return false;
2154 }
2155 return true;
2156 } catch (ServiceException se) {
2157 IOException ioe = ProtobufUtil.getRemoteException(se);
2158 LOG.info("Failed to report region transition, will retry", ioe);
2159 if (rssStub == rss) {
2160 rssStub = null;
2161 }
2162 }
2163 }
2164 return false;
2165 }
2166
2167
2168
2169
2170
2171 void triggerFlushInPrimaryRegion(final HRegion region) {
2172 if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
2173 return;
2174 }
2175 if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
2176 !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
2177 region.conf)) {
2178 region.setReadsEnabled(true);
2179 return;
2180 }
2181
2182 region.setReadsEnabled(false);
2183
2184
2185
2186 this.service.submit(
2187 new RegionReplicaFlushHandler(this, clusterConnection,
2188 rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
2189 }
2190
2191 @Override
2192 public RpcServerInterface getRpcServer() {
2193 return rpcServices.rpcServer;
2194 }
2195
2196 @VisibleForTesting
2197 public RSRpcServices getRSRpcServices() {
2198 return rpcServices;
2199 }
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211 @Override
2212 public void abort(final String reason, Throwable cause) {
2213 String msg = "ABORTING region server " + this + ": " + reason;
2214 if (cause != null) {
2215 LOG.fatal(msg, cause);
2216 } else {
2217 LOG.fatal(msg);
2218 }
2219 this.abortRequested = true;
2220
2221
2222
2223 LOG.fatal("RegionServer abort: loaded coprocessors are: " +
2224 CoprocessorHost.getLoadedCoprocessors());
2225
2226 try {
2227 LOG.info("Dump of metrics as JSON on abort: " + JSONBean.dumpRegionServerMetrics());
2228 } catch (MalformedObjectNameException | IOException e) {
2229 LOG.warn("Failed dumping metrics", e);
2230 }
2231
2232
2233 try {
2234 if (cause != null) {
2235 msg += "\nCause:\n" + StringUtils.stringifyException(cause);
2236 }
2237
2238 if (rssStub != null && this.serverName != null) {
2239 ReportRSFatalErrorRequest.Builder builder =
2240 ReportRSFatalErrorRequest.newBuilder();
2241 ServerName sn =
2242 ServerName.parseVersionedServerName(this.serverName.getVersionedBytes());
2243 builder.setServer(ProtobufUtil.toServerName(sn));
2244 builder.setErrorMessage(msg);
2245 rssStub.reportRSFatalError(null, builder.build());
2246 }
2247 } catch (Throwable t) {
2248 LOG.warn("Unable to report fatal error to master", t);
2249 }
2250
2251 if (User.isHBaseSecurityEnabled(conf)) {
2252 try {
2253 User.runAsLoginUser(new PrivilegedExceptionAction<Object>() {
2254 @Override
2255 public Object run() throws Exception {
2256 stop(reason, true);
2257 return null;
2258 }
2259 });
2260 } catch (IOException neverThrown) {
2261 }
2262 } else {
2263 stop(reason, true);
2264 }
2265 }
2266
2267
2268
2269
2270 public void abort(String reason) {
2271 abort(reason, null);
2272 }
2273
2274 @Override
2275 public boolean isAborted() {
2276 return this.abortRequested;
2277 }
2278
2279
2280
2281
2282
2283
2284 protected void kill() {
2285 this.killed = true;
2286 abort("Simulated kill");
2287 }
2288
2289
2290
2291
2292 protected void sendShutdownInterrupt() {
2293 }
2294
2295
2296
2297
2298
2299 protected void stopServiceThreads() {
2300
2301
2302 if (this.choreService != null) choreService.shutdown();
2303 if (this.nonceManagerChore != null) nonceManagerChore.cancel(true);
2304 if (this.compactionChecker != null) compactionChecker.cancel(true);
2305 if (this.periodicFlusher != null) periodicFlusher.cancel(true);
2306 if (this.healthCheckChore != null) healthCheckChore.cancel(true);
2307 if (this.storefileRefresher != null) storefileRefresher.cancel(true);
2308 if (this.movedRegionsCleaner != null) movedRegionsCleaner.cancel(true);
2309 if (this.fsUtilizationChore != null) fsUtilizationChore.cancel(true);
2310
2311 if (this.cacheFlusher != null) {
2312 this.cacheFlusher.join();
2313 }
2314
2315 if (this.spanReceiverHost != null) {
2316 this.spanReceiverHost.closeReceivers();
2317 }
2318 if (this.walRoller != null) {
2319 Threads.shutdown(this.walRoller.getThread());
2320 }
2321 final LogRoller metawalRoller = this.metawalRoller.get();
2322 if (metawalRoller != null) {
2323 Threads.shutdown(metawalRoller.getThread());
2324 }
2325 if (this.compactSplitThread != null) {
2326 this.compactSplitThread.join();
2327 }
2328 if (this.service != null) this.service.shutdown();
2329 if (this.replicationSourceHandler != null &&
2330 this.replicationSourceHandler == this.replicationSinkHandler) {
2331 this.replicationSourceHandler.stopReplicationService();
2332 } else {
2333 if (this.replicationSourceHandler != null) {
2334 this.replicationSourceHandler.stopReplicationService();
2335 }
2336 if (this.replicationSinkHandler != null) {
2337 this.replicationSinkHandler.stopReplicationService();
2338 }
2339 }
2340 }
2341
2342
2343
2344
2345
2346 public ReplicationSourceService getReplicationSourceService() {
2347 return replicationSourceHandler;
2348 }
2349
2350
2351
2352
2353
2354 ReplicationSinkService getReplicationSinkService() {
2355 return replicationSinkHandler;
2356 }
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366 @VisibleForTesting
2367 protected synchronized ServerName createRegionServerStatusStub() {
2368 if (rssStub != null) {
2369 return masterAddressTracker.getMasterAddress();
2370 }
2371 ServerName sn = null;
2372 long previousLogTime = 0;
2373 boolean refresh = false;
2374 RegionServerStatusService.BlockingInterface intf = null;
2375 boolean interrupted = false;
2376 try {
2377 while (keepLooping()) {
2378 sn = this.masterAddressTracker.getMasterAddress(refresh);
2379 if (sn == null) {
2380 if (!keepLooping()) {
2381
2382 LOG.debug("No master found and cluster is stopped; bailing out");
2383 return null;
2384 }
2385 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2386 LOG.debug("No master found; retry");
2387 previousLogTime = System.currentTimeMillis();
2388 }
2389 refresh = true;
2390 if (sleep(200)) {
2391 interrupted = true;
2392 }
2393 continue;
2394 }
2395
2396
2397 if (this instanceof HMaster && sn.equals(getServerName())) {
2398 intf = ((HMaster)this).getMasterRpcServices();
2399 break;
2400 }
2401 try {
2402 BlockingRpcChannel channel =
2403 this.rpcClient.createBlockingRpcChannel(sn, userProvider.getCurrent(),
2404 shortOperationTimeout);
2405 intf = RegionServerStatusService.newBlockingStub(channel);
2406 break;
2407 } catch (IOException e) {
2408 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
2409 e = e instanceof RemoteException ?
2410 ((RemoteException)e).unwrapRemoteException() : e;
2411 if (e instanceof ServerNotRunningYetException) {
2412 LOG.info("Master isn't available yet, retrying");
2413 } else {
2414 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
2415 }
2416 previousLogTime = System.currentTimeMillis();
2417 }
2418 if (sleep(200)) {
2419 interrupted = true;
2420 }
2421 }
2422 }
2423 } finally {
2424 if (interrupted) {
2425 Thread.currentThread().interrupt();
2426 }
2427 }
2428 rssStub = intf;
2429 return sn;
2430 }
2431
2432
2433
2434
2435
2436 private boolean keepLooping() {
2437 return !this.stopped && isClusterUp();
2438 }
2439
2440
2441
2442
2443
2444
2445
2446
2447 private RegionServerStartupResponse reportForDuty() throws IOException {
2448 ServerName masterServerName = createRegionServerStatusStub();
2449 if (masterServerName == null) return null;
2450 RegionServerStartupResponse result = null;
2451 try {
2452 rpcServices.requestCount.set(0);
2453 rpcServices.rpcGetRequestCount.set(0);
2454 rpcServices.rpcScanRequestCount.set(0);
2455 rpcServices.rpcMultiRequestCount.set(0);
2456 rpcServices.rpcMutateRequestCount.set(0);
2457 LOG.info("reportForDuty to master=" + masterServerName + " with port="
2458 + rpcServices.isa.getPort() + ", startcode=" + this.startcode);
2459 long now = EnvironmentEdgeManager.currentTime();
2460 int port = rpcServices.isa.getPort();
2461 RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder();
2462 if (shouldUseThisHostnameInstead()) {
2463 request.setUseThisHostnameInstead(useThisHostnameInstead);
2464 }
2465 request.setPort(port);
2466 request.setServerStartCode(this.startcode);
2467 request.setServerCurrentTime(now);
2468 result = this.rssStub.regionServerStartup(null, request.build());
2469 } catch (ServiceException se) {
2470 IOException ioe = ProtobufUtil.getRemoteException(se);
2471 if (ioe instanceof ClockOutOfSyncException) {
2472 LOG.fatal("Master rejected startup because clock is out of sync", ioe);
2473
2474 throw ioe;
2475 } else if (ioe instanceof ServerNotRunningYetException) {
2476 LOG.debug("Master is not running yet");
2477 } else {
2478 LOG.warn("error telling master we are up", se);
2479 }
2480 rssStub = null;
2481 }
2482 return result;
2483 }
2484
2485 @Override
2486 public RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) {
2487 try {
2488 GetLastFlushedSequenceIdRequest req =
2489 RequestConverter.buildGetLastFlushedSequenceIdRequest(encodedRegionName);
2490 RegionServerStatusService.BlockingInterface rss = rssStub;
2491 if (rss == null) {
2492 createRegionServerStatusStub();
2493 rss = rssStub;
2494 if (rss == null) {
2495
2496 LOG.warn("Unable to connect to the master to check " + "the last flushed sequence id");
2497 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2498 .build();
2499 }
2500 }
2501 GetLastFlushedSequenceIdResponse resp = rss.getLastFlushedSequenceId(null, req);
2502 return RegionStoreSequenceIds.newBuilder()
2503 .setLastFlushedSequenceId(resp.getLastFlushedSequenceId())
2504 .addAllStoreSequenceId(resp.getStoreLastFlushedSequenceIdList()).build();
2505 } catch (ServiceException e) {
2506 LOG.warn("Unable to connect to the master to check the last flushed sequence id", e);
2507 return RegionStoreSequenceIds.newBuilder().setLastFlushedSequenceId(HConstants.NO_SEQNUM)
2508 .build();
2509 }
2510 }
2511
2512
2513
2514
2515
2516
2517 protected void closeAllRegions(final boolean abort) {
2518 closeUserRegions(abort);
2519 closeMetaTableRegions(abort);
2520 }
2521
2522
2523
2524
2525
2526 void closeMetaTableRegions(final boolean abort) {
2527 Region meta = null;
2528 this.lock.writeLock().lock();
2529 try {
2530 for (Map.Entry<String, Region> e: onlineRegions.entrySet()) {
2531 HRegionInfo hri = e.getValue().getRegionInfo();
2532 if (hri.isMetaRegion()) {
2533 meta = e.getValue();
2534 }
2535 if (meta != null) break;
2536 }
2537 } finally {
2538 this.lock.writeLock().unlock();
2539 }
2540 if (meta != null) closeRegionIgnoreErrors(meta.getRegionInfo(), abort);
2541 }
2542
2543
2544
2545
2546
2547
2548
2549 void closeUserRegions(final boolean abort) {
2550 this.lock.writeLock().lock();
2551 try {
2552 for (Map.Entry<String, Region> e: this.onlineRegions.entrySet()) {
2553 Region r = e.getValue();
2554 if (!r.getRegionInfo().isMetaTable() && r.isAvailable()) {
2555
2556 closeRegionIgnoreErrors(r.getRegionInfo(), abort);
2557 }
2558 }
2559 } finally {
2560 this.lock.writeLock().unlock();
2561 }
2562 }
2563
2564
2565 public InfoServer getInfoServer() {
2566 return infoServer;
2567 }
2568
2569
2570
2571
2572 @Override
2573 public boolean isStopped() {
2574 return this.stopped;
2575 }
2576
2577 @Override
2578 public boolean isStopping() {
2579 return this.stopping;
2580 }
2581
2582 @Override
2583 public Map<String, Region> getRecoveringRegions() {
2584 return this.recoveringRegions;
2585 }
2586
2587
2588
2589
2590
2591 @Override
2592 public Configuration getConfiguration() {
2593 return conf;
2594 }
2595
2596
2597 ReentrantReadWriteLock.WriteLock getWriteLock() {
2598 return lock.writeLock();
2599 }
2600
2601 public int getNumberOfOnlineRegions() {
2602 return this.onlineRegions.size();
2603 }
2604
2605 boolean isOnlineRegionsEmpty() {
2606 return this.onlineRegions.isEmpty();
2607 }
2608
2609
2610
2611
2612
2613
2614 public Collection<Region> getOnlineRegionsLocalContext() {
2615 Collection<Region> regions = this.onlineRegions.values();
2616 return Collections.unmodifiableCollection(regions);
2617 }
2618
2619 @Override
2620 public void addToOnlineRegions(Region region) {
2621 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2622 configurationManager.registerObserver(region);
2623 }
2624
2625
2626
2627
2628
2629
2630 SortedMap<Long, Region> getCopyOfOnlineRegionsSortedBySize() {
2631
2632 SortedMap<Long, Region> sortedRegions = new TreeMap<Long, Region>(
2633 new Comparator<Long>() {
2634 @Override
2635 public int compare(Long a, Long b) {
2636 return -1 * a.compareTo(b);
2637 }
2638 });
2639
2640 for (Region region : this.onlineRegions.values()) {
2641 sortedRegions.put(region.getMemstoreSize(), region);
2642 }
2643 return sortedRegions;
2644 }
2645
2646
2647
2648
2649 public long getStartcode() {
2650 return this.startcode;
2651 }
2652
2653
2654 @Override
2655 public FlushRequester getFlushRequester() {
2656 return this.cacheFlusher;
2657 }
2658
2659
2660
2661
2662
2663
2664
2665 protected HRegionInfo[] getMostLoadedRegions() {
2666 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2667 for (Region r : onlineRegions.values()) {
2668 if (!r.isAvailable()) {
2669 continue;
2670 }
2671 if (regions.size() < numRegionsToReport) {
2672 regions.add(r.getRegionInfo());
2673 } else {
2674 break;
2675 }
2676 }
2677 return regions.toArray(new HRegionInfo[regions.size()]);
2678 }
2679
2680 @Override
2681 public Leases getLeases() {
2682 return leases;
2683 }
2684
2685
2686
2687
2688 protected Path getRootDir() {
2689 return rootDir;
2690 }
2691
2692
2693
2694
2695 @Override
2696 public FileSystem getFileSystem() {
2697 return fs;
2698 }
2699
2700
2701
2702
2703 protected Path getWALRootDir() {
2704 return walRootDir;
2705 }
2706
2707
2708
2709
2710 protected FileSystem getWALFileSystem() {
2711 return walFs;
2712 }
2713
2714 @Override
2715 public String toString() {
2716 return getServerName().toString();
2717 }
2718
2719
2720
2721
2722
2723
2724 public int getThreadWakeFrequency() {
2725 return threadWakeFrequency;
2726 }
2727
2728 @Override
2729 public ZooKeeperWatcher getZooKeeper() {
2730 return zooKeeper;
2731 }
2732
2733 @Override
2734 public BaseCoordinatedStateManager getCoordinatedStateManager() {
2735 return csm;
2736 }
2737
2738 @Override
2739 public ServerName getServerName() {
2740 return serverName;
2741 }
2742
2743 @Override
2744 public CompactionRequestor getCompactionRequester() {
2745 return this.compactSplitThread;
2746 }
2747
2748 public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
2749 return this.rsHost;
2750 }
2751
2752 @Override
2753 public ConcurrentMap<byte[], Boolean> getRegionsInTransitionInRS() {
2754 return this.regionsInTransitionInRS;
2755 }
2756
2757 @Override
2758 public ExecutorService getExecutorService() {
2759 return service;
2760 }
2761
2762 @Override
2763 public ChoreService getChoreService() {
2764 return choreService;
2765 }
2766
2767 @Override
2768 public RegionServerRpcQuotaManager getRegionServerRpcQuotaManager() {
2769 return rsQuotaManager;
2770 }
2771
2772
2773
2774
2775
2776
2777
2778
2779 static private void createNewReplicationInstance(Configuration conf,
2780 HRegionServer server, FileSystem walFs, Path walDir, Path oldWALDir) throws IOException{
2781
2782
2783 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
2784 HConstants.REPLICATION_ENABLE_DEFAULT)) {
2785 return;
2786 }
2787
2788
2789 String sourceClassname = conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
2790 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2791
2792
2793 String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
2794 HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
2795
2796
2797
2798 if (sourceClassname.equals(sinkClassname)) {
2799 server.replicationSourceHandler = (ReplicationSourceService)
2800 newReplicationInstance(sourceClassname,
2801 conf, server, walFs, walDir, oldWALDir);
2802 server.replicationSinkHandler = (ReplicationSinkService)
2803 server.replicationSourceHandler;
2804 } else {
2805 server.replicationSourceHandler = (ReplicationSourceService)
2806 newReplicationInstance(sourceClassname,
2807 conf, server, walFs, walDir, oldWALDir);
2808 server.replicationSinkHandler = (ReplicationSinkService)
2809 newReplicationInstance(sinkClassname,
2810 conf, server, walFs, walDir, oldWALDir);
2811 }
2812 }
2813
2814 static private ReplicationService newReplicationInstance(String classname,
2815 Configuration conf, HRegionServer server, FileSystem walFs, Path walDir,
2816 Path oldLogDir) throws IOException{
2817
2818 Class<?> clazz = null;
2819 try {
2820 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
2821 clazz = Class.forName(classname, true, classLoader);
2822 } catch (java.lang.ClassNotFoundException nfe) {
2823 throw new IOException("Could not find class for " + classname);
2824 }
2825
2826
2827 ReplicationService service = (ReplicationService)
2828 ReflectionUtils.newInstance(clazz, conf);
2829 service.initialize(server, walFs, walDir, oldLogDir);
2830 return service;
2831 }
2832
2833
2834
2835
2836
2837
2838
2839
2840 public static HRegionServer constructRegionServer(
2841 Class<? extends HRegionServer> regionServerClass,
2842 final Configuration conf2, CoordinatedStateManager cp) {
2843 try {
2844 Constructor<? extends HRegionServer> c = regionServerClass
2845 .getConstructor(Configuration.class, CoordinatedStateManager.class);
2846 return c.newInstance(conf2, cp);
2847 } catch (Exception e) {
2848 throw new RuntimeException("Failed construction of " + "Regionserver: "
2849 + regionServerClass.toString(), e);
2850 }
2851 }
2852
2853
2854
2855
2856 public static void main(String[] args) throws Exception {
2857 VersionInfo.logVersion();
2858 Configuration conf = HBaseConfiguration.create();
2859 @SuppressWarnings("unchecked")
2860 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2861 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2862
2863 new HRegionServerCommandLine(regionServerClass).doMain(args);
2864 }
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876 @Override
2877 public List<Region> getOnlineRegions(TableName tableName) {
2878 List<Region> tableRegions = new ArrayList<Region>();
2879 synchronized (this.onlineRegions) {
2880 for (Region region: this.onlineRegions.values()) {
2881 HRegionInfo regionInfo = region.getRegionInfo();
2882 if(regionInfo.getTable().equals(tableName)) {
2883 tableRegions.add(region);
2884 }
2885 }
2886 }
2887 return tableRegions;
2888 }
2889
2890
2891
2892
2893
2894
2895 public List<Region> getOnlineRegions() {
2896 List<Region> allRegions = new ArrayList<Region>();
2897 synchronized (this.onlineRegions) {
2898
2899 allRegions.addAll(onlineRegions.values());
2900 }
2901 return allRegions;
2902 }
2903
2904
2905
2906
2907
2908
2909 @Override
2910 public Set<TableName> getOnlineTables() {
2911 Set<TableName> tables = new HashSet<TableName>();
2912 synchronized (this.onlineRegions) {
2913 for (Region region: this.onlineRegions.values()) {
2914 tables.add(region.getTableDesc().getTableName());
2915 }
2916 }
2917 return tables;
2918 }
2919
2920
2921 public String[] getRegionServerCoprocessors() {
2922 TreeSet<String> coprocessors = new TreeSet<String>();
2923 try {
2924 coprocessors.addAll(getWAL(null).getCoprocessorHost().getCoprocessors());
2925 } catch (IOException exception) {
2926 LOG.warn("Exception attempting to fetch wal coprocessor information for the common wal; " +
2927 "skipping.");
2928 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2929 }
2930 Collection<Region> regions = getOnlineRegionsLocalContext();
2931 for (Region region: regions) {
2932 coprocessors.addAll(region.getCoprocessorHost().getCoprocessors());
2933 try {
2934 coprocessors.addAll(getWAL(region.getRegionInfo()).getCoprocessorHost().getCoprocessors());
2935 } catch (IOException exception) {
2936 LOG.warn("Exception attempting to fetch wal coprocessor information for region " + region +
2937 "; skipping.");
2938 LOG.debug("Exception details for failure to fetch wal coprocessor information.", exception);
2939 }
2940 }
2941 return coprocessors.toArray(new String[coprocessors.size()]);
2942 }
2943
2944
2945
2946
2947
2948 private void closeRegionIgnoreErrors(HRegionInfo region, final boolean abort) {
2949 try {
2950 CloseRegionCoordination.CloseRegionDetails details =
2951 csm.getCloseRegionCoordination().getDetaultDetails();
2952 if (!closeRegion(region.getEncodedName(), abort, details, null)) {
2953 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2954 " - ignoring and continuing");
2955 }
2956 } catch (IOException e) {
2957 LOG.warn("Failed to close " + region.getRegionNameAsString() +
2958 " - ignoring and continuing", e);
2959 }
2960 }
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982 protected boolean closeRegion(String encodedName, final boolean abort,
2983 CloseRegionCoordination.CloseRegionDetails crd, final ServerName sn)
2984 throws NotServingRegionException, RegionAlreadyInTransitionException {
2985
2986 Region actualRegion = this.getFromOnlineRegions(encodedName);
2987 if ((actualRegion != null) && (actualRegion.getCoprocessorHost() != null)) {
2988 try {
2989 actualRegion.getCoprocessorHost().preClose(false);
2990 } catch (IOException exp) {
2991 LOG.warn("Unable to close region: the coprocessor launched an error ", exp);
2992 return false;
2993 }
2994 }
2995
2996 final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
2997 Boolean.FALSE);
2998
2999 if (Boolean.TRUE.equals(previous)) {
3000 LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
3001 "trying to OPEN. Cancelling OPENING.");
3002 if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
3003
3004
3005 LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
3006 " Doing a standard close now");
3007 return closeRegion(encodedName, abort, crd, sn);
3008 }
3009
3010 actualRegion = this.getFromOnlineRegions(encodedName);
3011 if (actualRegion == null) {
3012 LOG.info("The opening previously in progress has been cancelled by a CLOSE request.");
3013
3014 throw new RegionAlreadyInTransitionException("The region " + encodedName +
3015 " was opening but not yet served. Opening is cancelled.");
3016 }
3017 } else if (Boolean.FALSE.equals(previous)) {
3018 LOG.info("Received CLOSE for the region: " + encodedName +
3019 ", which we are already trying to CLOSE, but not completed yet");
3020
3021
3022
3023
3024
3025
3026 throw new RegionAlreadyInTransitionException("The region " + encodedName +
3027 " was already closing. New CLOSE request is ignored.");
3028 }
3029
3030 if (actualRegion == null) {
3031 LOG.error("Received CLOSE for a region which is not online, and we're not opening.");
3032 this.regionsInTransitionInRS.remove(encodedName.getBytes());
3033
3034 throw new NotServingRegionException("The region " + encodedName +
3035 " is not online, and is not opening.");
3036 }
3037
3038 CloseRegionHandler crh;
3039 final HRegionInfo hri = actualRegion.getRegionInfo();
3040 if (hri.isMetaRegion()) {
3041 crh = new CloseMetaHandler(this, this, hri, abort,
3042 csm.getCloseRegionCoordination(), crd);
3043 } else {
3044 crh = new CloseRegionHandler(this, this, hri, abort,
3045 csm.getCloseRegionCoordination(), crd, sn);
3046 }
3047 this.service.submit(crh);
3048 return true;
3049 }
3050
3051
3052
3053
3054
3055
3056 public Region getOnlineRegion(final byte[] regionName) {
3057 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
3058 return this.onlineRegions.get(encodedRegionName);
3059 }
3060
3061 public InetSocketAddress[] getRegionBlockLocations(final String encodedRegionName) {
3062 return this.regionFavoredNodesMap.get(encodedRegionName);
3063 }
3064
3065 @Override
3066 public Region getFromOnlineRegions(final String encodedRegionName) {
3067 return this.onlineRegions.get(encodedRegionName);
3068 }
3069
3070
3071 @Override
3072 public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
3073 Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
3074
3075 if (destination != null) {
3076 long closeSeqNum = r.getMaxFlushedSeqId();
3077 if (closeSeqNum == HConstants.NO_SEQNUM) {
3078
3079 closeSeqNum = r.getOpenSeqNum();
3080 if (closeSeqNum == HConstants.NO_SEQNUM) {
3081 closeSeqNum = 0;
3082 }
3083 }
3084 addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
3085 }
3086 this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
3087 return toReturn != null;
3088 }
3089
3090
3091
3092
3093
3094
3095
3096
3097
3098 protected Region getRegion(final byte[] regionName)
3099 throws NotServingRegionException {
3100 String encodedRegionName = HRegionInfo.encodeRegionName(regionName);
3101 return getRegionByEncodedName(regionName, encodedRegionName);
3102 }
3103
3104 public Region getRegionByEncodedName(String encodedRegionName)
3105 throws NotServingRegionException {
3106 return getRegionByEncodedName(null, encodedRegionName);
3107 }
3108
3109 protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
3110 throws NotServingRegionException {
3111 Region region = this.onlineRegions.get(encodedRegionName);
3112 if (region == null) {
3113 MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
3114 if (moveInfo != null) {
3115 throw new RegionMovedException(moveInfo.getServerName(), moveInfo.getSeqNum());
3116 }
3117 Boolean isOpening = this.regionsInTransitionInRS.get(Bytes.toBytes(encodedRegionName));
3118 String regionNameStr = regionName == null?
3119 encodedRegionName: Bytes.toStringBinary(regionName);
3120 if (isOpening != null && isOpening.booleanValue()) {
3121 throw new RegionOpeningException("Region " + regionNameStr +
3122 " is opening on " + this.serverName);
3123 }
3124 throw new NotServingRegionException("Region " + regionNameStr +
3125 " is not online on " + this.serverName);
3126 }
3127 return region;
3128 }
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140 private Throwable cleanup(final Throwable t, final String msg) {
3141
3142 if (t instanceof NotServingRegionException) {
3143 LOG.debug("NotServingRegionException; " + t.getMessage());
3144 return t;
3145 }
3146 if (msg == null) {
3147 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
3148 } else {
3149 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
3150 }
3151 if (!rpcServices.checkOOME(t)) {
3152 checkFileSystem();
3153 }
3154 return t;
3155 }
3156
3157
3158
3159
3160
3161
3162
3163
3164 protected IOException convertThrowableToIOE(final Throwable t, final String msg) {
3165 return (t instanceof IOException ? (IOException) t : msg == null
3166 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
3167 }
3168
3169
3170
3171
3172
3173
3174
3175 public boolean checkFileSystem() {
3176 if (this.fsOk && this.fs != null) {
3177 try {
3178 FSUtils.checkFileSystemAvailable(this.fs);
3179 } catch (IOException e) {
3180 abort("File System not available", e);
3181 this.fsOk = false;
3182 }
3183 }
3184 return this.fsOk;
3185 }
3186
3187 @Override
3188 public void updateRegionFavoredNodesMapping(String encodedRegionName,
3189 List<org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName> favoredNodes) {
3190 InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()];
3191
3192
3193 for (int i = 0; i < favoredNodes.size(); i++) {
3194 addr[i] = InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
3195 favoredNodes.get(i).getPort());
3196 }
3197 regionFavoredNodesMap.put(encodedRegionName, addr);
3198 }
3199
3200
3201
3202
3203
3204
3205
3206 @Override
3207 public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) {
3208 return regionFavoredNodesMap.get(encodedRegionName);
3209 }
3210
3211 @Override
3212 public ServerNonceManager getNonceManager() {
3213 return this.nonceManager;
3214 }
3215
3216 private static class MovedRegionInfo {
3217 private final ServerName serverName;
3218 private final long seqNum;
3219 private final long ts;
3220
3221 public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
3222 this.serverName = serverName;
3223 this.seqNum = closeSeqNum;
3224 ts = EnvironmentEdgeManager.currentTime();
3225 }
3226
3227 public ServerName getServerName() {
3228 return serverName;
3229 }
3230
3231 public long getSeqNum() {
3232 return seqNum;
3233 }
3234
3235 public long getMoveTime() {
3236 return ts;
3237 }
3238 }
3239
3240
3241
3242 protected Map<String, MovedRegionInfo> movedRegions =
3243 new ConcurrentHashMap<String, MovedRegionInfo>(3000);
3244
3245
3246
3247 private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
3248
3249 protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
3250 if (ServerName.isSameHostnameAndPort(destination, this.getServerName())) {
3251 LOG.warn("Not adding moved region record: " + encodedName + " to self.");
3252 return;
3253 }
3254 LOG.info("Adding moved region record: "
3255 + encodedName + " to " + destination + " as of " + closeSeqNum);
3256 movedRegions.put(encodedName, new MovedRegionInfo(destination, closeSeqNum));
3257 }
3258
3259 void removeFromMovedRegions(String encodedName) {
3260 movedRegions.remove(encodedName);
3261 }
3262
3263 private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
3264 MovedRegionInfo dest = movedRegions.get(encodedRegionName);
3265
3266 long now = EnvironmentEdgeManager.currentTime();
3267 if (dest != null) {
3268 if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
3269 return dest;
3270 } else {
3271 movedRegions.remove(encodedRegionName);
3272 }
3273 }
3274
3275 return null;
3276 }
3277
3278
3279
3280
3281 protected void cleanMovedRegions() {
3282 final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
3283 Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
3284
3285 while (it.hasNext()){
3286 Map.Entry<String, MovedRegionInfo> e = it.next();
3287 if (e.getValue().getMoveTime() < cutOff) {
3288 it.remove();
3289 }
3290 }
3291 }
3292
3293
3294
3295
3296
3297 protected int movedRegionCleanerPeriod() {
3298 return TIMEOUT_REGION_MOVED;
3299 }
3300
3301
3302
3303
3304
3305 protected final static class MovedRegionsCleaner extends ScheduledChore implements Stoppable {
3306 private HRegionServer regionServer;
3307 Stoppable stoppable;
3308
3309 private MovedRegionsCleaner(
3310 HRegionServer regionServer, Stoppable stoppable){
3311 super("MovedRegionsCleaner for region " + regionServer, stoppable,
3312 regionServer.movedRegionCleanerPeriod());
3313 this.regionServer = regionServer;
3314 this.stoppable = stoppable;
3315 }
3316
3317 static MovedRegionsCleaner create(HRegionServer rs){
3318 Stoppable stoppable = new Stoppable() {
3319 private volatile boolean isStopped = false;
3320 @Override public void stop(String why) { isStopped = true;}
3321 @Override public boolean isStopped() {return isStopped;}
3322 };
3323
3324 return new MovedRegionsCleaner(rs, stoppable);
3325 }
3326
3327 @Override
3328 protected void chore() {
3329 regionServer.cleanMovedRegions();
3330 }
3331
3332 @Override
3333 public void stop(String why) {
3334 stoppable.stop(why);
3335 }
3336
3337 @Override
3338 public boolean isStopped() {
3339 return stoppable.isStopped();
3340 }
3341 }
3342
3343 private String getMyEphemeralNodePath() {
3344 return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
3345 }
3346
3347 private boolean isHealthCheckerConfigured() {
3348 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3349 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3350 }
3351
3352
3353
3354
3355 public CompactSplitThread getCompactSplitThread() {
3356 return this.compactSplitThread;
3357 }
3358
3359
3360
3361
3362
3363
3364
3365
3366 private void updateRecoveringRegionLastFlushedSequenceId(Region r) throws KeeperException,
3367 IOException {
3368 if (!r.isRecovering()) {
3369
3370 return;
3371 }
3372
3373 HRegionInfo regionInfo = r.getRegionInfo();
3374 ZooKeeperWatcher zkw = getZooKeeper();
3375 String previousRSName = this.getLastFailedRSFromZK(regionInfo.getEncodedName());
3376 Map<byte[], Long> maxSeqIdInStores = r.getMaxStoreSeqId();
3377 long minSeqIdForLogReplay = -1;
3378 for (Long storeSeqIdForReplay : maxSeqIdInStores.values()) {
3379 if (minSeqIdForLogReplay == -1 || storeSeqIdForReplay < minSeqIdForLogReplay) {
3380 minSeqIdForLogReplay = storeSeqIdForReplay;
3381 }
3382 }
3383
3384 try {
3385 long lastRecordedFlushedSequenceId = -1;
3386 String nodePath = ZKUtil.joinZNode(this.zooKeeper.recoveringRegionsZNode,
3387 regionInfo.getEncodedName());
3388
3389 byte[] data;
3390 try {
3391 data = ZKUtil.getData(zkw, nodePath);
3392 } catch (InterruptedException e) {
3393 throw new InterruptedIOException();
3394 }
3395 if (data != null) {
3396 lastRecordedFlushedSequenceId = ZKSplitLog.parseLastFlushedSequenceIdFrom(data);
3397 }
3398 if (data == null || lastRecordedFlushedSequenceId < minSeqIdForLogReplay) {
3399 ZKUtil.setData(zkw, nodePath, ZKUtil.positionToByteArray(minSeqIdForLogReplay));
3400 }
3401 if (previousRSName != null) {
3402
3403 nodePath = ZKUtil.joinZNode(nodePath, previousRSName);
3404 ZKUtil.setData(zkw, nodePath,
3405 ZKUtil.regionSequenceIdsToByteArray(minSeqIdForLogReplay, maxSeqIdInStores));
3406 LOG.debug("Update last flushed sequence id of region " + regionInfo.getEncodedName() +
3407 " for " + previousRSName);
3408 } else {
3409 LOG.warn("Can't find failed region server for recovering region " +
3410 regionInfo.getEncodedName());
3411 }
3412 } catch (NoNodeException ignore) {
3413 LOG.debug("Region " + regionInfo.getEncodedName() +
3414 " must have completed recovery because its recovery znode has been removed", ignore);
3415 }
3416 }
3417
3418
3419
3420
3421
3422
3423 private String getLastFailedRSFromZK(String encodedRegionName) throws KeeperException {
3424 String result = null;
3425 long maxZxid = 0;
3426 ZooKeeperWatcher zkw = this.getZooKeeper();
3427 String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
3428 List<String> failedServers = ZKUtil.listChildrenNoWatch(zkw, nodePath);
3429 if (failedServers == null || failedServers.isEmpty()) {
3430 return result;
3431 }
3432 for (String failedServer : failedServers) {
3433 String rsPath = ZKUtil.joinZNode(nodePath, failedServer);
3434 Stat stat = new Stat();
3435 ZKUtil.getDataNoWatch(zkw, rsPath, stat);
3436 if (maxZxid < stat.getCzxid()) {
3437 maxZxid = stat.getCzxid();
3438 result = failedServer;
3439 }
3440 }
3441 return result;
3442 }
3443
3444 public CoprocessorServiceResponse execRegionServerService(final RpcController controller,
3445 final CoprocessorServiceRequest serviceRequest) throws ServiceException {
3446 try {
3447 ServerRpcController execController = new ServerRpcController();
3448 CoprocessorServiceCall call = serviceRequest.getCall();
3449 String serviceName = call.getServiceName();
3450 String methodName = call.getMethodName();
3451 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
3452 throw new UnknownProtocolException(null,
3453 "No registered coprocessor service found for name " + serviceName);
3454 }
3455 Service service = coprocessorServiceHandlers.get(serviceName);
3456 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
3457 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
3458 if (methodDesc == null) {
3459 throw new UnknownProtocolException(service.getClass(), "Unknown method " + methodName
3460 + " called on service " + serviceName);
3461 }
3462 Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
3463 ProtobufUtil.mergeFrom(builderForType, call.getRequest());
3464 Message request = builderForType.build();
3465 final Message.Builder responseBuilder =
3466 service.getResponsePrototype(methodDesc).newBuilderForType();
3467 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
3468 @Override
3469 public void run(Message message) {
3470 if (message != null) {
3471 responseBuilder.mergeFrom(message);
3472 }
3473 }
3474 });
3475 Message execResult = responseBuilder.build();
3476 if (execController.getFailedOn() != null) {
3477 throw execController.getFailedOn();
3478 }
3479 ClientProtos.CoprocessorServiceResponse.Builder builder =
3480 ClientProtos.CoprocessorServiceResponse.newBuilder();
3481 builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
3482 HConstants.EMPTY_BYTE_ARRAY));
3483 builder.setValue(builder.getValueBuilder().setName(execResult.getClass().getName())
3484 .setValue(execResult.toByteString()));
3485 return builder.build();
3486 } catch (IOException ie) {
3487 throw new ServiceException(ie);
3488 }
3489 }
3490
3491
3492
3493
3494 public CacheConfig getCacheConfig() {
3495 return this.cacheConfig;
3496 }
3497
3498
3499
3500
3501 protected ConfigurationManager getConfigurationManager() {
3502 return configurationManager;
3503 }
3504
3505
3506
3507
3508 public TableDescriptors getTableDescriptors() {
3509 return this.tableDescriptors;
3510 }
3511
3512
3513
3514
3515 public void updateConfiguration() {
3516 LOG.info("Reloading the configuration from disk.");
3517
3518 conf.reloadConfiguration();
3519 configurationManager.notifyAllObservers(conf);
3520 }
3521
3522 @Override
3523 public HeapMemoryManager getHeapMemoryManager() {
3524 return hMemManager;
3525 }
3526
3527 @Override
3528 public double getCompactionPressure() {
3529 double max = 0;
3530 for (Region region : onlineRegions.values()) {
3531 for (Store store : region.getStores()) {
3532 double normCount = store.getCompactionPressure();
3533 if (normCount > max) {
3534 max = normCount;
3535 }
3536 }
3537 }
3538 return max;
3539 }
3540
3541 @Override
3542 public MetricsRegionServer getMetrics() {
3543 return metricsRegionServer;
3544 }
3545
3546 @Override
3547 public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
3548 return this.rsSpaceQuotaManager;
3549 }
3550 }