博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark RPC使用
阅读量:2426 次
发布时间:2019-05-10

本文共 1913 字,大约阅读时间需要 6 分钟。

HelloworldServer.scala

object HelloworldServer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf() val securityManager = new SecurityManager(conf) val rpcEnv: RpcEnv = RpcEnv.create("hello-service", "localhost", 52345, conf, securityManager) val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv) rpcEnv.setupEndpoint("hello-service", helloEndpoint) rpcEnv.awaitTermination() }}class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("start hello endpoint") } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) => {
//println(s"receive $msg") context.reply(s"Hi, $msg") } case SayBye(msg) => {
//println(s"receive $msg") context.reply(s"Bye, $msg") } } override def onStop(): Unit = {
println("stop hello endpoint") }}

HelloworldClient.scala

object HelloworldClient {
def main(args: Array[String]): Unit = {
syncCall() } def syncCall() = {
// 初始化RpcEnv环境 val conf = new SparkConf // 这里的rpc环境主机需要指定本机,端口号可以任意指定 val rpcEnv = RpcEnv.create("hello-client", "localhost", 52346, conf, new SecurityManager(conf)) // 根据Server端IP + Port获取后端服务的引用,得到的是RpcEndpointRef类型对象 val endpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service") // 1、客户端异步请求 // 客户端通过RpcEndpointRef#ask方法异步访问服务端,服务端通过RpcEndpoint#receiveAndReply方法获取到该请求后处理 val future = endpointRef.ask[String](SayBye("I am zhangsan")) // 客户端请求成功/失败时的处理方法 future.onComplete {
case scala.util.Success(value) => println(s"Got the result = $value") case scala.util.Failure(e) => println(s"Got error: $e") } // 客户端等待超时时间 Await.result(future, Duration("5s")) // 2、客户端同步请求 val resp = endpointRef.askSync[String](SayHi("hehe")) print(resp) }}

转载地址:http://lecmb.baihongyu.com/

你可能感兴趣的文章
单链表翻转
查看>>
检查表达式中的括号是否匹配
查看>>
一道关于 goroutine 的面试题
查看>>
信号量的使用方法
查看>>
Redis 缓存穿透、击穿、雪崩
查看>>
RabbitMQ(1): docker-compose安装rabbitmq及简单使用Hello World
查看>>
利用序列化实现对象的拷贝
查看>>
is-a,has-a,like-a是什么
查看>>
简单工厂、工厂、抽象工厂的对比
查看>>
J2EE的体系架构——J2EE
查看>>
对于关系型数据库中的索引的基本理解
查看>>
索引,主键,唯一索引,联合索引的区别
查看>>
剪桌腿的最小代价
查看>>
Zookeeper原理架构
查看>>
利用ZooKeeper简单实现分布式锁
查看>>
Lock、ReentrantLock、synchronized
查看>>
Java过滤器与SpringMVC拦截器之间的关系与区别
查看>>
Java中的String为什么是不可变的?
查看>>
剑指offer二叉搜索树与双向链表
查看>>
LeetCode 81. 搜索旋转排序数组 II(头条)
查看>>