View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob.mapreduce;
20  
21  import java.util.List;
22  import java.util.SortedSet;
23  import java.util.TreeSet;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.master.TableLockManager;
27  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName;
28  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock;
29  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
30  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
31  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
32  import org.apache.zookeeper.KeeperException;
33  
34  /**
35   * Tracker on the sweep tool node in zookeeper.
36   * The sweep tool node is an ephemeral one, when the process dies this node is deleted,
37   * at that time MR might be still running, and if another sweep job is started, two MR
38   * for the same column family will run at the same time.
39   * This tracker watches this ephemeral node, if it's gone or it's not created by the
40   * sweep job that owns the current MR, the current process will be aborted.
41   */
42  @InterfaceAudience.Private
43  public class SweepJobNodeTracker extends ZooKeeperListener {
44  
45    private String parentNode;
46    private String lockNodePrefix;
47    private String owner;
48    private String lockNode;
49  
50    public SweepJobNodeTracker(ZooKeeperWatcher watcher, String parentNode, String owner) {
51      super(watcher);
52      this.parentNode = parentNode;
53      this.owner = owner;
54      this.lockNodePrefix = ZKUtil.joinZNode(parentNode, "write-");
55    }
56  
57    /**
58     * Registers the watcher on the sweep job node.
59     * If there's no such a sweep job node, or it's not created by the sweep job that
60     * owns the current MR, the current process will be aborted.
61     * This assumes the table lock uses the Zookeeper. It's a workaround and only used
62     * in the sweep tool, and the sweep tool will be removed after the mob file compaction
63     * is finished.
64     */
65    public void start() throws KeeperException {
66      watcher.registerListener(this);
67      List<String> children = ZKUtil.listChildrenNoWatch(watcher, parentNode);
68      if (children != null && !children.isEmpty()) {
69        // there are locks
70        TreeSet<String> sortedChildren = new TreeSet<String>();
71        sortedChildren.addAll(children);
72        // find all the write locks
73        SortedSet<String> tails = sortedChildren.tailSet(lockNodePrefix);
74        if (!tails.isEmpty()) {
75          for (String tail : tails) {
76            String path = ZKUtil.joinZNode(parentNode, tail);
77            byte[] data = ZKUtil.getDataAndWatch(watcher, path);
78            TableLock lock = TableLockManager.fromBytes(data);
79            ServerName serverName = lock.getLockOwner();
80            org.apache.hadoop.hbase.ServerName sn = org.apache.hadoop.hbase.ServerName.valueOf(
81                serverName.getHostName(), serverName.getPort(), serverName.getStartCode());
82            // compare the server names (host, port and start code), make sure the lock is created
83            if (owner.equals(sn.toString())) {
84              lockNode = path;
85              return;
86            }
87          }
88        }
89      }
90      System.exit(1);
91    }
92  
93    @Override
94    public void nodeDeleted(String path) {
95      // If the lock node is deleted, abort the current process.
96      if (path.equals(lockNode)) {
97        System.exit(1);
98      }
99    }
100 }