从一段代码谈起浅谈JavaIO接口

51次阅读

共计 7377 个字符,预计需要花费 19 分钟才能阅读完成。

本文首发于泊浮目的专栏:https://segmentfault.com/blog…

1. 前言

前阵子休息天日常在寻找项目里不好的代码, 看到了这样的一段代码:

    private Result sshSameExec(Session session, String cmd) {if (log.isDebugEnabled()) {log.debug("shell command: {}", cmd);
        }
        UserInfo ui = getUserInfo();
        session.setUserInfo(ui);
        int exitStatus = 0;
        StringBuilder builder = new StringBuilder();
        ChannelExec channel;
        InputStream in;
        InputStream err;
        try {session.connect(connectTimeout);
            channel = (ChannelExec) session.openChannel("exec");
            channel.setCommand(cmd);
            in = channel.getInputStream();
            err = channel.getErrStream();
            channel.connect();} catch (Exception e) {throw new CloudRuntimeException(e);
        }

        try {
            long lastRead = Long.MAX_VALUE;
            byte[] tmp = new byte[1024];
            while (true) {while (in.available() > 0 || err.available() > 0) {
                    int i = 0;
                    if (in.available() > 0) {i = in.read(tmp, 0, 1024);
                    } else if (err.available() > 0) {i = err.read(tmp, 0, 1024);
                    }
                    if (i < 0) {break;}
                    lastRead = System.currentTimeMillis();
                    builder.append(new String(tmp, 0, i));
                }
                if (channel.isClosed()) {if (in.available() > 0) {continue;}
                    exitStatus = channel.getExitStatus();
                    break;
                }
                if (System.currentTimeMillis() - lastRead > exeTimeout) {break;}
            }
        } catch (IOException e) {throw new CloudRuntimeException(e);
        } finally {channel.disconnect();
            session.disconnect();}

        if (0 != exitStatus) {return Result.createByError(ErrorData.builder()
                    .errorCode(ResultCode.EXECUTE_SSH_FAIL.getCode())
                    .detail(builder.toString())
                    .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                    .build());
        } else {return Result.createBySuccess(builder.toString());
        }
    }

简单解释一下这段代码——即通过 ssh 到一台机器上, 然后执行一些命令. 对命令输出的东西, 开了一个循环, 每一次读一定的位置, 然后以字节流的形式读回来.

这段代码有点丑, 于是我闻到了学习的味道.

首先是对两个 Stream 的消费, 很显然, 在多核环境下, 我们同时也只能够消费其中一个 Stream. 其次, 这代码太挫了, 自己定义一个 tmp, 然后 1024、1024 这样的去取出来.

在改良之前, 我们先来回顾一下 JavaIO 的接口定义.

2.JavaIO 接口知识回顾

2.1 低级抽象接口:InputStream 和 OutputStream

这里有同学可能问了, 为啥叫它低抽象接口呢? 因为它离底层太近了, 计算机本来就是处理二进制的, 而这两个接口正是用来处理二进制数据流的.

先简单看一眼这两个接口:

  • InputStream
**
 * This abstract class is the superclass of all classes representing
 * an input stream of bytes.
 *
 * <p> Applications that need to define a subclass of <code>InputStream</code>
 * must always provide a method that returns the next byte of input.
 *
 * @author  Arthur van Hoff
 * @see     java.io.BufferedInputStream
 * @see     java.io.ByteArrayInputStream
 * @see     java.io.DataInputStream
 * @see     java.io.FilterInputStream
 * @see     java.io.InputStream#read()
 * @see     java.io.OutputStream
 * @see     java.io.PushbackInputStream
 * @since   JDK1.0
 */
public abstract class InputStream implements Closeable {.....}
  • OutputStream
/**
 * This abstract class is the superclass of all classes representing
 * an output stream of bytes. An output stream accepts output bytes
 * and sends them to some sink.
 * <p>
 * Applications that need to define a subclass of
 * <code>OutputStream</code> must always provide at least a method
 * that writes one byte of output.
 *
 * @author  Arthur van Hoff
 * @see     java.io.BufferedOutputStream
 * @see     java.io.ByteArrayOutputStream
 * @see     java.io.DataOutputStream
 * @see     java.io.FilterOutputStream
 * @see     java.io.InputStream
 * @see     java.io.OutputStream#write(int)
 * @since   JDK1.0
 */
public abstract class OutputStream implements Closeable, Flushable {...}

我们可以发现, 它们都实现了 Closeable 的接口. 因此大家在使用这些原生类时, 要注意在结束时调用 Close 方法哦.

这两个接口的常用实现类有:
– FileInputStreamFileOutputStream

  • DataInputStreamDataOutputStream
  •  ObjectInputStreamObjectOutputStream

2.2 高级抽象接口——Writer 和 Reader

为啥说它是高级抽象接口呢? 我们先来看看它们的注释:

  • Writer
/**
 * Abstract class for writing to character streams.  The only methods that a
 * subclass must implement are write(char[], int, int), flush(), and close().
 * Most subclasses, however, will override some of the methods defined here in
 * order to provide higher efficiency, additional functionality, or both.
 *
 * @see Writer
 * @see   BufferedWriter
 * @see   CharArrayWriter
 * @see   FilterWriter
 * @see   OutputStreamWriter
 * @see     FileWriter
 * @see   PipedWriter
 * @see   PrintWriter
 * @see   StringWriter
 * @see Reader
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Writer implements Appendable, Closeable, Flushable {
  • Reader
/**
 * Abstract class for reading character streams.  The only methods that a
 * subclass must implement are read(char[], int, int) and close().  Most
 * subclasses, however, will override some of the methods defined here in order
 * to provide higher efficiency, additional functionality, or both.
 *
 *
 * @see BufferedReader
 * @see   LineNumberReader
 * @see CharArrayReader
 * @see InputStreamReader
 * @see   FileReader
 * @see FilterReader
 * @see   PushbackReader
 * @see PipedReader
 * @see StringReader
 * @see Writer
 *
 * @author      Mark Reinhold
 * @since       JDK1.1
 */

public abstract class Reader implements Readable, Closeable {

我们可以看到, 这个抽象类是用来面向 character 的, 也就是字符. 字符的抽象等级必然比字节高, 因为字符靠近上层, 即人类.

2.3 优化输入和输出——Buffered

如果我们直接使用上述实现类去打开一个文件 (如FileWriter FileReader FileInputStream FileOutputStream ), 对其对象调用readwritereadLine 等, 每个请求都是由基础 OS 直接处理的, 这会使一个程序效率低得多——因为它们都会引发磁盘访问 or 网络请求等.

为了减少这种开销,Java 平台实现缓冲 I/O 流。缓冲输入流从被称为缓冲区(buffer)的存储器区域读出数据; 仅当缓冲区是空时,本地输入 API 才被调用。同样,缓冲输出流,将数据写入到缓存区,只有当缓冲区已满才调用本机输出 API。

用于包装非缓存流的缓冲流类有 4 个:BufferedInputStreamBufferedOutputStream·用于创建字节缓冲字节流, BufferedReaderBufferedWriter` 用于创建字符缓冲字节流.

3. 着手优化

之前, 我们提到了这段代码写得搓的地方:

  • 首先是对两个 Stream 的消费, 很显然, 在多核环境下, 我们同时也只能够消费其中一个 Stream.
  • 其次, 这代码太挫了, 自己定义一个 tmp, 然后 1024、1024 这样的去取出来.

故此, 我们可以考虑对每个 Stream 都进行包装, 支持用线程去消费, 其次我们可以用高级抽象分接口去 适配 Byte, 然后去 装饰 成 Buffer.

接下来, 我们来看一段 ZStack 里的工具类ShellUtils, 为了节省篇幅, 我们仅仅截出它在 IDE 里的Structure:

run 方法的核心:

我们可以看到 StreamConsumer 这个类, 我们来看一下它的代码:

    private static class StreamConsumer extends Thread {
        final InputStream in;
        final PrintWriter out;
        final boolean flush;

        StreamConsumer(InputStream in, PrintWriter out, boolean flushEveryWrite) {
            this.in = in;
            this.out = out;
            flush = flushEveryWrite;
        }

        @Override
        public void run() {
            BufferedReader br = null;
            try {br = new BufferedReader(new InputStreamReader(in));
                String line;
                while ((line = br.readLine()) != null) {out.println(line);
                    if (flush) {out.flush();
                    }
                }
            } catch (Exception e) {logger.warn(e.getMessage(), e);
            } finally {
                try {if (br != null) {br.close();
                    }
                } catch (IOException e) {logger.warn(e.getMessage(), e);
                }
            }
        }
    }

这段代码已经达到了我们的理想状态: 线程消费, 高级抽象.

3.1 使用 Kotlin

3.1.1 Kotlin IO

闲话不多说, 先贴代码为敬:

import java.io.InputStream
import java.io.InputStreamReader

class StreamGobbler(private val inputStream: InputStream, private var result: StringBuilder) : Runnable {override fun run() {val reader = InputStreamReader(inputStream).buffered()
        reader.lines().forEach { result.append(it) }
        reader.close()}

}

还是一样熟悉的配方, 我们逐行来解读:

  1. 定义一个类, 并且要求构造函数必须传入 InputStream 和一个 StringBuilder. 且实现了 Runnable 接口, 这意味着它可以被线程消费.
  2. 覆写 run 方法. 我们可以看到 InputStream 被适配成了InputStreamReader, 这意味着它可以输出字符流了, 然后我们使用了 Kotlin 的接口将其装饰成了 Buffer.
  3. 读每一行 buffer, 并 appned 到 result 这个 StringBuilder 里去.
  4. 读完就可以告辞了,close.

3.1.2 Kotlin Coroutine

先看一下上面的图, 我们都知道内核态线程是由 OS 调度的, 但当一个线程拿到时间片时, 却调到了阻塞 IO, 那么只能等在那边, 浪费时间.

而协程则可以解决这个问题, 当一个Jobhang 住的时候, 可以去做别的事情, 绕开阻塞. 更好的利用时间片.

最后, 我们来看一下成品代码:

    override fun sshExecWithCoroutine(session: Session, cmd: String): SimpleResult<out String> {val ui = InnerUserInfo()
        session.userInfo = ui
        val exitStatus: Int
        var channel = ChannelExec()
        val inputBuilder = StringBuilder()
        val errorBuilder = StringBuilder()
        try {session.connect(connectTimeout)
            channel = session.openChannel("exec") as ChannelExec
            channel.setCommand(cmd)
            channel.connect()
            val inputStream = StreamGobbler(channel.inputStream, inputBuilder)
            val errStream = StreamGobbler(channel.errStream, errorBuilder)

            val customJob = GlobalScope.launch {customStream(inputStream, errStream)
            }

            while (!customJob.isCompleted) {// wait job be done}

            exitStatus = channel.exitStatus
        } catch (e: IOException) {throw java.lang.RuntimeException(e)
        } finally {if (channel.isConnected) {channel.disconnect()
            }
            if (session.isConnected) {session.disconnect()
            }
        }

        return if (0 != exitStatus) {return SimpleResult.createByError(ErrorData.Builder()
                    .errorCode(ResultCode.EXECUTE_SSH_FAIL.value)
                    .detail(errorBuilder.toString())
                    .title(ResultCode.EXECUTE_SSH_FAIL.toString())
                    .build())

        } else {SimpleResult.createBySuccess(inputBuilder.toString())
        }
    }


    private suspend fun customStream(inputStream: StreamGobbler, errorStream: StreamGobbler) {
        val inputDeferred = GlobalScope.async {inputStream.run()
        }
        val errorDeferred = GlobalScope.async {errorStream.run()
        }

        inputDeferred.join()
        errorDeferred.join()}

正文完
 0