目录

Spark中的RPC

概述

本文是转载的: https://zhuanlan.zhihu.com/p/28893155

Spark 是一个快速的、通用的分布式计算系统,而分布式的特性就意味着,必然存在节点间的通信,本文主要介绍不同的 Spark 组件之间是如何通过 RPC(Remote Procedure Call) 进行点对点通信的。

分为3个章节:

  1. Spark RPC 的简单示例和实际应用
  2. Spark RPC 模块的设计原理
  3. Spark RPC 核心技术总结

Spark RPC的简单示例和实际应用

Spark 的 RPC 主要在两个模块中:

  1. 在 Spark-core 中,主要承载了更好的封装 server 和 client 的作用,以及和 scala 语言的融合,它依赖于模块 org.apache.spark.spark-network-common
  2. org.apache.spark.spark-network-common 中,该模块是 java 语言编写的,最新版本是基于 netty4 开发的,提供全双工、多路复用 I/O 模型的 Socket I/O 能力,Spark 的传输协议结构(wire protocol)也是自定义的。

为了更好的了解 Spark RPC 的内部实现细节,我基于 Spark 2.1 版本抽离了 RPC 通信的部分,单独启了一个项目 https://github.com/neoremind/kraps-rpc,放到了 github 以及发布到 Maven 中央仓库做学习使用,提供了比较好的上手文档、参数设置和性能评估。下面就通过这个模块对Spark RPC先做一个感性的认识。

以下的代码均可以在 spark-rpc 找到。

简单示例

假设我们要开发一个 Hello 服务,客户端可以传输 string,服务端响应 hi 或者 bye,并 echo 回去输入的 string。

第一步,定义一个 HelloEndpoint 继承自 RpcEndpoint 表明可以并发的调用该服务,如果继承自 ThreadSafeRpcEndpoint 则表明该 Endpoint 不允许并发,即只有一个线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
  override def onStart(): Unit = {
    println("start hello endpoint")
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case SayHi(msg) => {
      println(s"receive $msg")
      context.reply(s"hi, $msg")
    }
    case SayBye(msg) => {
      println(s"receive $msg")
      context.reply(s"bye, $msg")
    }
  }

  override def onStop(): Unit = {
    println("stop hello endpoint")
  }
}

case class SayHi(msg: String)
case class SayBye(msg: String)

和 Java 传统的 RPC 解决方案对比,可以看出这里不用定义接口或者方法标示(比如通常的 id 或者 name),使用 scala 的模式匹配进行方法的路由。虽然点对点通信的契约交换受制于语言,这里就是 SayHi 和 SayBye 两个 case class,但是 Spark RPC 定位于内部组件通信,所以无伤大雅。

第二步,把刚刚开发好的 Endpoint 交给 Spark RPC 管理其生命周期,用于响应外部请求。RpcEnvServerConfig 可以定义一些参数、server 名称(仅仅是一个标识)、bind 地址和端口。通过 NettyRpcEnvFactory 这个工厂方法,生成 RpcEnv(大概就是 Akka 中 ActorSystem 的意思),RpcEnv 是整个 Spark RPC 的核心所在,后文会详细展开,通过 setupEndpoint 将 “hello-service” 这个名字和第一步定义的 Endpoint 绑定,后续 client 调用路由到这个 Endpoint 就需要 “hello-service” 这个名字。调用 awaitTermination 来阻塞服务端监听请求并且处理。

1
2
3
4
5
6
// Server 的代码
val config = RpcEnvServerConfig(new RpcConf(), "hello-server", "localhost", 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()

第三步,开发一个 client 客户端调用刚刚启动的 server,首先 RpcEnvClientConfig 和 RpcEnv 都是必须的,然后通过刚刚提到的意思 “hello-service” 名字新建一个远程 Endpoint 的引用(Ref),可以看做是 stub,用于调用,这里首先展示通过异步的方式来做请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Client 连接 Server 的代码
val rpcConf = new RpcConf()
val config = RpcEnvClientConfig(rpcConf, "hello-client")
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hell-service")
val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
future.onComplete {
    case scala.util.Success(value) => println(s"Got the result = $value")
    case scala.util.Failure(e) => println(s"Got error: $e")
}

Await.result(future, Duration.apply("30s"))

也可以通过同步的方式,在最新的 Spark 中 askWithRetry 实际已更名为 askSync。

1
val result = endPointRef.askWithRetry[String](SayBye("neo"))

这就是 Spark RPC 的通信过程,使用起来易用性可想而知,非常简单,RPC 框架屏蔽了 Socket I/O 模型、线程模型、序列化/反序列化过程、使用 netty 做了包识别,长连接,网络重连重试等机制

实际应用

在 Spark 内部,很多的 Endpoint 以及 EndpointRef 与之通信都是通过这种形式的,举例来说比如 driver 和 executor 之间的交互用到了心跳机制,使用 HeartbeatReceiver 来实现,这也是一个 Endpoint,它的注册在 SparkContext 初始化的时候做的,代码如下:

1
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

而它的调用在 Executor 内的方式如下:

1
2
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) 

Spark RPC模块的设计原理

首先说明下,自 Spark 2.0 后已经把 Akka 这个 RPC 框架剥离出去了(详细见 SPARK-5293),原因很简单,因为很多用户会使用 Akka 做消息传递,那么就会和 Spark 内嵌的版本产生冲突,而 Spark 也仅仅用了 Akka 做 RPC,所以2.0之后,基于底层的 org.apache.spark.spark-network-common 模块实现了一个类似 Akka Actor 消息传递模式的 scala 模块,封装在了 core 里面,kraps-rpc 也就是把这个部分从 core 里面剥离出来独立了一个项目。

虽然剥离了 Akka,但是还是沿袭了 Actor 模式中的一些概念,在现在的 Spark RPC 中有如下映射关系。

1
2
3
RpcEndpoint => Actor
RpcEndpointRef => ActorRef
RpcEnv => ActorSystem

底层通信全部使用 netty 进行了替换,使用的是 org.apache.spark.spark-network-common 这个内部 lib。

类图分析

这里先上一个 UML 图展示了 Spark RPC 模块内的类关系,白色的是 Spark-core 中的 scala 类,黄色的是 org.apache.spark.spark-network-common 中的 java 类。

/spark%E4%B8%AD%E7%9A%84rpc/image_1cq34bi8hml9ci61lo41jvp1t4ip.png

不要被这张图所吓倒,经过下面的解释分析,相信读者可以领会其内涵,不用细究其设计的合理度,Spark 是一个发展很快、不断演进的项目,代码不是一成不变的,持续变化是一定的。

RpcEndpoint 和 RpcCallContext 先看最左侧的 RpcEndpoint,RpcEndpoint 是一个可以响应请求的服务,和 Akka 中的 Actor 类似,从它的提供的方法签名(如下)可以看出,receive方法是单向方式的,可以比作 UDP,而 receiveAndReply 是应答方式的,可以比作 TCP。它的子类实现可以选择性的覆盖这两个函数,我们第一章实现的 HelloEndpoint 以及 Spark 中的 HeartbeatReceiver 都是它的子类。

1
2
3
4
5
6
7
def receive: PartialFunction[Any, Unit] = {
    case _ => throw new RpcException(self + " does not implement 'receive'")
}

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new RpcException(self + " won't reply anything"))
}

其中 RpcCallContext 是用于分离核心业务逻辑和底层传输的桥接方法,这也可以看出 Spark RPC 多用组合,聚合以及回调 callback 的设计模式来做 OO 抽象,这样可以剥离业务逻辑 -> RPC 封装(Spark-core 模块内) -> 底层通信(spark-network-common)三者。RpcCallContext 可以用于回复正常的响应以及错误异常,例如:

1
2
reply(response: Any) // 回复一个message,可以是一个case class。
sendFailure(e: Throwable) // 回复一个异常,可以是Exception的子类,由于Spark RPC默认采用Java序列化方式,所以异常可以完整的在客户端还原并且作为cause re-throw出去。

RpcCallContext 也分为了两个子类,分别是 LocalNettyRpcCallContext 和 RemoteNettyRpcCallContext,这个主要是框架内部使用,如果是本地就走 LocalNettyRpcCallContext 直接调用 Endpoint 即可,否则就走 RemoteNettyRpcCallContext 需要通过 RPC 和远程交互,这点也体现了 RPC 的核心概念,就是如何执行另外一个地址空间上的函数、方法,就仿佛在本地调用一样。

另外,RpcEndpoint 还提供了一系列回调函数覆盖。

1
2
3
4
5
6
7
- onError
- onConnected
- onDisconnected
- onNetworkError
- onStart
- onStop
- stop

另外需要注意下,它的一个子类是 ThreadSafeRpcEndpoint,很多 Spark 中的 Endpoint 继承了这个类,Spark RPC 框架对这种 Endpoint 不做并发处理,也就是同一时间只允许一个线程在做调用

还有一个默认的 RpcEndpoint 叫做 RpcEndpointVerifier,每一个 RpcEnv 初始化的时候都会注册上这个 Endpoint,因为客户端的调用每次都需要先询问服务端是否存在某一个 Endpoint。

RpcEndpointRef RpcEndpointRef 类似于 Akka 中 ActorRef,顾名思义,它是 RpcEndpoint 的引用,提供的方法 send 等同于!,ask 方法等同于?,send 用于单向发送请求(RpcEndpoint中 的 receive 响应它),提供 fire-and-forget 语义,而 ask 提供请求响应的语义(RpcEndpoint 中的 receiveAndReply 响应它),默认是需要返回 response 的,带有超时机制,可以同步阻塞等待,也可以返回一个 Future 句柄,不阻塞发起请求的工作线程(阻塞不阻塞都可以)。

RpcEndpointRef 是客户端发起请求的入口,它可以从 RpcEnv 中获取,并且聪明的做本地调用或者 RPC。

RpcEnv 和 NettyRpcEnv 类库中最核心的就是 RpcEnv,刚刚提到了这就是 ActorSystem,服务端和客户端都可以使用它来做通信

  1. 对于 server side 来说,RpcEnv 是 RpcEndpoint 的运行环境,负责 RpcEndpoint 的整个生命周期管理,它可以注册或者销毁 Endpoint,解析 TCP 层的数据包并反序列化,封装成 RpcMessage,并且路由请求到指定的 Endpoint,调用业务逻辑代码,如果 Endpoint 需要响应,把返回的对象序列化后通过 TCP 层再传输到远程对端,如果 Endpoint 发生异常,那么调用 RpcCallContext.sendFailure 来把异常发送回去。
  2. 对 client side 来说,通过 RpcEnv 可以获取 RpcEndpoint 引用,也就是 RpcEndpointRef 的(请求入口)。

RpcEnv 是和具体的底层通信模块交互的负责人,它的伴生对象包含创建 RpcEnv 的方法,签名如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    new NettyRpcEnvFactory().create(config)
  }

RpcEnv 的创建由 RpcEnvFactory 负责,RpcEnvFactory 目前只有一个子类是 NettyRpcEnvFactory,原来还有 AkkaRpcEnvFactory。NettyRpcEnvFactory.create 方法一旦调用就会立即在 bind 的 address 和 port 上启动 server。

它依赖的 RpcEnvConfig 就是一个包含了 SparkConf 以及一些参数(kraps-rpc中更名为 RpcConf)。RpcEnv 的参数都需要从 RpcEnvConfig 中拿,最基本的 hostname 和 port,还有高级些的连接超时、重试次数、Reactor 线程池大小等等。

下面看看 RpcEnv 最常用的两个方法:

1
2
3
4
5
// 注册 endpoint,必须指定名称,客户端路由就靠这个名称来找endpoint
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef 

// 拿到一个endpoint的引用
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef

NettyRpcEnv 由 NettyRpcEnvFactory.create 创建,这是整个 Spark core 和 org.apache.spark.spark-network-common 的桥梁,内部 leverage 底层提供的通信能力,同时包装了一个类 Actor 的语义。上面两个核心的方法,setupEndpoint 会在 Dispatcher 中注册 Endpoint,setupEndpointRef 会先去调用 RpcEndpointVerifier 尝试验证本地或者远程是否存在某个 endpoint,然后再创建 RpcEndpointRef。更多关于服务端、客户端调用的细节将在时序图中阐述,这里不再展开。

Dispatcher和Inbox NettyRpcEnv 中包含 Dispatcher,主要针对服务端,帮助路由到正确的 RpcEndpoint,并且调用其业务逻辑。

这里需要先阐述下 Reactor 模型,Spark RPC 的 Socket I/O 一个典型的 Reactor 模型的,但是结合了 Actor pattern 中的 mailbox,可谓是一种混合的实现方式。

使用 Reactor 模型,由底层 netty 创建的 EventLoop 做 I/O 多路复用,这里使用 Multiple Reactors 这种形式,如下图所示,从 netty 的角度而言,Main Reactor 和 Sub Reactor 对应 BossGroup 和 WorkerGroup 的概念,前者(Main Reactor)负责监听 TCP 连接、建立和断开,后者(Sub Reactor)负责真正的 I/O 读写,而图中的 ThreadPool 就是的 Dispatcher 中的线程池,它来解耦开来耗时的业务逻辑和 I/O 操作,这样就可以更 scalabe,只需要少数的线程就可以处理成千上万的连接,这种思想是标准的分治策略,offload 非 I/O 操作到另外的线程池。

真正处理 RpcEndpoint 的业务逻辑在 ThreadPool 里面,中间靠 Reactor 线程中的 handler 处理 decode 成 RpcMessage,然后投递到 Inbox 中,所以 compute 的过程在另外的下面介绍的 Dispatcher 线程池里面做。

/spark%E4%B8%AD%E7%9A%84rpc/image_1cq36oejmarr10ktnup1rsqs7n16.png

刚刚还提到了 Actor pattern 中 mailbox 模式,Spark RPC 最早起源于 Akka,所以进化到现在,仍然了使用了这个模式。这里就介绍 Inbox,每个 Endpoint 都有一个 Inbox,Inbox 里面有一个 InboxMessage 的链表,InboxMessage 有很多子类,可以是远程调用过来的 RpcMessage,可以是远程调用过来的 fire-and-forget 的单向消息 OneWayMessage,还可以是各种服务启动,链路建立断开等 Message,这些 Message 都会在 Inbox 内部的方法内做模式匹配,调用相应的 RpcEndpoint 的函数(都是一一对应的)。

Dispatcher 中包含一个 MessageLoop,它读取 LinkedBlockingQueue 中的投递 RpcMessage,根据客户端指定的 Endpoint 标识(每个 EndPoint 都有一个 Inbox),找到 Endpoint 的 Inbox,然后投递进去,由于是阻塞队列,当没有消息的时候自然阻塞,一旦有消息,就开始工作。Dispatcher 的 ThreadPool 负责消费这些 Message。

Dispatcher 的 ThreadPool 它使用参数 spark.rpc.netty.dispatcher.numThreads 来控制数量,如果 kill -3 <PID> 每个 Spark driver 或者 executor 进程,都会看到N个 dispatcher 线程:

1
"dispatcher-event-loop-0" #26 daemon prio=5 os_prio=31 tid=0x00007f8877153800 nid=0x7103 waiting on condition [0x000000011f78b000]

那么另外的问题是谁会调用 Dispatcher 分发 Message 的方法呢?答案是 RpcHandler 的子类 NettyRpcHandler,这就是 Reactor 中的线程做的事情。RpcHandler 是底层 ore.apache.spark.spark-network-common 提供的 handler,当远程的数据包解析成功后,会调用这个 handler 做处理。

这样就完成了一个完全异步的流程,Network IO 通信由底层负责,然后由 Dispatcher 分发,只要 Dispatcher 中的 InboxMessage 的链表足够大,那么就可以让 Dispatcher 中的 ThreadPool 慢慢消化消息,和底层的 IO 解耦开来,完全在独立的线程中完成,一旦完成Endpoint 内部业务逻辑,利用 RpcCallContext 回调来做消息的返回。

Outbox NettyRpcEnv 中包含一个 ConcurrentHashMap[RpcAddress, Outbox],每个远程 Endpoint 都对应一个 Outbox,这和上面 Inbox 遥相呼应,是一个 mailbox 似的实现方式。

和 Inbox 类似,Outbox 内部包含一个 OutboxMessage 的链表,OutboxMessage 有两个子类,OneWayOutboxMessage和RpcOutboxMessage,分别对应调用 RpcEndpoint 的 receive 和 receiveAndReply 方法。

NettyRpcEnv 中的 send 和 ask 方法会调用指定地址 Outbox 中的 send 方法,当远程连接未建立时,会先建立连接,然后去消化 OutboxMessage。

同样,一个问题是 Outbox 中的 send 方法如何将消息通过 Network IO 发送出去,如果是 ask 方法又是如何读取远程响应的呢?答案是 send 方法通过 org.apache.spark.spark-network-common 创建的 TransportClient 发送出去消息,由 Reactor 线程负责序列化并且发送出去,每个 Message 都会返回一个 UUID,由底层来维护一个发送出去消息与其 Callback 的 HashMap,当 Netty 收到完整的远程 RpcResponse 时候,回调响应的 Callback,做反序列化,进而回调 Spark core 中的业务逻辑,做 Promise/Future 的 done,上层退出阻塞。

这也是一个异步的过程,发送消息到 Outbox 后,直接返回,Network IO 通信由底层负责,一旦 RPC 调用成功或者失败,都会回调上层的函数,做相应的处理。

spark-network-common中的类 这里暂不做过多的展开,都是基于 Netty 的封装,有兴趣的读者可以自行阅读源码,当然还可以参考我之前开源的 Navi-pbrpc 框架的代码,其原理是基本相同的。

时序图分析

服务启动,话不多述,直接上图。

/spark%E4%B8%AD%E7%9A%84rpc/v2-c333b37c160958d35569bbb2cd653fac_r.jpg

服务端响应

/spark%E4%B8%AD%E7%9A%84rpc/v2-9693b3cf2d3df7e0996b55844ae56ad4_r.jpg

第一阶段,IO 接收。TransportRequestHandler 是 netty 的回调 handler,它会根据 wire format(下文会介绍)解析好一个完整的数据包,交给 NettyRpcEnv 做反序列化,如果是 RPC 调用会构造 RpcMessage,然后回调 RpcHandler 的方法处理 RpcMessage,内部会调用 Dispatcher 做 RpcMessage 的投递,放到 Inbox 中,到此结束。

第二阶段,IO 响应。MessageLoop 获取带处理的 RpcMessage,交给 Dispatcher 中的 ThreadPool 做处理,实际就是调用 RpcEndpoint 的业务逻辑,通过 RpcCallContext 将消息序列化,通过回调函数,告诉 TransportRequestHandler 这有一个消息处理完毕,响应回去。

这里请重点体会异步处理带来的便利,使用 Reactor 和 Actor mailbox 的结合的模式,解耦了消息的获取以及处理逻辑。

客户端请求

/spark%E4%B8%AD%E7%9A%84rpc/v2-79d7059727a80068a7db7b9681ed8509_r.jpg

客户端一般需要先建立 RpcEnv,然后获取 RpcEndpointRef。

第一阶段,IO 发送。利用 RpcEndpointRef 做 send 或者 ask 动作,这里以 send 为例,send 会先进行消息的序列化,然后投递到指定地址的 Outbox 中,Outbox 如果发现连接未建立则先尝试建立连接,然后调用底层的 TransportClient 发送数据,直接通过该 netty 的 API 完成,完成后即可返回,这里返回了 UUID 作为消息的标识,用于下一个阶段的回调,使用的角度来说可以返回一个 Future,客户端可以阻塞或者继续做其他操作。

第二,IO 接收。TransportResponseHandler 接收到远程的响应后,会先做反序列号,然后回调第一阶段的 Future,完成调用,这个过程全部在 Reactor 线程中完成的,通过 Future 做线程间的通知。

Spark RPC核心技术总结

Spark RPC 作为 RPC 传输层选择 TCP 协议,做可靠的、全双工的 binary stream 通道。

做一个高性能 scalable 的 RPC,需要能够满足第一,服务端尽可能多的处理并发请求,第二,同时尽可能短的处理完毕。CPU 和 I/O 之前天然存在着差异,网络传输的延时不可控,CPU 资源宝贵,系统进程/线程资源宝贵,为了尽可能避免 Socket I/O 阻塞服务端和客户端调用,有一些模式(pattern)是可以应用的。Spark RPC 的 I/O Model 由于采用了 Netty,因此使用的底层的 I/O 多路复用(I/O Multiplexing)机制,这里可以通过 spark.rpc.io.mode 参数设置,不同的平台使用的技术不同,例如 linux 使用 epoll。

线程模型采用 Multi-Reactors + mailbox 的异步方式来处理,在上文中已经介绍过。

Schema Declaration 和序列化方面,Spark RPC 默认采用 Java native serialization 方案,主要从兼容性和 JVM 平台内部组件通信,以及 scala 语言的融合考虑,所以不具备跨语言通信的能力,性能上也不是追求极致,目前还没有使用 Kyro 等更好序列化性能和数据大小的方案。

协议结构,Spark RPC 采用私有的 wire format 如下,采用 headr+payload 的组织方式,header 中包括整个 frame 的长度,message 的类型,请求 UUID。为解决 TCP 粘包和半包问题,以及组织成完整的 Message 的逻辑都在 org.apache.spark.network.protocol.MessageEncoder 中。

/spark%E4%B8%AD%E7%9A%84rpc/image_1cq3a1re1s04jiuvpkf2kjc43h.png

使用 wireshake 具体分析一下。

首先看一个 RPC 请求,就是调用第一章说的 HelloEndpoint,客户端调用分两个 TCP Segment 传输,这是因为 Spark 使用 netty 的时候 header 和 body 分别 writeAndFlush 出去。

下图是第一个 TCP segment:

/spark%E4%B8%AD%E7%9A%84rpc/image_1cq3a2fbbo4g1tbe1qi11tv0i8v3u.png

例子中蓝色的部分是 header,头中的字节解析如下:

1
00 00 00 00 00 00 05 d2 // 十进制1490,是整个frame的长度

03一个字节表示的是RpcRequest,枚举定义如下,

1
2
3
4
5
6
7
8
RpcRequest(3)
RpcResponse(4)
RpcFailure(5)
StreamRequest(6)
StreamResponse(7)
StreamFailure(8),
OneWayMessage(9)
User(-1)

每个字节的意义如下:

1
2
4b ac a6 9f 83 5d 17 a9  // 8个字节是UUID
05 bd // 十进制1469payload长度

具体的 Payload 就长下面这个样子,可以看出使用 Java native serialization,一个简单的 Echo 请求就有1469个字节,还是很大的,序列化的效率不高。但是 Spark RPC 定位内部通信,不是一个通用的 RPC 框架,并且使用的量非常小,所以这点消耗也就可以忽略了,还有 Spark Structured Streaming 使用该序列化方式,其性能还是可以满足要求的。

/spark%E4%B8%AD%E7%9A%84rpc/image_1cq3a30cghvk1nkb1uvi1sd21aqq4b.png

总结

作者从好奇的角度来深度挖掘了下 Spark RPC 的内幕,并且从2.1版本的 Spark core 中独立出了一个专门的项目 Kraps-rpc,放到了 github 以及发布到 Maven 中央仓库做学习使用,提供了比较好的上手文档、参数设置和性能评估,在整合 kraps-rpc 还发现了一个小的改进点,给 Spark 提了一个 PR——[SPARK-21701],已经被 merge 到了主干,算是 contribute 社区了(10086个开心)。

接着深入剖析了 Spark RPC 模块内的类组织关系,使用 UML 类图和时序图帮助读者更好的理解一些核心的概念,包括 RpcEnv,RpcEndpoint,RpcEndpointRef 等,以及 I/O 的设计模式,包括 I/O 多路复用,Reactor 和 Actor mailbox 等,这里还是重点提下 Spark RPC 的设计哲学,利用 netty 强大的 Socket I/O 能力,构建一个异步的通信框架。最后,从 TCP 层的 segment 二进制角度分析了 wire protocol。

欢迎访问作者的博客neoremind.com,欢迎技术交流。

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。