当前位置:首页 > 站长资讯 > 正文内容

Hadoop 的 Server 及其线程模型分析

a8116255311年前 (2015-07-26)站长资讯56

一、Listener

Listener线程,当Server处于运行状态时,其负责监听来自客户端的连接,并使用Select模式处理Accept事件。

同时,它开启了一个空闲连接(Idle Connection)处理例程,假如有过期的空闲连接,就关闭。这个例程通过一个计时器来实现。

当select操作调用时,它可能会阻塞,这给了其它线程执行的机会。当有accept事件发生,它就会被唤醒以处理全部的事件,处理事件是进行一个doAccept的调用。

doAccept:

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {

        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        channel.socket().setKeepAlive(true);

        Reader reader = getReader();
        Connection c = connectionManager.register(channel);
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
      }
    }

由于多个连接可能同时发起申请,所以这里采用了while循环处理。

这里最关键的是设置了新建立的socket为非阻塞,这一点是基于性能的考虑,非阻塞的方式尽可能的读取socket接收缓冲区中的数据,这一点保证了将来会调用这个socket进行接收的Reader和进行发送的Responder线程不会因为发送和接收而阻塞,假如整个通讯过程都比较繁忙,那么Reader和Responder线程的就可以尽量不阻塞在I/O上,这样可以显著减少线程上下文切换的次数,提高cpu的利用率。

最后,获取了一个Reader,将此连接加入Reader的缓冲队列,同时让连接管理器监视并管理这个连接的生存期。

获取Reader的方式如下:

//最简单的负载均衡
    Reader getReader() {
      currentReader = (currentReader + 1) % readers.length;
      return readers[currentReader];
    }

二、Reader

当一个新建立的连接被加入Reader的缓冲队列pendingConnections之后,Reader也被唤醒,以处理此连接上的数据接收。

public void addConnection(Connection conn) throws InterruptedException {
        pendingConnections.put(conn);
        readSelector.wakeup();
      }

Server中配置了多个Reader线程,显然是为了提高并发服务连接的能力。

下面是Reader的主要逻辑:

while(true) {
		...
	   //取出一个连接,可能阻塞
       Connection conn = pendingConnections.take();
       //向select注册一个读事件
       conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
       ...
       //进行select,可能阻塞
       readSelector.select();
       ...
       //依次读取数据
       for(keys){
			doRead(key);
	   }
	   ...
}

当Server还在运行时,Reader线程尽可能多地处理缓冲队列中的连接,注册每一个连接的READ事件,采用select模式来获取连接上有数据接收的通知。当有数据需要接收时,它尽更大可能读取select返回的连接上的数据,以防止Listener线程因为没有运行时间而发生饥饿(starving)。

假如Listener线程饥饿,造成的结果是并发能力急剧下降,来自客户端的新连接请求超时或无法建立。

注意在从缓冲队列中获取连接时,Reader可能会发生阻塞,因为它采用了LinkedBlockingQueue类中的take *** ,这个 *** 在队列为空时会阻塞,这样Reader线程得以阻塞,以给其它线程执行的时间。

Reader线程的唤醒时机有两个:

Listener建立了新连接,并将此连接加入1个Reader的缓冲队列; select调用返回。

在Reader的doRead调用中,其主要调用了readAndProcess *** ,此 *** 循环处理数据,接收数据包的头部、上下文头部和真正的数据。
这个过程中值得一提的是下面的这个channelRead *** :

private int channelRead(ReadableByteChannel channel, 
                          ByteBuffer buffer) throws IOException {

    int count = (buffer.remAIning() <= NIO_BUFFER_LIMIT)  
                channel.read(buffer) : channelIO(channel, null, buffer);
    if (count > 0) {
      rpcMetrics.incrReceivedBytes(count);
    }
    return count;
  }

channelRead会判断数据接收数组buffer中的剩余未读数据,假如大于一个临界值NIO_BUFFER_LIMIT,就采取分片的技巧来多次地读,以防止jdk对large buffer采取变为direct buffer的优化。

这一点,也许是考虑到direct buffer在建立时会有一些开销,同时在jdk1.6之前direct buffer不会被GC回收,因为它们分配在JVM的堆外的内存空间中。

到底这样优化的效果如何,没有测试,也就略过。也许是为了减少GC的负担。

在Reader读取到一个完整的RpcRequest包之后,会调用processOneRpc *** ,此调用将进入业务逻辑环节。这个 *** ,会从接受到的数据包中,反序列化出RpcRequest的头部和数据,依此构造一个RpcRequest对象,设置客户端需要的跟踪信息(trace info),然后构造一个Call对象,如下图所示:

此后,在Handler处理时,就以Call为单位,这是一个包含了与连接相关信息的封装对象。

有了Call对象后,将其加入Server的callQueue队列,以供Handler处理。因为采用了put *** ,所以当callQueue满时(Handler忙),Reader会发生阻塞,如下所示:

callQueue.put(call);              // queue the call; maybe blocked here

三、Handler

Handler就是根据rpc请求中的 *** (Call)及参数,来调用相应的业务逻辑接口来处理请求。

一个Server中有多个Handler,对应多个业务接口,该文不讨论具体业务逻辑。

handler的逻辑基本如下(略去异常和其它次要信息):

public void run() {
      SERVER.set(Server.this);
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          CurCall.set(call);
          try {
            if (call.connection.user == null) {
              value = call(call.rpcKind, call.connection.protocolName, call.rpcRequest, 
                           call.timestamp);
            } else {
              value = 
                call.connection.user.doAs(...);
            }
          } catch (Throwable e) {
            //略 ... 
          }
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            responder.doRespond(call);
          }
  }

可见,Handler从callQueue中取出一个Call,然后调用这个Server.call *** ,最后调用Responder的doResponde *** 将结果发送给客户端。

Server.call *** :

public Writable call(RPC.RpcKind rpcKind, String protocol,
        Writable rpcRequest, long receiveTime) throws Exception {
      return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
          receiveTime);
    }

四、Responder

一个Server只有1个Responder线程。

此线程不断进行如下几个重要调用以和Handler协调并发送数据:

//这个wait是同步作用,具体见下面分析
waitPending();     
...
//开始select,或许会阻塞
writeSelector.select(PURGE_INTERVAL);
...
//假如selectKeys有数据,就依次异步发送数据
for(selectorKeys){
	doAsyncWrite(key);
}
...
//当到达丢弃时间,会从selectedKeys构造calls,并依次丢弃
for(Call call : calls) {
  doPurge(call, now);
}

当Handler调用doRespond *** 后,handler处理的结果被加入responseQueue的队尾,而不是立即发送回客户端:

void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
          //注意这里isHandler = true,表示可能会向select注册写事件以在Responder主循环中通过select处理数据发送
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

上面的synchronized 可以看出,responseQueue是争用资源,相应的:

Handler是生产者,将结果加入队列;
Responder是消费者,从队列中取出结果并发送。

processResponse将启动Responder进行发送,首先从responseQueue中以非阻塞方式取出一个call,然后以非阻塞方式尽力发送call.rpcResponse,假如发送完毕,则返回。

当还有剩余数据未发送,将call插入队列的之一个位置,由于isHandler参数,在来自Handler的调用中传入为true,所以会唤醒writeSelector,并注册一个写事件,其中incPending() *** ,是为了在向selector注册写事件时,阻塞Responder线程,后面有分析。

call.connection.responseQueue.addFirst(call);

            if (inHandler) {
              // set the serve time when the response has to be sent later
              call.timestamp = Time.now();

              incPending();
              try {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
                writeSelector.wakeup();
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
              } catch (ClosedChannelException e) {
                //Its ok. channel might be closed else where.
                done = true;
              } finally {
                decPending();
              }
            }

再回到Responder的主循环,看看假如向select注册了写事件会发生什么:

//执行这句时,假如Handler调用的responder.doResonde()正在向select注册写事件,这里就会阻塞
          //目的很显然,是为了下句的select能获取数据并立即返回,这就减少了阻塞发生的次数
          waitPending();     // If a channel is being registered, wait.

          //这里用超时阻塞来select,是为了能够在没有数据发送时,定期唤醒,以处理长期未得到处理的Call
          writeSelector.select(PURGE_INTERVAL);
          Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
          while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            try {
              if (key.isValid() && key.isWritable()) {
                  //异步发送
                  doAsyncWrite(key);
              }
            } catch (IOException e) {
              LOG.info(Thread.currentThread().getName() + ": doAsyncWrite threw exception " + e);
            }
          }

重点内容都做了注释,不再赘述。可以看出,既考虑同步,又考虑性能,这是值得学习的地方。

五、总结

该文着重分析了hadoop的rpc调用中server部分,可以看出,这是一个精良的设计,考虑的很细。

关于同步:
Listener生产,Reader消费;Reader生产,Handler消费,Handler生产,Responder消费。
所以它们之间必须同步.在具体的hadoop实现中,既有利用BlockingQueue的put&take操作实现阻塞,以达到同步目的,也对争用资源使用synchronized来实现同步。 关于缓冲:
其中几个缓冲队列也值得关注.Server的并发请求会特别多,而Handler在执行call进行业务逻辑时,肯定会慢下来,所以必须建立请求和处理之间的缓冲。
另外,处理和发送之间也同样会出现速率不匹配的现象,同样需要缓冲。 关于线程模型:
Listener单线程,Reader多线程,Handler多线程,Responder单线程,为什么会这样设计?Listener采用select模式处理accept事件,一个客户端在一段时间内一般只建立有限次的连接,而且连接的建立是比较快的,所以单线程足够应付,建立后直接丢给Reader,从而自己很从容地应付新连接。Handler多线程,业务逻辑是大头,又很大可能会牵涉I/O密集(HDFS),假如线程少,耗时过长的业务逻辑可能就会让大部分的Handler线程处于阻塞,这样轻快的业务逻辑也必须排队,可能会发生饥饿。假如Reader收集的请求队列长时间处于满的状态,整个通讯必然恶化,所以这是典型的需要降低响应时间、提升吞吐量的高并发时刻,这个时刻的上下文切换是必须的,不纠结,并发为重。Responder是单线程,显然,Responder会比较轻松,因为虽然请求很多,但经过Reader->Handler的缓冲和Handler的处理,上一批能发送完的结果已经发送了。Responder更多的是搜集并处理那些长结果,并通过高效select模式来获取结果并发送。

这里,Handler在业务逻辑调用完毕直接调用了responder.doRespond发送,是因为这是个立即返回的调用,这个调用的耗时是很少的,所以不必让Handler因为发送而阻塞,进一步充分发挥了Handler多线程的能力,减少了线程切换的机会,强调了其多线程并发的优势,同时又为responder减负,以增强Responder单线程作战的信心。 关于锁
对Hadoop来讲,因为同步需求,所以加锁是必不可少的。性能是需要考虑,但是从工程的角度上来看,通讯层的稳定性、代码可维护性、保持代码结构的相对简单性(其代码因历史原因已非常复杂),大部分采用了synchronized这种悲观得、重型的加锁方式,这样,可以显著减少对象之间同步的复杂性,减少错误的发生。

六、(补充)RpcServer 线程模型

NameNode启动过程:

线程模型

Listener 1个:

监听并接受来自客户端的连接.将新建连接放入pendingConnections. 清理空闲连接. 唤醒Reader.

Reader N个 : 从pendingConnections中获取连接,读取数据,从RpcRequest构造Call,并放入callQueue.

Handler N 个:

从callQueue获取客户端调用call,并执行. 调用Responder,将结果加入responseQueue的尾部.这里会调用一次发送.假如数据未发送完,注册 WRITE事件到 selector.并唤醒Responder.

Responder 1个:

从responseQueue中按照FIFO顺序发送数据. 处理 selector select出的数据. 扫描callQueue,并丢弃过期的Call.

终.

扫描二维码推送至手机访问。

版权声明:本文由2345好导航站长资讯发布,如需转载请注明出处。

本文链接:http://www.2345hao.cn/blog/index.php/post/51.html

分享给朋友:

“Hadoop 的 Server 及其线程模型分析” 的相关文章

19%市占率换不来业绩体量的和林微纳 新业务0.24%市占率又该如何期待?

19%市占率换不来业绩体量的和林微纳 新业务0.24%市占率又该如何期待?

  以19%市占率位居精微屏蔽罩市场头部玩家的和林微纳,即将亮相科创板。   2021年3月9日,主要产品为微机电(MEMS)精微电子零部件的和林微纳,开启了科创板招股。公司与楼氏电子、瑞声科技、裕元电子和银河机械,一同成为精微屏蔽罩市场的主要玩家,2019年五家企业合计占到全球市场总份额的80%...

新风光:核心材料严重依赖进口 毛利率下滑市场“风光”不再 |

新风光:核心材料严重依赖进口 毛利率下滑市场“风光”不再 |

  作为“光伏、风电”等大热门行业上游关键零部件供应商的新风光,即将登陆科创资本市场。   2021年3月24日,以大功率电力电子节能控制技术为核心技术平台,构筑电气控制装备产品体系的新风光,在科创板开启招股环节。   招股资料显示,新风光本次共计将募资5.9亿元,其中1.5亿元用于变频器和SV...

上声电子:四年净利仅剩30% 研发低产能又“空置” 三角股权难解丨

上声电子:四年净利仅剩30% 研发低产能又“空置” 三角股权难解丨

  历时四年发展,归母净利润反缩水近7成的上声电子,即将登陆科创板,寻求资本助力。   作为国内前装市场汽车声学产品方案供应商的上声电子,于2021年3月29日在科创板启动招股环节。公司计划募集4.47亿元资金,其中2.47亿元用于扩产扬声器项目,1.49亿元用于扩产汽车电子项目,剩余5000万元...

瑞华泰:主业停滞、产能重研发轻、债务高筑、实控人空悬… 压力重重 何去何从?丨

瑞华泰:主业停滞、产能重研发轻、债务高筑、实控人空悬… 压力重重 何去何从?丨

  瑞华泰,一家打破“卡脖子”材料高端PI薄膜的企业,日前正在做科创资本市场的最后冲刺。   2021年4月14日,专注于高性能PI薄膜领域技术自主研发的制造商瑞华泰,已经开启路演及询价环节,距离正式科创板资本市场仅剩最后一步之遥。目前,公司已建立了完整的PI薄膜研发和产业化的核心技术体系,成功进...

四个交易日大跌24%、16.6亿资金疯狂逃离 真爱美家股价过山车的背后丨

四个交易日大跌24%、16.6亿资金疯狂逃离 真爱美家股价过山车的背后丨

  四个交易日高位逆势大跌24%,共计16.6亿资金逃离,上市仅10多天的真爱美家(003041.SZ)正经历大资金高位套现离场。   资料显示,真爱美家是一家以毛毯出口为主的家用纺织企业,公司于2021年4月6日正式登陆资本市场。4月15日,公司涨停封板态势突然崩溃,股价掉头向下,当天最大跌幅触...

从占比90%跌至30% 传统主业断崖式下跌背后 力源科技自己都有些迷茫丨

从占比90%跌至30% 传统主业断崖式下跌背后 力源科技自己都有些迷茫丨

  号称火电、核电行业凝结水精处理系统设备供应商最具竞争力之一的力源科技,即将登陆科创资本市场。         图/Wind   力源科技是一家主要为核电、火电、冶金、化工、石化等行业公司提供各类环保水处理系统和智能电站设备服务的企业。2021年4月21日,公司在科创板启动了招股环节,拟...