乐趣区

关于springboot:关于JSCH使用自义定连接池说明

1. JSCH 应用办法

jsch 应用办法

2. JSCH 工具类

JSCH 工具类

3. 创立连接池

ConnectionPool.java

@Slf4j
public class ConnectionPool {

    private String strictHostKeyChecking;

    private Integer timeout;


    /**
     * ip 地址
     */
    private String ip = "";

    /**
     * 端口号
     */
    private Integer port = 22;

    /**
     * 用户名
     */
    private String username = "";

    /**
     * 明码
     */
    private String password = "";

    /**
     * 每次扩容减少几个连贯
     */
    private int incrementalConnections = 2;

    /**
     * 最大连接数
     */
    private int maxConnections = 10;

    /**
     * 最大闲暇连贯
     */
    private int maxIdle = 4;

    /**
     * 最小闲暇连贯
     */
    private int minIdel = 2;


    private Vector<PooledConnection> connections = null;

    @PostConstruct
    private void init() {createPool();
    }

    /**
     * 构造方法
     *
     * @param strictHostKeyChecking 连贯模式
     * @param timeout               超时工夫
     */
    public ConnectionPool(String strictHostKeyChecking, Integer timeout) {
        this.strictHostKeyChecking = strictHostKeyChecking;
        this.timeout = timeout;
    }

    /**
     * 构造方法
     *
     * @param strictHostKeyChecking  连贯模式
     * @param timeout                超时工夫
     * @param incrementalConnections 增量大小
     */
    public ConnectionPool(String strictHostKeyChecking,
                          Integer timeout,
                          int incrementalConnections) {
        this.strictHostKeyChecking = strictHostKeyChecking;
        this.timeout = timeout;
        this.incrementalConnections = incrementalConnections;
    }

    /**
     * 构造方法
     *
     * @param strictHostKeyChecking  连贯模式
     * @param timeout                超时工夫
     * @param incrementalConnections 增量大小
     * @param maxConnections         连接池最大连接数
     */
    public ConnectionPool(String strictHostKeyChecking,
                          Integer timeout,
                          int incrementalConnections,
                          int maxConnections) {
        this.strictHostKeyChecking = strictHostKeyChecking;
        this.timeout = timeout;
        this.incrementalConnections = incrementalConnections;
        this.maxConnections = maxConnections;
    }

    /**
     * 创立连接池,判断连接池是否创立,如果连接池没有创立则创立连接池
     */
    public synchronized void createPool() {if (Objects.nonNull(connections)) {return;}
        connections = new Vector<>();
        log.info("create shell connectionPool success!");
    }


    /**
     * 创立指定数量的连贯放入连接池中
     *
     * @param numConnections 创立数量
     * @throws JSchException 建设近程连贯异样
     */
    private void createConnections(int numConnections) throws JSchException {for (int x = 0; x < numConnections; x++) {

            // 判断是否已达连接池最大连贯,如果达到最大连贯数据则不再创立连贯
            if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {break;}

            // 在连接池中新增一个连贯
            try {connections.addElement(new PooledConnection(newConnection(), ip));
            } catch (JSchException e) {log.error("create shell connection failed {}", e.getMessage());
                throw new JSchException();}
            log.info("Session connected!");
        }
    }

    /**
     * 新一个连贯 session
     *
     * @return 创立的 session
     * @throws JSchException 近程连贯异样
     */
    private Session newConnection() throws JSchException {
        // 创立一个 session
        JSch jsch = new JSch();
        Session session = jsch.getSession(username, ip, port);
        session.setPassword(password);
        Properties sshConfig = new Properties();
        sshConfig.put("StrictHostKeyChecking", strictHostKeyChecking);
        session.setConfig(sshConfig);
        session.connect(timeout);
        session.setServerAliveInterval(1800);
        return session;
    }

    /**
     * 获取一个可用 session
     *
     * @param ip       ip 地址
     * @param port     端口号
     * @param username 用户名
     * @param password 明码
     * @return 可用的 session
     * @throws JSchException 近程连贯异样
     */
    public synchronized Session getConnection(String ip, Integer port, String username, String password) throws JSchException {
        this.ip = ip;
        this.port = port;
        this.username = username;
        this.password = password;

        // 连接池还没创立,则返回 null
        if (Objects.isNull(connections)) {return null;}

        // 取得一个可用的数据库连贯
        Session session = getFreeConnection();

        // 如果目前没有能够应用的连贯,即所有的连贯都在应用中,等一会重试
        while (Objects.isNull(session)) {wait(250);
            session = getFreeConnection();}

        return session;
    }

    /**
     * 获取一个可用 session
     *
     * @return 返回可用 session
     * @throws JSchException 近程连贯异样
     */
    private Session getFreeConnection() throws JSchException {Session session = findFreeConnection();
        // 如果没有可用连贯,则创立连贯,if (Objects.isNull(session)) {createConnections(incrementalConnections);
            session = findFreeConnection();
            if (Objects.isNull(session)) {return null;}
        }
        return session;
    }

    /**
     * 查找可用连贯
     *
     * @return 返回可用连贯
     */
    private Session findFreeConnection() {
        Session session = null;
        PooledConnection conn;

        Enumeration<PooledConnection> enumerate = connections.elements();

        // 遍历所有的对象,看是否有可用的连贯
        while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
            if (!ip.equals(conn.getTag())) {continue;}
            if (!conn.isBusy()) {session = conn.getSession();
                conn.setBusy(true);
                if (!testConnection(session)) {
                    try {session = newConnection();
                    } catch (JSchException e) {log.error("create shell connection failed {}", e.getMessage());
                        return null;
                    }
                    conn.setSession(session);
                }
                break;
            }
        }
        return session;
    }

    /**
     * 测试连贯是否可用
     *
     * @param session 须要测试的 session
     * @return 是否可用
     */
    private boolean testConnection(Session session) {boolean connected = session.isConnected();
        if (!connected) {closeConnection(session);
            return false;
        }
        return true;
    }

    /**
     * 将 session 放回连接池中
     *
     * @param session 须要放回连接池中的 session
     */
    public synchronized void returnConnection(Session session) {
        // 确保连接池存在,如果连贯没有创立(不存在),间接返回
        if (Objects.isNull(connections)) {log.error("ConnectionPool does not exist");
            return;
        }
        PooledConnection conn;
        Enumeration<PooledConnection> enumerate = connections.elements();

        // 遍历连接池中的所有连贯,找到这个要返回的连贯对象,将状态设置为闲暇
        while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
            if (session.equals(conn.getSession())) {conn.setBusy(false);
            }
        }

    }


    /**
     * 刷新连接池
     *
     * @throws JSchException 近程连贯异样
     */
    public synchronized void refreshConnections() throws JSchException {
        // 确保连接池己翻新存在
        if (Objects.isNull(connections)) {log.error("ConnectionPool does not exist");
            return;
        }
        PooledConnection conn;
        Enumeration<PooledConnection> enumerate = connections.elements();
        while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
            if (conn.isBusy()) {wait(5000);
            }
            closeConnection(conn.getSession());
            conn.setSession(newConnection());
            conn.setBusy(false);
        }
    }

    /**
     * 敞开连接池
     */
    public synchronized void closeConnectionPool() {
        // 确保连接池存在,如果不存在,返回
        if (Objects.isNull(connections)) {log.info("Connection pool does not exist");
            return;
        }
        PooledConnection conn;
        Enumeration<PooledConnection> enumerate = connections.elements();
        while (enumerate.hasMoreElements()) {conn = enumerate.nextElement();
            if (conn.isBusy()) {wait(5000);
            }
            closeConnection(conn.getSession());
            connections.removeElement(conn);
        }
        connections = null;
    }

    /**
     * 敞开 session 会话
     *
     * @param session 须要敞开的 session
     */
    private void closeConnection(Session session) {session.disconnect();
    }

    /**
     * 线程暂停
     *
     * @param mSeconds 暂停工夫
     */
    private void wait(int mSeconds) {
        try {Thread.sleep(mSeconds);
        } catch (InterruptedException e) {log.error("{} 线程暂停失败 -> {}", Thread.currentThread().getName(), e.getMessage());
        }
    }

    /**
     * 对象连接状态
     */
    class PooledConnection {

        /**
         * 近程连贯
         */
        Session session;

        /**
         * 此连贯是否正在应用的标记,默认没有正在应用
         */
        boolean busy = false;

        /**
         * 连贯标记
         */
        String tag;

        /**
         * 构造函数,依据一个 Session 结构一个 PooledConnection 对象
         *
         * @param session 连贯
         * @param tag     连贯标记
         */
        public PooledConnection(Session session, String tag) {
            this.session = session;
            this.tag = tag;
        }

        public Session getSession() {return session;}

        public void setSession(Session session) {this.session = session;}

        public boolean isBusy() {return busy;}

        public void setBusy(boolean busy) {this.busy = busy;}

        public String getTag() {return tag;}

        public void setTag(String tag) {this.tag = tag;}
    }

    public int getIncrementalConnections() {return this.incrementalConnections;}

    public void setIncrementalConnections(int incrementalConnections) {this.incrementalConnections = incrementalConnections;}

    public int getMaxConnections() {return this.maxConnections;}

    public void setMaxConnections(int maxConnections) {this.maxConnections = maxConnections;}
}

4. 革新 shellUtil

ShellUtil.java

@Slf4j
@Component
@Scope(value = "prototype")
public class ShellUtil {

    /**
     * ip 地址
     */
    private String ip = "";

    /**
     * 端口号
     */
    private Integer port = 22;

    /**
     * 用户名
     */
    private String username = "";

    /**
     * 明码
     */
    private String password = "";

    private Session session;

    private Channel channel;

    private ChannelExec channelExec;

    private ChannelSftp channelSftp;

    private ChannelShell channelShell;

    private ConnectionPool connectionPool;

    public ShellUtil(ConnectionPool connectionPool) {this.connectionPool = connectionPool;}

    /**
     * 初始化
     *
     * @param ip       近程主机 IP 地址
     * @param port     近程主机端口
     * @param username 近程主机登陆用户名
     * @param password 近程主机登陆密码
     * @throws JSchException JSch 异样
     */
    public void init(String ip, Integer port, String username, String password) throws JSchException {
        this.ip = ip;
        this.port = port;
        this.username = username;
        this.password = password;
    }

    public void init(String ip, String username, String password) throws JSchException {
        this.ip = ip;
        this.username = username;
        this.password = password;
    }

    private void getSession() throws JSchException {session = connectionPool.getConnection(ip, port, username, password);
        if (Objects.isNull(session)) {connectionPool.refreshConnections();
            session = connectionPool.getConnection(ip, port, username, password);
            if (Objects.isNull(session)){throw new RuntimeException("无可用连贯");
            }
        }
    }

    /**
     * 连贯屡次执行命令,执行命令结束后须要执行 close()办法
     *
     * @param command 须要执行的指令
     * @return 执行后果
     * @throws Exception 没有执行初始化
     */
    public String execCmd(String command) throws Exception {initChannelExec();
        log.info("execCmd command - > {}", command);
        channelExec.setCommand(command);
        channel.setInputStream(null);
        channelExec.setErrStream(System.err);
        channel.connect();
        StringBuilder sb = new StringBuilder(16);
        try (InputStream in = channelExec.getInputStream();
             InputStreamReader isr = new InputStreamReader(in, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr)) {
            String buffer;
            while ((buffer = reader.readLine()) != null) {sb.append("\n").append(buffer);
            }
            log.info("execCmd result - > {}", sb);
            return sb.toString();}

    }


    /**
     * 执行命令敞开连贯
     *
     * @param command 须要执行的指令
     * @return 执行后果
     * @throws Exception 没有执行初始化
     */
    public String execCmdAndClose(String command) throws Exception {String result = execCmd(command);
        close();
        return result;
    }

    /**
     * 执行简单 shell 命令
     *
     * @param cmds 多条命令
     * @return 执行后果
     * @throws Exception 连贯异样
     */
    public String execCmdByShell(String... cmds) throws Exception {return execCmdByShell(Arrays.asList(cmds));
    }

    /**
     * 执行简单 shell 命令
     *
     * @param cmds 多条命令
     * @return 执行后果
     * @throws Exception 连贯异样
     */
    public String execCmdByShell(List<String> cmds) throws Exception {
        String result = "";
        initChannelShell();
        InputStream inputStream = channelShell.getInputStream();
        channelShell.setPty(true);
        channelShell.connect();

        OutputStream outputStream = channelShell.getOutputStream();
        PrintWriter printWriter = new PrintWriter(outputStream);
        for (String cmd : cmds) {printWriter.println(cmd);
        }
        printWriter.flush();

        byte[] tmp = new byte[1024];
        while (true) {while (inputStream.available() > 0) {int i = inputStream.read(tmp, 0, 1024);
                if (i < 0) {break;}
                String s = new String(tmp, 0, i);
                if (s.contains("--More--")) {outputStream.write((" ").getBytes());
                    outputStream.flush();}
                System.out.println(s);
            }
            if (channelShell.isClosed()) {System.out.println("exit-status:" + channelShell.getExitStatus());
                break;
            }
            try {Thread.sleep(1000);
            } catch (Exception e) {e.printStackTrace();
            }

        }
        outputStream.close();
        inputStream.close();
        close();
        return result;
    }

    /**
     * SFTP 文件上传
     *
     * @param src  源地址
     * @param dst  目标地址
     * @throws Exception 上传文件失败
     */
    public void put(String src, String dst) throws Exception {put(src, dst, ChannelSftp.OVERWRITE);
    }

    /**
     * SFTP 文件上传
     *
     * @param src  源地址
     * @param dst  目标地址
     * @param mode 上传模式 默认为 ChannelSftp.OVERWRITE
     * @throws Exception 上传文件失败
     */
    public void put(String src, String dst, int mode) throws Exception {initChannelSftp();
        log.info("Upload File {} -> {}", src, dst);
        channelSftp.put(src, dst, mode);
        log.info("Upload File Success!");
        close();}

    /**
     * SFTP 文件上传并监控上传进度
     *
     * @param src 源地址
     * @param dst 目标地址
     * @throws Exception 上传文件失败
     */
    public void putMonitorAndClose(String src, String dst) throws Exception {putMonitorAndClose(src, dst, ChannelSftp.OVERWRITE);
    }

    /**
     * SFTP 文件上传并监控上传进度
     *
     * @param src  源地址
     * @param dst  目标地址
     * @param mode 上传模式 默认为 ChannelSftp.OVERWRITE
     * @throws Exception 上传文件失败
     */
    public void putMonitorAndClose(String src, String dst, int mode) throws Exception {initChannelSftp();
        FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
        log.info("Upload File {} -> {}", src, dst);
        channelSftp.put(src, dst, monitor, mode);
        log.info("Upload File Success!");
        close();}

    /**
     * SFTP 文件下载
     *
     * @param src 源文件地址
     * @param dst 目标地址
     * @throws Exception 下载文件失败
     */
    public void get(String src, String dst) throws Exception {initChannelSftp();
        log.info("Download File {} -> {}", src, dst);
        channelSftp.get(src, dst);
        log.info("Download File Success!");
        close();}

    /**
     * SFTP 文件下载并监控下载进度
     *
     * @param src 源文件地址
     * @param dst 目标地址
     * @throws Exception 下载文件失败
     */
    public void getMonitorAndClose(String src, String dst) throws Exception {initChannelSftp();
        FileProgressMonitor monitor = new FileProgressMonitor(new File(src).length());
        log.info("Download File {} -> {}", src, dst);
        channelSftp.get(src, dst, monitor);
        log.info("Download File Success!");
        close();}

    /**
     * 删除指定目录文件
     *
     * @param path 删除门路
     * @throws Exception 近程主机连贯异样
     */
    public void deleteFile(String path) throws Exception {initChannelSftp();
        channelSftp.rm(path);
        log.info("Delete File {}", path);
        close();}

    /**
     * 删除指定目录
     *
     * @param path 删除门路
     * @throws Exception 近程主机连贯异样
     */
    public void deleteDir(String path) throws Exception {initChannelSftp();
        channelSftp.rmdir(path);
        log.info("Delete Dir {}", path);
        close();}

    /**
     * 开释资源
     */
    public void close() {connectionPool.returnConnection(session);
    }

    private void initChannelSftp() throws Exception {getSession();
        channel = session.openChannel("sftp");
        channel.connect(); // 建设 SFTP 通道的连贯
        channelSftp = (ChannelSftp) channel;
        if (session == null || channel == null || channelSftp == null) {log.error("请先执行 init()");
            throw new Exception("请先执行 init()");
        }
    }

    private void initChannelExec() throws Exception {getSession();
        // 关上执行 shell 指令的通道
        channel = session.openChannel("exec");
        channelExec = (ChannelExec) channel;
        if (session == null || channel == null || channelExec == null) {log.error("请先执行 init()");
            throw new Exception("请先执行 init()");
        }
    }

    private void initChannelShell() throws Exception {getSession();
        // 关上执行 shell 指令的通道
        channel = session.openChannel("shell");
        channelShell = (ChannelShell) channel;
        if (session == null || channel == null || channelShell == null) {log.error("请先执行 init()");
            throw new Exception("请先执行 init()");
        }
    }


}

5. 增加配置

ConnectionPoolConfig.java

@Configuration
public class PoolConfiguration {@Value("${ssh.strictHostKeyChecking:no}")
    private String strictHostKeyChecking;

    @Value("${ssh.timeout:30000}")
    private Integer timeout;

    @Value("${ssh.incrementalConnections:2}")
    private Integer incrementalConnections;

    @Value("${ssh.maxConnections:10}")
    private Integer maxConnections;

    @Bean
    public ConnectionPool connectionPool(){return new ConnectionPool(strictHostKeyChecking, timeout,incrementalConnections,maxConnections);
    }

}

6. 线程平安问题解决

6.1

public class SessionThreadLocal {private static ThreadLocal<Session> threadLocal = new ThreadLocal<>();

    public static synchronized void set(Session session) {threadLocal.set(session);
    }

    public static synchronized Session get( ) {return threadLocal.get();
    }
    public static synchronized void remove( ) {threadLocal.remove();
    }
}

6.2 应用 springboot 中 bean 的作用域 prototype

应用 @Lookup 注入形式

 @Lookup
    public ShellUtil getshellUtil(){return null;};

    @GetMapping("/test")
    public void Test() throws Exception {int i = getshellUtil().hashCode();
        System.out.println(i);
    }
退出移动版