关于java:厉害了Netty-轻松实现文件上传

49次阅读

共计 21851 个字符,预计需要花费 55 分钟才能阅读完成。

作者:rickiyang

出处:www.cnblogs.com/rickiyang/p/11074222.html

明天咱们来实现一个应用 netty 进行文件传输的工作。在理论我的项目中,文件传输通常采纳 FTP 或者 HTTP 附件的形式。事实上通过 TCP Socket+File 的形式进行文件传输也有肯定的利用场景,只管不是支流,然而把握这种文件传输方式还是比拟重要的,特地是针对两个跨主机的 JVM 过程之间进行长久化数据的相互交换。

而应用 netty 来进行文件传输也是利用 netty 人造的劣势:零拷贝性能。很多同学都据说过 netty 的”零拷贝”性能,然而具体体现在哪里又不晓得,上面咱们就简要介绍下:

Netty 的“零拷贝”次要体现在如下三个方面:

1) Netty 的接管和发送 ByteBuffer 采纳 DIRECT BUFFERS,应用堆外间接内存进行 Socket 读写,不须要进行字节缓冲区的二次拷贝。如果应用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到间接内存中,而后才写入 Socket 中。相比于堆外间接内存,音讯在发送过程中多了一次缓冲区的内存拷贝。

2) Netty 提供了组合 Buffer 对象,能够聚合多个 ByteBuffer 对象,用户能够像操作一个 Buffer 那样不便的对组合 Buffer 进行操作,防止了传统通过内存拷贝的形式将几个小 Buffer 合并成一个大的 Buffer。

3) Netty 的文件传输采纳了 transferTo 办法,它能够间接将文件缓冲区的数据发送到指标 Channel,防止了传统通过循环 write 形式导致的内存拷贝问题。

具体的剖析在此就不多做介绍,有趣味的能够查阅相干文档。咱们还是把重点放在文件传输上。Netty 作为高性能的服务器端异步 IO 框架必然也离不开文件读写性能,咱们能够应用 netty 模仿 http 的模式通过网页上传文件写入服务器,当然要应用 http 的模式那你也用不着 netty!大材小用。

netty4 中如果想应用 http 模式上传文件你还得借助第三方 jar 包:okhttp。应用该 jar 实现 http 申请的发送。然而在 netty5 中曾经为咱们写好了,咱们能够间接调用 netty5 的 API 就能够实现。所以 netty4 和 5 的差异还是挺大的,至于应用哪个,那就看你们公司抉择哪一个了!本文目前应用 netty4 来实现文件上传性能。上面咱们上代码:

pom 文件:

<dependency>
      <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
      <version>4.1.5.Final</version>
</dependency>

server 端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class FileUploadServer {public void bind(int port) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度
                    ch.pipeline().addLast(new FileUploadServerHandler());
                }
            });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {e.printStackTrace();
            }
        }
        try {new FileUploadServer().bind(port);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

server 端 handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.File;
import java.io.RandomAccessFile;

public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
    private int byteRead;
    private volatile int start = 0;
    private String file_dir = "D:";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FileUploadFile) {FileUploadFile ef = (FileUploadFile) msg;
            byte[] bytes = ef.getBytes();
            byteRead = ef.getEndPos();
            String md5 = ef.getFile_md5();// 文件名
            String path = file_dir + File.separator + md5;
            File file = new File(path);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            randomAccessFile.seek(start);
            randomAccessFile.write(bytes);
            start = start + byteRead;
            if (byteRead > 0) {ctx.writeAndFlush(start);
            } else {randomAccessFile.close();
                ctx.close();}
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        ctx.close();}
}

client 端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.io.File;

public class FileUploadClient {public void connect(int port, String host, final FileUploadFile fileUploadFile) throws Exception {EventLoopGroup group = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {

                @Override
                protected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
                    ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();} finally {group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {e.printStackTrace();
            }
        }
        try {FileUploadFile uploadFile = new FileUploadFile();
            File file = new File("c:/1.txt");
            String fileMd5 = file.getName();// 文件名
            uploadFile.setFile(file);
            uploadFile.setFile_md5(fileMd5);
            uploadFile.setStarPos(0);// 文件开始地位
            new FileUploadClient().connect(port, "127.0.0.1", uploadFile);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

client 端 handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;

public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
    private int byteRead;
    private volatile int start = 0;
    private volatile int lastLength = 0;
    public RandomAccessFile randomAccessFile;
    private FileUploadFile fileUploadFile;

    public FileUploadClientHandler(FileUploadFile ef) {if (ef.getFile().exists()) {if (!ef.getFile().isFile()) {System.out.println("Not a file :" + ef.getFile());
                return;
            }
        }
        this.fileUploadFile = ef;
    }

    public void channelActive(ChannelHandlerContext ctx) {
        try {randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
            randomAccessFile.seek(fileUploadFile.getStarPos());
            lastLength = (int) randomAccessFile.length() / 10;
            byte[] bytes = new byte[lastLength];
            if ((byteRead = randomAccessFile.read(bytes)) != -1) {fileUploadFile.setEndPos(byteRead);
                fileUploadFile.setBytes(bytes);
                ctx.writeAndFlush(fileUploadFile);
            } else {System.out.println("文件曾经读完");
            }
        } catch (FileNotFoundException e) {e.printStackTrace();
        } catch (IOException i) {i.printStackTrace();
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof Integer) {start = (Integer) msg;
            if (start != -1) {randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
                randomAccessFile.seek(start);
                System.out.println("块儿长度:" + (randomAccessFile.length() / 10));
                System.out.println("长度:" + (randomAccessFile.length() - start));
                int a = (int) (randomAccessFile.length() - start);
                int b = (int) (randomAccessFile.length() / 10);
                if (a < b) {lastLength = a;}
                byte[] bytes = new byte[lastLength];
                System.out.println("-----------------------------" + bytes.length);
                if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {System.out.println("byte 长度:" + bytes.length);
                    fileUploadFile.setEndPos(byteRead);
                    fileUploadFile.setBytes(bytes);
                    try {ctx.writeAndFlush(fileUploadFile);
                    } catch (Exception e) {e.printStackTrace();
                    }
                } else {randomAccessFile.close();
                    ctx.close();
                    System.out.println("文件曾经读完 --------" + byteRead);
                }
            }
        }
    }

    // @Override
    // public void channelRead(ChannelHandlerContext ctx, Object msg) throws
    // Exception {// System.out.println("Server is speek:"+msg.toString());
    // FileRegion filer = (FileRegion) msg;
    // String path = "E://Apk//APKMD5.txt";
    // File fl = new File(path);
    // fl.createNewFile();
    // RandomAccessFile rdafile = new RandomAccessFile(path, "rw");
    // FileRegion f = new DefaultFileRegion(rdafile.getChannel(), 0,
    // rdafile.length());
    //
    // System.out.println("This is" + ++counter + "times receive server:["
    // + msg + "]");
    // }

    // @Override
    // public void channelReadComplete(ChannelHandlerContext ctx) throws
    // Exception {// ctx.flush();
    // }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        ctx.close();}
    // @Override
    // protected void channelRead0(ChannelHandlerContext ctx, String msg)
    // throws Exception {
    // String a = msg;
    // System.out.println("This is"+
    // ++counter+"times receive server:["+msg+"]");
    // }
}

咱们还自定义了一个对象,用于统计文件上传进度的:

import java.io.File;
import java.io.Serializable;

public class FileUploadFile implements Serializable {


    private static final long serialVersionUID = 1L;
    private File file;// 文件
    private String file_md5;// 文件名
    private int starPos;// 开始地位
    private byte[] bytes;// 文件字节数组
    private int endPos;// 结尾地位

    public int getStarPos() {return starPos;}

    public void setStarPos(int starPos) {this.starPos = starPos;}

    public int getEndPos() {return endPos;}

    public void setEndPos(int endPos) {this.endPos = endPos;}

    public byte[] getBytes() {return bytes;}

    public void setBytes(byte[] bytes) {this.bytes = bytes;}

    public File getFile() {return file;}

    public void setFile(File file) {this.file = file;}

    public String getFile_md5() {return file_md5;}

    public void setFile_md5(String file_md5) {this.file_md5 = file_md5;}
}

输入为:

 块儿长度:894
长度:8052
-----------------------------894
byte 长度:894
块儿长度:894
长度:7158
-----------------------------894
byte 长度:894
块儿长度:894
长度:6264
-----------------------------894
byte 长度:894
块儿长度:894
长度:5370
-----------------------------894
byte 长度:894
块儿长度:894
长度:4476
-----------------------------894
byte 长度:894
块儿长度:894
长度:3582
-----------------------------894
byte 长度:894
块儿长度:894
长度:2688
-----------------------------894
byte 长度:894
块儿长度:894
长度:1794
-----------------------------894
byte 长度:894
块儿长度:894
长度:900
-----------------------------894
byte 长度:894
块儿长度:894
长度:6
-----------------------------6
byte 长度:6
块儿长度:894
长度:0
-----------------------------0
文件曾经读完 --------0

Process finished with exit code 0

这样就实现了服务器端文件的上传,当然咱们也能够应用 http 的模式。

server 端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class HttpFileServer implements Runnable {
    private int port;

    public HttpFileServer(int port) {super();
        this.port = port;
    }

    @Override
    public void run() {EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup);
        serverBootstrap.channel(NioServerSocketChannel.class);
        //serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
        serverBootstrap.childHandler(new HttpChannelInitlalizer());
        try {ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }

    public static void main(String[] args) {HttpFileServer b = new HttpFileServer(9003);
        new Thread(b).start();}
}

Server 端 initializer:

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;


public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpChannelHandler());
    }

}

server 端 hadler:

import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.SystemPropertyUtil;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.regex.Pattern;

import javax.activation.MimetypesFileTypeMap;

public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
    public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
    public static final int HTTP_CACHE_SECONDS = 60;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 监测解码状况
        if (!request.getDecoderResult().isSuccess()) {sendError(ctx, BAD_REQUEST);
            return;
        }
        final String uri = request.getUri();
        final String path = sanitizeUri(uri);
        System.out.println("get file:"+path);
        if (path == null) {sendError(ctx, FORBIDDEN);
            return;
        }
        // 读取要下载的文件
        File file = new File(path);
        if (file.isHidden() || !file.exists()) {sendError(ctx, NOT_FOUND);
            return;
        }
        if (!file.isFile()) {sendError(ctx, FORBIDDEN);
            return;
        }
        RandomAccessFile raf;
        try {raf = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException ignore) {sendError(ctx, NOT_FOUND);
            return;
        }
        long fileLength = raf.length();
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders.setContentLength(response, fileLength);
        setContentTypeHeader(response, file);
        //setDateAndCacheHeaders(response, file);
        if (HttpHeaders.isKeepAlive(request)) {response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE);
        }

        // Write the initial line and the header.
        ctx.write(response);

        // Write the content.
        ChannelFuture sendFileFuture =
        ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());
        //sendFuture 用于监督发送数据的状态
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {if (total < 0) { // total unknown
                    System.err.println(future.channel() + "Transfer progress:" + progress);
                } else {System.err.println(future.channel() + "Transfer progress:" + progress + "/" + total);
                }
            }

            @Override
            public void operationComplete(ChannelProgressiveFuture future) {System.err.println(future.channel() + "Transfer complete.");
            }
        });

        // Write the end marker
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);

        // Decide whether to close the connection or not.
        if (!HttpHeaders.isKeepAlive(request)) {
            // Close the connection when the whole content is written out.
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        if (ctx.channel().isActive()) {sendError(ctx, INTERNAL_SERVER_ERROR);
        }
        ctx.close();}

    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");

    private static String sanitizeUri(String uri) {
        // Decode the path.
        try {uri = URLDecoder.decode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {throw new Error(e);
        }

        if (!uri.startsWith("/")) {return null;}

        // Convert file separators.
        uri = uri.replace('/', File.separatorChar);

        // Simplistic dumb security check.
        // You will have to do something serious in the production environment.
        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".")
                || INSECURE_URI.matcher(uri).matches()) {return null;}

        // Convert to absolute path.
        return SystemPropertyUtil.get("user.dir") + File.separator + uri;
    }


    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure:" + status + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");

        // Close the connection as soon as the error message is sent.
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * Sets the content type header for the HTTP Response
     *
     * @param response
     *            HTTP response
     * @param file
     *            file to extract content type
     */
    private static void setContentTypeHeader(HttpResponse response, File file) {MimetypesFileTypeMap m = new MimetypesFileTypeMap();
        String contentType = m.getContentType(file.getPath());
        if (!contentType.equals("application/octet-stream")) {contentType += "; charset=utf-8";}
        response.headers().set(CONTENT_TYPE, contentType);
    }

}

client 端:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.net.URI;


public class HttpDownloadClient {
    /**
     * 下载 http 资源 向服务器下载间接填写要下载的文件的相对路径
     *(↑↑↑倡议只应用字母和数字对特殊字符对字符进行局部过滤可能导致异样↑↑↑)*        向互联网下载输出残缺门路
     * @param host 目标主机 ip 或域名
     * @param port 指标主机端口
     * @param url 文件门路
     * @param local 本地存储门路
     * @throws Exception
     */
    public void connect(String host, int port, String url, final String local) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChildChannelHandler(local));

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            URI uri = new URI(url);
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());

            // 构建 http 申请
            request.headers().set(HttpHeaders.Names.HOST, host);
            request.headers().set(HttpHeaders.Names.CONNECTION,
                    HttpHeaders.Values.KEEP_ALIVE);
            request.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
                    request.content().readableBytes());
            // 发送 http 申请
            f.channel().write(request);
            f.channel().flush();
            f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();
        }

    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        String local;
        public ChildChannelHandler(String local) {this.local = local;}

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 客户端接管到的是 httpResponse 响应,所以要应用 HttpResponseDecoder 进行解码
            ch.pipeline().addLast(new HttpResponseDecoder());
            // 客户端发送的是 httprequest,所以要应用 HttpRequestEncoder 进行编码
            ch.pipeline().addLast(new HttpRequestEncoder());
            ch.pipeline().addLast(new ChunkedWriteHandler());
            ch.pipeline().addLast(new HttpDownloadHandler(local));
        }

    }
    public static void main(String[] args) throws Exception {HttpDownloadClient client = new HttpDownloadClient();
        //client.connect("127.0.0.1", 9003,"/file/pppp/1.doc","1.doc");
//        client.connect("zlysix.gree.com", 80, "http://zlysix.gree.com/HelloWeb/download/20m.apk", "20m.apk");
        client.connect("www.ghost64.com", 80, "http://www.ghost64.com/qqtupian/zixunImg/local/2017/05/27/1495855297602.jpg", "1495855297602.jpg");

    }
}

client 端 handler:

import java.io.File;
import java.io.FileOutputStream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
//import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.internal.SystemPropertyUtil;
/**
 * @Author:yangyue
 * @Description:
 * @Date: Created in 9:15 on 2017/5/28.
 */

public class HttpDownloadHandler extends ChannelInboundHandlerAdapter {
    private boolean readingChunks = false; // 分块读取开关
    private FileOutputStream fOutputStream = null;// 文件输入流
    private File localfile = null;// 下载文件的本地对象
    private String local = null;// 待下载文件名
    private int succCode;// 状态码

    public HttpDownloadHandler(String local) {this.local = local;}

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {if (msg instanceof HttpResponse) {// response 头信息
            HttpResponse response = (HttpResponse) msg;
            succCode = response.getStatus().code();
            if (succCode == 200) {setDownLoadFile();// 设置下载文件
                readingChunks = true;
            }
            // System.out.println("CONTENT_TYPE:"
            // + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
        }
        if (msg instanceof HttpContent) {// response 体信息
            HttpContent chunk = (HttpContent) msg;
            if (chunk instanceof LastHttpContent) {readingChunks = false;}

            ByteBuf buffer = chunk.content();
            byte[] dst = new byte[buffer.readableBytes()];
            if (succCode == 200) {while (buffer.isReadable()) {buffer.readBytes(dst);
                    fOutputStream.write(dst);
                    buffer.release();}
                if (null != fOutputStream) {fOutputStream.flush();
                }
            }

        }
        if (!readingChunks) {if (null != fOutputStream) {System.out.println("Download done->"+ localfile.getAbsolutePath());
                fOutputStream.flush();
                fOutputStream.close();
                localfile = null;
                fOutputStream = null;
            }
            ctx.channel().close();
        }
    }

    /**
     * 配置本地参数,筹备下载
     */
    private void setDownLoadFile() throws Exception {if (null == fOutputStream) {local = SystemPropertyUtil.get("user.dir") + File.separator +local;
            //System.out.println(local);
            localfile = new File(local);
            if (!localfile.exists()) {localfile.createNewFile();
            }
            fOutputStream = new FileOutputStream(localfile);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {System.out.println("管道异样:" + cause.getMessage());
        cause.printStackTrace();
        ctx.channel().close();
    }
}

这里客户端我放的是网络连接,下载的是一副图片,启动服务端和客户端就能够看到这个图片被下载到了工程的根目录下。

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿 (2021 最新版)

2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

正文完
 0