Spark master使用zookeeper进行HA的解释

2020-09-15 10:11发布

1条回答
敦敦宁
2楼 · 2020-09-15 10:35
参考资料 《Spark内核设计的技术-架构设计与实现》 《从Paxos到ZooKeeper》

摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。

Spark 对 ZooKeeper 的使用主要在以下几个类中。

org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent

2 基于 ZooKeeper 的持久化引擎

ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。

仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。

// 篇幅有限,节选了部分代码,但是应该不影响文意private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  SparkCuratorUtil.mkdir(zk, WORKING_DIR)

  // 重载的 persist 方法,调用的是 serializeIntoFile  // 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅  override def persist(name: String, obj: Object): Unit = {
    serializeIntoFile(WORKING_DIR + "/" + name, obj)
  }

  // 写入 ZooKeeper 实际调用的是这个方法  // 可以看到,创建的是一个持久性的节点  // 序列化完 value 就可以写入指定的路径  private def serializeIntoFile(path: String, value: AnyRef) {
    val serialized = serializer.newInstance().serialize(value)
    val bytes = new Array[Byte](serialized.remaining())
    serialized.get(bytes)
    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
  }}

顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。

// 发现基于 ZooKeeper 实现的恢复模式的工厂类private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  // 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }}

然后在 org.apache.spark.deploy.master.Master 中发现了持久化模式的选择。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

根据变量 persistenceEngine 可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker 看看是如何记录 Worker 信息。

final def addWorker(worker: WorkerInfo): Unit = {
  persist("worker_" + worker.id, worker)}

看到这个方法,激发了我的好奇心,WorkInfo 究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。

private[spark] class WorkerInfo(
    val id: String,
    val host: String,
    val port: Int,
    val cores: Int,
    val memory: Int,
    val endpoint: RpcEndpointRef,
    val webUiAddress: String)

3 基于 ZooKeeper 的领导选举代理

领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。

这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  // 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    // 增加一个监听器,监控主节点的存活    leaderLatch.addListener(this)
    leaderLatch.start()
  }

  override def stop() {
    leaderLatch.close()
    zk.close()
  }

  // override 了 ZooKeeper 的 LeaderLatchListener 接口的方法  // 用于判断当前节点是否为 Leader 节点  override def isLeader() {
    synchronized {
      // could have lost leadership by now.      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

  private object LeadershipStatus extends Enumeration {
    type LeadershipStatus = Value
    val LEADER, NOT_LEADER = Value
  }}

介绍完 ZooKeeperLeaderElectionAgent 的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
      // 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

4 Summary

总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中,

参考资料 《Spark内核设计的技术-架构设计与实现》 《从Paxos到ZooKeeper》

摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。

Spark 对 ZooKeeper 的使用主要在以下几个类中。

org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent

2 基于 ZooKeeper 的持久化引擎

ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。

仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。

// 篇幅有限,节选了部分代码,但是应该不影响文意private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  SparkCuratorUtil.mkdir(zk, WORKING_DIR)

  // 重载的 persist 方法,调用的是 serializeIntoFile  // 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅  override def persist(name: String, obj: Object): Unit = {
    serializeIntoFile(WORKING_DIR + "/" + name, obj)
  }

  // 写入 ZooKeeper 实际调用的是这个方法  // 可以看到,创建的是一个持久性的节点  // 序列化完 value 就可以写入指定的路径  private def serializeIntoFile(path: String, value: AnyRef) {
    val serialized = serializer.newInstance().serialize(value)
    val bytes = new Array[Byte](serialized.remaining())
    serialized.get(bytes)
    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
  }}

顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。

// 发现基于 ZooKeeper 实现的恢复模式的工厂类private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  // 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }}

然后在 org.apache.spark.deploy.master.Master 中发现了持久化模式的选择。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

根据变量 persistenceEngine 可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker 看看是如何记录 Worker 信息。

final def addWorker(worker: WorkerInfo): Unit = {
  persist("worker_" + worker.id, worker)}

看到这个方法,激发了我的好奇心,WorkInfo 究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。

private[spark] class WorkerInfo(
    val id: String,
    val host: String,
    val port: Int,
    val cores: Int,
    val memory: Int,
    val endpoint: RpcEndpointRef,
    val webUiAddress: String)

3 基于 ZooKeeper 的领导选举代理

领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。

这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  // 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    // 增加一个监听器,监控主节点的存活    leaderLatch.addListener(this)
    leaderLatch.start()
  }

  override def stop() {
    leaderLatch.close()
    zk.close()
  }

  // override 了 ZooKeeper 的 LeaderLatchListener 接口的方法  // 用于判断当前节点是否为 Leader 节点  override def isLeader() {
    synchronized {
      // could have lost leadership by now.      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

  private object LeadershipStatus extends Enumeration {
    type LeadershipStatus = Value
    val LEADER, NOT_LEADER = Value
  }}

介绍完 ZooKeeperLeaderElectionAgent 的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
      // 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

4 Summary

总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中,

参考资料 《Spark内核设计的技术-架构设计与实现》 《从Paxos到ZooKeeper》

摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。

Spark 对 ZooKeeper 的使用主要在以下几个类中。

org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent

2 基于 ZooKeeper 的持久化引擎

ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。

仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。

// 篇幅有限,节选了部分代码,但是应该不影响文意private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  SparkCuratorUtil.mkdir(zk, WORKING_DIR)

  // 重载的 persist 方法,调用的是 serializeIntoFile  // 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅  override def persist(name: String, obj: Object): Unit = {
    serializeIntoFile(WORKING_DIR + "/" + name, obj)
  }

  // 写入 ZooKeeper 实际调用的是这个方法  // 可以看到,创建的是一个持久性的节点  // 序列化完 value 就可以写入指定的路径  private def serializeIntoFile(path: String, value: AnyRef) {
    val serialized = serializer.newInstance().serialize(value)
    val bytes = new Array[Byte](serialized.remaining())
    serialized.get(bytes)
    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
  }}

顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。

// 发现基于 ZooKeeper 实现的恢复模式的工厂类private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  // 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }}

然后在 org.apache.spark.deploy.master.Master 中发现了持久化模式的选择。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

根据变量 persistenceEngine 可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker 看看是如何记录 Worker 信息。

final def addWorker(worker: WorkerInfo): Unit = {
  persist("worker_" + worker.id, worker)}

看到这个方法,激发了我的好奇心,WorkInfo 究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。

private[spark] class WorkerInfo(
    val id: String,
    val host: String,
    val port: Int,
    val cores: Int,
    val memory: Int,
    val endpoint: RpcEndpointRef,
    val webUiAddress: String)

3 基于 ZooKeeper 的领导选举代理

领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。

这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  // 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    // 增加一个监听器,监控主节点的存活    leaderLatch.addListener(this)
    leaderLatch.start()
  }

  override def stop() {
    leaderLatch.close()
    zk.close()
  }

  // override 了 ZooKeeper 的 LeaderLatchListener 接口的方法  // 用于判断当前节点是否为 Leader 节点  override def isLeader() {
    synchronized {
      // could have lost leadership by now.      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

  private object LeadershipStatus extends Enumeration {
    type LeadershipStatus = Value
    val LEADER, NOT_LEADER = Value
  }}

介绍完 ZooKeeperLeaderElectionAgent 的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
      // 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

4 Summary

总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中,

参考资料 《Spark内核设计的技术-架构设计与实现》 《从Paxos到ZooKeeper》

摘抄一段 ZooKeeper 官网的一句话。大意就是 ZooKeeper 为分布式应用提供了高效可靠的分布式协调服务,提供了统一命名服务、配置管理和分布式锁等分布式的基础服务。

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

那么 Spark 在哪里有用到 ZooKeeper,并且用在什么地方呢?不要脸的在 IDE 上全局搜一下(其实是笔者对 ZooKeeper 相对熟一点,而且自己公司的项目也用 ZooKeeper 做些小功能,所以想看看 Spark 里是怎么用的。

Spark 对 ZooKeeper 的使用主要在以下几个类中。

org.apache.spark.deploy.master.ZooKeeperPersistenceEngine
org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent

2 基于 ZooKeeper 的持久化引擎

ZooKeeperPersistenceEngine 这个类主要是用于 Spark Master 高可用的持久化工作。其实很容易理解,作为一个分布式内存计算框架,每个环节都有「崩溃」的可能性,那么能不能减少诸如宕机、网络问题带来的影响呢?那么就是将运行中的各个环节,或者说有助于恢复这个集群状态的信息持久化下来。了解 Spark 高可用的实现的同学都知道,Spark 中有个参数 spark.deploy.recoveryMode,是当 Master 有问题的时候,重新启动恢复的时候用到的。有几个恢复的方式,从本地文件中恢复,又或者是本文介绍的,利用 ZooKeeper 存储状态,并从中恢复。

仔细看看代码,其实 Spark 里用到 ZooKeeper 的地方不多,而且用的相对都不复杂,所以跟着源码走是很好理清楚逻辑的。

// 篇幅有限,节选了部分代码,但是应该不影响文意private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer: Serializer)
  extends PersistenceEngine
  with Logging {

  private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
  private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

  SparkCuratorUtil.mkdir(zk, WORKING_DIR)

  // 重载的 persist 方法,调用的是 serializeIntoFile  // 明明是写到 ZooKeeper,为什么还叫 IntoFile?其实无伤大雅  override def persist(name: String, obj: Object): Unit = {
    serializeIntoFile(WORKING_DIR + "/" + name, obj)
  }

  // 写入 ZooKeeper 实际调用的是这个方法  // 可以看到,创建的是一个持久性的节点  // 序列化完 value 就可以写入指定的路径  private def serializeIntoFile(path: String, value: AnyRef) {
    val serialized = serializer.newInstance().serialize(value)
    val bytes = new Array[Byte](serialized.remaining())
    serialized.get(bytes)
    zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
  }}

顺藤摸瓜,我还想看看 Master 和 Worker 等这些信息是如何写入的,那再往深看看。

// 发现基于 ZooKeeper 实现的恢复模式的工厂类private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, serializer: Serializer)
  extends StandaloneRecoveryModeFactory(conf, serializer) {

  // 从这里看看是哪里调用了这个方法创建了 ZooKeeperPersistenceEngine  def createPersistenceEngine(): PersistenceEngine = {
    new ZooKeeperPersistenceEngine(conf, serializer)
  }

  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
    new ZooKeeperLeaderElectionAgent(master, conf)
  }}

然后在 org.apache.spark.deploy.master.Master 中发现了持久化模式的选择。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  // 如果选择 ZooKeeper 的恢复模式,那么就用上述的工厂方法创建 ZooKeeper 的持久化方法  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

根据变量 persistenceEngine 可以找出以下几处调用,依照命名来看,应该就是处理持久化的组件信息的方法了,其中挑 addWorker 看看是如何记录 Worker 信息。

final def addWorker(worker: WorkerInfo): Unit = {
  persist("worker_" + worker.id, worker)}

看到这个方法,激发了我的好奇心,WorkInfo 究竟藏着什么信息呢。其实就是 Worker 的 id, host, cores 之类的信息,因为进入恢复模式的时候,是需要让新的 Master 恢复得到这些 Worker 的信息才能实现恢复和接管的。

private[spark] class WorkerInfo(
    val id: String,
    val host: String,
    val port: Int,
    val cores: Int,
    val memory: Int,
    val endpoint: RpcEndpointRef,
    val webUiAddress: String)

3 基于 ZooKeeper 的领导选举代理

领导选举机制可以保证集群虽然存在多个 Master,但是只有一个 Master 是激活的,其他都会处于 standby 状态。

这里简单提一下 ZooKeeper 是如何选主的。ZooKeeper 创建节点是强一致性的,可以保证在分布式高并发的情况下创建的节点一定是全局唯一的,因此只要 Spark 的一个 Master 被选为 Leader,那么在相应目录下,就一定只有这个 Master 是创建成功的,而其他 Master 会创建一个子节点的 Watcher,用于监控当前 Master 是否还存货,一旦他挂了,那么就会重新启动选主过程。Spark 在这里利用了 ZooKeeper 的这一特性,个人猜测也是因为这个原因,类名叫做 xxxAgent

private[master] class ZooKeeperLeaderElectionAgent(val masterInstance: LeaderElectable,
    conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging  {

  val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"

  private var zk: CuratorFramework = _
  private var leaderLatch: LeaderLatch = _
  private var status = LeadershipStatus.NOT_LEADER

  start()

  // 获得 ZooKeeper 的客户端,创建用于选主的 LeaderLatch 实例  private def start() {
    logInfo("Starting ZooKeeper LeaderElection agent")
    zk = SparkCuratorUtil.newClient(conf)
    leaderLatch = new LeaderLatch(zk, WORKING_DIR)
    // 增加一个监听器,监控主节点的存活    leaderLatch.addListener(this)
    leaderLatch.start()
  }

  override def stop() {
    leaderLatch.close()
    zk.close()
  }

  // override 了 ZooKeeper 的 LeaderLatchListener 接口的方法  // 用于判断当前节点是否为 Leader 节点  override def isLeader() {
    synchronized {
      // could have lost leadership by now.      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

  private object LeadershipStatus extends Enumeration {
    type LeadershipStatus = Value
    val LEADER, NOT_LEADER = Value
  }}

介绍完 ZooKeeperLeaderElectionAgent 的实现,可以看看 Master 是怎么使用的,下面的代码在上面也出现过。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
      // 如果 RECOVERY_MODE 是 ZOOKEEPER 的话,就会创建基于 ZooKeeper 的选主代理    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))}

4 Summary

总结下,ZooKeeper 作为一个分布式应用中优秀的组件,在 Spark 中的应用并不复杂(也是源于 Spark 的优秀吧),而包括持久化和选主的实现,我们完全可以参考 Spark 的实现方式,应用在自己的项目中。


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问