共计 13544 个字符,预计需要花费 34 分钟才能阅读完成。
1. JSCH 应用办法
jsch 应用办法
2. JSCH 工具类
JSCH 工具类
3. 创立连接池
ConnectionPool.java
@Slf4j
public class ConnectionPool {
private String strictHostKeyChecking;
private Integer timeout;
/**
* ip 地址
*/
private String ip = "";
/**
* 端口号
*/
private Integer port = 22;
/**
* 用户名
*/
private String username = "";
/**
* 明码
*/
private String password = "";
/**
* 每次扩容减少几个连贯
*/
private int incrementalConnections = 2;
/**
* 最大连接数
*/
private int maxConnections = 10;
/**
* 最大闲暇连贯
*/
private int maxIdle = 4;
/**
* 最小闲暇连贯
*/
private int minIdel = 2;
private Vector<PooledConnection> connections = null;
@PostConstruct
private void init() {createPool();
}
/**
* 构造方法
*
* @param strictHostKeyChecking 连贯模式
* @param timeout 超时工夫
*/
public ConnectionPool(String strictHostKeyChecking, Integer timeout) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
}
/**
* 构造方法
*
* @param strictHostKeyChecking 连贯模式
* @param timeout 超时工夫
* @param incrementalConnections 增量大小
*/
public ConnectionPool(String strictHostKeyChecking,
Integer timeout,
int incrementalConnections) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
this.incrementalConnections = incrementalConnections;
}
/**
* 构造方法
*
* @param strictHostKeyChecking 连贯模式
* @param timeout 超时工夫
* @param incrementalConnections 增量大小
* @param maxConnections 连接池最大连接数
*/
public ConnectionPool(String strictHostKeyChecking,
Integer timeout,
int incrementalConnections,
int maxConnections) {
this.strictHostKeyChecking = strictHostKeyChecking;
this.timeout = timeout;
this.incrementalConnections = incrementalConnections;
this.maxConnections = maxConnections;
}
/**
* 创立连接池,判断连接池是否创立,如果连接池没有创立则创立连接池
*/
public synchronized void createPool() {if (Objects.nonNull(connections)) {return;}
connections = new Vector<>();
log.info("create shell connectionPool success!");
}
/**
* 创立指定数量的连贯放入连接池中
*
* @param numConnections 创立数量
* @throws JSchException 建设近程连贯异样
*/
private void createConnections(int numConnections) throws JSchException {for (int x = 0; x < numConnections; x++) {
// 判断是否已达连接池最大连贯,如果达到最大连贯数据则不再创立连贯
if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {break;}
// 在连接池中新增一个连贯
try {connections.addElement(new PooledConnection(newConnection(), ip));
} catch (JSchException e) {log.error("create shell connection failed {}", e.getMessage());
throw new JSchException();}
log.info("Session connected!");
}
}
/**
* 新一个连贯 session
*
* @return 创立的 session
* @throws JSchException 近程连贯异样
*/
private Session newConnection() throws JSchException {
// 创立一个 session
JSch jsch = new JSch();
Session session = jsch.getSession(username, ip, port);
session.setPassword(password);
Properties sshConfig = new Properties();
sshConfig.put("StrictHostKeyChecking", strictHostKeyChecking);
session.setConfig(sshConfig);
session.connect(timeout);
session.setServerAliveInterval(1800);
return session;
}
/**
* 获取一个可用 session
*
* @param ip ip 地址
* @param port 端口号
* @param username 用户名
* @param password 明码
* @return 可用的 session
* @throws JSchException 近程连贯异样
*/
public synchronized Session getConnection(String ip, Integer port, String username, String password) throws JSchException {
this.ip = ip;
this.port = port;
this.username = username;
this.password = password;
// 连接池还没创立,则返回 null
if (Objects.isNull(connections)) {return null;}
// 取得一个可用的数据库连贯
Session session = getFreeConnection();
// 如果目前没有能够应用的连贯,即所有的连贯都在应用中,等一会重试
while (Objects.isNull(session)) {wait(250);
session = getFreeConnection();}
return session;
}
/**
* 获取一个可用 session
*
* @return 返回可用 session
* @throws JSchException 近程连贯异样
*/
private Session getFreeConnection() throws JSchException {Session session = findFreeConnection();
// 如果没有可用连贯,则创立连贯,if (Objects.isNull(session)) {createConnections(incrementalConnections);
session = findFreeConnection();
if (Objects.isNull(session)) {return null;}
}
return session;
}
/**
* 查找可用连贯
*
* @return 返回可用连贯
*/
private Session findFreeConnection() {
Session session = null;
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍历所有的对象,看是否有可用的连贯
while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
if (!ip.equals(conn.getTag())) {continue;}
if (!conn.isBusy()) {session = conn.getSession();
conn.setBusy(true);
if (!testConnection(session)) {
try {session = newConnection();
} catch (JSchException e) {log.error("create shell connection failed {}", e.getMessage());
return null;
}
conn.setSession(session);
}
break;
}
}
return session;
}
/**
* 测试连贯是否可用
*
* @param session 须要测试的 session
* @return 是否可用
*/
private boolean testConnection(Session session) {boolean connected = session.isConnected();
if (!connected) {closeConnection(session);
return false;
}
return true;
}
/**
* 将 session 放回连接池中
*
* @param session 须要放回连接池中的 session
*/
public synchronized void returnConnection(Session session) {
// 确保连接池存在,如果连贯没有创立(不存在),间接返回
if (Objects.isNull(connections)) {log.error("ConnectionPool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍历连接池中的所有连贯,找到这个要返回的连贯对象,将状态设置为闲暇
while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
if (session.equals(conn.getSession())) {conn.setBusy(false);
}
}
}
/**
* 刷新连接池
*
* @throws JSchException 近程连贯异样
*/
public synchronized void refreshConnections() throws JSchException {
// 确保连接池己翻新存在
if (Objects.isNull(connections)) {log.error("ConnectionPool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
if (conn.isBusy()) {wait(5000);
}
closeConnection(conn.getSession());
conn.setSession(newConnection());
conn.setBusy(false);
}
}
/**
* 敞开连接池
*/
public synchronized void closeConnectionPool() {
// 确保连接池存在,如果不存在,返回
if (Objects.isNull(connections)) {log.info("Connection pool does not exist");
return;
}
PooledConnection conn;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
if (conn.isBusy()) {wait(5000);
}
closeConnection(conn.getSession());
connections.removeElement(conn);
}
connections = null;
}
/**
* 敞开 session 会话
*
* @param session 须要敞开的 session
*/
private void closeConnection(Session session) {session.disconnect();
}
/**
* 线程暂停
*
* @param mSeconds 暂停工夫
*/
private void wait(int mSeconds) {
try {Thread.sleep(mSeconds);
} catch (InterruptedException e) {log.error("{} 线程暂停失败 -> {}", Thread.currentThread().getName(), e.getMessage());
}
}
/**
* 对象连接状态
*/
class PooledConnection {
/**
* 近程连贯
*/
Session session;
/**
* 此连贯是否正在应用的标记,默认没有正在应用
*/
boolean busy = false;
/**
* 连贯标记
*/
String tag;
/**
* 构造函数,依据一个 Session 结构一个 PooledConnection 对象
*
* @param session 连贯
* @param tag 连贯标记
*/
public PooledConnection(Session session, String tag) {
this.session = session;
this.tag = tag;
}
public Session getSession() {return session;}
public void setSession(Session session) {this.session = session;}
public boolean isBusy() {return busy;}
public void setBusy(boolean busy) {this.busy = busy;}
public String getTag() {return tag;}
public void setTag(String tag) {this.tag = tag;}
}
public int getIncrementalConnections() {return this.incrementalConnections;}
public void setIncrementalConnections(int incrementalConnections) {this.incrementalConnections = incrementalConnections;}
public int getMaxConnections() {return this.maxConnections;}
public void setMaxConnections(int maxConnections) {this.maxConnections = maxConnections;}
}
4. 革新 shellUtil
ShellUtil.java
@Slf4j
@Component
@Scope(value = "prototype")
public class ShellUtil {
/**
* ip 地址
*/
private String ip = "";
/**
* 端口号
*/
private Integer port = 22;
/**
* 用户名
*/
private String username = "";
/**
* 明码
*/
private String password = "";
private Session session;
private Channel channel;
private ChannelExec channelExec;
private ChannelSftp channelSftp;
private ChannelShell channelShell;
private ConnectionPool connectionPool;
public ShellUtil(ConnectionPool connectionPool) {this.connectionPool = connectionPool;}
/**
* 初始化
*
* @param ip 近程主机 IP 地址
* @param port 近程主机端口
* @param username 近程主机登陆用户名
* @param password 近程主机登陆密码
* @throws JSchException JSch 异样
*/
public void init(String ip, Integer port, String username, String password) throws JSchException {
this.ip = ip;
this.port = port;
this.username = username;
this.password = password;
}
public void init(String ip, String username, String password) throws JSchException {
this.ip = ip;
this.username = username;
this.password = password;
}
private void getSession() throws JSchException {session = connectionPool.getConnection(ip, port, username, password);
if (Objects.isNull(session)) {connectionPool.refreshConnections();
session = connectionPool.getConnection(ip, port, username, password);
if (Objects.isNull(session)){throw new RuntimeException("无可用连贯");
}
}
}
/**
* 连贯屡次执行命令,执行命令结束后须要执行 close()办法
*
* @param command 须要执行的指令
* @return 执行后果
* @throws Exception 没有执行初始化
*/
public String execCmd(String command) throws Exception {initChannelExec();
log.info("execCmd command - > {}", command);
channelExec.setCommand(command);
channel.setInputStream(null);
channelExec.setErrStream(System.err);
channel.connect();
StringBuilder sb = new StringBuilder(16);
try (InputStream in = channelExec.getInputStream();
InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(isr)) {
String buffer;
while ((buffer = reader.readLine()) != null) {sb.append("\n").append(buffer);
}
log.info("execCmd result - > {}", sb);
return sb.toString();}
}
/**
* 执行命令敞开连贯
*
* @param command 须要执行的指令
* @return 执行后果
* @throws Exception 没有执行初始化
*/
public String execCmdAndClose(String command) throws Exception {String result = execCmd(command);
close();
return result;
}
/**
* 执行简单 shell 命令
*
* @param cmds 多条命令
* @return 执行后果
* @throws Exception 连贯异样
*/
public String execCmdByShell(String... cmds) throws Exception {return execCmdByShell(Arrays.asList(cmds));
}
/**
* 执行简单 shell 命令
*
* @param cmds 多条命令
* @return 执行后果
* @throws Exception 连贯异样
*/
public String execCmdByShell(List<String> cmds) throws Exception {
String result = "";
initChannelShell();
InputStream inputStream = channelShell.getInputStream();
channelShell.setPty(true);
channelShell.connect();
OutputStream outputStream = channelShell.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
for (String cmd : cmds) {printWriter.println(cmd);
}
printWriter.flush();
byte[] tmp = new byte[1024];
while (true) {while (inputStream.available() > 0) {int i = inputStream.read(tmp, 0, 1024);
if (i < 0) {break;}
String s = new String(tmp, 0, i);
if (s.contains("--More--")) {outputStream.write((" ").getBytes());
outputStream.flush();}
System.out.println(s);
}
if (channelShell.isClosed()) {System.out.println("exit-status:" + channelShell.getExitStatus());
break;
}
try {Thread.sleep(1000);
} catch (Exception e) {e.printStackTrace();
}
}
outputStream.close();
inputStream.close();
close();
return result;
}
/**
* SFTP 文件上传
*
* @param src 源地址
* @param dst 目标地址
* @throws Exception 上传文件失败
*/
public void put(String src, String dst) throws Exception {put(src, dst, ChannelSftp.OVERWRITE);
}
/**
* SFTP 文件上传
*
* @param src 源地址
* @param dst 目标地址
* @param mode 上传模式 默认为 ChannelSftp.OVERWRITE
* @throws Exception 上传文件失败
*/
public void put(String src, String dst, int mode) throws Exception {initChannelSftp();
log.info("Upload File {} -> {}", src, dst);
channelSftp.put(src, dst, mode);
log.info("Upload File Success!");
close();}
/**
* SFTP 文件上传并监控上传进度
*
* @param src 源地址
* @param dst 目标地址
* @throws Exception 上传文件失败
*/
public void putMonitorAndClose(String src, String dst) throws Exception {putMonitorAndClose(src, dst, ChannelSftp.OVERWRITE);
}
/**
* SFTP 文件上传并监控上传进度
*
* @param src 源地址
* @param dst 目标地址
* @param mode 上传模式 默认为 ChannelSftp.OVERWRITE
* @throws Exception 上传文件失败
*/
public void putMonitorAndClose(String src, String dst, int mode) throws Exception {initChannelSftp();
FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
log.info("Upload File {} -> {}", src, dst);
channelSftp.put(src, dst, monitor, mode);
log.info("Upload File Success!");
close();}
/**
* SFTP 文件下载
*
* @param src 源文件地址
* @param dst 目标地址
* @throws Exception 下载文件失败
*/
public void get(String src, String dst) throws Exception {initChannelSftp();
log.info("Download File {} -> {}", src, dst);
channelSftp.get(src, dst);
log.info("Download File Success!");
close();}
/**
* SFTP 文件下载并监控下载进度
*
* @param src 源文件地址
* @param dst 目标地址
* @throws Exception 下载文件失败
*/
public void getMonitorAndClose(String src, String dst) throws Exception {initChannelSftp();
FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
log.info("Download File {} -> {}", src, dst);
channelSftp.get(src, dst, monitor);
log.info("Download File Success!");
close();}
/**
* 删除指定目录文件
*
* @param path 删除门路
* @throws Exception 近程主机连贯异样
*/
public void deleteFile(String path) throws Exception {initChannelSftp();
channelSftp.rm(path);
log.info("Delete File {}", path);
close();}
/**
* 删除指定目录
*
* @param path 删除门路
* @throws Exception 近程主机连贯异样
*/
public void deleteDir(String path) throws Exception {initChannelSftp();
channelSftp.rmdir(path);
log.info("Delete Dir {}", path);
close();}
/**
* 开释资源
*/
public void close() {connectionPool.returnConnection(session);
}
private void initChannelSftp() throws Exception {getSession();
channel = session.openChannel("sftp");
channel.connect(); // 建设 SFTP 通道的连贯
channelSftp = (ChannelSftp) channel;
if (session == null || channel == null || channelSftp == null) {log.error("请先执行 init()");
throw new Exception("请先执行 init()");
}
}
private void initChannelExec() throws Exception {getSession();
// 关上执行 shell 指令的通道
channel = session.openChannel("exec");
channelExec = (ChannelExec) channel;
if (session == null || channel == null || channelExec == null) {log.error("请先执行 init()");
throw new Exception("请先执行 init()");
}
}
private void initChannelShell() throws Exception {getSession();
// 关上执行 shell 指令的通道
channel = session.openChannel("shell");
channelShell = (ChannelShell) channel;
if (session == null || channel == null || channelShell == null) {log.error("请先执行 init()");
throw new Exception("请先执行 init()");
}
}
}
5. 增加配置
ConnectionPoolConfig.java
@Configuration
public class PoolConfiguration {@Value("${ssh.strictHostKeyChecking:no}")
private String strictHostKeyChecking;
@Value("${ssh.timeout:30000}")
private Integer timeout;
@Value("${ssh.incrementalConnections:2}")
private Integer incrementalConnections;
@Value("${ssh.maxConnections:10}")
private Integer maxConnections;
@Bean
public ConnectionPool connectionPool(){return new ConnectionPool(strictHostKeyChecking, timeout,incrementalConnections,maxConnections);
}
}
6. 线程平安问题解决
6.1
public class SessionThreadLocal {private static ThreadLocal<Session> threadLocal = new ThreadLocal<>();
public static synchronized void set(Session session) {threadLocal.set(session);
}
public static synchronized Session get( ) {return threadLocal.get();
}
public static synchronized void remove( ) {threadLocal.remove();
}
}
6.2 应用 springboot 中 bean 的作用域 prototype
应用 @Lookup
注入形式
@Lookup
public ShellUtil getshellUtil(){return null;};
@GetMapping("/test")
public void Test() throws Exception {int i = getshellUtil().hashCode();
System.out.println(i);
}
正文完
发表至: springboot
2021-02-20