博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm nimbus的LeaderElector
阅读量:5950 次
发布时间:2019-06-19

本文共 12591 字,大约阅读时间需要 41 分钟。

  hot3.png

本文主要研究一下storm nimbus的LeaderElector

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.java

public static void main(String[] args) throws Exception {        Utils.setupDefaultUncaughtExceptionHandler();        launch(new StandaloneINimbus());    }    public static Nimbus launch(INimbus inimbus) throws Exception {        Map
conf = Utils.merge(ConfigUtils.readStormConfig(), ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false)); boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP); boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK); if (checkAcl) { AclEnforcement.verifyAcls(conf, fixupAcl); } return launchServer(conf, inimbus); } private static Nimbus launchServer(Map
conf, INimbus inimbus) throws Exception { StormCommon.validateDistributedMode(conf); validatePortAvailable(conf); StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry); nimbus.launchServer(); final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS); metricsRegistry.startMetricsReporters(conf); Utils.addShutdownHookWithDelayedForceKill(() -> { metricsRegistry.stopMetricsReporters(); nimbus.shutdown(); server.stop(); }, 10); if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) { nimbus.initWorkerTokenManager(); } LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION); server.serve(); return nimbus; } public Nimbus(Map
conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo, BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper, StormMetricsRegistry metricsRegistry) throws Exception { //...... if (blobStore == null) { blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null); } this.blobStore = blobStore; if (topoCache == null) { topoCache = new TopoCache(blobStore, conf); } if (leaderElector == null) { leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf), metricsRegistry); } this.leaderElector = leaderElector; this.blobStore.setLeaderElector(this.leaderElector); //...... } public void launchServer() throws Exception { try { BlobStore store = blobStore; IStormClusterState state = stormClusterState; NimbusInfo hpi = nimbusHostPortInfo; LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf)); validator.prepare(conf); //add to nimbuses state.addNimbusHost(hpi.getHost(), new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION)); leaderElector.addToLeaderLockQueue(); this.blobStore.startSyncBlobs(); for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) { exec.prepare(); } if (isLeader()) { for (String topoId : state.activeStorms()) { transition(topoId, TopologyActions.STARTUP, null); } clusterMetricSet.setActive(true); } //...... } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; } if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) { throw e; } LOG.error("Error on initialization of nimbus", e); Utils.exitProcess(13, "Error on initialization of nimbus"); } }
  • Nimbus在构造器里头调用Zookeeper.zkLeaderElector创建leaderElector
  • launchServer方法调用了leaderElector.addToLeaderLockQueue()参与leader选举

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {        return _instance.zkLeaderElectorImpl(conf, blobStore);    }    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {        List
servers = (List
) conf.get(Config.STORM_ZOOKEEPER_SERVERS); Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf); String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; String id = NimbusInfo.fromConf(conf).toHostPortString(); AtomicReference
leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); AtomicReference
leaderLatchListenerAtomicReference = new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get())); return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference, blobStore); }
  • 这里使用/leader-lock路径创建了LeaderLatch,然后使用leaderLatchListenerImpl创建了LeaderLatchListener
  • 最后使用LeaderElectorImp创建ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.java

// Leader latch listener that will be invoked when we either gain or lose leadership    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();        return new LeaderLatchListener() {            final String STORM_JAR_SUFFIX = "-stormjar.jar";            final String STORM_CODE_SUFFIX = "-stormcode.ser";            final String STORM_CONF_SUFFIX = "-stormconf.ser";            @Override            public void isLeader() {                Set
activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false)); Set
activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds); Set
activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys); Set
allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys()); Set
allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys); // this finds all active topologies blob keys from all local topology blob keys Sets.SetView
diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys); LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys), generateJoinedString(diffTopology)); if (diffTopology.isEmpty()) { Set
activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys); // this finds all dependency blob keys from active topologies from all local blob keys Sets.SetView
diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys); LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys), generateJoinedString(diffDependencies)); if (diffDependencies.isEmpty()) { LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally."); } else { LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership."); closeLatch(); } } else { LOG.info("code for all active topologies not available locally, giving up leadership."); closeLatch(); } } @Override public void notLeader() { LOG.info("{} lost leadership.", hostName); } //...... private void closeLatch() { try { leaderLatch.close(); } catch (IOException e) { throw new RuntimeException(e); } } }; }
  • leaderLatchListenerImpl返回一个LeaderLatchListener接口的实现类
  • isLeader接口里头做了一些校验,即当被zookeeper选中为leader的时候,如果本地没有所有的active topologies或者本地没有所有dependencies,那么就需要调用leaderLatch.close()放弃leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.java

public class LeaderElectorImp implements ILeaderElector {    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);    private final Map
conf; private final List
servers; private final CuratorFramework zk; private final String leaderlockPath; private final String id; private final AtomicReference
leaderLatch; private final AtomicReference
leaderLatchListener; private final BlobStore blobStore; private final TopoCache tc; private final IStormClusterState clusterState; private final List
acls; private final StormMetricsRegistry metricsRegistry; public LeaderElectorImp(Map
conf, List
servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference
leaderLatch, AtomicReference
leaderLatchListener, BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List
acls, StormMetricsRegistry metricsRegistry) { this.conf = conf; this.servers = servers; this.zk = zk; this.leaderlockPath = leaderlockPath; this.id = id; this.leaderLatch = leaderLatch; this.leaderLatchListener = leaderLatchListener; this.blobStore = blobStore; this.tc = tc; this.clusterState = clusterState; this.acls = acls; this.metricsRegistry = metricsRegistry; } @Override public void prepare(Map
conf) { // no-op for zookeeper implementation } @Override public void addToLeaderLockQueue() throws Exception { // if this latch is already closed, we need to create new instance. if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) { leaderLatch.set(new LeaderLatch(zk, leaderlockPath)); LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls, metricsRegistry); leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback)); LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners."); } // Only if the latch is not already started we invoke start if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) { leaderLatch.get().addListener(leaderLatchListener.get()); leaderLatch.get().start(); LOG.info("Queued up for leader lock."); } else { LOG.info("Node already in queue for leader lock."); } } @Override // Only started latches can be closed. public void removeFromLeaderLockQueue() throws Exception { if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) { leaderLatch.get().close(); LOG.info("Removed from leader lock queue."); } else { LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed."); } } @Override public boolean isLeader() throws Exception { return leaderLatch.get().hasLeadership(); } @Override public NimbusInfo getLeader() { try { return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader()); } catch (Exception e) { throw Utils.wrapInRuntime(e); } } @Override public List
getAllNimbuses() throws Exception { List
nimbusInfos = new ArrayList<>(); Collection
participants = leaderLatch.get().getParticipants(); for (Participant participant : participants) { nimbusInfos.add(Zookeeper.toNimbusInfo(participant)); } return nimbusInfos; } @Override public void close() { //Do nothing now. }}
  • LeaderElectorImp实现了ILeaderElector接口
  • addToLeaderLockQueue方法检测如果latch已经closed,则重新创建一个新的,然后检测latch的状态,如果还没有start的话,则调用start参与选举
  • 之所以对closed状态的latch创建一个,主要有两个原因:一是对已经closed的latch进行方法调用会抛异常,二是被zk选举为leader,但是不满意storm的一些leader条件会放弃leadership即close掉

小结

  • storm nimbus的LeaderElector主要是基于zookeeper recipies的LeaderLatch来实现
  • storm nimbus自定义了LeaderLatchListener,对成为leader之后的nimbus进行校验,需要本地拥有所有的active topologies以及所有dependencies,否则放弃leadership

doc

转载于:https://my.oschina.net/go4it/blog/2239477

你可能感兴趣的文章
heartbeat-gui
查看>>
关于一阶逻辑中实例化的可满足性问题
查看>>
cut命令用法讲解
查看>>
我的第一篇日志。
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
企业实战:mysql5.6数据库备份、恢复脚本
查看>>
CentOS7安装mysql
查看>>
RMB數字轉換中文
查看>>
基于rhel7.2的Zabbix平台搭建和部署(二)
查看>>
Html5本地存储和本地数据库
查看>>
Android Fragment实践(二)
查看>>
Windows 64 位 mysql 5.7以上版本包解压安装
查看>>
知道双字节码, 如何获取汉字 - 回复 "pinezhou" 的问题
查看>>
TClientDataSet[14]: 测试 FindFirst、FindNext、FindLast、FindPrior、Found
查看>>
CentOS 6.3中配置bond多网卡负载均衡
查看>>
clamav 完整查杀 linux 病毒实战
查看>>
EIGRP的Metric计算以及负载均衡
查看>>
org-capture 配置
查看>>
linux下lvs搭建负载均衡集群
查看>>