百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

大数据架构师,带你深入理解HadoopYARN架构设计要点,不来别后悔

ccwgpt 2024-10-23 09:22 20 浏览 0 评论

前言

YARN是开源项目Hadoop的一个资源管理系统,最初设计是为了解决Hadoop中MapReduce计算框架中的资源管理问题,但是现在它已经是一个更加通用的资源管理系统,可以把MapReduce计算框架作为一个应用程序运行在YARN系统之上,通过YARN来管理资源。如果你的应用程序也需要借助YARN的资源管理功能,你也可以实现YARN提供的编程API,将你的应用程序运行于YARN之上,将资源的分配与回收统一交给YARN去管理,可以大大简化资源管理功能的开发。当前,也有很多应用程序已经可以构建于YARN之上,如Storm、Spark等计算框架。

YARN整体架构

YARN是基于Master/Slave模式的分布式架构,我们先看一下,YARN的架构设计,如图所示(来自官网文档):

上图,从逻辑上定义了YARN系统的核心组件和主要交互流程,各个组件说明如下:

  • YARN Client

YARN Client提交Application到RM,它会首先创建一个Application上下文件对象,并设置AM必需的资源请求信息,然后提交到RM。YARN Client也可以与RM通信,获取到一个已经提交并运行的Application的状态信息等,具体详见后面ApplicationClientProtocol协议的分析说明。

  • ResourceManager(RM)

RM是YARN集群的Master,负责管理整个集群的资源和资源分配。RM作为集群资源的管理和调度的角色,如果存在单点故障,则整个集群的资源都无法使用。在2.4.0版本才新增了RM HA的特性,这样就增加了RM的可用性。

  • NodeManager(NM)

NM是YARN集群的Slave,是集群中实际拥有实际资源的工作节点。我们提交Job以后,会将组成Job的多个Task调度到对应的NM上进行执行。Hadoop集群中,为了获得分布式计算中的Locality特性,会将DN和NM在同一个节点上运行,这样对应的HDFS上的Block可能就在本地,而无需在网络间进行数据的传输。

  • Container

Container是YARN集群中资源的抽象,将NM上的资源进行量化,根据需要组装成一个个Container,然后服务于已授权资源的计算任务。计算任务在完成计算后,系统会回收资源,以供后续计算任务申请使用。Container包含两种资源:内存和CPU,后续Hadoop版本可能会增加硬盘、网络等资源。

  • ApplicationMaster(AM)

AM主要管理和监控部署在YARN集群上的Application,以MapReduce为例,MapReduce Application是一个用来处理MapReduce计算的服务框架程序,为用户编写的MapReduce程序提供运行时支持。通常我们在编写的一个MapReduce程序可能包含多个Map Task或Reduce Task,而各个Task的运行管理与监控都是由这个MapReduce Application来负责,比如运行Task的资源申请,由AM向RM申请;启动/停止NM上某Task的对应的Container,由AM向NM请求来完成。

下面,我们基于Hadoop 2.6.0的YARN源码,来探讨YARN内部实现原理。

YARN协议

YARN是一个分布式资源管理系统,它包含了分布的多个组件,我们可以通过这些组件之间设计的交互协议来说明,如图所示:

下面我们来详细看看各个协议实现的功能:

  • ApplicationClientProtocol(Client -> RM)

ResourceTracker(NM -> RM)

  • ApplicationMasterProtocol(AM -> RM)
  • ContainerManagementProtocol(AM -> NM)
  • ResourceManagerAdministrationProtocol(RM Admin -> RM)
  • HAServiceProtocol(Active RM HA Framework Standby RM)
  • YARN RPC实现

    1.X版本的Hadoop使用默认实现的Writable协议作为RPC协议,而在2.X版本,重写了RPC框架,改成默认使用Protobuf协议作为Hadoop的默认RPC通信协议。 YARN RPC的实现,如下面类图所示:


    通过上图可以看出,RpcEngine有两个实现:WritableRpcEngine和ProtobufRpcEngine,默认使用ProtobufRpcEngine,我们可以选择使用1.X默认的RPC通信协议,甚至可以自定义实现。

    ResourceManager内部原理

    RM是YARN分布式系统的主节点,ResourceManager服务进程内部有很多组件提供其他服务,包括对外RPC服务,已经维护内部一些对象状态的服务等,RM的内部结构如图所示:

    上图中RM内部各个组件(Dispatcher/EventHandler/Service)的功能,可以查看源码。这里,说一下ResourceScheduler组件,它是RM内部最重要的一个组件,用它来实现资源的分配与回收,它提供了一定算法,在运行时可以根据算法提供的策略来对资源进行调度。YARN内部有3种资源调度策略的实现:FifoScheduler、FairScheduler、CapacityScheduler,其中默认实现为CapacityScheduler。CapacityScheduler实现了资源更加细粒度的分配,可以设置多级队列,每个队列都有一定的容量,即对队列设置资源上限和下限,然后对每一级别队列分别再采用合适的调度策略(如FIFO)进行调度。如果我们想实现自己的资源调度策略,可以直接实现YARN的资源调度接口ResourceScheduler,然后修改yarn-site.xml中的配置项yarn.resourcemanager.scheduler.class即可。

    NodeManager内部原理

    NM是YARN系统中实际持有资源的从节点,也是实际用户程序运行的宿主节点,内部结构如图所示:

    上图中NM内部各个组件(Dispatcher/EventHandler/Service)的功能,可以查看源码,不再累述。

    事件处理机制

    事件处理可以分成2大类,一类是同步处理事件,事件处理过程会阻塞调用进程,通常这样的事件处理逻辑非常简单,不会长时间阻塞;另一类就是异步处理处理事件,通常在接收到事件以后,会有一个用来派发事件的Dispatcher,将事件发到对应的事件队列中,这采用生产者-消费者模式,消费者这会监视着队列,并从取出事件进行异步处理。YARN中到处可以见到事件处理,其中比较特殊一点的就是将状态机(StateMachine)作为一个事件处理器,从而通过事件来触发特定对象状态的变迁,通过这种方式来管理对象状态。我们先看一下YARN中事件处理的机制,以ResourceManager端为例,如下图所示:

    产生的事件通过Dispatcher进行派发并进行处理,如果EventHandler处理逻辑比较简单,直接同步处理,否则可能会采用异步处理的方式。在EventHandler处理的过程中,还可能产生新的事件Event,然后再次通过RM的Dispatcher进行派发,而后处理。

    状态机

    我们以RM端管理的RMAppImpl对象为例,它表示一个Application运行过程中,在RM端的所维护的Application的状态,该对象对应的所有状态及其状态转移路径,如下图所示:

    在上图中如果加上触发状态转移的事件及其类型,可能整个图会显得很乱,所以这里,我详细画了一个分图,用来说明,每一个状态的变化都是有哪种类型的事件触发的,根据这个图,可以方便地阅读源码,如下图所示:


    NMLivelinessMonitor源码分析实例

    YARN主要采用了Dispatcher+EventHandler+Service这样的抽象,将所有的内部/外部组件采用这种机制来实现,由于存在很多的Service和EventHandler,而且有的组件可能既是一个Service,同时还是一个EventHandler,所以在阅读代码的时候可能会感觉迷茫,这里我给出了一个阅读NMLivelinessMonitor服务的实例,仅供想研究源码的人参考。NMLivelinessMonitor是ResourceManager端的一个监控服务实现,它主要是用来监控注册的节点的Liveliness状态,这里是监控NodeManager的状态。该服务会周期性地检查NodeManager的心跳信息来确保注册到ResourceManager的NodeManager当前处于活跃状态,可以执行资源分配以及处理计算任务,在NMLivelinessMonitor类继承的抽象泛型类AbstractLivelinessMonitor中有一个Map,如下所示:

    private Map<O, Long> running = new HashMap<O, Long>();

    这里面O被替换成了NodeId,而值类型Long表示时间戳,也就是表达了一个NodeManager向ResourceManager最后发送心跳信息时间戳,通过检测running中的时间戳;来判断NodeManager是否可以正常使用。在ResourceManager中可以看到,NMLivelinessMonitor的实例是其一个成员:

    protected NMLivelinessMonitor nmLivelinessMonitor;

    看一下NMLivelinessMonitor类的实现,它继承自抽象泛型类AbstractLivelinessMonitor,看NMLivelinessMonitor类的声明:

    public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId>

    在类实现中,有一个重写(@Override)的protected的方法expire,如下所示:

    @Override
    protected void expire(NodeId id) {
      dispatcher.handle(
          new RMNodeEvent(id, RMNodeEventType.EXPIRE));
    }

    我们可以通过该类NMLivelinessMonitor抽象基类中看到调用expire方法的逻辑,是在一个内部线程类PingChecker中,代码如下所示:

    private class PingChecker implements Runnable {
     
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          synchronized (AbstractLivelinessMonitor.this) {
            Iterator<Map.Entry<O, Long>> iterator =
              running.entrySet().iterator();
     
            //avoid calculating current time everytime in loop
            long currentTime = clock.getTime();
     
            while (iterator.hasNext()) {
              Map.Entry<O, Long> entry = iterator.next();
              if (currentTime > entry.getValue() + expireInterval) {
                iterator.remove();
                expire(entry.getKey()); // 调用抽象方法expire,会在子类中实现
                LOG.info("Expired:" + entry.getKey().toString() +
                        " Timed out after " + expireInterval/1000 + " secs");
              }
            }
          }
          try {
            Thread.sleep(monitorInterval);
          } catch (InterruptedException e) {
            LOG.info(getName() + " thread interrupted");
            break;
          }
        }
      }
    }

    这里面的泛型O在NMLivelinessMonitor类中就是NodeId,所以最关心的逻辑就是前面提到的NMLivelinessMonitor中的expire方法的实现。在expire方法中,调用了dispatcher的handle方法来处理,所以dispatcher应该是一个EventHandler对象,后面我们会看到,它其实是通过ResourceManager中的dispatcher成员,也就是AsyncDispatcher来获取到的(AsyncDispatcher内部有一个组合而成的EventHandler)。下面,我们接着看NMLivelinessMonitor是如何创建的,在ResourceManager.RMActiveServices类的serviceInit()方法中,代码如下所示:

    nmLivelinessMonitor = createNMLivelinessMonitor();
    addService(nmLivelinessMonitor);

    跟踪代码继续看createNMLivelinessMonitor方法,如下所示:

    private NMLivelinessMonitor createNMLivelinessMonitor() {
      return new NMLivelinessMonitor(this.rmContext
          .getDispatcher());
    }

    上面通过rmContext的getDispatcher获取到一个Dispatcher对象,来作为NMLivelinessMonitor构造方法的参数,我们需要看一下这个Dispatcher是如何创建的,查看ResourceManager.serviceInit方法,代码如下所示:

    rmDispatcher = setupDispatcher();
    addIfService(rmDispatcher);
    rmContext.setDispatcher(rmDispatcher);

    继续跟踪代码,setupDispatcher()方法实现如下所示:

    private Dispatcher setupDispatcher() {
      Dispatcher dispatcher = createDispatcher();
      dispatcher.register(RMFatalEventType.class,
          new ResourceManager.RMFatalEventDispatcher());
      return dispatcher;
    }

    继续看createDispatcher()方法代码实现:

    protected Dispatcher createDispatcher() {
      return new AsyncDispatcher();
    }

    可以看到,在这里创建了一个AsyncDispatcher对象在创建的NMLivelinessMonitor实例中包含一个AsyncDispatcher实例。回到前面,我们需要知道这个AsyncDispatcher调用getEventHandler()返回的EventHandler的处理逻辑是如何的,NMLivelinessMonitor的代码实现如下所示:

    public class NMLivelinessMonitor extends AbstractLivelinessMonitor<NodeId> {
     
      private EventHandler dispatcher;
       
      public NMLivelinessMonitor(Dispatcher d) {
        super("NMLivelinessMonitor", new SystemClock());
        this.dispatcher = d.getEventHandler(); // 调用AsyncDispatcher的getEventHandler()方法获取EventHandler
      }
     
      public void serviceInit(Configuration conf) throws Exception {
        int expireIntvl = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
                YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
        setExpireInterval(expireIntvl);
        setMonitorInterval(expireIntvl/3);
        super.serviceInit(conf);
      }
     
      @Override
      protected void expire(NodeId id) {
        dispatcher.handle(
            new RMNodeEvent(id, RMNodeEventType.EXPIRE));
      }
    }

    查看AsyncDispatcher类的getEventHandler()方法,代码如下所示:

    @Override
    public EventHandler getEventHandler() {
      if (handlerInstance == null) {
        handlerInstance = new GenericEventHandler();
      }
      return handlerInstance;
    }

    可见,这里面无论是第一次调用还是其他对象已经调用过该方法,这里面最终只有一个GenericEventHandler实例作为这个dispatcher的内部EventHandler实例,所以继续跟踪代码,看GenericEventHandler实现,如下所示:

    class GenericEventHandler implements EventHandler<Event> {
      public void handle(Event event) {
        if (blockNewEvents) {
          return;
        }
        drained = false;
     
        /* all this method does is enqueue all the events onto the queue */
        int qSize = eventQueue.size();
        if (qSize !=0 && qSize %1000 == 0) {
          LOG.info("Size of event-queue is " + qSize);
        }
        int remCapacity = eventQueue.remainingCapacity();
        if (remCapacity < 1000) {
          LOG.warn("Very low remaining capacity in the event-queue: "
              + remCapacity);
        }
        try {
          eventQueue.put(event); // 将Event放入到队列eventQueue中
        } catch (InterruptedException e) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", e);
          }
          throw new YarnRuntimeException(e);
        }
      };
    }

    将传入handle方法的Event丢进了eventQueue队列,也就是说GenericEventHandler是基于eventQueue的一个生产者,那么消费者是AsyncDispatcher内部的另一个线程,如下所示:

    @Override
    protected void serviceStart() throws Exception {
      //start all the components
      super.serviceStart();
      eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程
      eventHandlingThread.setName("AsyncDispatcher event handler");
      eventHandlingThread.start();
    }

    查看createThread()方法,如下所示:

    Runnable createThread() {
      return new Runnable() {
        @Override
        public void run() {
          while (!stopped && !Thread.currentThread().isInterrupted()) {
            drained = eventQueue.isEmpty();
            // blockNewEvents is only set when dispatcher is draining to stop,
            // adding this check is to avoid the overhead of acquiring the lock
            // and calling notify every time in the normal run of the loop.
            if (blockNewEvents) {
              synchronized (waitForDrained) {
                if (drained) {
                  waitForDrained.notify();
                }
              }
            }
            Event event;
            try {
              event = eventQueue.take(); // 从队列取出事件Event
            } catch(InterruptedException ie) {
              if (!stopped) {
                LOG.warn("AsyncDispatcher thread interrupted", ie);
              }
              return;
            }
            if (event != null) {
              dispatch(event); // 分发处理该有效事件Event
            }
          }
        }
      };
    }

    可以看到,从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:

    @SuppressWarnings("unchecked")
    protected void dispatch(Event event) {
      //all events go thru this loop
      if (LOG.isDebugEnabled()) {
        LOG.debug("Dispatching the event " + event.getClass().getName() + "."
            + event.toString());
      }
     
      Class<? extends Enum> type = event.getType().getDeclaringClass();
     
      try{
        EventHandler handler = eventDispatchers.get(type); // 通过event获取到事件类型,再根据事件类型获取到已经注册的EventHandler
        if(handler != null) {
          handler.handle(event); // 使用对应的EventHandler处理事件event
        } else {
          throw new Exception("No handler for registered for " + type);
        }
      } catch (Throwable t) {
        //TODO Maybe log the state of the queue
        LOG.fatal("Error in dispatcher thread", t);
        // If serviceStop is called, we should exit this thread gracefully.
        if (exitOnDispatchException
            && (ShutdownHookManager.get().isShutdownInProgress()) == false
            && stopped == false) {
          LOG.info("Exiting, bbye..");
          System.exit(-1);
        }
      }
    }

    可以看到,根据已经注册的Map<Class, EventHandler> eventDispatchers表,选择对应的EventHandler来执行实际的事件处理逻辑。这里,再看看这个EventHandler是在哪里住的。前面已经看到,NMLivelinessMonitor类的expire方法中,传入的是new RMNodeEvent(id, RMNodeEventType.EXPIRE),我们再查看ResourceManager.RMActiveServices.serviceInit()方法:

    // Register event handler for RmNodes
    rmDispatcher.register(
        RMNodeEventType.class, new NodeEventDispatcher(rmContext)); // 注册:事件类型RMNodeEventType,EventHandler实现类NodeEventDispatcher

    可见RMNodeEventType类型的事件是使用ResourceManager.NodeEventDispatcher这个EventHandler来处理的,同时它也是一个Dispatcher,现在再看NodeEventDispatcher的实现:

    @Private
    public static final class NodeEventDispatcher implements
        EventHandler<RMNodeEvent> {
     
      private final RMContext rmContext;
     
      public NodeEventDispatcher(RMContext rmContext) {
        this.rmContext = rmContext;
      }
     
      @Override
      public void handle(RMNodeEvent event) {
        NodeId nodeId = event.getNodeId();
        RMNode node = this.rmContext.getRMNodes().get(nodeId); // 调用getRMNodes()获取到一个ConcurrentMap<NodeId, RMNode>,它维护每个NodeId的状态(RMNode是一个状态机对象)
        if (node != null) {
          try {
            ((EventHandler<RMNodeEvent>) node).handle(event); // RMNode的实现为RMNodeImpl,它也是一个EventHandler
          } catch (Throwable t) {
            LOG.error("Error in handling event type " + event.getType()
                + " for node " + nodeId, t);
          }
        }
      }
    }

    这个里面还没有真正地去处理,而是基于RMNode状态机对象来进行转移处理,所以我们继续看RMNode的实现RMNodeImpl,因为前面事件类型RMNodeEventType.EXPIRE,我们看状态机创建时对该事件类型的转移动作是如何注册的:

      private static final StateMachineFactory<RMNodeImpl,
                                               NodeState,
                                               RMNodeEventType,
                                               RMNodeEvent> stateMachineFactory
                     = new StateMachineFactory<RMNodeImpl,
                                               NodeState,
                                               RMNodeEventType,
                                               RMNodeEvent>(NodeState.NEW)
    ...
         .addTransition(NodeState.RUNNING, NodeState.LOST,
             RMNodeEventType.EXPIRE,
             new DeactivateNodeTransition(NodeState.LOST))
    ...
         .addTransition(NodeState.UNHEALTHY, NodeState.LOST,
             RMNodeEventType.EXPIRE,
             new DeactivateNodeTransition(NodeState.LOST))

    在ResourceManager端维护的NodeManager的信息使用RMNodeImpl来表示(在内存中保存ConcurrentMap),所以当前如果expire方法被调用,RMNodeImpl会根据状态机对象中已经注册的前置转移状态(pre-transition state)、后置转移状态(post-transition state)、事件类型(event type)、转移Hook程序,来对事件进行处理,并使当前RMNodeImpl的状态由前置转移状态更新为后置转移状态。对于上面代码,如果当前RMNodeImpl状态是NodeState.RUNNING,事件为RMNodeEventType.EXPIRE类型,则会调用Hook程序实现DeactivateNodeTransition,状态更新为NodeState.LOST;如果当前RMNodeImpl状态是NodeState.UNHEALTHY,事件为RMNodeEventType.EXPIRE类型,则会调用Hook程序实现DeactivateNodeTransition,状态更新为NodeState.LOST。具体地,每个Transition的处理逻辑如何,可以查看对应的Transition实现代码。

    觉得文章不错的话,可以转发此文关注小编,之后给大家持续更新干货文章~~

    相关推荐

    5 分钟搭建 Node.js 微服务原型(node 微服务架构)

    微服务已成为在Node.js中构建可扩展且强大的云应用的主流方法。同时也存在一些门槛,其中一些难点需要你在以下方面做出决策:组织项目结构。将自定义服务连接到第三方服务(数据库,消息代理等)处理微服...

    当前的前端,真的不配叫程序员吗?

    今天看到一个比较令人震惊的帖子,说前端不配叫程序员,令我很吃鲸,是谁我就不说了,帖子出处是一个大龄程序员组里面的,想想也不觉得奇怪了,毕竟对于年龄比较大的程序员来说,前端起步比较晚,最开始就是一个切图...

    聊聊asp.net中Web Api的使用(asp.net core web api教程)

    扯淡随着app应用的崛起,后端服务开发的也越来越多,除了很多优秀的nodejs框架之外,微软当然也会在这个方面提供更便捷的开发方式。这是微软一贯的作风,如果从开发的便捷性来说的话微软是当之无愧的老大哥...

    NodeJS中,listen Access:permission denied解决办法

    错误描述:Win10系统,NodeJS程序。使用express框架开发的http服务器,启动时出现错误提示“listenAccess:permissiondenied"。错误原因:这是由于...

    Hono — 下一代高性能web框架(天融信下一代vnp)

    最近公司可能要有变革,要统计我们的技能。真的是很无语,但是有没有办法。哎,问豆包吧提起Hono大家可能很陌生,这是什么?但是我提到Expressjs、nodejs想必前端小伙伴很熟悉啊。那么Hon...

    生活例子说明线程,简单明了(列举一个日常生活中的例子以程序的形式表示)

    1.程序设计的目标在我看来单从程序的角度来看,一个好的程序的目标应该是性能与用户体验的平衡。当然一个程序是否能够满足用户的需求暂且不谈,这是业务层面的问题,我们仅仅讨论程序本身。围绕两点来展开,性能...

    Node实战006:自定义模块的创建和使用详解

    Node的应用是由模块组成的,每个文件的定义都是一个模块(module变量代表当前模块)并有自己的作用域。Node遵循commonjs的模块规范,用来隔离每个模块的作用域,使每一个模块在自身的命名空间...

    Node.js基本内容和知识点(node.js的概念)

    简单的说Node.js就是运行在服务端的JavaScript,起初段定位是后端开发语言,由于技术的不够成熟,一般小型项目会完全使用node.js作为后台支撑,大项目中,运行不够稳定,不会轻易使用...

    干货 | 如何利用Node.js 构建分布式集群

    引言在软件定义的世界里,企业通过Web应用和移动应用程序来提供大部分的服务,Node.js迅速成为时下最为流行的一个平台之一,就和它可以搭建响应速度快、易于扩展的web应用和移动应用有很大关系,并凭...

    nodejs mongodb 实现简易留言板(node.js留言板)

    一个朋友问了一下mongodb的一些操作问题我就做了下面这个简单的留言板给他做一个实例希望能帮助到他express的框架就不说了express的问题请移步nodejs之expressht...

    nodejs mqtt 智能售货机系统物联网控制系统源码分享

    智能售货机系统(Moleintelligentvendingmachinesystem)是一套物联网控制系统性的解决方案。主要涉及到的语言和库有c,c++,js,nodejs,vue.js,...

    为什么 Node.js 这么火,而同样异步模式 Python 框架 Twisted 却十几年一直不温不火?

    说nodejs只是靠营销的是否太天真了些?当初nodejs出来的时候各种BUG,我简单的测试其大文件传输都会出现各种问题。而同時期的其他阵营早就甩其几条街了。但是为什么却能一直不断发展壮大?...

    2020年14个最有用的NodeJS库(node用什么数据库)

    Express快速,简单,极简的节点Web框架对…有好处·易于处理多种类型的请求,例如GET,PUT,POST和DELETE请求·快速构建单页,多页和混合Web应用程序每周下载1100万Lice...

    连载:2016年最好的JS框架和库(下)

    继续上一期的介绍:Agility.jsAgility.js是专为JS服务的MVC库,你可以免费编写可再用和可维护的浏览器代码,Agility支持Js,样式(CSS)、内容(HTML)和行为(JS)。C...

    awesome-nodejs 终极资源库:60K+星标的开发者宝藏

    Node.js终极资源库:60K+星标的开发者宝藏引言在GitHub上,有一个备受瞩目的Node.js资源仓库,以其惊人的60.6k星标量和6kfork量,成为了Node.js开发者的必备参考。这个...

    取消回复欢迎 发表评论: