本文共 1913 字,大约阅读时间需要 6 分钟。
HelloworldServer.scala
object HelloworldServer { def main(args: Array[String]): Unit = { val conf = new SparkConf() val securityManager = new SecurityManager(conf) val rpcEnv: RpcEnv = RpcEnv.create("hello-service", "localhost", 52345, conf, securityManager) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv) rpcEnv.setupEndpoint("hello-service", helloEndpoint) rpcEnv.awaitTermination() }}class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint { override def onStart(): Unit = { println("start hello endpoint") } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case SayHi(msg) => { //println(s"receive $msg") context.reply(s"Hi, $msg") } case SayBye(msg) => { //println(s"receive $msg") context.reply(s"Bye, $msg") } } override def onStop(): Unit = { println("stop hello endpoint") }}
HelloworldClient.scala
object HelloworldClient { def main(args: Array[String]): Unit = { syncCall() } def syncCall() = { // 初始化RpcEnv环境 val conf = new SparkConf // 这里的rpc环境主机需要指定本机,端口号可以任意指定 val rpcEnv = RpcEnv.create("hello-client", "localhost", 52346, conf, new SecurityManager(conf)) // 根据Server端IP + Port获取后端服务的引用,得到的是RpcEndpointRef类型对象 val endpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") // 1、客户端异步请求 // 客户端通过RpcEndpointRef#ask方法异步访问服务端,服务端通过RpcEndpoint#receiveAndReply方法获取到该请求后处理 val future = endpointRef.ask[String](SayBye("I am zhangsan")) // 客户端请求成功/失败时的处理方法 future.onComplete { case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") } // 客户端等待超时时间 Await.result(future, Duration("5s")) // 2、客户端同步请求 val resp = endpointRef.askSync[String](SayHi("hehe")) print(resp) }}
转载地址:http://lecmb.baihongyu.com/