MacOS下安装Apache Flink及测试WordCount

网友投稿 343 2022-11-19

MacOS下安装Apache Flink及测试WordCount

1.安装java1.8版本

steven@wangyuxiangdeMacBook-Pro  ~  java -versionjava version "1.8.0_211"Java(TM) SE Runtime Environment (build 1.8.0_211-b12)Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

2.安装flink

使用brew安装flink,命令如下:brew install apache-flink

3.查看是否安装成功

steven@wangyuxiangdeMacBook-Pro ~ flink -vVersion: 1.13.2, Commit ID: 5f007ff

4.查看flink安装目录

steven@wangyuxiangdeMacBook-Pro  ~  brew info apache-flinkapache-flink: stable 1.13.2 (bottled), HEADScalable batch and stream data processing(164 files, 325.3MB) * Poured from bottle on 2022-05-13 at 15:52:56From: Apache-2.0==> DependenciesRequired: openjdk@11 ✔==> Options--HEAD Install HEAD version==> Analyticsinstall: 449 (30 days), 1,388 (90 days), 6,005 (365 days)install-on-request: 451 (30 days), 1,392 (90 days), 5,997 (365 days)build-error: 0 (30 days)

5.进入flink安装目录,启动flink

cd /usr/local/Cellar/apache-flink/1.13.2/./libexec/bin/start-cluster.sh steven@wangyuxiangdeMacBook-Pro  /usr/local/Cellar/apache-flink/1.13.2  ./libexec/bin/start-cluster.sh\Starting cluster.Starting standalonesession daemon on host wangyuxiangdeMacBook-Pro.local.Starting taskexecutor daemon on host wangyuxiangdeMacBook-Pro.local.

6.进入web页面,可以看到启动成功:/usr/local/Cellar/apache-flink/1.13.2/ ./libexec/bin/stop-cluster.sh

8.用java写实时流的flink任务,代码如下:

package com.dangbei.flink_test.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import org.apache.flink.api.java.tuple.Tuple2;public class Test_WordCount { public static void main(String[] args) throws Exception { // 创建Flink的代码执行实时流处理上下文环境变量 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 定义读取数据机器主机名称和端口 String host = "localhost"; int port = 9000; // 获取输入对应的socket输入的实时流数据 DataStream inputLineDataStream = env.socketTextStream(host, port); // 对数据集进行多个算子处理,按空白符号分词展开,并转换成(word, 1)二元组进行统计 DataStream> resultStream = inputLineDataStream.flatMap(new FlatMapFunction>() { public void flatMap(String line, Collector> out)throws Exception { // 按空白符号分词 String[] wordArray = line.split("\\s"); // 遍历所有word,包成二元组输出 for (String word : wordArray) { out.collect(new Tuple2( word, 1)); } } }).keyBy(0) // 返回的是一个一个的(word,1)的二元组,按照第一个位置的word分组,因为此实时流是无界的,即数据并不完整,故不用group // by而是用keyBy来代替 .sum(1); // 将第二个位置上的freq=1的数据求和 // 打印出来计算出来的(word,freq)的统计结果对 resultStream.print(); // 正式启动实时流处理引擎 env.execute(); }}

8.1pom.xml配置如下:

> counts = text.flatMap(new FlatMapFunction>() { @Override public void flatMap(String s, Collector> out) throws Exception { String[] value = s.toLowerCase().split(" "); for (String word : value) { out.collect(new Tuple2(word, 1)); } } }) .groupBy(0) .sum(1);// 4.打印 counts.print(); }}

9.1提交命令如下:

./bin/flink run -c com.dangbei.flink_test.wordcount.WordCount /Users/steven/flinkjob/flink_test/target/flink_test-1.0-jar-with-dependencies.jar

9.2flink图中是跑完的任务

因为批的跑完任务后就会释放,所以只能看到FINSHED

注意:

在提交任务的时候,一定要加上类,不然会报错如下:

steven@wangyuxiangdeMacBook-Pro  /usr/local/Cellar/apache-flink/1.13.2  ./bin/flink run /Users/steven/flinkjob/flink_test/target/flink_test-1.0-jar-with-dependencies.jar------------------------------------------------------------ The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file. at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:437) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:158) at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:65) at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691) at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:271) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:#Word文档导入#大数据presto+Alluxio​
下一篇:关于kafka消费不到远程bootstrap
相关文章

 发表评论

暂时没有评论,来抢沙发吧~