记一次JAVA使用ProcessBuilder执行Shell任务卡死问题分析

网友投稿 415 2022-09-07

记一次JAVA使用ProcessBuilder执行Shell任务卡死问题分析

背景

最近由于某些原因需要把一些原本 ​​location​​​ 在 ​​oss​​​ (阿里云云对象存储)上的 ​​hive​​​ 数据迁移到​​cosn​​​(腾讯云对象存储)。目前一直在增量进行同步,在迁移之前需要进行数据的对比。至于对比的方法计划有两种,一种是对比 ​​oss​​​ 和 ​​cosn​​​ 对应文件下的文件所占磁盘空间大小,即使用 ​​hadoop fs -du -s -h 路径​​​ 命令,然后对比相应表 ​​location​​​ 的数据大小是否一直即可;另外一种是直接对相应的 ​​hive​​​ 表进行 ​​count​​​ 操作,表的 ​​location​​​ 地址可以通过 ​​hive server2​​​ 或者 ​​spark thrift server​​ 获取相应的元数据信息。

生成对比脚本

根据 ​​hive server2​​​ 或者 ​​spark thrift server​​​ 获取的分区、location、库名、表名等信息生成了数据量和所占磁盘空间的 ​​shell​​​ 语句。这里的数据量语句最终生成到一个文件中:​​count.sh​​

count=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table1 "`echo bi_ods.table1:$count >>/Users/scx/work/git/company/utils/count.logcount=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table2 "`echo bi_ods.table2:$count >>/Users/scx/work/git/company/utils/count.logcount=`spark-sql --master yarn --executor-memory 8G --executor-cores 4 --num-executors 2 --driver-memory 4G -e "select count(1) from bi_ods.table3 where dt >= 20191020 and dt <= 20191027"`echo bi_ods.table3:$count >>/Users/scx/work/git/company/utils/count.log...

通过语句可以看出,对于没有分区的表直接进行 ​​count​​​ 计算,对于分区的表,只会对比最近的数据(日期自定义) 我们可以通过执行这个 ​​​shell​​​ 脚本,可以把最终的结果重定向到 ​​count.log​​​中,以 ​​:​​​ 隔开,前半本部分是表名,后半部分是数据量,再写个程序解析表和数据量,最后与腾讯云执行的 ​​count​​ 结果进行对比即可。

执行优化

上面生成了 ​​count.sh​​​ 后执行呢?直接 ​​bash count.sh​​​吗?是一种方法,除非你的表个数很少,很快就能执行完。要知道,这种方式是串行执行,只有等一个表的 ​​count​​​ 语句执行完成之后才能执行下一个语句。我这边共有近​​2000​​​ 张表,如果这样执行的话需要近 ​​30​​ 个小时,太浪费时间了。

所以有没有什么方法优化呢?当然有,记得 ​​Java​​​ 里面有调用 ​​shell​​​ 程序的类,比如​​ProcessBuilder​​​ 、​​Runtime​​​。所以我们可以解析 ​​count.sh​​​ 脚本,获取所有的表的count 命令和对应的重定向命令,然后使用线程池多线程执行每一个 ​​spark-sql​​​ 的 ​​count​​​ 语句并且把结果重定向到 ​​count.log​​

废话不多说,上代码

public class BashExecute { static Random random = new Random(); /** * 随机种子 */ static int sed = Integer.MAX_VALUE; /** * 已经完成的任务数量 */ static int finshTask = 0; /** * 环境变量 */ static Map customPro = new HashMap<>(); static { Properties properties = System.getProperties(); for (Map.Entry entry : properties.entrySet()) { customPro.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); } customPro.putAll(System.getenv()); } public static void execute(String cmds, long num) throws IOException { File tmpFile = createTmpFile(cmds);/* //生成脚本日志的正常输出的文件 File logFile = new File(HiveApp.workDir + "/success/" + num + ".log"); if (!logFile.exists()) { logFile.createNewFile(); } //生成脚本日志的异常输出的文件 File errFile = new File(HiveApp.workDir + "/error/" + num + "_err.log"); if (!errFile.exists()) { errFile.createNewFile(); }*/ ProcessBuilder builder = new ProcessBuilder("bash", tmpFile.getAbsolutePath()) .directory(new File(HiveApp.workDir)); builder.environment().putAll(customPro); Process process = null; try { process = builder.start(); /* //一个线程读取正常日志信息 new StreamThread(logFile, process.getInputStream()).start(); //一个线程读取异常日志信息 new StreamThread(errFile, process.getErrorStream()).start(); */ int exitCode = process.waitFor(); if (exitCode != 0) { System.out.println("执行任务异常:" + cmds); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { if (process != null) { process.destroy(); process = null; } tmpFile.delete(); } } /** * 根据要执行的命令创建临时脚本文件 * * @param cmds shell脚本内容 * @return 创建的文件 */ private static File createTmpFile(String cmds) throws IOException { File file = new File("/tmp/" + System.currentTimeMillis() + random.nextInt(sed) + ".sh"); if (!file.exists()) { if (!file.createNewFile()) { throw new RuntimeException("新建临时文件失败" + file.getAbsolutePath()); } } BufferedWriter writer = new BufferedWriter(new FileWriter(file)); writer.write(cmds); writer.flush(); writer.close(); return file; } /** * nohup java -classpath utils-1.0-SNAPSHOT-jar-with-dependencies.jar com.sucx.app.BashExecute /home/hadoop/sucx/count.sh 70 > exe.log & * * @param args args[0] count.log lying args[1] 并发个数 * @throws IOException */ public static void main(String[] args) throws IOException, InterruptedException { List cmdList; //默认并行度 int poolSize = 15; if (args.length == 0) { cmdList = getCmd(HiveApp.countCmd); } else { //count.sh 路径 cmdList = getCmd(args[0]); if (args.length > 1) { poolSize = Integer.parseInt(args[1]); } } int allTask = cmdList.size(); System.out.println("总任务量:" + allTask); ExecutorService service = Executors.newFixedThreadPool(poolSize); CountDownLatch latch = new CountDownLatch(allTask); for (String cmd : cmdList) { service.execute(() -> { try { execute(cmd, allTask - latch.getCount()); } catch (IOException e) { e.printStackTrace(); } finally { synchronized (BashExecute.class) { finshTask++; //输出当前进行 System.out.println(LocalDateTime.now() + "已完成:" + finshTask + "/" + allTask + ",百分比为:" + (finshTask * 100 / allTask) + "%"); } latch.countDown(); } }); } latch.await(); System.out.println("任务全部完成"); service.shutdown(); } /** * 根据count.sh的路径来获取所有任务的count语句和重定向语句 * * @param path count.sh路径 * @return 所有表命令的集合 */ private static List getCmd(String path) throws IOException { File file = new File(path); if (!file.exists() || !file.isFile()) { throw new RuntimeException("文件不存在"); } BufferedReader reader = new BufferedReader(new FileReader(file)); String line; List cmds = new ArrayList<>(); while ((line = reader.readLine()) != null) { cmds.add("#!/bin/bash" + "\n" + line + "\n" + reader.readLine()); } reader.close(); return cmds; } /** * 读取日志的线程 */ /*static class StreamThread extends Thread { private BufferedWriter writer; private InputStream inputStream; public StreamThread(File file, InputStream inputStream) throws IOException { this.writer = new BufferedWriter(new FileWriter(file)); this.inputStream = inputStream; } @Override public void run() { try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"))) { String line; int lineNum = 0; while ((line = reader.readLine()) != null) { writer.write(line); writer.newLine(); if (lineNum++ == 10) { writer.flush(); lineNum = 0; } } } catch (Exception ignored) { } } }*/}

代码很简单,​​main​​​方法接受两个参数,第一个参数为 ​​count.sh​​​ 的绝对路径,第二个参数为并发的线程数​​n​​

解析​​count.sh​​​ 的脚本内容,每两行​​shell​​​ 代码为一对(第一行为​​count​​​ 的语句,第二行为结果重定向),返回一个​​List​​​ 集合​​cmdList​​新建一个​​fixed​​​ 的线程池,其中核心线程数和最大线程数都为​​n​​设置​​CountDownLatch​​​ 为任务集合​​cmdList​​​ 的大小,遍历​​cmdList​​​ 集合,向线程池提交执行的​​shell​​​ 代码,然后输出当前的执行进度(为什么同步呢?因为有一个​​finshTask++;​​​ 的操作),并释放​​CountDownLatch​​锁​​CountDownLatch await​​等待所有任务执行完毕后输出完成的代码,并关闭线程池

其中在执行脚本的 ​​execute​​​ 代码中为脚本建了一个临时文件,然后使用 ​​ProcessBuilder​​ 执行该文件,等待脚本结束即可。

卡死问题

代码写好了 就好快快乐乐的分别把 ​​jar​​ 上传到阿里云和腾讯云集群执行了

nohup java -classpath utils-1.0-SNAPSHOT-jar-with-dependencies.jar com.sucx.app.BashExecute /home/hadoop/sucx/count.sh 70 > exe.log &

然后​​tail -f exe.log​​ 查看实时日志,看着腾讯云的代码欢快的执行,心里有点小兴奋

但是到了阿里云这里好像有点异常啊,等了好久都不动了

最后 ​​yarn​​ 上显示任务失败了

查看 ​​yarn​​​日志发现报错都是一样的,并且单独拉出来在命令行执行都是 ​​ok​​的

19/11/05 10:32:13 WARN executor.Executor: Issue communicating with driver in heartbeaterorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ... 14

但是 ​​java​​ 的进程好像还是卡着不动

"process reaper" #22 daemon prio=10 os_prio=0 tid=0x00007fcefc001680 nid=0x540a runnable [0x00007fcf4d38a000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"process reaper" #21 daemon prio=10 os_prio=0 tid=0x00007fcf08007ab0 nid=0x5407 runnable [0x00007fcf4d3c3000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)"pool-1-thread-10" #19 prio=5 os_prio=0 tid=0x00007fcfa418bb20 nid=0x53f8 in Object.wait() [0x00007fcf4d4c4000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on <0x00000006750be3a8> (a java.lang.UNIXProcess) at java.lang.Object.wait(Object.java:502) at java.lang.UNIXProcess.waitFor(UNIXProcess.java:395) - locked <0x00000006750be3a8> (a java.lang.UNIXProcess) at com.sucx.app.BashExecute.execute(BashExecute.java:75) at com.sucx.app.BashExecute.lambda$main$0(BashExecute.java:136) at com.sucx.app.BashExecute$$Lambda$1/1406718218.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

​​spark-sql​​ 任务明明失败了,但是我们在执行的进程还没退出?怎么回事,好费解

那么就选择一个线程分析,通过上面堆栈信息选了 ​​process repaer​​ 线程的堆栈信息

"process reaper" #21 daemon prio=10 os_prio=0 tid=0x00007fcf08007ab0 nid=0x5407 runnable [0x00007fcf4d3c3000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) at java.lang.UNIXProcess$$Lambda$10/889583863.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

查看 ​​java​​​ 源码 ​​java.lang.UNIXProcess.waitForProcessExit​​​是个 ​​native​​ 方法,跳过

private native int waitForProcessExit(int pid);

继续向上分析

void initStreams(int[] fds) throws IOException { switch (platform) { case LINUX: case BSD: stdin = (fds[0] == -1) ? ProcessBuilder.NullOutputStream.INSTANCE : new ProcessPipeOutputStream(fds[0]); stdout = (fds[1] == -1) ? ProcessBuilder.NullInputStream.INSTANCE : new ProcessPipeInputStream(fds[1]); stderr = (fds[2] == -1) ? ProcessBuilder.NullInputStream.INSTANCE : new ProcessPipeInputStream(fds[2]); processReaperExecutor.execute(() -> { int exitcode = waitForProcessExit(pid); synchronized (this) { this.exitcode = exitcode; this.hasExited = true; this.notifyAll(); } if (stdout instanceof ProcessPipeInputStream) ((ProcessPipeInputStream) stdout).processExited(); if (stderr instanceof ProcessPipeInputStream) ((ProcessPipeInputStream) stderr).processExited(); if (stdin instanceof ProcessPipeOutputStream) ((ProcessPipeOutputStream) stdin).processExited(); }); break; /** 省略下面代码 **/

查看 ​​initStreams​​ 后我们可以发现该方法是为了异步监控应用的退出,然后关闭输入流和输出流。好像没发现什么问题,继续向上分析

UNIXProcess(final byte[] prog, final byte[] argBlock, final int argc, final byte[] envBlock, final int envc, final byte[] dir, final int[] fds, final boolean redirectErrorStream) throws IOException { pid = forkAndExec(launchMechanism.ordinal() + 1, helperpath, prog, argBlock, argc, envBlock, envc, dir, fds, redirectErrorStream); try { doPrivileged((PrivilegedExceptionAction) () -> { initStreams(fds); return null; }); } catch (PrivilegedActionException ex) { throw (IOException) ex.getException(); } }

这里好像看到我们的程序就是在​​forkAndExec​​​方法中执行然后返回一个程序的进程号(​​pid​​),进去看看

/** * Creates a process. Depending on the {@code mode} flag, this is done by * one of the following mechanisms: *

     *   1 - fork(2) and exec(2)     *   2 - posix_spawn(3P)     *   3 - vfork(2) and exec(2)     *     *  (4 - clone(2) and exec(2) - obsolete and currently disabled in native code)     * 
* @param fds an array of three file descriptors. * Indexes 0, 1, and 2 correspond to standard input, * standard output and standard error, respectively. On * input, a value of -1 means to create a pipe to connect * child and parent processes. On output, a value which * is not -1 is the parent pipe fd corresponding to the * pipe which has been created. An element of this array * is -1 on input if and only if it is not -1 on * output. * @return the pid of the subprocess */ private native int forkAndExec(int mode, byte[] helperpath, byte[] prog, byte[] argBlock, int argc, byte[] envBlock, int envc, byte[] dir, int[] fds, boolean redirectErrorStream) throws IOException;

也是一个 ​​native​​​ 方法,但是上面的注释不能忽略了,尤其是关于 ​​fds​​ 参数的。大致意思是:

fds是三个文件描述符的数组。索引0、1和2分别对应于标准输入, 标准输出和标准误差。对于输入流,-1表示创建一个管道来连接 子进程和父进程,对于输出流,非-1的值是与已创建的管道相对应的父管道。 当且仅当输出不是-1时,输入才为-1。

看到这里突然想到,我使用 ​​ProcessBuilder​​​ 执行的 ​​spark-sql​​​ 的日志都打到哪里去了?好像也并没有重定向日志信息。 继续往上看

static Process start(String[] cmdarray, java.util.Map environment, String dir, ProcessBuilder.Redirect[] redirects, boolean redirectErrorStream) throws IOException { /*省略部分代码*/ try { if (redirects == null) { //如果不重定向输入流、输出流则全部默认为管道形式链接到当前java程序 std_fds = new int[] { -1, -1, -1 }; } else { std_fds = new int[3]; if (redirects[0] == Redirect.PIPE) std_fds[0] = -1; else if (redirects[0] == Redirect.INHERIT) std_fds[0] = 0; else { f0 = new FileInputStream(redirects[0].file()); std_fds[0] = fdAccess.get(f0.getFD()); } if (redirects[1] == Redirect.PIPE) std_fds[1] = -1; else if (redirects[1] == Redirect.INHERIT) std_fds[1] = 1; else { f1 = new FileOutputStream(redirects[1].file(), redirects[1].append()); std_fds[1] = fdAccess.get(f1.getFD()); } if (redirects[2] == Redirect.PIPE) std_fds[2] = -1; else if (redirects[2] == Redirect.INHERIT) std_fds[2] = 2; else { f2 = new FileOutputStream(redirects[2].file(), redirects[2].append()); std_fds[2] = fdAccess.get(f2.getFD()); } } return new UNIXProcess (toCString(cmdarray[0]), argBlock, args.length, envBlock, envc[0], toCString(dir), std_fds, redirectErrorStream); } finally { // In theory, close() can throw IOException // (although it is rather unlikely to happen here) try { if (f0 != null) f0.close(); } finally { try { if (f1 != null) f1.close(); } finally { if (f2 != null) f2.close(); } } } }

发现如果我们没有重定型输入流输出流的话,全部设置为默认值 ​​-1​​​,而 ​​-1​​​ 表示创建一个管道连接子进程与父进程。 记得以前使用 ​​​ulimit -a​​​ 命令时看到参数里有 ​​pipe size​​​ 的限制,即管道缓冲区的大小,会不会是因为缓冲区不够用导致任务卡死呢? 在阿里云机器执行一下发现大小为 ​​​8*512byte=4k​​

[hadoop@emr-header-1 ~]$ ulimit -acore file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 63471max locked memory (kbytes, -l) 64max memory size (kbytes, -m) unlimitedopen files (-n) 65535pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 32767virtual memory (kbytes, -v) unlimitedfile locks (-x)

腾讯云机器执行一下

[hadoop@172 ~]$ ulimit -acore file size (blocks, -c) 0data seg size (kbytes, -d) unlimitedscheduling priority (-e) 0file size (blocks, -f) unlimitedpending signals (-i) 256991max locked memory (kbytes, -l) 64max memory size (kbytes, -m) unlimitedopen files (-n) 100001pipe size (512 bytes, -p) 8POSIX message queues (bytes, -q) 819200real-time priority (-r) 0stack size (kbytes, -s) 8192cpu time (seconds, -t) unlimitedmax user processes (-u) 256991virtual memory (kbytes, -v) unlimitedfile locks (-x)

发现结果一样,都是 ​​4K​​ 的大小

于是网上搜索下发现

​​2.6​​​ 标准版本的 ​​linux​​​ 内核,​​pipe​​​ 缓冲区是 ​​64 KB​​​,尽管命令 ​​ulimit -a​​​ 看到管道大小 ​​8​​​ 块,缓冲区的大小不是 ​​4 k​​​,因为内核动态分配最大​​16​​​“缓冲条目”,相乘为 ​​64 k​​。这些限制是硬编码的

查看腾讯云的缓冲条目个数为​​16​​​,刚好 ​​64 K​​

[hadoop@172 kernels]$ cat /usr/src/kernels/3.10.0-514.21.1.el7.x86_64/include/linux/pipe_fs_i.h | grep PIPE_DEF_BUFFERS#define PIPE_DEF_BUFFERS 16

阿里云的缓冲条目找不到

[hadoop@emr-header-1 kernels]$ ls /usr/src/kernels/[hadoop@emr-header-1 kernels]$

那么就默认的 ​​4K​​。

看到这里想必你也知道了导致我们任务卡死是由于并没有任务进程从管道缓冲区读数据导致了缓冲区满了无法再进行写入。解决办法就是在​​ProcessBuilder​​ 启动程序的时候重定向输出流到文件或者使用异步线程实时读取输出流。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:广告情报局:SK-II 这支广告,比「爱死机」好看!
下一篇:presto sql输入表、输入字段、limit、join操作解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~