Java FTPClient连接池的实现

网友投稿 450 2023-01-28


Java FTPClient连接池的实现

最近在写一个FTP上传工具,用到了Apache的FTPClient,为了提高上传效率,我采用了多线程的方式,但是每个线程频繁的创建和销毁FTPClient对象势必会造成不必要的开销,因此,此处最好使用一个FTPClient连接池。仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

关于对象池

有些对象的创建开销是比较大的,比如数据库连接等。为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

如果我们要自己实现一个对象池,一般需要完成如下功能:

1. 如果池中有可用的对象,对象池应当能返回给客户端

2. 客户端把对象放回池里后,可以对这些对象进行重用

3. 对象池能够创建新的对象来满足客户端不断增长的需求

4. 需要有一个正确关闭池的机制来结束对象的生命周期

Apache的对象池工具包

为了方便我们开发自己的对象池,Apache 提供的common-pool工具包,里面包含了开发通用对象池的一些接口和实现类。其中最基本的两个接口是ObjectPool 和PoolableObjectFactory。

ObjectPool接口中有几个最基本的方法:

1. addObject() : 添加对象到池

2. borrowObject():客户端从池中借出一个对象

3. returnObject():客户端归还一个对象到池中

4. close():关闭对象池,清理内存释放资源等

5. setFactory(ObjectFactory factory):需要一个工厂来制造池中的对象

PoolableObjectFactory接口中几个最基本的方法:

1. makeObject():制造一个对象

2. destoryObject():销毁一个对象

3. validateObject():验证一个对象是否还可用

通过以上两个接口我们就可以自己实现一个对象池了。

实例:开发一个FTPClient对象池

最近在开发一个项目,需要把hdfs中的文件上传到一组ftp服务器,为了提高上传效率,自然考虑到使用多线程的方式进行上传。我上传ftp用的工具是Apache common-net包中的FTPClient,但Apache并没有提供FTPClientPool,于是为了减少FTPClient的创建销毁次数,我们就自己开发一个FTPClientPool来复用FTPClient连接。

通过上面的介绍,我们可以利用Apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的ObjectPool和PoolableObjectFactory两个接口即可。下面就看一下我写的实现:

写一个ObjectPool接口的实现FTPClientPool

import java.io.IOException;

import java.util.NoSuchElementException;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.pool.ObjectPool;

import org.apache.commons.pool.PoolableObjectFactory;

/**

* 实现了一个FTPClient连接池

* @author heaven

*/

public class FTPClientPool implements ObjectPool{

private static final int DEFAULT_POOL_SIZE = 10;

private final BlockingQueue pool;

prenomPWxuuivate final FtpClientFactory factory;

/**

* 初始化连接池,需要注入一个工厂来提供FTPClient实例

* @param factory

* @throws Exception

*/

public FTPClientPool(FtpClientFactory factory) throws Exception{

this(DEFAULT_POOL_SIZE, factory);

}

/**

*

* @param maxPoolSize

* @param factory

* @throws Exception

*/

public FTPClientPool(int poolSize, FtpClientFactory factory) throenomPWxuuws Exception {

this.factory = factory;

pool = new ArrayBlockingQueue(poolSize*2);

initPool(poolSize);

}

/**

* 初始化连接池,需要注入一个工厂来提供FTPClient实例

* @param maxPoolSize

* @throws Exception

*/

private void initPool(int maxPoolSize) throws Exception {

for(int i=0;i

//往池中添加对象

addObject();

}

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#borrowObject()

*/

public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {

FTPClient client = pool.take();

if (client == null) {

client = factory.makeObject();

addObject();

}else if(!factory.validateObject(client)){//验证不通过

//使对象在池中失效

invalidateObject(client);

//制造并添加新对象到池中

client = factory.makeObject();

addObject();

}

return client;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object)

*/

public void returnObject(FTPClient client) throws Exception {

if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) {

try {

factory.destroyObject(client);

} catch (IOException e) {

e.printStackTrace();

}

}

}

public void invalidateObject(FTPClient client) throws Exception {

//移除无效的客户端

pool.remove(client);

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPhttp://ool#addObject()

*/

public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {

//插入对象到队列

pool.offer(factory.makeObject(),3,TimeUnit.SECONDS);

}

public int getNumIdle() throws UnsupportedOperationException {

return 0;

}

public int getNumActive() throws UnsupportedOperationException {

return 0;

}

public void clear() throws Exception, UnsupportedOperationException {

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#close()

*/

public void close() throws Exception {

while(pool.iterator().hasNext()){

FTPClient client = pool.take();

factory.destroyObject(client);

}

}

public void setFactory(PoolableObjectFactory factory) throws IllegalStateException, UnsupportedOperationException {

}

}

再写一个PoolableObjectFactory接口的实现FTPClientFactory

import java.io.IOException;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPReply;

import org.apache.commons.pool.PoolableObjectFactory;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.hdfstoftp.util.FTPClientException;

/**

* FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁

* @author heaven

*/

public class FtpClientFactory implements PoolableObjectFactory {

private static Logger logger = LoggerFactory.getLogger("file");

private FTPClientConfigure config;

//给工厂传入一个参数对象,方便配置FTPClient的相关参数

public FtpClientFactory(FTPClientConfigure config){

this.config=config;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#makeObject()

*/

public FTPClient makeObject() throws Exception {

FTPClient ftpClient = new FTPClient();

ftpClient.setConnectTimeout(config.getClientTimeout());

try {

ftpClient.connect(config.getHost(), config.getPort());

int reply = ftpClient.getReplyCode();

if (!FTPReply.isPositiveCompletion(reply)) {

ftpClient.disconnect();

logger.warn("FTPServer refused connection");

return null;

}

boolean result = ftpClient.login(config.getUsername(), config.getPassword());

if (!result) {

throw new FTPClientException("ftpClient登陆失败! userName:" + config.getUsername() + " ; password:" + config.getPassword());

}

ftpClient.setFileType(config.getTransferFileType());

ftpClient.setBufferSize(1024);

ftpClient.setControlEncoding(config.getEncoding());

if (config.getPassiveMode().equals("true")) {

ftpClient.enterLocalPassiveMode();

}

} catch (IOException e) {

e.printStackTrace();

} catch (FTPClientException e) {

e.printStackTrace();

}

return ftpClient;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)

*/

public void destroyObject(FTPClient ftpClient) throws Exception {

try {

if (ftpClient != null && ftpClient.isConnected()) {

ftpClient.logout();

}

} catch (IOException io) {

io.printStackTrace();

} finally {

// 注意,一定要在finally代码中断开连接,否则会导致占用ftp连接情况

try {

ftpClient.disconnect();

} catch (IOException io) {

io.printStackTrace();

}

}

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)

*/

public boolean validateObject(FTPClient ftpClient) {

try {

return ftpClient.sendNoOp();

} catch (IOException e) {

throw new RuntimeException("Failed to validate client: " + e, e);

}

}

public void activateObject(FTPClient ftpClient) throws Exception {

}

public void passivateObject(FTPClient ftpClient) throws Exception {

}

}

最后,我们最好给工厂传递一个参数对象,方便我们设置FTPClient的一些参数

package org.apache.commons.pool.impl.contrib;

/**

* FTPClient配置类,封装了FTPClient的相关配置

*

* @author heaven

*/

public class FTPClientConfigure {

private String host;

private int port;

private String username;

private String password;

private String passiveMode;

private String encoding;

private int clientTimeout;

private int threadNum;

private int transferFileType;

private boolean renameUploaded;

private int retryTimes;

public String getHost() {

return host;

}

public void setHost(String host) {

this. host = host;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this. port = port;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this. username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this. password = password;

}

public String getPassiveMode() {

return passiveMode;

}

public void setPassiveMode(String passiveMode) {

this. passiveMode = passiveMode;

}

public String getEncoding() {

return encoding;

}

public void setEncoding(String encoding) {

this. encoding = encoding;

}

public int getClientTimeout() {

return clientTimeout;

}

public void setClientTimeout( int clientTimeout) {

this. clientTimeout = clientTimeout;

}

public int getThreadNum() {

return threadNum;

}

public void setThreadNum( int threadNum) {

this. threadNum = threadNum;

}

public int getTransferFileType() {

return transferFileType;

}

public void setTransferFileType( int transferFileType) {

this. transferFileType = transferFileType;

}

public boolean isRenameUploaded() {

return renameUploaded;

}

public void setRenameUploaded( boolean renameUploaded) {

this. renameUploaded = renameUploaded;

}

public int getRetryTimes() {

return retryTimes;

}

public void setRetryTimes( int retryTimes) {

this. retryTimes = retryTimes;

}

@Override

public String toString() {

return "FTPClientConfig [host=" + host + "\n port=" + port + "\n username=" + username + "\n password=" + password + "\n passiveMode=" + passiveMode

+ "\n encoding=" + encoding + "\n clientTimeout=" + clientTimeout + "\n threadNum=" + threadNum + "\n transferFileType="

+ transferFileType + "\n renameUploaded=" + renameUploaded + "\n retryTimes=" + retryTimes + "]" ;

}

}

FTPClientPool连接池类管理FTPClient对象的生命周期,负责对象的借出、规划、池的销毁等;FTPClientPool类依赖于FtpClientFactory类,由这个工程类来制造和销毁对象;FtpClientFactory又依赖FTPClientConfigure类,FTPClientConfigure负责封装FTPClient的配置参数。至此,我们的FTPClient连接池就开发完成了。

需要注意的是,FTPClientPool中用到了一个阻塞队列ArrayBlockingQueue来管理存放FTPClient对象,关于阻塞队列,请参考我的这篇文章: 【Java并发之】BlockingQueue

//往池中添加对象

addObject();

}

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#borrowObject()

*/

public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {

FTPClient client = pool.take();

if (client == null) {

client = factory.makeObject();

addObject();

}else if(!factory.validateObject(client)){//验证不通过

//使对象在池中失效

invalidateObject(client);

//制造并添加新对象到池中

client = factory.makeObject();

addObject();

}

return client;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object)

*/

public void returnObject(FTPClient client) throws Exception {

if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) {

try {

factory.destroyObject(client);

} catch (IOException e) {

e.printStackTrace();

}

}

}

public void invalidateObject(FTPClient client) throws Exception {

//移除无效的客户端

pool.remove(client);

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPhttp://ool#addObject()

*/

public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {

//插入对象到队列

pool.offer(factory.makeObject(),3,TimeUnit.SECONDS);

}

public int getNumIdle() throws UnsupportedOperationException {

return 0;

}

public int getNumActive() throws UnsupportedOperationException {

return 0;

}

public void clear() throws Exception, UnsupportedOperationException {

}

/* (non-Javadoc)

* @see org.apache.commons.pool.ObjectPool#close()

*/

public void close() throws Exception {

while(pool.iterator().hasNext()){

FTPClient client = pool.take();

factory.destroyObject(client);

}

}

public void setFactory(PoolableObjectFactory factory) throws IllegalStateException, UnsupportedOperationException {

}

}

再写一个PoolableObjectFactory接口的实现FTPClientFactory

import java.io.IOException;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPReply;

import org.apache.commons.pool.PoolableObjectFactory;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.hdfstoftp.util.FTPClientException;

/**

* FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁

* @author heaven

*/

public class FtpClientFactory implements PoolableObjectFactory {

private static Logger logger = LoggerFactory.getLogger("file");

private FTPClientConfigure config;

//给工厂传入一个参数对象,方便配置FTPClient的相关参数

public FtpClientFactory(FTPClientConfigure config){

this.config=config;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#makeObject()

*/

public FTPClient makeObject() throws Exception {

FTPClient ftpClient = new FTPClient();

ftpClient.setConnectTimeout(config.getClientTimeout());

try {

ftpClient.connect(config.getHost(), config.getPort());

int reply = ftpClient.getReplyCode();

if (!FTPReply.isPositiveCompletion(reply)) {

ftpClient.disconnect();

logger.warn("FTPServer refused connection");

return null;

}

boolean result = ftpClient.login(config.getUsername(), config.getPassword());

if (!result) {

throw new FTPClientException("ftpClient登陆失败! userName:" + config.getUsername() + " ; password:" + config.getPassword());

}

ftpClient.setFileType(config.getTransferFileType());

ftpClient.setBufferSize(1024);

ftpClient.setControlEncoding(config.getEncoding());

if (config.getPassiveMode().equals("true")) {

ftpClient.enterLocalPassiveMode();

}

} catch (IOException e) {

e.printStackTrace();

} catch (FTPClientException e) {

e.printStackTrace();

}

return ftpClient;

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)

*/

public void destroyObject(FTPClient ftpClient) throws Exception {

try {

if (ftpClient != null && ftpClient.isConnected()) {

ftpClient.logout();

}

} catch (IOException io) {

io.printStackTrace();

} finally {

// 注意,一定要在finally代码中断开连接,否则会导致占用ftp连接情况

try {

ftpClient.disconnect();

} catch (IOException io) {

io.printStackTrace();

}

}

}

/* (non-Javadoc)

* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)

*/

public boolean validateObject(FTPClient ftpClient) {

try {

return ftpClient.sendNoOp();

} catch (IOException e) {

throw new RuntimeException("Failed to validate client: " + e, e);

}

}

public void activateObject(FTPClient ftpClient) throws Exception {

}

public void passivateObject(FTPClient ftpClient) throws Exception {

}

}

最后,我们最好给工厂传递一个参数对象,方便我们设置FTPClient的一些参数

package org.apache.commons.pool.impl.contrib;

/**

* FTPClient配置类,封装了FTPClient的相关配置

*

* @author heaven

*/

public class FTPClientConfigure {

private String host;

private int port;

private String username;

private String password;

private String passiveMode;

private String encoding;

private int clientTimeout;

private int threadNum;

private int transferFileType;

private boolean renameUploaded;

private int retryTimes;

public String getHost() {

return host;

}

public void setHost(String host) {

this. host = host;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this. port = port;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this. username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this. password = password;

}

public String getPassiveMode() {

return passiveMode;

}

public void setPassiveMode(String passiveMode) {

this. passiveMode = passiveMode;

}

public String getEncoding() {

return encoding;

}

public void setEncoding(String encoding) {

this. encoding = encoding;

}

public int getClientTimeout() {

return clientTimeout;

}

public void setClientTimeout( int clientTimeout) {

this. clientTimeout = clientTimeout;

}

public int getThreadNum() {

return threadNum;

}

public void setThreadNum( int threadNum) {

this. threadNum = threadNum;

}

public int getTransferFileType() {

return transferFileType;

}

public void setTransferFileType( int transferFileType) {

this. transferFileType = transferFileType;

}

public boolean isRenameUploaded() {

return renameUploaded;

}

public void setRenameUploaded( boolean renameUploaded) {

this. renameUploaded = renameUploaded;

}

public int getRetryTimes() {

return retryTimes;

}

public void setRetryTimes( int retryTimes) {

this. retryTimes = retryTimes;

}

@Override

public String toString() {

return "FTPClientConfig [host=" + host + "\n port=" + port + "\n username=" + username + "\n password=" + password + "\n passiveMode=" + passiveMode

+ "\n encoding=" + encoding + "\n clientTimeout=" + clientTimeout + "\n threadNum=" + threadNum + "\n transferFileType="

+ transferFileType + "\n renameUploaded=" + renameUploaded + "\n retryTimes=" + retryTimes + "]" ;

}

}

FTPClientPool连接池类管理FTPClient对象的生命周期,负责对象的借出、规划、池的销毁等;FTPClientPool类依赖于FtpClientFactory类,由这个工程类来制造和销毁对象;FtpClientFactory又依赖FTPClientConfigure类,FTPClientConfigure负责封装FTPClient的配置参数。至此,我们的FTPClient连接池就开发完成了。

需要注意的是,FTPClientPool中用到了一个阻塞队列ArrayBlockingQueue来管理存放FTPClient对象,关于阻塞队列,请参考我的这篇文章: 【Java并发之】BlockingQueue


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:vue中element
下一篇:共享文件系统有哪些软件(共享文件系统有哪些软件)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~