linux怎么查看本机内存大小
415
2022-09-07
记一次JAVA使用ProcessBuilder执行Shell任务卡死问题分析
背景
最近由于某些原因需要把一些原本 location 在 oss (阿里云云对象存储)上的 hive 数据迁移到cosn(腾讯云对象存储)。目前一直在增量进行同步,在迁移之前需要进行数据的对比。至于对比的方法计划有两种,一种是对比 oss 和 cosn 对应文件下的文件所占磁盘空间大小,即使用 hadoop fs -du -s -h 路径 命令,然后对比相应表 location 的数据大小是否一直即可;另外一种是直接对相应的 hive 表进行 count 操作,表的 location 地址可以通过 hive server2 或者 spark thrift server 获取相应的元数据信息。
生成对比脚本
根据 hive server2 或者 spark thrift server 获取的分区、location、库名、表名等信息生成了数据量和所占磁盘空间的 shell 语句。这里的数据量语句最终生成到一个文件中:count.sh
count=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table1 "`echo bi_ods.table1:$count >>/Users/scx/work/git/company/utils/count.logcount=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table2 "`echo bi_ods.table2:$count >>/Users/scx/work/git/company/utils/count.logcount=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table3 where dt >= 20191020 and dt <= 20191027"`echo bi_ods.table3:$count >>/Users/scx/work/git/company/utils/count.log...
通过语句可以看出,对于没有分区的表直接进行 count 计算,对于分区的表,只会对比最近的数据(日期自定义) 我们可以通过执行这个 shell 脚本,可以把最终的结果重定向到 count.log中,以 : 隔开,前半本部分是表名,后半部分是数据量,再写个程序解析表和数据量,最后与腾讯云执行的 count 结果进行对比即可。
执行优化
上面生成了 count.sh 后执行呢?直接 bash count.sh吗?是一种方法,除非你的表个数很少,很快就能执行完。要知道,这种方式是串行执行,只有等一个表的 count 语句执行完成之后才能执行下一个语句。我这边共有近2000 张表,如果这样执行的话需要近 30 个小时,太浪费时间了。
所以有没有什么方法优化呢?当然有,记得 Java 里面有调用 shell 程序的类,比如ProcessBuilder 、Runtime。所以我们可以解析 count.sh 脚本,获取所有的表的count 命令和对应的重定向命令,然后使用线程池多线程执行每一个 spark-sql 的 count 语句并且把结果重定向到 count.log
废话不多说,上代码
public class BashExecute { static Random random = new Random(); /** * 随机种子 */ static int sed = Integer.MAX_VALUE; /** * 已经完成的任务数量 */ static int finshTask = 0; /** * 环境变量 */ static Map
代码很简单,main方法接受两个参数,第一个参数为 count.sh 的绝对路径,第二个参数为并发的线程数n
解析count.sh 的脚本内容,每两行shell 代码为一对(第一行为count 的语句,第二行为结果重定向),返回一个List 集合cmdList新建一个fixed 的线程池,其中核心线程数和最大线程数都为n设置CountDownLatch 为任务集合cmdList 的大小,遍历cmdList 集合,向线程池提交执行的shell 代码,然后输出当前的执行进度(为什么同步呢?因为有一个finshTask++; 的操作),并释放CountDownLatch锁CountDownLatch await等待所有任务执行完毕后输出完成的代码,并关闭线程池
其中在执行脚本的 execute 代码中为脚本建了一个临时文件,然后使用 ProcessBuilder 执行该文件,等待脚本结束即可。
卡死问题
代码写好了 就好快快乐乐的分别把 jar 上传到阿里云和腾讯云集群执行了
nohup java -classpath utils-1.0-SNAPSHOT-jar-with-dependencies.jar com.sucx.app.BashExecute /home/hadoop/sucx/count.sh 70 > exe.log &
然后tail -f exe.log 查看实时日志,看着腾讯云的代码欢快的执行,心里有点小兴奋
但是到了阿里云这里好像有点异常啊,等了好久都不动了
最后 yarn 上显示任务失败了
查看 yarn日志发现报错都是一样的,并且单独拉出来在命令行执行都是 ok的
19/11/05 10:32:13 WARN executor.Executor: Issue communicating with driver in heartbeaterorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ... 14
但是 java 的进程好像还是卡着不动
"process reaper" #22 daemon prio=10 os_prio=0 tid=0x00007fcefc001680 nid=0x540a runnable [0x00007fcf4d38a000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"process reaper" #21 daemon prio=10 os_prio=0 tid=0x00007fcf08007ab0 nid=0x5407 runnable [0x00007fcf4d3c3000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"pool-1-thread-10" #19 prio=5 os_prio=0 tid=0x00007fcfa418bb20 nid=0x53f8 in Object.wait() [0x00007fcf4d4c4000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000006750be3a8> (a java.lang.UNIXProcess) at java.lang.Object.wait(Object.java:502) at java.lang.UNIXProcess.waitFor(UNIXProcess.java:395) - locked <0x00000006750be3a8> (a java.lang.UNIXProcess) at com.sucx.app.BashExecute.execute(BashExecute.java:75) at com.sucx.app.BashExecute.lambda$main$0(BashExecute.java:136) at com.sucx.app.BashExecute$$Lambda$1/1406718218.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
spark-sql 任务明明失败了,但是我们在执行的进程还没退出?怎么回事,好费解
那么就选择一个线程分析,通过上面堆栈信息选了 process repaer 线程的堆栈信息
"process reaper" #21 daemon prio=10 os_prio=0 tid=0x00007fcf08007ab0 nid=0x5407 runnable [0x00007fcf4d3c3000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
查看 java 源码 java.lang.UNIXProcess.waitForProcessExit是个 native 方法,跳过
private native int waitForProcessExit(int pid);
继续向上分析
void initStreams(int[] fds) throws IOException { switch (platform) { case LINUX: case BSD: stdin = (fds[0] == -1) ? ProcessBuilder.NullOutputStream.INSTANCE : new ProcessPipeOutputStream(fds[0]); stdout = (fds[1] == -1) ? ProcessBuilder.NullInputStream.INSTANCE : new ProcessPipeInputStream(fds[1]); stderr = (fds[2] == -1) ? ProcessBuilder.NullInputStream.INSTANCE : new ProcessPipeInputStream(fds[2]); processReaperExecutor.execute(() -> { int exitcode = waitForProcessExit(pid); synchronized (this) { this.exitcode = exitcode; this.hasExited = true; this.notifyAll(); } if (stdout instanceof ProcessPipeInputStream) ((ProcessPipeInputStream) stdout).processExited(); if (stderr instanceof ProcessPipeInputStream) ((ProcessPipeInputStream) stderr).processExited(); if (stdin instanceof ProcessPipeOutputStream) ((ProcessPipeOutputStream) stdin).processExited(); }); break; /** 省略下面代码 **/
查看 initStreams 后我们可以发现该方法是为了异步监控应用的退出,然后关闭输入流和输出流。好像没发现什么问题,继续向上分析
UNIXProcess(final byte[] prog, final byte[] argBlock, final int argc, final byte[] envBlock, final int envc, final byte[] dir, final int[] fds, final boolean redirectErrorStream) throws IOException { pid = forkAndExec(launchMechanism.ordinal() + 1, helperpath, prog, argBlock, argc, envBlock, envc, dir, fds, redirectErrorStream); try { doPrivileged((PrivilegedExceptionAction
这里好像看到我们的程序就是在forkAndExec方法中执行然后返回一个程序的进程号(pid),进去看看
/** * Creates a process. Depending on the {@code mode} flag, this is done by * one of the following mechanisms: *
* 1 - fork(2) and exec(2) * 2 - posix_spawn(3P) * 3 - vfork(2) and exec(2) * * (4 - clone(2) and exec(2) - obsolete and currently disabled in native code) ** @param fds an array of three file descriptors. * Indexes 0, 1, and 2 correspond to standard input, * standard output and standard error, respectively. On * input, a value of -1 means to create a pipe to connect * child and parent processes. On output, a value which * is not -1 is the parent pipe fd corresponding to the * pipe which has been created. An element of this array * is -1 on input if and only if it is not -1 on * output. * @return the pid of the subprocess */ private native int forkAndExec(int mode, byte[] helperpath, byte[] prog, byte[] argBlock, int argc, byte[] envBlock, int envc, byte[] dir, int[] fds, boolean redirectErrorStream) throws IOException;
也是一个 native 方法,但是上面的注释不能忽略了,尤其是关于 fds 参数的。大致意思是:
fds是三个文件描述符的数组。索引0、1和2分别对应于标准输入, 标准输出和标准误差。对于输入流,-1表示创建一个管道来连接 子进程和父进程,对于输出流,非-1的值是与已创建的管道相对应的父管道。 当且仅当输出不是-1时,输入才为-1。
看到这里突然想到,我使用 ProcessBuilder 执行的 spark-sql 的日志都打到哪里去了?好像也并没有重定向日志信息。 继续往上看
static Process start(String[] cmdarray, java.util.Map
发现如果我们没有重定型输入流输出流的话,全部设置为默认值 -1,而 -1 表示创建一个管道连接子进程与父进程。 记得以前使用 ulimit -a 命令时看到参数里有 pipe size 的限制,即管道缓冲区的大小,会不会是因为缓冲区不够用导致任务卡死呢? 在阿里云机器执行一下发现大小为 8*512byte=4k
[hadoop@emr-header-1 ~]$ ulimit -acore file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 63471max locked memory (kbytes, -l) 64max memory size (kbytes, -m) unlimitedopen files (-n) 65535pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 32767virtual memory (kbytes, -v) unlimitedfile locks (-x)
腾讯云机器执行一下
[hadoop@172 ~]$ ulimit -acore file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 256991max locked memory (kbytes, -l) 64max memory size (kbytes, -m) unlimitedopen files (-n) 100001pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 256991virtual memory (kbytes, -v) unlimitedfile locks (-x)
发现结果一样,都是 4K 的大小
于是网上搜索下发现
2.6 标准版本的 linux 内核,pipe 缓冲区是 64 KB,尽管命令 ulimit -a 看到管道大小 8 块,缓冲区的大小不是 4 k,因为内核动态分配最大16“缓冲条目”,相乘为 64 k。这些限制是硬编码的
查看腾讯云的缓冲条目个数为16,刚好 64 K
[hadoop@172 kernels]$ cat /usr/src/kernels/3.10.0-514.21.1.el7.x86_64/include/linux/pipe_fs_i.h | grep PIPE_DEF_BUFFERS#define PIPE_DEF_BUFFERS 16
阿里云的缓冲条目找不到
[hadoop@emr-header-1 kernels]$ ls /usr/src/kernels/[hadoop@emr-header-1 kernels]$
那么就默认的 4K。
看到这里想必你也知道了导致我们任务卡死是由于并没有任务进程从管道缓冲区读数据导致了缓冲区满了无法再进行写入。解决办法就是在ProcessBuilder 启动程序的时候重定向输出流到文件或者使用异步线程实时读取输出流。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~