1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
| // HTTP 服务器实现
public class HttpServer {
private final int port;
public HttpServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new HttpServerInitializer());
ChannelFuture f = b.bind(port).sync();
System.out.println("HTTP Server started on http://localhost:" + port);
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = args.length > 0 ? Integer.parseInt(args[0]) : 8080;
new HttpServer(port).start();
}
}
// HTTP 服务器初始化器
class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP 编解码器
pipeline.addLast(new HttpServerCodec());
// HTTP 对象聚合器
pipeline.addLast(new HttpObjectAggregator(65536));
// HTTP 内容压缩
pipeline.addLast(new HttpContentCompressor());
// 自定义业务处理器
pipeline.addLast(new HttpServerHandler());
}
}
// HTTP 服务器处理器
class HttpServerHandler extends ChannelInboundHandlerAdapter {
private static final Map<String, String> routes = new HashMap<>();
private static final AtomicLong requestCount = new AtomicLong(0);
static {
// 配置路由
routes.put("/", "Welcome to Netty HTTP Server!");
routes.put("/hello", "Hello, World!");
routes.put("/time", "Current time: " + new Date());
routes.put("/stats", "Request count: ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
long count = requestCount.incrementAndGet();
String uri = request.uri();
HttpMethod method = request.method();
System.out.println("Request #" + count + ": " + method + " " + uri);
// 处理请求
handleHttpRequest(ctx, request, count);
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request, long count) {
String uri = request.uri();
String responseContent;
HttpResponseStatus status = HttpResponseStatus.OK;
// 路由处理
if (routes.containsKey(uri)) {
responseContent = routes.get(uri);
if (uri.equals("/stats")) {
responseContent += count;
}
} else if (uri.equals("/api/echo")) {
// API 接口:回显请求体
ByteBuf content = request.content();
responseContent = "Echo: " + content.toString(CharsetUtil.UTF_8);
} else if (uri.startsWith("/api/user/")) {
// RESTful API 示例
String userId = uri.substring("/api/user/".length());
responseContent = "{\"userId\": \"" + userId + "\", \"name\": \"User " + userId + "\"}";
} else {
// 404 处理
responseContent = "404 Not Found: " + uri;
status = HttpResponseStatus.NOT_FOUND;
}
// 构建 HTTP 响应
sendHttpResponse(ctx, request, responseContent, status);
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
String content, HttpResponseStatus status) {
ByteBuf responseContent = Unpooled.copiedBuffer(content, CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, status, responseContent);
// 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, responseContent.readableBytes());
response.headers().set(HttpHeaderNames.SERVER, "Netty/4.1");
response.headers().set(HttpHeaderNames.DATE, new Date());
// 处理 Keep-Alive
boolean keepAlive = HttpUtil.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// 发送响应
ChannelFuture future = ctx.writeAndFlush(response);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("HTTP Server exception: " + cause.getMessage());
ctx.close();
}
}
|