Springboot应用中线程池配置详细教程(最新2021版)

网友投稿 182 2023-01-24

Springboot应用中线程池配置详细教程(最新2021版)

前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程。由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池。本文就讲述下Springboot应用下的线程池配置。

背景知识:Springboot中通过使用ThreadPoolTaskExecutor这个javaBean对象的corePoolSize(核心线程数)、maxPoolSize(最大线程数)、keepAliveSeconds(线程空闲时长)和queueCapacity(任务队列容量)属性来配置ThreadPoolExecutor,以上四个属性的作用大致如下:

新提交一个任务时的处理流程很明显:

如果当前线程池的线程数还没有达到基本大小(poolSize < corePoolSize),无论是否有空闲的线程新增一个线程处理新提交的任务;

如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列未满时,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);

如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列满时;

当前poolSize

当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

好了,回到正文。目前配置Springboot线程池主要有两种方式:配置默认线程池和提供自定义线程池;毫无疑问,两种配置方式并无优劣。从使用角度来讲,由于自定义线程池是自定义即没有被Springboot默认使用的线程池,那么就需要通过@Async("自定义线程池bean对象名")的方式去使用,其它地方同默认线程池使用方式一致;下面通过一个简单的Springboot应用结合实际来展示:

1、新建一个Springboot项目,项目结构和pom.xml内容如下:

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.17.RELEASE

com.hugesoft

springboot-async

0.0.1

springboot-async

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter-web

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

二、application.yml中,自定义了线程池需要配置的四个属性,内容如下:

task:

pool:

corePoolSize: 10

maxPoolSize: 20

keepAliveSeconds: 300

queueCapacity: 50

三、在com.hugesoft.config包中有三个类:TaskThreadPoolConfig类用来简化封装application.yml配置的属性,OverrideDefaultThreadPoolConfig类提供了配置默认线程池的方式,CustomizeThreadPoolConfig类则实现了自定义线程池,具体实现如下:

package com.hugesoft.config.dto;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.stereotype.Component;

/**

* 线程池配置属性类

* @author YuXD

*/

@Data

@Component

@ConfigurationProperties(prefix = "task.pool")

public class TaskThreadPoolConfig {

/**

* 核心线程数

*/

private int corePoolSize;

/**

* 最大线程数

*/

private int maxPoolSize;

/**

* 线程空闲时间

*/

private int keepAliveSeconds;

/**

* 任务队列容量(阻塞队列)

*/

private int queueCapacity;

}

package com.hugesoft.config;

import com.hugesoft.config.dto.TaskThreadPoolConfig;

import lombok.extern.slf4j.Slf4j;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.AsyncConfigurer;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 重写默认线程池配置

* @author YuXD

*/

@Slf4j

@Configuration

@EnableAsync

public class OverrideDefaultThreadPoolConfig implements AsyncConfigurer {

@Autowired

private TaskThreadPoolConfig config;

@Override

public Executor getAsyncExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//核心线程池大小

executor.setCorePoolSize(config.getCorePoolSize());

//最大线程数

executor.setMaxPoolSize(config.getMaxPoolSize());

//队列容量

executor.setQueueCapacity(config.getQueueCapacity());

//活跃时间

executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

//线程名字前缀

executor.setThreadNamePrefix("default-thread-");

/*

当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)

CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

*/

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

/**

* 异步任务中异常处理

*

* @return

*/

@Override

public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

return (ex, method, params) -> {

log.error("==========================" + ex.getMessage() + "=======================", ex);

log.error("exception method:" + method.getName());

};

}

}

package com.hugesoft.config;

import com.hugesoft.config.dto.TaskThreadPoolConfig;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.util.concurrent.ThreadPoolExecutor;

/**

*

* 自定义下城

* @author : YuXD

*/

@Configuration

@EnableAsync

public class CustomizeThreadPoolConfig {

@Autowired

private TaskThreadPoolConfig config;

@Bean("customizeThreadPool")

public Executor doConfigCustomizeThreadPool() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//核心线程池大小

executor.setCorePoolSize(config.getCorePoolSize());

//最大线程数

executor.setMaxPoolSize(config.getMaxPoolSize());

//队列容量

executor.setQueueCapacity(config.getQueueCapacity());

//活跃时间

executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

//线程名字前缀

executor.setThreadNamePrefix("customize-thread-");

/*

当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)

CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

*/

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

}

四、com.hugesoft.service包下的内容是从真实项目环境中提取出来的,IStatusAnalyseService定义了设备状态分析基础Service,JJDeviceDataAnalyseManager,ZHUDeviceDataAnalyseManager,ZZDeviceDataAnalyseManager三个子类分别提供了默认实现,AbstractDeviceDataAnalyseManager提取了三个子类用到的公共方法,代码没难度,理解即可;需要尤其注意AbstractDeviceDataAnalyseManager的两个重载方法,分别采用默认线程池和自定义线程池的方式,注意使用的异同点,这点也就是默认线程池和自定义线程池适用上的唯一不同点。具体试下如下:

package com.hugesoft.service;

/**

* 参数分析基础Service,所有需要进行参数分析的都需要实现该接口

*

* @author YuXD

*/

public interface IStatusAnalyseService {

/**

* 设备状态解析处理

*

* @param start 开始时间

* @param end 截止时间

*/

void doStatusAnalyseHandle(String start, String end);

/**

* 设备状态解析处理

*

* @param end 截止时间

*/

void doStatusAnalyseHandle(String end);

/**

* 获取数据类别

*

* @return

*/

String getDataType();

}

package com.hugesoft.service.impl;

import com.hugesoft.service.IStatusAnalyseService;

import org.springframework.scheduling.annotation.Async;

import java.util.Random;

/**

* 抽象的设备数据分析Manager

*

* @author YuXD

* @since 2020-06-18 22:47

*/

public abstract class AbstractDeviceDataAnalyseManager implements IStatusAnalyseService {

@Async("customizeThreadPool")

@Override

public void doStatusAnalyseHandle(String start, String end) {

int sleepSeconds = new Random().nextInt(3) + 1;

try {

Thread.sleep(sleepSeconds * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(getDataType() + "在自定义线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");

}

@Async

@Override

public void doStatusAnalyseHandle(String end) {

int sleepSeconds = new Random().nextInt(3) + 1;

try {

Thread.sleep(sleepSeconds * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(getDataType() + "在默认线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 机加设备数据分析Service实现类

* @author: YuXD

* @create: 2021-03-15 20:17

**/

@Service("JJ")

public class JJDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "机加";

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 铸造设备数据分析Service实现类

* @author: YuXD

* @create: 2020-06-18 22:56

**/

@Service("ZHU")

public class ZHUDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "铸造";

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 总装设备数据分析Service实现类

* @author: YuXD

* @create: 2020-06-18 22:56

**/

@Service("ZZ")

public class ZZDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "总装";

}

}

五、最后看一下Springboot启动类实现;该类既是启动类也是Controller类,没什么特别要说明的。

package com.hugesoft;

import com.hugesoft.service.IStatusAnalyseService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@EnableAsync

@SpringBootApplication

public class SpringbootAsyncApplication {

@Autowired

private List statusAnalyseServiceList;

public static void main(String[] args) {

SpringApplication.run(SpringbootAsyncApplication.class, args);

}

@GetMapping("/sayHelloAsync")

public String sayHelloAsync() {

for (IStatusAnalyseService statusAnalyseService : statusAnalyseServiceList) {

// 采用自定义线程池

statusAnalyseService.doStatusAnalyseHandle(null, null);

// 采用默认线程池

statusAnalyseService.doStatusAnalyseHandle(null);

}

return "Hello, Async!";

}

}

六、最后启动main方法,通过浏览器地址栏访问 http://localhost:8080/sayHelloAsync,发现秒出现如下页面,且控制台会出现如下内容,说明我们配置的默认线程池和自定义线程池都起作用了,到此,配置成功

当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

好了,回到正文。目前配置Springboot线程池主要有两种方式:配置默认线程池和提供自定义线程池;毫无疑问,两种配置方式并无优劣。从使用角度来讲,由于自定义线程池是自定义即没有被Springboot默认使用的线程池,那么就需要通过@Async("自定义线程池bean对象名")的方式去使用,其它地方同默认线程池使用方式一致;下面通过一个简单的Springboot应用结合实际来展示:

1、新建一个Springboot项目,项目结构和pom.xml内容如下:

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.17.RELEASE

com.hugesoft

springboot-async

0.0.1

springboot-async

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter-web

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.springframework.boot

spring-boot-starter-parent

2.1.17.RELEASE

com.hugesoft

springboot-async

0.0.1

springboot-async

Demo project for Spring Boot

1.8

org.springframework.boot

spring-boot-starter-web

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

二、application.yml中,自定义了线程池需要配置的四个属性,内容如下:

task:

pool:

corePoolSize: 10

maxPoolSize: 20

keepAliveSeconds: 300

queueCapacity: 50

三、在com.hugesoft.config包中有三个类:TaskThreadPoolConfig类用来简化封装application.yml配置的属性,OverrideDefaultThreadPoolConfig类提供了配置默认线程池的方式,CustomizeThreadPoolConfig类则实现了自定义线程池,具体实现如下:

package com.hugesoft.config.dto;

import lombok.Data;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.stereotype.Component;

/**

* 线程池配置属性类

* @author YuXD

*/

@Data

@Component

@ConfigurationProperties(prefix = "task.pool")

public class TaskThreadPoolConfig {

/**

* 核心线程数

*/

private int corePoolSize;

/**

* 最大线程数

*/

private int maxPoolSize;

/**

* 线程空闲时间

*/

private int keepAliveSeconds;

/**

* 任务队列容量(阻塞队列)

*/

private int queueCapacity;

}

package com.hugesoft.config;

import com.hugesoft.config.dto.TaskThreadPoolConfig;

import lombok.extern.slf4j.Slf4j;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.AsyncConfigurer;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.util.concurrent.ThreadPoolExecutor;

/**

* 重写默认线程池配置

* @author YuXD

*/

@Slf4j

@Configuration

@EnableAsync

public class OverrideDefaultThreadPoolConfig implements AsyncConfigurer {

@Autowired

private TaskThreadPoolConfig config;

@Override

public Executor getAsyncExecutor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//核心线程池大小

executor.setCorePoolSize(config.getCorePoolSize());

//最大线程数

executor.setMaxPoolSize(config.getMaxPoolSize());

//队列容量

executor.setQueueCapacity(config.getQueueCapacity());

//活跃时间

executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

//线程名字前缀

executor.setThreadNamePrefix("default-thread-");

/*

当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)

CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

*/

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

/**

* 异步任务中异常处理

*

* @return

*/

@Override

public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {

return (ex, method, params) -> {

log.error("==========================" + ex.getMessage() + "=======================", ex);

log.error("exception method:" + method.getName());

};

}

}

package com.hugesoft.config;

import com.hugesoft.config.dto.TaskThreadPoolConfig;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

import java.util.concurrent.ThreadPoolExecutor;

/**

*

* 自定义下城

* @author : YuXD

*/

@Configuration

@EnableAsync

public class CustomizeThreadPoolConfig {

@Autowired

private TaskThreadPoolConfig config;

@Bean("customizeThreadPool")

public Executor doConfigCustomizeThreadPool() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//核心线程池大小

executor.setCorePoolSize(config.getCorePoolSize());

//最大线程数

executor.setMaxPoolSize(config.getMaxPoolSize());

//队列容量

executor.setQueueCapacity(config.getQueueCapacity());

//活跃时间

executor.setKeepAliveSeconds(config.getKeepAliveSeconds());

//线程名字前缀

executor.setThreadNamePrefix("customize-thread-");

/*

当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)

CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行

*/

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

executor.initialize();

return executor;

}

}

四、com.hugesoft.service包下的内容是从真实项目环境中提取出来的,IStatusAnalyseService定义了设备状态分析基础Service,JJDeviceDataAnalyseManager,ZHUDeviceDataAnalyseManager,ZZDeviceDataAnalyseManager三个子类分别提供了默认实现,AbstractDeviceDataAnalyseManager提取了三个子类用到的公共方法,代码没难度,理解即可;需要尤其注意AbstractDeviceDataAnalyseManager的两个重载方法,分别采用默认线程池和自定义线程池的方式,注意使用的异同点,这点也就是默认线程池和自定义线程池适用上的唯一不同点。具体试下如下:

package com.hugesoft.service;

/**

* 参数分析基础Service,所有需要进行参数分析的都需要实现该接口

*

* @author YuXD

*/

public interface IStatusAnalyseService {

/**

* 设备状态解析处理

*

* @param start 开始时间

* @param end 截止时间

*/

void doStatusAnalyseHandle(String start, String end);

/**

* 设备状态解析处理

*

* @param end 截止时间

*/

void doStatusAnalyseHandle(String end);

/**

* 获取数据类别

*

* @return

*/

String getDataType();

}

package com.hugesoft.service.impl;

import com.hugesoft.service.IStatusAnalyseService;

import org.springframework.scheduling.annotation.Async;

import java.util.Random;

/**

* 抽象的设备数据分析Manager

*

* @author YuXD

* @since 2020-06-18 22:47

*/

public abstract class AbstractDeviceDataAnalyseManager implements IStatusAnalyseService {

@Async("customizeThreadPool")

@Override

public void doStatusAnalyseHandle(String start, String end) {

int sleepSeconds = new Random().nextInt(3) + 1;

try {

Thread.sleep(sleepSeconds * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(getDataType() + "在自定义线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");

}

@Async

@Override

public void doStatusAnalyseHandle(String end) {

int sleepSeconds = new Random().nextInt(3) + 1;

try {

Thread.sleep(sleepSeconds * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(getDataType() + "在默认线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 机加设备数据分析Service实现类

* @author: YuXD

* @create: 2021-03-15 20:17

**/

@Service("JJ")

public class JJDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "机加";

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 铸造设备数据分析Service实现类

* @author: YuXD

* @create: 2020-06-18 22:56

**/

@Service("ZHU")

public class ZHUDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "铸造";

}

}

package com.hugesoft.service.impl;

import org.springframework.stereotype.Service;

/**

* @description: 总装设备数据分析Service实现类

* @author: YuXD

* @create: 2020-06-18 22:56

**/

@Service("ZZ")

public class ZZDeviceDataAnalyseManager extends AbstractDeviceDataAnalyseManager {

@Override

public String getDataType() {

return "总装";

}

}

五、最后看一下Springboot启动类实现;该类既是启动类也是Controller类,没什么特别要说明的。

package com.hugesoft;

import com.hugesoft.service.IStatusAnalyseService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@EnableAsync

@SpringBootApplication

public class SpringbootAsyncApplication {

@Autowired

private List statusAnalyseServiceList;

public static void main(String[] args) {

SpringApplication.run(SpringbootAsyncApplication.class, args);

}

@GetMapping("/sayHelloAsync")

public String sayHelloAsync() {

for (IStatusAnalyseService statusAnalyseService : statusAnalyseServiceList) {

// 采用自定义线程池

statusAnalyseService.doStatusAnalyseHandle(null, null);

// 采用默认线程池

statusAnalyseService.doStatusAnalyseHandle(null);

}

return "Hello, Async!";

}

}

六、最后启动main方法,通过浏览器地址栏访问 http://localhost:8080/sayHelloAsync,发现秒出现如下页面,且控制台会出现如下内容,说明我们配置的默认线程池和自定义线程池都起作用了,到此,配置成功

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:调用开放api接口教程(api接口使用教程)
下一篇:新浪股票接口api 交易(新浪股票接口api 交易平台)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~