c语言sscanf函数的用法是什么
254
2022-11-17
大数据学习笔记-------------------(5)
第5章 Spark调度与高级编程
5.1 Spark应用程序例子
Spark应用程序用spark-submit这个shell命令把spark应用程序部署在集群上。通过统一的接口使用各自的集群管理器。因此,不必每一个应用程序进行配置。
例如:使用与之前相同的例子,这次使用spark应用来操作这个例子。
下面是in.txt文件包含的文本信息:
people are not asbeautiful as they look,as they walk or as they talk.they are only as beautiful as they love,as they care asthey share.
下面是用scala编写的spark应用程序:SparkWordCount.scala
import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark._object SparkWordCount {def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ val count=input.flatMap(line=>line.split(" ")) .map(word=>(word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); }}
使用如下步骤提交应用。在终端,按照下面的步骤执行spark-application路径下。
Step 1:下载Spark Jar
Spark corejar被用于编译,下载spark-core_2.10-1.3.0.jar的网址如下:
2:编译
按照如下命令编译程序,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar是Spark 库中的hadoopjar。
$scalac -classpath"spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar"SparkPi.scala
Step 3:创建Jar
执行下面命令,创建一个wordcount.jar文件:
jar-cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Step 4:提交Spark应用程序
提交spark应用的命令如下:
spark-submit --class SparkWordCount --master localwordcount.jar
如果执行成功,将看到如下输出:
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.15/07/08 13:56:06 INFO SparkUI: Started SparkUI at 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at with timestamp 143634396702915/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+5415/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 sOK15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at 13:56:13 INFO DAGScheduler: Stopping DAGScheduler15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared15/07/08 13:56:14 INFO BlockManager: BlockManager stopped15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext15/07/08 13:56:14 INFO Utils: Shutdown hook called15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3-b2c2-823d8d99c5af15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Step 5:检查输出文件
执行成功之后,在spark-application路径下会发现outfile文件夹。执行下面的命令,查看outfile文件夹下的文件:
$ cd outfile$lsPart-00000part-00001 _SUCCESS
使用cat命令显示Part-00000 part-00001文件中的内容
5.2 Spark-submit语法
spark-submit[options]
options选项如下表:
Option | Description |
--master | spark://host:port, mesos://host:port, yarn, or local. |
--deploy-mode | Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). |
--class | Your application's main class (for Java / Scala apps). |
--name | A name of your application. |
--jars | Comma-separated list of local jars to include on the driver and executor classpaths. |
--packages | Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. |
--repositories | Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. |
--py-files | Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python apps. |
--files | Comma-separated list of files to be placed in the working directory of each executor. |
--conf (prop=val) | Arbitrary Spark configuration property. |
--properties-file | Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults. |
--driver-memory | Memory for driver (e.g. 1000M, 2G) (Default: 512M). |
--driver-java-options | Extra Java options to pass to the driver. |
--driver-library-path | Extra library path entries to pass to the driver. |
--driver-class-path | Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. |
--executor-memory | Memory per executor (e.g. 1000M, 2G) (Default: 1G). |
--proxy-user | User to impersonate when submitting the application. |
--help, -h | Show this help message and exit. |
--verbose, -v | Print additional debug output. |
--version | Print the version of current Spark. |
--driver-cores NUM | Cores for driver (Default: 1). |
--supervise | If given, restarts the driver on failure. |
--kill | If given, kills the driver specified. |
--status | If given, requests the status of the driver specified. |
--total-executor-cores | Total cores for all executors. |
--executor-cores | Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode). |
5.3 Spark变量
Spark包含两种不同的变量:broadcast变量和Accumulators变量。
1. broadcast变量:用于分发largevalues
broadcast变量允许程序员保持一个只读变量缓存每台机器上,而不是shiping一个copy任务。它们可以被使用,例如,以有效的方式给每一个节点,一个大型输入数据集的copy。Spark也尝试使用高效broadcast算法以降低通信成本来分发broadcast变量。spark actions是通过阶段执行,把分布式shuffle操作分开。Spark自动broadcast每个阶段内任务所需要的公共数据。这种broadcast数据以序列化的形式被缓存,而且在运行的每个任务之前,反序列化。这意味着显式创建broadcast变量,只能用与如下情形:多个阶段的工作需要相同的数据、反序列化形式缓存数据。
Broadcast变量通过调用SparkContext.broadcast(v)创建。broadcast变量被封装在v中,它的值可以调用value方法来使用。给出例子如下:
scala> val broadcastVar =sc.broadcast(Array(1, 2, 3))
输出:
broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
创建broadcast变量之后,它应该被用来代替在集群上运行的任何函数的值,这 样, broadcast变量不止一次的ship到节点。此外,broadcast变量对象不应被修饰在broadcast之后,以确保所有节点获得broadcast变量相同的值。
2. Accumulators变量:用于汇集特定集合的信息
Accumulators是唯一一个通过关联操作被added而且支持高并发的变量。它们可以被用来实现计数器(如在MapReduce的)或求和。Spark本地支持数字类型的accumulators,并且程序员可以增加对新类型的支持。如果创建一个带有名字的accumulators,它们将在Spark UI显示。这对运行阶段的进度的理解是很有用(注:Python尚未支持)。
accumulator通过调用SparkContext.accumulator(v)创建变量v,在集群上运行的任务可以通过add方法或者+=操作符(scala or python)加入accmulator。然而,它们的值是不能读取的。只有driver程序可以通过value方法读取accmulator的值。
下面给出使用accumulator对数值中的元素进行求和:
scala> val accum = sc.accumulator(0)
scala>sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
通过如下命令查看,accumulator中的值:scala> accum.value
输出结果:res2: Int = 10
5.4 数字类型RDD操作
Spark允许对numericdata进行不同的操作。Spark 数字类型操作的实现可以通过创建一个一次一个元素的流算法模型实现。这些操作被计算而且通过调用status()方法返回一个StatusCounter对象。StatusCounts对象的数字类型的函数列表如下:
Method | &Meaning |
|
count() | Number of elements in the RDD. |
|
Mean() | Average of the elements in the RDD. |
|
Sum() | Total value of the elements in the RDD. |
|
Max() | Maximum value among all elements in the RDD. |
|
Min() | Minimum value among all elements in the RDD. | |
Variance() | Variance of the elements. | |
Stdev() | Standard deviation. |
如果想使用这些方法中的一个,可以在RDD直接调用相应的方法。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~