多平台统一管理软件接口,如何实现多平台统一管理软件接口
450
2023-01-28
Java FTPClient连接池的实现
最近在写一个FTP上传工具,用到了Apache的FTPClient,为了提高上传效率,我采用了多线程的方式,但是每个线程频繁的创建和销毁FTPClient对象势必会造成不必要的开销,因此,此处最好使用一个FTPClient连接池。仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool。下面就大体介绍一下开发连接池的整个过程,供大家参考。
关于对象池
有些对象的创建开销是比较大的,比如数据库连接等。为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。
如果我们要自己实现一个对象池,一般需要完成如下功能:
1. 如果池中有可用的对象,对象池应当能返回给客户端
2. 客户端把对象放回池里后,可以对这些对象进行重用
3. 对象池能够创建新的对象来满足客户端不断增长的需求
4. 需要有一个正确关闭池的机制来结束对象的生命周期
Apache的对象池工具包
为了方便我们开发自己的对象池,Apache 提供的common-pool工具包,里面包含了开发通用对象池的一些接口和实现类。其中最基本的两个接口是ObjectPool 和PoolableObjectFactory。
ObjectPool接口中有几个最基本的方法:
1. addObject() : 添加对象到池
2. borrowObject():客户端从池中借出一个对象
3. returnObject():客户端归还一个对象到池中
4. close():关闭对象池,清理内存释放资源等
5. setFactory(ObjectFactory factory):需要一个工厂来制造池中的对象
PoolableObjectFactory接口中几个最基本的方法:
1. makeObject():制造一个对象
2. destoryObject():销毁一个对象
3. validateObject():验证一个对象是否还可用
通过以上两个接口我们就可以自己实现一个对象池了。
实例:开发一个FTPClient对象池
最近在开发一个项目,需要把hdfs中的文件上传到一组ftp服务器,为了提高上传效率,自然考虑到使用多线程的方式进行上传。我上传ftp用的工具是Apache common-net包中的FTPClient,但Apache并没有提供FTPClientPool,于是为了减少FTPClient的创建销毁次数,我们就自己开发一个FTPClientPool来复用FTPClient连接。
通过上面的介绍,我们可以利用Apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的ObjectPool和PoolableObjectFactory两个接口即可。下面就看一下我写的实现:
写一个ObjectPool接口的实现FTPClientPool
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
/**
* 实现了一个FTPClient连接池
* @author heaven
*/
public class FTPClientPool implements ObjectPool
private static final int DEFAULT_POOL_SIZE = 10;
private final BlockingQueue
prenomPWxuuivate final FtpClientFactory factory;
/**
* 初始化连接池,需要注入一个工厂来提供FTPClient实例
* @param factory
* @throws Exception
*/
public FTPClientPool(FtpClientFactory factory) throws Exception{
this(DEFAULT_POOL_SIZE, factory);
}
/**
*
* @param maxPoolSize
* @param factory
* @throws Exception
*/
public FTPClientPool(int poolSize, FtpClientFactory factory) throenomPWxuuws Exception {
this.factory = factory;
pool = new ArrayBlockingQueue
initPool(poolSize);
}
/**
* 初始化连接池,需要注入一个工厂来提供FTPClient实例
* @param maxPoolSize
* @throws Exception
*/
private void initPool(int maxPoolSize) throws Exception {
for(int i=0;i //往池中添加对象 addObject(); } } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#borrowObject() */ public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException { FTPClient client = pool.take(); if (client == null) { client = factory.makeObject(); addObject(); }else if(!factory.validateObject(client)){//验证不通过 //使对象在池中失效 invalidateObject(client); //制造并添加新对象到池中 client = factory.makeObject(); addObject(); } return client; } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object) */ public void returnObject(FTPClient client) throws Exception { if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) { try { factory.destroyObject(client); } catch (IOException e) { e.printStackTrace(); } } } public void invalidateObject(FTPClient client) throws Exception { //移除无效的客户端 pool.remove(client); } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPhttp://ool#addObject() */ public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { //插入对象到队列 pool.offer(factory.makeObject(),3,TimeUnit.SECONDS); } public int getNumIdle() throws UnsupportedOperationException { return 0; } public int getNumActive() throws UnsupportedOperationException { return 0; } public void clear() throws Exception, UnsupportedOperationException { } /* (non-Javadoc) * @see org.apache.commons.pool.ObjectPool#close() */ public void close() throws Exception { while(pool.iterator().hasNext()){ FTPClient client = pool.take(); factory.destroyObject(client); } } public void setFactory(PoolableObjectFactory } } 再写一个PoolableObjectFactory接口的实现FTPClientFactory import java.io.IOException; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.pool.PoolableObjectFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hdfstoftp.util.FTPClientException; /** * FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁 * @author heaven */ public class FtpClientFactory implements PoolableObjectFactory private static Logger logger = LoggerFactory.getLogger("file"); private FTPClientConfigure config; //给工厂传入一个参数对象,方便配置FTPClient的相关参数 public FtpClientFactory(FTPClientConfigure config){ this.config=config; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() */ public FTPClient makeObject() throws Exception { FTPClient ftpClient = new FTPClient(); ftpClient.setConnectTimeout(config.getClientTimeout()); try { ftpClient.connect(config.getHost(), config.getPort()); int reply = ftpClient.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftpClient.disconnect(); logger.warn("FTPServer refused connection"); return null; } boolean result = ftpClient.login(config.getUsername(), config.getPassword()); if (!result) { throw new FTPClientException("ftpClient登陆失败! userName:" + config.getUsername() + " ; password:" + config.getPassword()); } ftpClient.setFileType(config.getTransferFileType()); ftpClient.setBufferSize(1024); ftpClient.setControlEncoding(config.getEncoding()); if (config.getPassiveMode().equals("true")) { ftpClient.enterLocalPassiveMode(); } } catch (IOException e) { e.printStackTrace(); } catch (FTPClientException e) { e.printStackTrace(); } return ftpClient; } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ public void destroyObject(FTPClient ftpClient) throws Exception { try { if (ftpClient != null && ftpClient.isConnected()) { ftpClient.logout(); } } catch (IOException io) { io.printStackTrace(); } finally { // 注意,一定要在finally代码中断开连接,否则会导致占用ftp连接情况 try { ftpClient.disconnect(); } catch (IOException io) { io.printStackTrace(); } } } /* (non-Javadoc) * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ public boolean validateObject(FTPClient ftpClient) { try { return ftpClient.sendNoOp(); } catch (IOException e) { throw new RuntimeException("Failed to validate client: " + e, e); } } public void activateObject(FTPClient ftpClient) throws Exception { } public void passivateObject(FTPClient ftpClient) throws Exception { } } 最后,我们最好给工厂传递一个参数对象,方便我们设置FTPClient的一些参数 package org.apache.commons.pool.impl.contrib; /** * FTPClient配置类,封装了FTPClient的相关配置 * * @author heaven */ public class FTPClientConfigure { private String host; private int port; private String username; private String password; private String passiveMode; private String encoding; private int clientTimeout; private int threadNum; private int transferFileType; private boolean renameUploaded; private int retryTimes; public String getHost() { return host; } public void setHost(String host) { this. host = host; } public int getPort() { return port; } public void setPort(int port) { this. port = port; } public String getUsername() { return username; } public void setUsername(String username) { this. username = username; } public String getPassword() { return password; } public void setPassword(String password) { this. password = password; } public String getPassiveMode() { return passiveMode; } public void setPassiveMode(String passiveMode) { this. passiveMode = passiveMode; } public String getEncoding() { return encoding; } public void setEncoding(String encoding) { this. encoding = encoding; } public int getClientTimeout() { return clientTimeout; } public void setClientTimeout( int clientTimeout) { this. clientTimeout = clientTimeout; } public int getThreadNum() { return threadNum; } public void setThreadNum( int threadNum) { this. threadNum = threadNum; } public int getTransferFileType() { return transferFileType; } public void setTransferFileType( int transferFileType) { this. transferFileType = transferFileType; } public boolean isRenameUploaded() { return renameUploaded; } public void setRenameUploaded( boolean renameUploaded) { this. renameUploaded = renameUploaded; } public int getRetryTimes() { return retryTimes; } public void setRetryTimes( int retryTimes) { this. retryTimes = retryTimes; } @Override public String toString() { return "FTPClientConfig [host=" + host + "\n port=" + port + "\n username=" + username + "\n password=" + password + "\n passiveMode=" + passiveMode + "\n encoding=" + encoding + "\n clientTimeout=" + clientTimeout + "\n threadNum=" + threadNum + "\n transferFileType=" + transferFileType + "\n renameUploaded=" + renameUploaded + "\n retryTimes=" + retryTimes + "]" ; } } FTPClientPool连接池类管理FTPClient对象的生命周期,负责对象的借出、规划、池的销毁等;FTPClientPool类依赖于FtpClientFactory类,由这个工程类来制造和销毁对象;FtpClientFactory又依赖FTPClientConfigure类,FTPClientConfigure负责封装FTPClient的配置参数。至此,我们的FTPClient连接池就开发完成了。 需要注意的是,FTPClientPool中用到了一个阻塞队列ArrayBlockingQueue来管理存放FTPClient对象,关于阻塞队列,请参考我的这篇文章: 【Java并发之】BlockingQueue
//往池中添加对象
addObject();
}
}
/* (non-Javadoc)
* @see org.apache.commons.pool.ObjectPool#borrowObject()
*/
public FTPClient borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
FTPClient client = pool.take();
if (client == null) {
client = factory.makeObject();
addObject();
}else if(!factory.validateObject(client)){//验证不通过
//使对象在池中失效
invalidateObject(client);
//制造并添加新对象到池中
client = factory.makeObject();
addObject();
}
return client;
}
/* (non-Javadoc)
* @see org.apache.commons.pool.ObjectPool#returnObject(java.lang.Object)
*/
public void returnObject(FTPClient client) throws Exception {
if ((client != null) && !pool.offer(client,3,TimeUnit.SECONDS)) {
try {
factory.destroyObject(client);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void invalidateObject(FTPClient client) throws Exception {
//移除无效的客户端
pool.remove(client);
}
/* (non-Javadoc)
* @see org.apache.commons.pool.ObjectPhttp://ool#addObject()
*/
public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
//插入对象到队列
pool.offer(factory.makeObject(),3,TimeUnit.SECONDS);
}
public int getNumIdle() throws UnsupportedOperationException {
return 0;
}
public int getNumActive() throws UnsupportedOperationException {
return 0;
}
public void clear() throws Exception, UnsupportedOperationException {
}
/* (non-Javadoc)
* @see org.apache.commons.pool.ObjectPool#close()
*/
public void close() throws Exception {
while(pool.iterator().hasNext()){
FTPClient client = pool.take();
factory.destroyObject(client);
}
}
public void setFactory(PoolableObjectFactory
}
}
再写一个PoolableObjectFactory接口的实现FTPClientFactory
import java.io.IOException;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool.PoolableObjectFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hdfstoftp.util.FTPClientException;
/**
* FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁
* @author heaven
*/
public class FtpClientFactory implements PoolableObjectFactory
private static Logger logger = LoggerFactory.getLogger("file");
private FTPClientConfigure config;
//给工厂传入一个参数对象,方便配置FTPClient的相关参数
public FtpClientFactory(FTPClientConfigure config){
this.config=config;
}
/* (non-Javadoc)
* @see org.apache.commons.pool.PoolableObjectFactory#makeObject()
*/
public FTPClient makeObject() throws Exception {
FTPClient ftpClient = new FTPClient();
ftpClient.setConnectTimeout(config.getClientTimeout());
try {
ftpClient.connect(config.getHost(), config.getPort());
int reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
logger.warn("FTPServer refused connection");
return null;
}
boolean result = ftpClient.login(config.getUsername(), config.getPassword());
if (!result) {
throw new FTPClientException("ftpClient登陆失败! userName:" + config.getUsername() + " ; password:" + config.getPassword());
}
ftpClient.setFileType(config.getTransferFileType());
ftpClient.setBufferSize(1024);
ftpClient.setControlEncoding(config.getEncoding());
if (config.getPassiveMode().equals("true")) {
ftpClient.enterLocalPassiveMode();
}
} catch (IOException e) {
e.printStackTrace();
} catch (FTPClientException e) {
e.printStackTrace();
}
return ftpClient;
}
/* (non-Javadoc)
* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
*/
public void destroyObject(FTPClient ftpClient) throws Exception {
try {
if (ftpClient != null && ftpClient.isConnected()) {
ftpClient.logout();
}
} catch (IOException io) {
io.printStackTrace();
} finally {
// 注意,一定要在finally代码中断开连接,否则会导致占用ftp连接情况
try {
ftpClient.disconnect();
} catch (IOException io) {
io.printStackTrace();
}
}
}
/* (non-Javadoc)
* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)
*/
public boolean validateObject(FTPClient ftpClient) {
try {
return ftpClient.sendNoOp();
} catch (IOException e) {
throw new RuntimeException("Failed to validate client: " + e, e);
}
}
public void activateObject(FTPClient ftpClient) throws Exception {
}
public void passivateObject(FTPClient ftpClient) throws Exception {
}
}
最后,我们最好给工厂传递一个参数对象,方便我们设置FTPClient的一些参数
package org.apache.commons.pool.impl.contrib;
/**
* FTPClient配置类,封装了FTPClient的相关配置
*
* @author heaven
*/
public class FTPClientConfigure {
private String host;
private int port;
private String username;
private String password;
private String passiveMode;
private String encoding;
private int clientTimeout;
private int threadNum;
private int transferFileType;
private boolean renameUploaded;
private int retryTimes;
public String getHost() {
return host;
}
public void setHost(String host) {
this. host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this. port = port;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this. username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this. password = password;
}
public String getPassiveMode() {
return passiveMode;
}
public void setPassiveMode(String passiveMode) {
this. passiveMode = passiveMode;
}
public String getEncoding() {
return encoding;
}
public void setEncoding(String encoding) {
this. encoding = encoding;
}
public int getClientTimeout() {
return clientTimeout;
}
public void setClientTimeout( int clientTimeout) {
this. clientTimeout = clientTimeout;
}
public int getThreadNum() {
return threadNum;
}
public void setThreadNum( int threadNum) {
this. threadNum = threadNum;
}
public int getTransferFileType() {
return transferFileType;
}
public void setTransferFileType( int transferFileType) {
this. transferFileType = transferFileType;
}
public boolean isRenameUploaded() {
return renameUploaded;
}
public void setRenameUploaded( boolean renameUploaded) {
this. renameUploaded = renameUploaded;
}
public int getRetryTimes() {
return retryTimes;
}
public void setRetryTimes( int retryTimes) {
this. retryTimes = retryTimes;
}
@Override
public String toString() {
return "FTPClientConfig [host=" + host + "\n port=" + port + "\n username=" + username + "\n password=" + password + "\n passiveMode=" + passiveMode
+ "\n encoding=" + encoding + "\n clientTimeout=" + clientTimeout + "\n threadNum=" + threadNum + "\n transferFileType="
+ transferFileType + "\n renameUploaded=" + renameUploaded + "\n retryTimes=" + retryTimes + "]" ;
}
}
FTPClientPool连接池类管理FTPClient对象的生命周期,负责对象的借出、规划、池的销毁等;FTPClientPool类依赖于FtpClientFactory类,由这个工程类来制造和销毁对象;FtpClientFactory又依赖FTPClientConfigure类,FTPClientConfigure负责封装FTPClient的配置参数。至此,我们的FTPClient连接池就开发完成了。
需要注意的是,FTPClientPool中用到了一个阻塞队列ArrayBlockingQueue来管理存放FTPClient对象,关于阻塞队列,请参考我的这篇文章: 【Java并发之】BlockingQueue
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~