c语言sscanf函数的用法是什么
251
2022-11-29
【回顾】Spark 分布式计算模拟
文章目录
前言
一、模拟客户端、服务端数据通信二、模拟客户端向服务端发送计算任务三、客户端、服务端分布式集群计算模拟
前言
上一篇我们提到过,对于 Spark 框架有两个核心组件:Driver、Excutor。1、DriverSpark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责: ➢ 将用户程序转化为作业(job) ➢ 在 Executor 之间调度任务 (task) ➢ 跟踪 Executor 的执行情况 ➢ 通过 UI 展示查询运行情况实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为Driver 类。2、ExecutorSpark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。Executor 有两个核心功能: ➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程 ➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
一、模拟客户端、服务端数据通信
通过以上的概念描述,我们可以将Driver、Excutor看成是客户端、服务端的数据通信模式。首先我们先来模拟回顾一下客户端、服务端数据通信的流程。
我们需要创建两个类 Driver、Executor 来分别充当客户端、服务端。
客户端 Driver :需要请求连接到服务端;连接成功后向其发送数据;发送数据后结束。服务端 Executor:需要启动服务,等待客户端的请求;客户端请求后,接收客户端的连接请求;接收客户端发送的数据后,结束。
其实,还有一步应该是服务端将结果返回给客户端,客户端同时接收服务端的数据,这里就不再深入。
具体代码实现如下:
package test02import java.io.{ObjectOutputStream, OutputStream}import java.net.Socketobject Driver { def main(args: Array[String]): Unit = { // 建立客户端,准备发送数据 val client: Socket = new Socket("localhost",9999) println("连接服务端成功!") // 客户端发送数据 val out: OutputStream = client.getOutputStream out.write(2) out.flush() println("数据传输完毕!") // 关闭资源 out.close() client.close() }}
package test02import java.io.InputStreamimport java.net.{ServerSocket, Socket}object Executor { def main(args: Array[String]): Unit = { // 启动服务器,准备接收数据 val server: ServerSocket = new ServerSocket(9999) println("正在等待客户端的连接中......") // 等待客户端的连接 val client: Socket = server.accept() println("客户端的连接成功!") // 接收信息 val in: InputStream = client.getInputStream val data: Int = in.read() println("服务端接收的数据是:",data) // 关闭资源 in.close() client.close() server.close() }}
客户端、服务端数据通信演示:
首先运行Executor启动服务端,等待客户端的请求连接。然后运行Driver请求连接服务器,服务器端接手请求后,客户端开始发送数据2,服务端获取到传送的数据后进行输出。
返回顶部
二、模拟客户端向服务端发送计算任务
Driver 在 Spark 作业执行时主要作用就是: ➢ 将用户程序转化为作业(job) ➢ 在 Executor 之间调度任务 (task) ➢ 跟踪 Executor 的执行情况
Executor主要任务就是运行Spark应用。
这里我们将进一步模拟任务的调度,新建一个Task类含有数据、逻辑(处理方式)、计算功能。整体思路就是,在通信的时候,将数据转为计算任务(将List中的每个数据乘2)。以类的形式模拟任务task调度到Executor中,并在Executor中调用类的计算功能实现应用的运行。
具体代码实现如下:
package test02class Task extends Serializable{ // 数据 val data: Seq[Int] = List(1,2,3) // 逻辑 --- 匿名函数 val logic: Int =>Int = _*2 // 计算 def compute(): Seq[Int] = { data.map(item => logic(item)) }}
package test02import java.io.{ObjectOutputStream, OutputStream}import java.net.Socketobject Driver { def main(args: Array[String]): Unit = { // 建立客户端,准备发送数据 val client: Socket = new Socket("localhost",9999) println("连接服务端成功!") // 客户端发送数据 /*val out: OutputStream = client.getOutputStream out.write(2) out.flush() println("数据传输完毕!")*/ // 发送task任务 val out = client.getOutputStream val outputObject = new ObjectOutputStream(out) val task = new Task() outputObject.writeObject(task) outputObject.flush() println("任务发送完成!") // 关闭资源 out.close() outputObject.close() client.close() }}
package test02import java.io.{InputStream, ObjectInputStream}import java.net.{ServerSocket, Socket}object Executor { def main(args: Array[String]): Unit = { // 启动服务器,准备接收数据 val server: ServerSocket = new ServerSocket(9999) println("正在等待客户端的连接中......") // 等待客户端的连接 val client: Socket = server.accept() println("客户端的连接成功!") // 接收信息 /*val in: InputStream = client.getInputStream val data: Int = in.read() println("服务端接收的数据是:",data)*/ val in = client.getInputStream val inputObject = new ObjectInputStream(in) val task = inputObject.readObject().asInstanceOf[Task] val result: Seq[Int] = task.compute() println("计算的结果为:",result) // 关闭资源 in.close() inputObject.close() client.close() server.close() }}
区别于之前的简单数据通信,这里在进行网络传输的时候是一个Task类对象(需要实现序列化,继承 Serializable),在流中也是以对象的流进行传输。特别提醒,在服务端接收到序列化对象的时候需要进行类型转换,scala中使用asInstanceOf算子实现对象类型的转换。
模拟客户端向服务端发送计算任务演示:
返回顶部
三、客户端、服务端分布式集群计算模拟
二中的单节点之间的处理并不能真正体现出分布式的效果,这里我们在添加一个Excutor(JVM进程),模拟集群中工作节点(Worker)下不止有一个Executor运行的情况。
集群:多个人在一起作同样的事 。分布式 :多个人在一起作不同的事 。
当多个人在处理同一件事情的时候,会将总工作量拆分为一小份一小份,分发给不同的人去做,最终合并起来完成同一个任务。这里我们将Task类修改为数据、逻辑主体类,看作是整个大任务,新建SubTask类就是拆分的小任务,每个小任务中都有计算功能,其工作量具体由整个集群来进行分配。
package test02// Task 提供数据class Task extends Serializable{ // 数据 val data: List[Int] = List(1,2,3,4) // 逻辑 --- 匿名函数 val logic: Int =>Int = _*2}
package test02// SubTask 对拆分的数据进行计算class SubTask extends Serializable { var data: List[Int] = _ var logic: Int => Int = _ // 计算 def compute(): Seq[Int] = { data.map(item => logic(item)) }}
在进行通信的时候,我们创建两个 client 来模拟两份 SubTask 子任务的分发,每个任务的逻辑也就是对数据的处理逻辑是与总任务保持一致的,数据量这里我们就进行均分:take()算子返回RDD中的前n个元素、takeRight()算子返回从右边开始起的n个算子。
package test02import java.io.ObjectOutputStreamimport java.net.Socketobject Driver { def main(args: Array[String]): Unit = { // 建立客户端,准备发送数据 val client1 = new Socket("localhost",9999) val client2 = new Socket("localhost",8888) println("连接服务端成功!") // 1、客户端发送数据 /*val out: OutputStream = client.getOutputStream out.write(2) out.flush() println("数据传输完毕!")*/ // 2、发送task任务 /*val out = client.getOutputStream val outputObject = new ObjectOutputStream(out) val task = new Task() outputObject.writeObject(task) outputObject.flush() println("任务发送完成!")*/ // 3、拆分数据发送 task // 第一份 val out1=client1.getOutputStream val objectOutput1 = new ObjectOutputStream(out1) val task = new Task() val subTask1 = new SubTask() subTask1.logic = task.logic subTask1.data = task.data.take(2) objectOutput1.writeObject(subTask1) objectOutput1.flush() // 第二份 val out2=client2.getOutputStream val objectOutput2 = new ObjectOutputStream(out2) val subTask2 = new SubTask() subTask2.logic = task.logic subTask2.data = task.data.takeRight(2) objectOutput2.writeObject(subTask2) objectOutput2.flush() println("客户端数据发送完毕!") // 关闭资源 out1.close() objectOutput1.close() client1.close() }}
接着使用两个Executor类接收来自client1、client2的同一个任务的两个不同子任务,完成分布式集群计算。
package test02import java.io.ObjectInputStreamimport java.net.{ServerSocket, Socket}object Executor1 { def main(args: Array[String]): Unit = { // 启动服务器,准备接收数据 val server: ServerSocket = new ServerSocket(9999) println("正在等待客户端的连接中......") // 等待客户端的连接 val client: Socket = server.accept() println("客户端的连接成功!") // 接收信息 /*val in: InputStream = client.getInputStream val data: Int = in.read() println("服务端接收的数据是:",data)*/ val in = client.getInputStream val inputObject = new ObjectInputStream(in) val task = inputObject.readObject().asInstanceOf[SubTask] val result: Seq[Int] = task.compute() println("计算节点[9999]的结果为:",result) // 关闭资源 in.close() inputObject.close() client.close() server.close() }}
package test02import java.io.ObjectInputStreamimport java.net.{ServerSocket, Socket}object Executor2 { def main(args: Array[String]): Unit = { // 启动服务器,准备接收数据 val server: ServerSocket = new ServerSocket(8888) println("正在等待客户端的连接中......") // 等待客户端的连接 val client: Socket = server.accept() println("客户端的连接成功!") // 接收信息 /*val in: InputStream = client.getInputStream val data: Int = in.read() println("服务端接收的数据是:",data)*/ // 接收数据计算 val in = client.getInputStream val inputObject = new ObjectInputStream(in) val task = inputObject.readObject().asInstanceOf[SubTask] val result: Seq[Int] = task.compute() println("计算节点[8888]的结果为:",result) // 关闭资源 in.close() inputObject.close() client.close() server.close() }}
客户端、服务端分布式集群计算模拟演示:
在上述模拟过程基础上,Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是: ➢ RDD : 弹性分布式数据集 ➢ 累加器:分布式共享只写变量 ➢ 广播变量:分布式共享只读变量
返回顶部
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~