Netty学习笔记,一:接收nodejs模拟表单上传的文件

好久不写博客了,也好久不写代码了,这两天临时遇上一个事情,觉得不难,加上觉得手有些生,就动手做了一下,结果遇上了不少坑,有新坑,有老坑,痛苦无比,现在总算差不多了,赶紧记录下来,希望以后不再重复这种痛苦。

事情很简单,用nodejs模拟表单提交,上传文件到netty服务器。

1、netty的参考资料很多,目前有netty3,netty4两个版本,netty5出到alpha 2版本,不知道怎么的,就不更新了,官网也注明不支持了,所以我采用的是netty4.1.19版,目前最新的。

参考的资料大致如下

1)http://netty.io/wiki/index.html,官方的文档,都写的很经典,值得学习,里面的例子snoop对我帮助很大

2)https://www.programcreek.com/,一个示例代码的网站。

netty的代码基本都是照抄第二个网站的内容,具体地址是https://www.programcreek.com/java-api-examples/index.php?source_dir=netty4.0.27Learn-master/example/src/main/java/io/netty/example/http/upload/HttpUploadServer.java。共有三个文件,HttpUploadServer.java,HttpUploadServerHandler.java,HttpUploadServerInitializer.java

2、nodejs本身比较简单,但也花了不少时间研究。上传文件可选的组件也很多,有form-data,request甚至官方的API,使用起来都不复杂,本来选择的是form-data,但是用起来也遇到了不少问题,最终使用的还是request,request使用起来非常简单,我主要参考了如下内容。

1)http://www.open-open.com/lib/view/open1435301679966.html 中文的,介绍的比较详细。

2)https://github.com/request/request 这是官方网站,内容最全,最权威。

3、详细环境

1)Windows 10专业版

2)Spring Tool Suite 3.9.1,其实用eclipse也可以

3)Netty 4.1.19

4)Nodejs 8.9.3

4、目标

1)Netty程序

a)同时支持post、get方法。

b)将cookie、get参数和post参数保存到map里,如果是文件上传,则将其保存到临时目录,返回web地址,供客户访问。

2)nodejs

a)同时支持get、post方法。

b)可以设置cookie,因为上传文件肯定是需要登录的,sessionID一般是保存在cookie里面。

5、预期思路

1)先解决netty的服务端问题,客户端先用浏览器测试。

2)再解决nodejs的问题。

6、解决过程和踩的坑

1)Netty

a)Netty编程本身不难,但是相对来说要底层一些,如果经常做web开发的人,可能容易困惑,但熟悉一下就好了。

一般来说,netty服务端程序分为三个程序,如下

Server:启动线程,保定端口。

Initializer:初始化流处理器,即将接收到的字节流先进行编码,形成对象,供后续解码器处理,我们需要关注的东西不多,在这个程序里,我们拿到手的已经是解析好的http对象了,只要按照我们的思路处理就可以了。

Handler:是我们自己的逻辑,在这个例子里就是解析对象,形成map,将文件保存到磁盘上而已。

b)首先是pom文件,如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>

        <groupId>io-netty</groupId>
        <artifactId>io-netty-example</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>

        <name>io-netty-example</name>
        <url>http://maven.apache.org</url>

        <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>

        <dependencies>
                <!-- https://mvnrepository.com/artifact/io.netty/netty-codec-http -->
                <dependency>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-all</artifactId>
                        <version>4.1.19.Final</version>
                </dependency>
                <dependency>
                        <groupId>com.alibaba</groupId>
                        <artifactId>fastjson</artifactId>
                        <version>1.2.44</version>
                </dependency>
                <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>3.8.1</version>
                        <scope>test</scope>
                </dependency>
        </dependencies>
</project>

  

c)Server非常简单,代码也不多,如下

package io.netty.example.http.upload; 
 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.logging.LogLevel; 
import io.netty.handler.logging.LoggingHandler; 
import io.netty.handler.ssl.SslContext; 
import io.netty.handler.ssl.util.SelfSignedCertificate; 
 
/**
 * A HTTP server showing how to use the HTTP multipart package for file uploads and decoding post data. 
 */ 
public final class HttpUploadServer { 
 
    static final boolean SSL = System.getProperty("ssl") != null; 
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8090")); 
 
    public static void main(String[] args) throws Exception { 
        // Configure SSL. 
        final SslContext sslCtx; 
        if (SSL) { 
            SelfSignedCertificate ssc = new SelfSignedCertificate(); 
            sslCtx = SslContext.newServerContext(ssc.certificate(), ssc.privateKey()); 
        } else { 
            sslCtx = null; 
        } 
 
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
        EventLoopGroup workerGroup = new NioEventLoopGroup(); 
        try { 
            ServerBootstrap b = new ServerBootstrap(); 
            b.group(bossGroup, workerGroup); 
            b.channel(NioServerSocketChannel.class); 
            b.handler(new LoggingHandler(LogLevel.INFO)); 
            b.childHandler(new HttpUploadServerInitializer(sslCtx)); //调用Initializer
 
            Channel ch = b.bind(PORT).sync().channel(); 
 
            System.err.println("Open your web browser and navigate to " + 
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); 
 
            ch.closeFuture().sync(); 
        } finally { 
            bossGroup.shutdownGracefully(); 
            workerGroup.shutdownGracefully(); 
        } 
    } 
}

d)Initializer代码,需要注意的是流处理器,

/*
 * Copyright 2012 The Netty Project 
 * 
 * The Netty Project licenses this file to you under the Apache License, 
 * version 2.0 (the "License"); you may not use this file except in compliance 
 * with the License. You may obtain a copy of the License at: 
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
 * License for the specific language governing permissions and limitations 
 * under the License. 
 */
package io.netty.example.http.upload;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;

public class HttpUploadServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public HttpUploadServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();

        if (sslCtx != null) {
            pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }

        pipeline.addLast(new HttpRequestDecoder()); //处理Request
        // Uncomment the following line if you don't want to handle HttpChunks.
        //pipeline.addLast(new HttpObjectAggregator(1048576)); //将对象组装为FullHttpRequest
        pipeline.addLast(new HttpResponseEncoder());  //处理Response

        // Remove the following line if you don't want automatic content compression.
        pipeline.addLast(new HttpContentCompressor());  //压缩

        pipeline.addLast(new HttpUploadServerHandler());
    }
}

这里需要注意一点,采用HttpRequestDecoder处理器,会将一个Request对象解析成三个对象HttpRequest、HttpCotent、LastHttpContent,这三个对象大致是这样的,HttpRequest是地址信息和头部信息,其中包括get方式传送的参数和cookie信息;HttpContent是消息体,即Body部分,即post方式form提交的内容;LastHttpContent则是消息体的末尾,即提示消息体结束,也就是整个请求结束。

但是需要注意的是,使用HttpObjectAggregator处理器,可以将Request对象处理为FullRequest,但我测试了一下,不知道为什么,竟然卡死了,所以只好用这种笨办法,以后研究一下,这次先这样吧。

e)Handler的代码有些长,不过还是贴出来吧。

/*
 * Copyright 2012 The Netty Project 
 * 
 * The Netty Project licenses this file to you under the Apache License, 
 * version 2.0 (the "License"); you may not use this file except in compliance 
 * with the License. You may obtain a copy of the License at: 
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, software 
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
 * License for the specific language governing permissions and limitations 
 * under the License. 
 */
package io.netty.example.http.upload;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.ServerCookieEncoder;
import io.netty.handler.codec.http.multipart.Attribute;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.DiskAttribute;
import io.netty.handler.codec.http.multipart.DiskFileUpload;
import io.netty.handler.codec.http.multipart.FileUpload;
import io.netty.handler.codec.http.multipart.HttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;
import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder.ErrorDataDecoderException;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;
import io.netty.util.CharsetUtil;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.alibaba.fastjson.JSON;

import static io.netty.buffer.Unpooled.*;
import static io.netty.handler.codec.http.HttpHeaders.Names.*;

public class HttpUploadServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    private static final Logger logger = Logger.getLogger(HttpUploadServerHandler.class.getName());

    private HttpRequest request;

    private boolean readingChunks;

    private final StringBuilder responseContent = new StringBuilder();

    private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // Disk
                                                                                                                // if
                                                                                                                // size
                                                                                                                // exceed

    private HttpPostRequestDecoder decoder;

    //////
    private String tempPath = "d:/upload/";  //文件保存目录

    private String url_path = "http://localhost/upload/";   //文件临时web目录
    private String errorJson;
    private Map<String, Object> mparams = new HashMap<>();  //将参数保存到map里面

    static {
        DiskFileUpload.deleteOnExitTemporaryFile = true; // should delete file
                                                            // on exit (in normal
                                                            // exit)
        DiskFileUpload.baseDirectory = null; // system temp directory
        DiskAttribute.deleteOnExitTemporaryFile = true; // should delete file on
                                                        // exit (in normal exit)
        DiskAttribute.baseDirectory = null; // system temp directory
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (decoder != null) {
            decoder.cleanFiles();
        }
    }

//处理输入对象,会执行三次,分别是HttpRequest、HttpContent、LastHttpContent @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { HttpRequest request = this.request = (HttpRequest) msg; URI uri = new URI(request.getUri()); if (!uri.getPath().equals("/formpostmultipart")) { errorJson = "{code:-1}"; writeError(ctx, errorJson); return; } // new getMethod // for (Entry<String, String> entry : request.headers()) { // responseContent.append("HEADER: " + entry.getKey() + '=' + entry.getValue() + // "\r\n"); // } // new getMethod Set<Cookie> cookies; String value = request.headers().get(COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = CookieDecoder.decode(value); } for (Cookie cookie : cookies) { mparams.put(cookie.getName(), cookie.getValue()); } // add System.out.println(JSON.toJSONString(mparams)); QueryStringDecoder decoderQuery = new QueryStringDecoder(request.getUri()); Map<String, List<String>> uriAttributes = decoderQuery.parameters(); // add mparams.putAll(uriAttributes); System.out.println(JSON.toJSONString(mparams)); // for (Entry<String, List<String>> attr: uriAttributes.entrySet()) { // for (String attrVal: attr.getValue()) { // responseContent.append("URI: " + attr.getKey() + '=' + attrVal + "\r\n"); // } // } // responseContent.append("\r\n\r\n"); if (request.getMethod().equals(HttpMethod.GET)) { // GET Method: should not try to create a HttpPostRequestDecoder // So stop here // responseContent.append("\r\n\r\nEND OF GET CONTENT\r\n"); // Not now: LastHttpContent will be sent writeResponse(ctx.channel()); return; } try { decoder = new HttpPostRequestDecoder(factory, request); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); errorJson = "{code:-2}"; writeError(ctx, errorJson); return; } readingChunks = HttpHeaders.isTransferEncodingChunked(request); // responseContent.append("Is Chunked: " + readingChunks + "\r\n"); // responseContent.append("IsMultipart: " + decoder.isMultipart() + "\r\n"); if (readingChunks) { // Chunk version // responseContent.append("Chunks: "); readingChunks = true; } } // check if the decoder was constructed before // if not it handles the form get if (decoder != null) { if (msg instanceof HttpContent) { // New chunk is received HttpContent chunk = (HttpContent) msg; try { decoder.offer(chunk); } catch (ErrorDataDecoderException e1) { e1.printStackTrace(); // responseContent.append(e1.getMessage()); writeResponse(ctx.channel()); ctx.channel().close(); errorJson = "{code:-3}"; writeError(ctx, errorJson); return; } // responseContent.append('o'); // example of reading chunk by chunk (minimize memory usage due to // Factory) readHttpDataChunkByChunk(ctx); // example of reading only if at the end if (chunk instanceof LastHttpContent) { writeResponse(ctx.channel()); readingChunks = false; reset(); } } } else { writeResponse(ctx.channel()); } } private void reset() { request = null; // destroy the decoder to release all resources decoder.destroy(); decoder = null; } /** * Example of reading request by chunk and getting values from chunk to chunk * * @throws IOException */
//处理post数据 private void readHttpDataChunkByChunk(ChannelHandlerContext ctx) throws IOException { try { while (decoder.hasNext()) { InterfaceHttpData data = decoder.next(); if (data != null) { try { // new value writeHttpData(ctx, data); } finally { data.release(); } } } } catch (EndOfDataDecoderException e1) { // end // responseContent.append("\r\n\r\nEND OF CONTENT CHUNK BY CHUNK\r\n\r\n"); mparams.put("code", "-2"); } } //解析post属性,保存文件,写入map private void writeHttpData(ChannelHandlerContext ctx, InterfaceHttpData data) throws IOException { if (data.getHttpDataType() == HttpDataType.Attribute) { Attribute attribute = (Attribute) data; String value; try { value = attribute.getValue(); } catch (IOException e1) { // Error while reading data from File, only print name and error e1.printStackTrace(); // responseContent.append("\r\nBODY Attribute: " + // attribute.getHttpDataType().name() + ": " // + attribute.getName() + " Error while reading value: " + e1.getMessage() + // "\r\n"); errorJson = "{code:-4}"; writeError(ctx, errorJson); return; } mparams.put(attribute.getName(), attribute.getValue()); System.out.println(JSON.toJSONString(mparams)); } else { if (data.getHttpDataType() == HttpDataType.FileUpload) { FileUpload fileUpload = (FileUpload) data; if (fileUpload.isCompleted()) { System.out.println(fileUpload.length()); if (fileUpload.length() > 0) { String orign_name = fileUpload.getFilename(); String file_name = UUID.randomUUID() + "." + orign_name.substring(orign_name.lastIndexOf(".") + 1); fileUpload.renameTo(new File(tempPath + file_name)); mparams.put(data.getName(), url_path + file_name); System.out.println(JSON.toJSONString(mparams)); } } else { errorJson = "{code:-5}"; writeError(ctx, errorJson); } } } } //写入response,返回给客户 private void writeResponse(Channel channel) { // Convert the response content to a ChannelBuffer. ByteBuf buf = copiedBuffer(JSON.toJSONString(mparams), CharsetUtil.UTF_8); responseContent.setLength(0); // Decide whether to close the connection or not. boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.headers().get(CONNECTION)) || request.getProtocolVersion().equals(HttpVersion.HTTP_1_0) && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.headers().get(CONNECTION)); // Build the response object. FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); if (!close) { // There's no need to add 'Content-Length' header // if this is the last response. response.headers().set(CONTENT_LENGTH, buf.readableBytes()); } Set<Cookie> cookies; String value = request.headers().get(COOKIE); if (value == null) { cookies = Collections.emptySet(); } else { cookies = CookieDecoder.decode(value); } if (!cookies.isEmpty()) { // Reset the cookies if necessary. for (Cookie cookie : cookies) { response.headers().add(SET_COOKIE, ServerCookieEncoder.encode(cookie)); } } // Write the response. ChannelFuture future = channel.writeAndFlush(response); // Close the connection after the write operation is done if necessary. if (close) { future.addListener(ChannelFutureListener.CLOSE); } } //返回错误信息,也是写入response private void writeError(ChannelHandlerContext ctx, String errorJson) { ByteBuf buf = copiedBuffer(errorJson, CharsetUtil.UTF_8); // Build the response object. FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf); response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8"); response.headers().set(CONTENT_LENGTH, buf.readableBytes()); // Write the response. ctx.channel().writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, responseContent.toString(), cause); ctx.channel().close(); } }

虽然代码很多,但是最需要注意的只有四个方法:

channelRead0(ChannelHandlerContext ctx, HttpObject msg):处理输入内容,会执行三次,分别是HttpRequest、HttpContent、LastHttpContent,依次处理。
readHttpDataChunkByChunk(ChannelHandlerContext ctx):解析HttpContent时调用,即消息体时,具体执行过程在函数writeHttpData中   
writeResponse(Channel channel):写入response,这里调用了fastjson将map转换为json字符串。

f)上传的html文件

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>HTML5的标题</title>
</head>
<body>
    <form  action="http://127.0.0.1:8090/formpostmultipart?a=张三&b=李四"  method="post"  enctype="multipart/form-data">
       <input type="text" name="name" value="shiyq"/>
       <br>
       <input type="text" name="name" value="历史地理"/>
       <br>
       <input type=file  name="file"/>
       <br>
       <input type="submit" value="上传"/>
    </form>
</body>
</html>

g)启动HttpUploadServer,然后在浏览器里访问upload.html,返回结果如下

{
    "name": "历史地理",
    "a": [
        "张三"
    ],
    "b": [
        "李四"
    ],
    "file": "http://localhost/upload/13d45df8-d6c7-4a7a-8f21-0251efeca240.png"
}

  

这里要注意的是,地址栏传递的参数是个数组,即参数名可以重复,form里面的值不可以,只能是一个。

2)NodeJS

nodejs相对要简单一些,但是也更让人困惑,主要遇到了两个问题。

a)请求地址包含中文的情况,这个其实是个老问题,很容易解决,但是却卡住了半天,看来很久不写程序就是不行啊。最后的解决办法就是进行url编码。

b)cookie设置的问题,form-data模块没有说明cookie设置的问题,官方API的request也言之不详,幸好request写的比较明白,但是默认还不开启,需要设置,还是老话,三天不写程序手就生了。

c)环境非常简单,只需要安装request模块就可以了,命令为npm install request,尽量不要装在全局,在我的Windows 10上出现找不到模块的现象,最后安装到当前目录才解决,最后的代码如下

var fs = require('fs');
var request = require('request').defaults({jar:true});  //不要忘记npm install request,不要忘记设置jar:true,否则无法设置cookied

var file_path="D:/Documents/IMG_20170427_121431.jpg"
var formData = {
    name:"路送双",
    code:"tom",
    my_file:fs.createReadStream(file_path)
}

var url = encodeURI("http://localhost:8090/formpostmultipart?a=王二&a=张三&b=李四");//对中文编码
var j = request.jar();
var cookie = request.cookie('key1=value1');
var cookie1 = request.cookie('key2=value2');
j.setCookie(cookie, url);
j.setCookie(cookie1, url);

request.post({url:url, jar:j, formData: formData}, function optionalCallback(err, httpResponse, body) {
  if (err) {
    return console.error('upload failed:', err);
  }
  console.log( body);
});

需要注意cookie的设置,不仅需要设置jar属性为true,还需要调用多次setCookie,还需要在request.post中指定参数,挺麻烦的。

d)返回结果如下

{
    "key1": "value1",
    "key2": "value2",
    "a": [
        "王二",
        "张三"
    ],
    "b": [
        "李四"
    ],
    "code": "tom",
    "my_file": "http://localhost/upload/8d8e2f9f-7513-4844-9614-0d7fb7a33a6e.jpg",
    "name": "路送双"
}

7、结论

其实是个很简单的问题,不过研究过程有些长,而且很笨拙,如果用FullHttpRequest,代码会少很多,以后再研究吧。