Java 如何快速实现一个连接池

网友投稿 229 2023-01-12

Java 如何快速实现一个连接池

什么是 ACP?

ACP 库提供了一整套用于实现对象池化的 API,以及若干种各具特色的对象池实现。目前最常用的版本是 2.0 版本,相对于 1.x 版本而言,并不是简单升级。2.0 版本是对象池实现的完全重写,显著的提升了性能和可伸缩性,并且包含可靠的实例跟踪和池监控。

Apache Commons Pool 的官网地址为:Pool – Overview,想翻找相关文档资料,到这里去是最权威、最全面的。

如何使用 ACP?

要使用 ACP 实现一个线程池,首先需要先引入 ACP 的依赖包,这里以 Maven 为例。

org.apache.commons

commons-pool2

2.0

要使用 ACP 实现一个对象池,大致可以分为三个步骤:

创建对象工厂:告诉 ACP 如何创建你要的对象。

创建对象池:告诉 ACP 你想创建一个怎样的对象池。

使用对象池:ACP 告诉你如何使用你的对象。

创建对象工厂

对象工厂告诉 ACP,它应该如何去创建、激活、钝化、销毁你的对象。创建对象工厂非常简单,只需要实现 ACP 的 PooledObjectFactory 接口即可。PooledObjectFactory 接口的定义如下:

public interface PooledObjectFactory {

PooledObject makeObject() throws Exception;

void destroyObject(PooledObject p) throws Exception;

boolean validateObject(PooledObject p);

void activateObject(PooledObject p) throws Exception;

void passivateObject(PooledObject p) throws Exception;

}

但更多情况下,我们会继承 BasePooledObjectFactory 类来实现对象工厂。因为 BasePooledObjectFactory 类是 PooledObjectFactory 的基础实现类,使用它可以帮我们省了很多麻烦。通过继承这个抽象类,我们只需要实现两个方法:create() 和 wrap() 方法。

// 告诉 ACP 如何创建对象

public abstract T create() throws Exception;

// 定义你要返回的对象

public abstract PooledObject wrap(T obj);

create() 方法定义你的对象初始化过程,最后将初始化完成的对象返回。例如你想定义一个 SFTP 的连接,那么你首先需要定义一个 jsch 对象,之后设置账号密码,之后连接服务器,最后返回一个 ChannelSftp 对象。

public ChannelSftp create() {

// SFTP 连接的创建过程

}

wrap() 方法定义你要返回的对象,对于一个 SFTP 的连接池来说,其实就是一个 ChannelSftp 对象。一般情况下可以使用类 DefaultPooledObject 替代,参考实现如下:

@Override

public PooledObject wrap(Foo foo) {

return new DefaultPooledObject(foo);

}

创建对象池

创建好对象工厂之后,ACP 已经知道你需要的对象如何创建了。那么接下来,你需要根据你的实际需要,去创建一个对象池。在 ACP 中,我们通过 GenericObjectPool 以及 GenericObjectPoolConfig 来创建一个对象池。

// 声明一个对象池

private GenericObjectPool sftpConnectPool;

// 设置连接池配置

GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

poolConfig.setEvictionPolicyClassName("tech.shuyi.javacodechip.acp.SftpEvictionPolicy");

poolConfig.setBlockWhenExhausted(true);

poolConfig.setJmxEnabled(false);

poolConfig.setMaxWaitMillis(1000 * 10);

poolConfig.setTimeBetweenEvictionRunsMillis(60 * 1000);

poolConfig.setMinEvictableIdleTimeMillis(20 * 1000);

poolConfig.setTestWhileIdle(true);

poolConfig.setTestOnReturn(true);

poolConfig.setTestOnBorrow(true);

poolConfig.setMaxTotal(3);

// 设置抛弃策略

AbandonedConfig abandonedConfig = new AbandonedConfig();

abandonedConfig.setRemoveAbandonedOnMaintenance(true);

abandonedConfig.setRemoveAbandonedOnBorrow(true);

this.sftpConnectPool = new GenericObjectPool<>(sftpConnectFactory, poolConfig, abandonedConfig);

在上面创建 SFTP 连接池的代码中,我们配置了一些线程池的参数以及设置了抛弃策略。抛弃策略是非常重要的,如果没有设置抛弃策略,那么会拿到失效的连接从而导致获取文件失败。抛弃策略是通过 poolConfig.setEvictionPolicyClassName 来设置的,我们这里设置的是 SftpEvictionPolicy 类,其代码内容如下:

@Slf4j

@Component

public class SftpEvictionPolicy implements EvictionPolicy {

@Override

public boolean evict(EvictionConfig config, PooledObject underTest, int idleCount) {

try {

// 连接失效时进行驱逐

if (!underTest.getObject().isConnected()) {

log.warn("connect time out, evict the connection. time={}",System.currentTimeMillis() - underTest.getLastReturnTime());

return true;

}

}catch (Exception e){

return true;

}

return false;

}

}

看到这里,创建线程池的代码就结束了,SftpConnectPool 文件的全部内容如下:

@Slf4j

public class SftpConnectPool {

private GenericObjectPool sftpConnectPool;

public SftpConnectPool(SftpConnectFactory sftpConnectFactory) {

// 设置连接池配置

GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

poolConfig.setEvictionPolicyClassName("tech.shuyi.javacodechip.acp.SftpEvictionPolicy");

poolConfig.setBlockWhenExhausted(true);

poolConfig.setJmxEnabled(false);

poolConfig.setMaxWaitMillis(1000 * 10);

poolConfig.setTimeBetweenEvictionRunsMillis(60 * 1000);

poolConfig.setMinEvictableIdleTimeMillis(20 * 1000);

poolConfig.setTestWhileIdle(true);

poolConfig.setTestOnReturn(true);

poolConfig.setTestOnBorrow(true);

poolConfig.setMaxTotal(3);

// 设置抛弃策略

AbandonedConfig abandonedConfig = new AbandonedConfig();

abandonedConfig.setRemoveAbandonedOnMaintenance(true);

abandonedConfig.setRemoveAbandonedOnBorrow(true);

this.sftpConnectPool = new GenericObjectPool<>(sftpConnectFactory, poolConfig, abandonedConfig);

}

public ChannelSftp borrowObject() {

try {

return sftpConnectPool.borrowObject();

} catch (Exception e) {

log.error("borrowObject error", e);

return null;

}

}

public void returnObject(ChannelSftp channelSftp) {

if (channelSftp!=null) {

sftpConnectPool.returnObject(channelSftp);

}

}

}

为了方便使用,我还增加了 borrowObject 和 returnObject 方法,但这两个并不是必须的。在这两个方法中,我们分别调用了 GenericObjectPool 类的 borrowObject 方法和 returnObject 方法。这正是 ACP 提供的、使用线程池对象的方法,先借一个对象,之后归还对象。

注:其实在这一步,已经包含了对象池的使用了。但实际使用的时候,我们经常是将对象池的声明与使用放在同一个类中,因此为了讲解方便,这里没有分开。因此下文的使用对象池,本质上是对对象池做进一步封装。

使用对象池

到这里我们的 SFTP 对象池就已经创建完毕了,是不是非常简单呢!但在实际的工作中,我们通常会在这基础上,做一些封装。对于我们这次的 SFTP 连接池来说,我们会对外直接提供下载文件的服务,将 SFTP 对象池进一步封装起来,不需要关心怎么获取文件。

public class SftpFileHelper {

@Autowired

private SftpConnectPool sftpConnectPool;

public void download(String dir, String file, String saveUrl)throws IOException {

ChannsUXKhplXelSftp sftp = sftpConnectPool.borrowObject();

log.info("begin to download file, dir={}, file={}, saveUrl={}", dir, file, saveUrl);

try {

if (!StringUtils.isEmpty(dir)) {

sftp.cd(dir);

}

File downloadFile = new File(saveUrl);

sftp.get(file, new FileOutputStream(downloadFile));

}catch (Exception e){

log.warn("下载文件失败", e);

}finally {

sftpConnectPool.returnObject(sftp);

}

log.info("file:{} is download successful", file);

}

}

最后我们写一个测试用例来试一试,是否能正常下载文件。

@RunWith(SpringRunner.class)

@SpringBootTest

@Slf4j

public class SftpFileHelperTest {

@Autowired

private SftpFileHelper sftpFileHelper;

@Test

public void testDownloadFtpFile() throws Exception {

sftpFileHelper.download("dir", "fileName", "fileName");

}

}

总结

本文针对 Apache Commons Pool 库最常用的对象池功能做了演示。看完这篇文章,我们知道创建一个线程池需要三个步骤,分别是:

创建对象工厂:告诉 ACP 如何创建你要的对象。

创建对象池:告诉 ACP 你想创建一个怎样的对象池、设置驱逐策略。

使用对象池:ACP 告诉你如何使用你的对象。

本文相关代码存放在博主 github 项目:java-code-chip 中,可以点击地址获取:java-code-chip/src/main/java/tech/shuyi/javacodechip/acp at master chenyurong/java-code-chip

ACP 库能够让读者朋友们快速地创建一个对象池,更加专注于业务内容。但事实上,ACP 提供的内容远不止如此,它还有更多更高级的功能。

例如当我们连接的 SFTP 服务器有多个时,我们需要通过不同地址来获得不同的连接对象。此时最笨的办法是每个不同的地址,都复制多一份代码,然后通过不同类的不同方法来实现。但这样的情况工作量相当可观,并且也会有很多重复代码。这种时候就可以使用BaseKeyedPooledObjectFactory 来替代 BasePooledObjectFactory,从而实现通过 key 来实现不同地址的连接对象管理。

更多关于 ACP 的内容,感兴趣的同学可以自行探索,这里就不深入讲解了。

另一种实现方式:

泛型接口ConnectionPool.java

public interface ConnectionPool<T> {

/**

* 初始化池资源

* @param maxActive 池中最大活动连接数

* @param maxWait 最大等待时间

*/

void init(Integer maxActive, Long maxWait);

/**

* 从池中获取资源

* @return 连接资源

*/

T getResource() throws Exception;

/**

* 释放连接

* @param connection 正在使用的连接

*/

void release(T connection) throws Exception;

/**

* 释放连接池资源

*/

void close();

}

以zookeeper为例,实现zookeeper连接池,ZookeeperConnectionPool.java

public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {

//最大活动连接数

private Integer maxActive;

//最大等待时间

private Long maxWait;

//空闲队列

private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();

http:// //繁忙队列

private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();

//连接池活动连接数

private AtomicInteger activeSize = new AtomicInteger(0);

//连接池关闭标记

private AtomicBoolean isClosed = new AtomicBoolean(false);

//总共获取的连接记数

private AtomicInteger createCount = new AtomicInteger(0);

//等待zookeeper客户端创建完成的计数器

private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));

public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {

this.init(maxActive, maxWait);

}

@Override

public void init(Integer maxActive, Long maxWait) {

this.maxActive = maxActive;

this.maxWait = maxWait;

}

@Override

public ZooKeeper getResource() throws Exception {

ZooKeeper zooKeeper;

Long nowTime = System.currentTimeMillis();

final CountDownLatch countDownLatch = latchThreadLocal.get();

//空闲队列idle是否有连接

if ((zooKeeper = idle.poll()) == null) {

//判断池中连接数是否小于maxActive

if (activeSize.get() < maxActive) {

//先增加池中连接数后判断是否小于等于maxActive

if (activeSize.incrementAndGet() <= maxActive) {

//创建zookeeper连接

zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {

if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {

countDownLatch.countDown();

}

});

countDownLatch.await();

System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");

busy.offer(zooKeeper);

return zooKeeper;

} else {

//如增加后发现大于maxActive则减去增加的

activeSize.decrementAndGet();

}

}

//若活动线程已满则等待busy队列释放连接

try {

System.out.println("Thread:" + Thread.currentThread().getId() + "等待获取空闲资源");

Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);

zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

throw new Exception("等待异常");

}

//判断是否超时

if (zooKeeper != null) {

System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接:" + createCount.incrementAndGet() + "条");

busy.offer(zooKeeper);

return zooKeeper;

} else {

System.out.println("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");

throw new Exception("Thread:" + Thread.currentThread().getId() + "获取连接超时,请重试!");

}

}

//空闲队列有连接,直接返回

busy.offer(zooKeeper);

return zooKeeper;

}

@Override

public void release(ZooKeeper connection) throws Exception {

if (connection == null) {

System.out.println("connection 为空");

return;

}

if (busy.remove(connection)){

idle.offer(connection);

} else {

activeSize.decrementAndGet();

throw new Exception("释放失败");

}

}

@Override

public void close() {

if (isClosed.compareAndSet(false, true)) {

idle.forEach((zooKeeper) -> {

try {

zooKeeper.close();

} catch (InterruptedException e) {

e.printStackTrace();

}

});

busy.forEach((zooKeeper) -> {

try {

zooKeeper.close();

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

}

测试用例

这里创建20个线程并发测试连接池,Test.java

public class Test {

public static void main(String[] args) throws Exception {

int threadCount = 20;

Integer maxActive = 10;

Long maxWait = 10000L;

ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);

CountDownLatch countDownLatch = new CountDownLatch(20);

for (int i = 0; i < threadCount; i++) {

new Thread(() -> {

countDownLatch.countDown();

try {

countDownLatch.await();

ZooKeeper zooKeeper = pool.getResource();

Thread.sleep(2000);

pool.release(zooKeeper);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (Exception e) {

e.printStackTrace();

}

}).start();

}

while (true){

}

}

}

以上就是Java 如何快速实现一个连接池的详细内容,更多关于Java 连接池的资料请关注我们其它相关文章!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:丰网快递物流查询单号查询(丰通快运单号查询快递查询)
下一篇:详解Java动态字节码技术
相关文章

 发表评论

暂时没有评论,来抢沙发吧~