使用Apache commons-pool2实现高效的FTPClient连接池的方法

一. 连接池概述

​ 频繁的建立和关闭连接,会极大的降低系统的性能,而连接池会在初始化的时候会创建一定数量的连接,每次访问只需从连接池里获取连接,使用完毕后再放回连接池,并不是直接关闭连接,这样可以保证程序重复使用同一个连接而不需要每次访问都建立和关闭连接, 从而提高系统性能。有些对象的创建开销是比较大的,比如数据库连接等。为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

二. commons-pool2介绍

2.1 pool2的引入

<dependency>

<groupId>commons-lang</groupId>

<artifactId>commons-lang</artifactId>

<version>2.5</version>

</dependency>

<dependency>

<groupId>org.apache.commons</groupId>

<artifactId>commons-pool2</artifactId>

<version>2.8.0</version>

</dependency>

<dependency>

<groupId>commons-net</groupId>

<artifactId>commons-net</artifactId>

<version>3.6</version>

</dependency>

<dependency>

<groupId>commons-io</groupId>

<artifactId>commons-io</artifactId>

<version>2.6</version>

</dependency>

2.2 pool2的组成

PooledObject(池化对象) PooledObjectFactory(对象工厂) ObjectPool (对象池)

对应为: FTPClient(池化对象) FTPClientFactory(对象工厂) FTPClientPool(对象池)

关系图:

关系图

三. 实现连接池

3.1 配置FtpClient

我们已经有现成的池化对象(FtpClient)了,只需要添加配置即可,FTPClientConfig【FTP连接配置类】

/**

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*

*/

package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**

* @desc: demo

* @name: FTPClientConfig.java

* @author: tompai

* @email:liinux@qq.com

* @createTime: 2019年12月31日 下午12:53:48

* @history:

* @version: v1.0

*/

public class FTPClientConfig extends GenericObjectPoolConfig<FTPClient> {

private String host;// 主机名

private int port = 21;// 端口

private String username;// 用户名

private String password;// 密码

private int connectTimeOut = 5000;// ftp 连接超时时间 毫秒

private String controlEncoding = "utf-8";

private int bufferSize = 1024;// 缓冲区大小

private int fileType = 2;// 传输数据格式 2表binary二进制数据

private int dataTimeout = 120 * 1000;

private boolean useEPSVwithIPv4 = false;

private boolean passiveMode = true;// 是否启用被动模式

private int threadNum=1;//开启线程数

private int transferFileType=FTPClient.BINARY_FILE_TYPE;//传输文件类型

private boolean renameUploaded=false;//是否上传文件重命名;

private int retryTimes=3;//重试次数

public String getHost() {

return host;

}

public void setHost(String host) {

this.host = host;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this.username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

public int getConnectTimeOut() {

return connectTimeOut;

}

public void setConnectTimeOut(int connectTimeOut) {

this.connectTimeOut = connectTimeOut;

}

public String getControlEncoding() {

return controlEncoding;

}

public void setControlEncoding(String controlEncoding) {

this.controlEncoding = controlEncoding;

}

public int getBufferSize() {

return bufferSize;

}

public void setBufferSize(int bufferSize) {

this.bufferSize = bufferSize;

}

public int getFileType() {

return fileType;

}

public void setFileType(int fileType) {

this.fileType = fileType;

}

public int getDataTimeout() {

return dataTimeout;

}

public void setDataTimeout(int dataTimeout) {

this.dataTimeout = dataTimeout;

}

public boolean isUseEPSVwithIPv4() {

return useEPSVwithIPv4;

}

public void setUseEPSVwithIPv4(boolean useEPSVwithIPv4) {

this.useEPSVwithIPv4 = useEPSVwithIPv4;

}

public boolean isPassiveMode() {

return passiveMode;

}

public void setPassiveMode(boolean passiveMode) {

this.passiveMode = passiveMode;

}

public int getThreadNum() {

return threadNum;

}

public void setThreadNum(int threadNum) {

this.threadNum = threadNum;

}

public int getTransferFileType() {

return transferFileType;

}

public void setTransferFileType(int transferFileType) {

this.transferFileType = transferFileType;

}

public boolean isRenameUploaded() {

return renameUploaded;

}

public void setRenameUploaded(boolean renameUploaded) {

this.renameUploaded = renameUploaded;

}

public int getRetryTimes() {

return retryTimes;

}

public void setRetryTimes(int retryTimes) {

this.retryTimes = retryTimes;

}

@Override

public String toString() {

return "{\"host\":\"" + host + "\", \"port\":\"" + port + "\", \"username\":\"" + username

+ "\", \"password\":\"" + password + "\", \"connectTimeOut\":\"" + connectTimeOut

+ "\", \"controlEncoding\":\"" + controlEncoding + "\", \"bufferSize\":\"" + bufferSize

+ "\", \"fileType\":\"" + fileType + "\", \"dataTimeout\":\"" + dataTimeout

+ "\", \"useEPSVwithIPv4\":\"" + useEPSVwithIPv4 + "\", \"passiveMode\":\"" + passiveMode

+ "\", \"threadNum\":\"" + threadNum + "\", \"transferFileType\":\"" + transferFileType

+ "\", \"renameUploaded\":\"" + renameUploaded + "\", \"retryTimes\":\"" + retryTimes + "\"}";

}

}

3.2 创建FTPClientFactory

​​ 在commons-pool2中有两种工厂:PooledObjectFactory 和KeyedPooledObjectFactory,在此使用PooledObjectFactory。

/**

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*

*/

package com.tompai.ftp.pool;

import java.io.IOException;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPReply;

import org.apache.commons.pool2.BasePooledObjectFactory;

import org.apache.commons.pool2.PooledObject;

import org.apache.commons.pool2.impl.DefaultPooledObject;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* @desc: demo

* @name: FTPClientFactory.java

* @author: tompai

* @email:liinux@qq.com

* @createTime: 2019年12月31日 下午12:52:02

* @history:

* @version: v1.0

*/

public class FTPClientFactory extends BasePooledObjectFactory<FTPClient> {

private static Logger logger = LoggerFactory.getLogger(FTPClientFactory.class);

private FTPClientConfig config;

public FTPClientConfig getFtpPoolConfig() {

return config;

}

public void setFtpPoolConfig(FTPClientConfig ftpPoolConfig) {

this.config = ftpPoolConfig;

}

/**

* 新建对象

*/

@Override

public FTPClient create() throws Exception {

FTPClient ftpClient = new FTPClient();

ftpClient.setConnectTimeout(config.getConnectTimeOut());

try {

logger.info("连接ftp服务器:" + config.getHost() + ":" + config.getPort());

ftpClient.connect(config.getHost(), config.getPort());

int reply = ftpClient.getReplyCode();

if (!FTPReply.isPositiveCompletion(reply)) {

ftpClient.disconnect();

logger.error("FTPServer 拒绝连接!");

return null;

}

boolean result = ftpClient.login(config.getUsername(), config.getPassword());

if (!result) {

logger.error("ftpClient登录失败!");

throw new Exception(

"ftpClient登录失败! userName:" + config.getUsername() + ", password:" + config.getPassword());

}

ftpClient.setControlEncoding(config.getControlEncoding());

ftpClient.setBufferSize(config.getBufferSize());

ftpClient.setFileType(config.getFileType());

ftpClient.setDataTimeout(config.getDataTimeout());

ftpClient.setUseEPSVwithIPv4(config.isUseEPSVwithIPv4());

if (config.isPassiveMode()) {

logger.info("进入ftp被动模式");

ftpClient.enterLocalPassiveMode();// 进入被动模式

}

} catch (IOException e) {

logger.error("FTP连接失败:", e);

}

return ftpClient;

}

@Override

public PooledObject<FTPClient> wrap(FTPClient ftpClient) {

return new DefaultPooledObject<FTPClient>(ftpClient);

}

/**

* 销毁对象

*/

@Override

public void destroyObject(PooledObject<FTPClient> p) throws Exception {

FTPClient ftpClient = p.getObject();

if (ftpClient != null && ftpClient.isConnected()) {

ftpClient.logout();

ftpClient.disconnect();

super.destroyObject(p);

}

}

/**

* 验证对象

*/

@Override

public boolean validateObject(PooledObject<FTPClient> p) {

FTPClient ftpClient = p.getObject();

boolean connect = false;

try {

connect = ftpClient.sendNoOp();

} catch (IOException e) {

e.printStackTrace();

}

return connect;

}

/**

* No-op.

*

* @param p ignored

*/

@Override

public void activateObject(PooledObject<FTPClient> p) throws Exception {

// The default implementation is a no-op.

}

/**

* No-op.

*

* @param p ignored

*/

@Override

public void passivateObject(PooledObject<FTPClient> p) throws Exception {

// The default implementation is a no-op.

}

}

3.3 实现FTPClientPool

​ 在commons-pool2中预设了三个可以直接使用的对象池:GenericObjectPool、GenericKeyedObjectPool和SoftReferenceObjectPool,在此使用GenericObjectPool<FTPClient>

/**

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*

*/

package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.pool2.impl.GenericObjectPool;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* @desc: demo

* @name: FTPClientPool.java

* @author: tompai

* @email:liinux@qq.com

* @createTime: 2019年12月31日 下午12:54:10

* @history:

* @version: v1.0

*/

public class FTPClientPool {

private static Logger logger =LoggerFactory.getLogger(FTPClientPool.class);

private GenericObjectPool<FTPClient> pool;

private FTPClientFactory factory;

public FTPClientPool(FTPClientFactory clientFactory) {

this.factory = clientFactory;

pool = new GenericObjectPool<FTPClient>(clientFactory, clientFactory.getFtpPoolConfig());

}

public FTPClientFactory getClientFactory() {

return factory;

}

public GenericObjectPool<FTPClient> getPool() {

return pool;

}

/**

* 借 获取一个连接对象

*

* @return

* @throws Exception

*/

public FTPClient borrowObject() throws Exception {

FTPClient client = pool.borrowObject();

if (!client.sendNoOp()) {

// 使池中的对象无效

client.logout();

client.disconnect();

pool.invalidateObject(client);

client = factory.create();

pool.addObject();

}

return client;

}

/**

* 还 归还一个连接对象

*

* @param ftpClient

*/

public void returnObject(FTPClient ftpClient) {

if (ftpClient != null) {

pool.returnObject(ftpClient);

}

}

}

3.4 连接池使用工具类FTPClientHelper

/**

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*

*/

package com.tompai.ftp.pool;

import java.io.IOException;

import java.io.InputStream;

import org.apache.commons.io.IOUtils;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPFile;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* @desc: demo

* @name: FTPClientHelper.java

* @author: tompai

* @email:liinux@qq.com

* @createTime: 2019年12月31日 下午1:14:20

* @history:

* @version: v1.0

*/

public class FTPClientHelper implements AutoCloseable {

private static Logger logger = LoggerFactory.getLogger(FTPClientHelper.class);

private FTPClientPool pool;

public void setFtpClientPool(FTPClientPool ftpClientPool) {

this.pool = ftpClientPool;

}

/**

* 列出目录下的所有文件

* @author: tompai

* @createTime: 2019年12月31日 下午1:56:02

* @history:

* @param pathname

* @return

* @throws Exception FTPFile[]

*/

public FTPFile[] listFiles(String pathname) {

FTPClient client = getClient();

try {

return client.listFiles(pathname);

} catch (IOException e) {

//TODO Auto-generated catch block

logger.error(e.getMessage());

e.printStackTrace();

}finally {

pool.returnObject(client);

}

return null;

}

/**

* 下载 remote文件流

* @author: tompai

* @createTime: 2019年12月31日 下午1:52:07

* @history:

* @param remote

* @return byte[]

*/

public byte[] retrieveFileStream(String remote) {

FTPClient client = getClient();

InputStream in = null;

//byte[] result=new Byte[]

try {

in = client.retrieveFileStream(remote);

return IOUtils.toByteArray(in);

} catch (IOException e) {

logger.error(e.getMessage());

// TODO Auto-generated catch block

e.printStackTrace();

} finally {

pool.returnObject(client);

}

return null;

}

/**

* 上传文件

* @author: tompai

* @createTime: 2019年12月31日 下午1:53:07

* @history:

* @param remote

* @param local

* @return

* @throws Exception boolean

*/

public boolean storeFile(String remote, InputStream local) throws Exception {

FTPClient client = getClient();

try {

return client.storeFile(remote, local);

} finally {

pool.returnObject(client);

}

}

/**

* 创建目录 单个不可递归

* @author: tompai

* @createTime: 2019年12月31日 下午1:52:24

* @history:

* @param pathname

* @return

* @throws Exception boolean

*/

public boolean makeDirectory(String pathname) throws Exception {

FTPClient client = getClient();

try {

return client.makeDirectory(pathname);

} finally {

pool.returnObject(client);

}

}

/**

* 删除目录,单个不可递归

* @author: tompai

* @createTime: 2019年12月31日 下午1:52:42

* @history:

* @param pathname

* @return

* @throws Exception boolean

*/

public boolean removeDirectory(String pathname) throws Exception {

FTPClient client = getClient();

try {

return client.removeDirectory(pathname);

} finally {

pool.returnObject(client);

}

}

/**

* 删除文件 单个 ,不可递归

* @author: tompai

* @createTime: 2019年12月31日 下午1:52:54

* @history:

* @param pathname

* @return

* @throws Exception boolean

*/

public boolean deleteFile(String pathname) throws Exception {

FTPClient client = getClient();

try {

return client.deleteFile(pathname);

} finally {

pool.returnObject(client);

}

}

/**

* 获取一个连接对象

*

* @author: tompai

* @createTime: 2019年12月31日 下午1:45:46

* @history:

* @return FTPClient

*/

private FTPClient getClient() {

FTPClient client = null;

try {

client = pool.borrowObject();

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

return client;

}

/**

* 释放连接

*

* @author: tompai

* @createTime: 2019年12月31日 下午1:45:03

* @history:

* @param client void

*/

private void releaseClient(FTPClient client) {

if (client == null) {

return;

}

try {

pool.returnObject(client);

} catch (Exception e) {

logger.error("Could not return the ftpClient to the pool", e);

// destoryFtpClient

if (client.isAvailable()) {

try {

client.logout();

client.disconnect();

pool.getPool().invalidateObject(client);

} catch (Exception io) {

logger.error(io.getMessage());

}

}

}

}

@Override

public void close() throws Exception {

// TODO Auto-generated method stub

logger.info("---Resources Closed---.");

}

}

3.5 连接池测试类FTPLinkPoolTest

在此可以自己使用FileZilla Server for Windows搭建一个简单的FTP服务器;

/**

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

* http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*

*/

package com.tompai.ftp.pool;

import org.apache.commons.net.ftp.FTPFile;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

/**

* @desc: demo

* @name: FTPLinkPoolTest.java

* @author: tompai

* @email:liinux@qq.com

* @createTime: 2019年12月31日 下午1:13:51

* @history:

* @version: v1.0

*/

public class FTPLinkPoolTest {

private static Logger logger =LoggerFactory.getLogger(FTPClientPool.class);

/**

* @author: tompai

* @createTime: 2019年12月31日 下午1:13:51

* @history:

* @param args void

*/

public static void main(String[] args) {

//TODO Auto-generated method stub

FTPClientConfig conf=new FTPClientConfig();

conf.setHost("127.0.0.1");

conf.setUsername("test");

conf.setPassword("test");

FTPClientFactory factory=new FTPClientFactory();

factory.setFtpPoolConfig(conf);

FTPClientPool pool=new FTPClientPool(factory);

FTPClientHelper clientHelper=new FTPClientHelper();

clientHelper.setFtpClientPool(pool);

String pathname="/0821";

FTPFile[] files=clientHelper.listFiles(pathname);

for(FTPFile file:files) {

String name=file.getName();

logger.info("name:{}",name);

}

}

}

3.6 测试结果