Java,SpringBoot,Netty,HttpClient,实现代理,请求转发功能

背景说明

开发环境:idea + SpringBoot2.3.3.RELEASE + netty4.1.60.Final + httpclient4.5

实现目标:

实现代码:

<!-- netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.60.Final</version> </dependency> <!-- protobuf-java --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.15.6</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>4.1</version> </dependency>

application.yml

gatekeeper: server: local-addr: 0.0.0.0 local-port: 80 proxy: remote-addr: www.baidu.com remote-port: 80 # 日志输出 logging: level: root: info com: gatekeeper: debug pattern: console: %clr(%d{yyyy-MM-dd} [%thread] %-5level %logger{50} - %msg%n) file: %d{yyyy-MM-dd HH:mm:ss.SSS} >>> [%thread] >>> %-5level >>> %logger{50} >>> %msg%n

配置:

package com.gatekeeper.configure; import com.gatekeeper.common.ApplicationContextHelper; import com.gatekeeper.server.GatekeeperHttpConfig; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class GatekeeperConfigure { @Bean(name = "gatekeeperHttpConfig") public GatekeeperHttpConfig gatekeeperHttpConfig() { return new GatekeeperHttpConfig(); } @Bean(name = "applicationContextHelper") public ApplicationContextHelper applicationContextHelper(ApplicationContext applicationContext) { return new ApplicationContextHelper(applicationContext); } } package com.gatekeeper.common; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import java.util.Map; public class ApplicationContextHelper implements ApplicationContextAware { private static ApplicationContext applicationContext; public ApplicationContextHelper(ApplicationContext applicationContext) { this.setApplicationContext(applicationContext); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ApplicationContextHelper.applicationContext = applicationContext; } /** * 根据Bean的名称获取bean对象 * * @param beanName * @param <T> * @return */ public static <T> T getBean(String beanName) { if (applicationContext.containsBean(beanName)) { return (T) applicationContext.getBean(beanName); } else { return null; } } public static <T> Map<String, T> getBeansOfType(Class<T> baseType) { return applicationContext.getBeansOfType(baseType); } } package com.gatekeeper.server; import lombok.Data; import org.springframework.beans.factory.annotation.Value; @Data public class GatekeeperHttpConfig { @Value("${gatekeeper.server.local-addr}") private String localAddr; @Value("${gatekeeper.server.local-port}") private int localPort; @Value("${gatekeeper.proxy.remote-addr}") private String remoteAddr; @Value("${gatekeeper.proxy.remote-port}") private int remotePort; }

服务类:

package com.gatekeeper.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.net.SocketAddress; @Component @Slf4j @Order(value = 1) public class GatekeeperHttpServer implements ApplicationRunner, DisposableBean { @Autowired private GatekeeperHttpConfig gatekeeperHttpConfig; private Thread serverThread; private EventLoopGroup bossEventGroup; private NioEventLoopGroup workerEventGroup; @Override public void run(ApplicationArguments args) throws Exception { log.debug("run", args); serverThread = new Thread(() -> { try { // 初始化==>用于Acceptor的主"线程池" this.bossEventGroup = new NioEventLoopGroup(); // 初始化==>用于I/O工作的从"线程池" this.workerEventGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); // group方法设置主从线程池 serverBootstrap.group(bossEventGroup, workerEventGroup); // 指定通道channel类型,服务端为:NioServerSocketChannel serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childHandler(new GatekeeperHttpServerInitializer()); SocketAddress serverAddr = new InetSocketAddress(gatekeeperHttpConfig.getLocalAddr(), gatekeeperHttpConfig.getLocalPort()); // 绑定并侦听端口 ChannelFuture channelFuture = serverBootstrap.bind(serverAddr).sync(); log.debug("server:" + serverAddr + " start"); // 等待服务监听端口关闭 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 优雅退出,释放"线程池" bossEventGroup.shutdownGracefully(); workerEventGroup.shutdownGracefully(); } }); serverThread.setName("gatekeeper-server-thread"); serverThread.setDaemon(true); serverThread.start(); } @Override public void destroy() throws Exception { if (bossEventGroup != null) { bossEventGroup.shutdownGracefully(); } if (workerEventGroup != null) { workerEventGroup.shutdownGracefully(); } } }package com.gatekeeper.server; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.stream.ChunkedWriteHandler; import java.util.Map; /** * netty 实现简单的 http 协议:配置 解码器、handler */ public class GatekeeperHttpServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // http编解码 pipeline.addLast("http-decoder", new HttpRequestDecoder()); // http消息聚合器 pipeline.addLast("http-aggregator", new HttpObjectAggregator(65535)); pipeline.addLast("http-encoder", new HttpResponseEncoder()); pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // 请求处理器 pipeline.addLast("MyNettyHttpServerHandler", new GatekeeperHttpRequestHandler()); // 打印 Map<String, ChannelHandler> handlerMap = pipeline.toMap(); for (String key : handlerMap.keySet()) { // System.out.println(key + "=" + handlerMap.get(key)); } } }package com.gatekeeper.server; import com.gatekeeper.common.ApplicationContextHelper; import com.gatekeeper.exchange.GatekeeperExchanger; import com.gatekeeper.exchange.HttpExchange; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import lombok.extern.slf4j.Slf4j; import java.net.SocketAddress; @Slf4j public class GatekeeperHttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private GatekeeperHttpConfig catekeeperHttpConfig; @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) { log.debug("channelRead0() fullHttpRequest-->" + fullHttpRequest); SocketAddress clientSocketAddress = channelHandlerContext.channel().remoteAddress(); HttpResponseStatus responseStatus = HttpResponseStatus.OK; FullHttpResponse fullHttpResponse = null; if (fullHttpRequest.method() == HttpMethod.GET) { HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest); httpExchange.setClientSocketAddress(clientSocketAddress); fullHttpResponse = GatekeeperExchanger.exchange(httpExchange); } else if (fullHttpRequest.method() == HttpMethod.POST) { HttpExchange httpExchange = new HttpExchange(getCatekeeperHttpConfig(), channelHandlerContext, fullHttpRequest); httpExchange.setClientSocketAddress(clientSocketAddress); fullHttpResponse = GatekeeperExchanger.exchange(httpExchange); } else { responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR; fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, responseStatus); } if (fullHttpResponse != null) { log.debug("channelRead0() fullHttpResponse-->" + fullHttpResponse); channelHandlerContext.writeAndFlush(fullHttpResponse).addListener(ChannelFutureListener.CLOSE); } } /** * @return */ public GatekeeperHttpConfig getCatekeeperHttpConfig() { if (catekeeperHttpConfig == null) { synchronized (this) { catekeeperHttpConfig = ApplicationContextHelper.getBean("gatekeeperHttpConfig"); } } return catekeeperHttpConfig; } }

信息交换:

package com.gatekeeper.exchange; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpMethod; import org.springframework.util.StringUtils; public final class GatekeeperExchanger { /** * @param httpExchange * @return */ public static FullHttpResponse exchange(HttpExchange httpExchange) { HttpMethod httpMethod = httpExchange.getFullHttpRequest().method(); String strContentType = getRequestContentType(httpExchange); if (HttpMethod.GET == httpMethod) { GetRemoteExecutor.INSTANCE.execute(httpExchange); } else if (HttpMethod.POST == httpMethod) { if (isRequestBodyStringStream(strContentType, httpExchange)) { PostStringStreamExecutor.INSTANCE.execute(httpExchange); } else if (isRequestBodyFormData(strContentType, httpExchange)) { PostByteStreamExecutor.INSTANCE.execute(httpExchange); } else { PostStringStreamExecutor.INSTANCE.execute(httpExchange); } } return httpExchange.getFullHttpResponse(); } /** * @param strContentType * @param httpExchange * @return */ private static boolean isRequestBodyFormData(String strContentType, HttpExchange httpExchange) { if ("multipart/form-data".equalsIgnoreCase(strContentType)) { // 上传 return true; } return false; } /** * @param strContentType * @param httpExchange * @return */ private static boolean isRequestBodyStringStream(String strContentType, HttpExchange httpExchange) { if ("application/x-www-form-urlencoded".equalsIgnoreCase(strContentType)) { // name1=value1&name2=value2 return true; } else if ("application/json".equalsIgnoreCase(strContentType)) { // {name1:value1,name2:value2} return true; } else if ("text/xml".equalsIgnoreCase(strContentType)) { // <xml><xml> return true; } else if ("application/xml".equalsIgnoreCase(strContentType)) { // <xml><xml> return true; } return false; } /** * @param httpExchange * @return */ private static String getRequestContentType(HttpExchange httpExchange) { String strContentType = httpExchange.getHeaderValueFromRequest("Content-type"); if (!StringUtils.isEmpty(strContentType)) { strContentType = strContentType.trim(); } return strContentType; } } package com.gatekeeper.exchange; import com.gatekeeper.server.GatekeeperHttpConfig; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.multipart.*; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.net.SocketAddress; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @Data @Slf4j public final class HttpExchange { private GatekeeperHttpConfig catekeeperHttpConfig; private ChannelHandlerContext channelHandlerContext; private FullHttpRequest fullHttpRequest; private FullHttpResponse fullHttpResponse; private SocketAddress clientSocketAddress; public HttpExchange(GatekeeperHttpConfig catekeeperHttpConfig, ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) { this.catekeeperHttpConfig = catekeeperHttpConfig; this.channelHandlerContext = channelHandlerContext; this.fullHttpRequest = fullHttpRequest; } /** * @param clientSocketAddress */ public void setClientSocketAddress(SocketAddress clientSocketAddress) { this.clientSocketAddress = clientSocketAddress; } /** * @return */ public SocketAddress getClientSocketAddress() { return this.clientSocketAddress; } /** * @return */ public String getClientAddr() { return this.clientSocketAddress.toString(); } /** * 远程执行地址 * * @return */ public String getRemoteUrl() { String url = "http://" + catekeeperHttpConfig.getRemoteAddr() + ":" + catekeeperHttpConfig.getRemotePort(); url = url + fullHttpRequest.uri(); log.debug("remote url " + url); return url; } /** * 从URI中获取请求参数 * * @return */ public Map<String, List<String>> getUriParamMapFromRequest() { Map<String, List<String>> params = new HashMap<>(); QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri()); Map<String, List<String>> paramList = decoder.parameters(); for (Map.Entry<String, List<String>> entry : paramList.entrySet()) { params.put(entry.getKey(), entry.getValue()); } return params; } /** * 从请求中获取头信息 * * @param name * @return */ public String getHeaderValueFromRequest(String name) { HttpHeaders httpHeaders = fullHttpRequest.headers(); if (!httpHeaders.isEmpty()) { return httpHeaders.get(name); } return null; } /** * 从请求中获取头信息 * * @return */ public Map<String, String> getHeaderMapFromRequest() { Map<String, String> headerMap = new HashMap<>(); HttpHeaders httpHeaders = fullHttpRequest.headers(); if (!httpHeaders.isEmpty()) { Iterator<Map.Entry<String, String>> iterator = httpHeaders.iteratorAsString(); while (iterator.hasNext()) { Map.Entry<String, String> entry = iterator.next(); headerMap.put(entry.getKey(), entry.getValue()); } } return headerMap; } /** * @return */ public byte[] getBodyValueFromRequest() { ByteBuf content = fullHttpRequest.content(); byte[] bodyBytes = new byte[content.readableBytes()]; content.readBytes(bodyBytes, 0, content.capacity()); return bodyBytes; } /** * @return */ private Map<String, String> getFormDataFromRequest() { Map<String, String> paramsMap = new HashMap<String, String>(); HttpDataFactory httpDataFactory = new DefaultHttpDataFactory(false); HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(httpDataFactory, fullHttpRequest); List<InterfaceHttpData> postData = decoder.getBodyHttpDatas(); for (InterfaceHttpData data : postData) { if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { MemoryAttribute attribute = (MemoryAttribute) data; paramsMap.put(attribute.getName(), attribute.getValue()); } } return paramsMap; } }package com.gatekeeper.exchange; public interface IRemoteExecutor { public void execute(HttpExchange httpExchange); } package com.gatekeeper.exchange; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.*; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import java.io.IOException; import java.util.Map; import java.util.Set; public abstract class AbstractRemoteExecutor implements IRemoteExecutor { public static final String CLIENT_REAL_IP = "client-real-ip"; /** * @param url * @param headerMap * @return */ protected FullHttpResponse executeByGet(String url, Map<String, String> headerMap) { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); CloseableHttpResponse response = null; try { HttpGet httpGet = new HttpGet(url); if (headerMap != null) { Set<String> keySet = headerMap.keySet(); for (String key : keySet) { httpGet.addHeader(key, headerMap.get(key)); } } response = httpClient.execute(httpGet); return buildFullHttpResponse(response); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * @param url * @param headerMap * @param bodyBytes * @return */ protected FullHttpResponse executeByPost(String url, Map<String, String> headerMap, byte[] bodyBytes) { CloseableHttpClient httpClient = HttpClientBuilder.create().build(); CloseableHttpResponse response = null; try { HttpPost httpPost = new HttpPost(url); if (headerMap != null) { Set<String> keySet = headerMap.keySet(); for (String key : keySet) { if (key.equalsIgnoreCase("Content-Length")) { continue; } httpPost.addHeader(key, headerMap.get(key)); } } httpPost.setEntity(new ByteArrayEntity(bodyBytes)); response = httpClient.execute(httpPost); return buildFullHttpResponse(response); } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return null; } /** * @param httpResponse * @return * @throws IOException */ private FullHttpResponse buildFullHttpResponse(CloseableHttpResponse httpResponse) throws IOException { HttpEntity responseEntity = httpResponse.getEntity(); // 响应状态 StatusLine statusLine = httpResponse.getStatusLine(); int statusCode = statusLine.getStatusCode(); // 响应类型及长度 Header contentTypeHeader = responseEntity.getContentType(); //String contentTypeName = contentTypeHeader.getName(); //String contentTypeValue = contentTypeHeader.getValue(); //long contentLength = responseEntity.getContentLength(); // 响应头 Header[] responseHeader = httpResponse.getAllHeaders(); HttpHeaders httpHeaders = new DefaultHttpHeaders(); if (responseHeader != null) { for (int i = 0; i < responseHeader.length; i++) { Header header = responseHeader[i]; httpHeaders.add(header.getName(), header.getValue()); } } // 响应内容 byte[] responseBytes = EntityUtils.toByteArray(responseEntity); // =====================================================================// // ===== 构造返回FullHttpResponse // =====================================================================// HttpVersion version = HttpVersion.HTTP_1_1; HttpResponseStatus responseStatus = HttpResponseStatus.valueOf(statusCode); ByteBuf content = Unpooled.copiedBuffer(responseBytes); HttpHeaders headers = httpHeaders; HttpHeaders trailingHeaders = new DefaultHttpHeaders(); return new DefaultFullHttpResponse(version, responseStatus, content, headers, trailingHeaders); } }package com.gatekeeper.exchange; import io.netty.handler.codec.http.FullHttpResponse; import java.util.Map; public class GetRemoteExecutor extends AbstractRemoteExecutor { public static final IRemoteExecutor INSTANCE = new GetRemoteExecutor(); @Override public void execute(HttpExchange httpExchange) { String url = httpExchange.getRemoteUrl(); Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest(); headerMap.put(CLIENT_REAL_IP,httpExchange.getClientAddr()); FullHttpResponse fullHttpResponse = super.executeByGet(url, headerMap); httpExchange.setFullHttpResponse(fullHttpResponse); } } package com.gatekeeper.exchange; import io.netty.handler.codec.http.FullHttpResponse; import java.util.Map; public class PostByteStreamExecutor extends AbstractRemoteExecutor { public static final IRemoteExecutor INSTANCE = new PostByteStreamExecutor(); @Override public void execute(HttpExchange httpExchange) { String url = httpExchange.getRemoteUrl(); Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest(); headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr()); byte[] bodyBytes = httpExchange.getBodyValueFromRequest(); FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes); httpExchange.setFullHttpResponse(fullHttpResponse); } }package com.gatekeeper.exchange; import io.netty.handler.codec.http.FullHttpResponse; import java.util.Map; public class PostStringStreamExecutor extends AbstractRemoteExecutor { public static final IRemoteExecutor INSTANCE = new PostStringStreamExecutor(); @Override public void execute(HttpExchange httpExchange) { String url = httpExchange.getRemoteUrl(); Map<String, String> headerMap = httpExchange.getHeaderMapFromRequest(); headerMap.put(CLIENT_REAL_IP, httpExchange.getClientAddr()); byte[] bodyBytes = httpExchange.getBodyValueFromRequest(); FullHttpResponse fullHttpResponse = super.executeByPost(url, headerMap, bodyBytes); httpExchange.setFullHttpResponse(fullHttpResponse); } }