Spring Cloud Gateway 获取请求体(Request Body)的多种方法

网友投稿 363 2023-02-10

Spring Cloud Gateway 获取请求体(Request Body)的多种方法

一、直接在全局拦截器中获取,伪代码如下

private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){

Flux body = serverHttpRequest.getBody();

AtomicReference bodyRef = new AtomicReference<>();

body.subscribe(buffer -> {

CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());

DataBufferUtils.release(buffer);

bodyRef.set(charBuffer.toString());

});

return bodyRef.get();

}

存在的缺陷:其他拦截器无法再通过该方式获取请求体(因为请求体已被消费),并且会抛出异常

Only one connection receive subscriber allowed.Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.

异常原因:实际上spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次。因此就出现了上面的错误。

再者受版本限制

这种方法在spring-boot-starter-parent 2.0.6.RELEASE + Spring Cloud Finchley.SR2 body 中生效,

但是在spring-boot-starter-parent 2.1.0.RELEASE + Spring Cloud Greenwich.M3 body 中不生效,总是为空

二、先在全局过滤器中获取,然后再把request重新包装,继续向下传递传递

@Override

public GatewayFilter apply(NameValueConfig nameValueConfig) {

return (exchange, chain) -> {

URI uri = exchange.getRequest().getURI();

URI ex = UriComponentsBuilder.fromUri(uri).build(true).toUri();

ServerHttpRequest request = exchange.getRequest().mutate().uri(ex).build();

if("POST".equalsIgnoreCase(request.getMethodValue())){//判断是否为POST请求

Flux body = request.getBody();

AtomicReference bodyRef = new AtomicReference<>();

body.subscribe(dataBuffer -> {

CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer());

DataBufferUtils.release(dataBuffer);

bodyRef.set(charBuffer.toString());

});//读取request body到缓存

String bodyStr = bodyRef.get();//获取request body

System.out.println(bodyStr);//这里是我们需要做的操作

DataBuffer bodyDataBuffer = stringBuffer(bodyStr);

Flux bodyFlux = Flhttp://ux.just(bodyDataBuffer);

request = new ServerHttpRequestDecorator(request){

@Override

public Flux getBody() {

return bodyFlux;

}

};//封装我们的request

}

return chain.filter(exchange.mutate().request(request).build());

};

}

 protected DataBuffer stringBuffer(String value) {

byte[] bytes = value.getBytes(StandardCharsets.UTF_8);

NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);

DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);

buffer.write(bytes);

return buffer;

}

该方案的缺陷:request body获取不完整(因为异步原因),只能获取1024B的数据。并且请求体超过1024B,会出现响应超慢(因为我是开启了熔断)。

三、过滤器加路线定位器

翻查源码发现ReadBodyPredicateFactory里面缓存了request body的信息,于是在自定义router中配置了ReadBodyPredicateFactory,然后在filter中通过cachedRequestBodyObject缓存字段获取request body信息。

/**

* @description: 获取POST请求的请求体

* ReadBodyPredicateFactory 发现里面缓存了request body的信息,

* 于是在自定义router中配置了ReadBodyPredicateFactory

* @modified:

*/

@EnableAutoConfiguration

@Configuration

public class RouteLocatorRequestBoby{

   //自定义过滤器

@Resource

private ReqTrxSooutlaceFilter reqTraceFilter;

  

@Resource

private RibbonLoadBalancerClient ribbonLoadBalancerClient;

private static final String SERVICE = "/leap/**";

private static final String HTTP_PREFIX = "http://";

private static final String COLON = ":";

@Bean

public RouteLocator myRoutes(RouteLocatorBuilder builder) {

//通过负载均衡获取服务实例

ServiceInstance instance = ribbonLoadBalancerClient.choose("PLATFORM-SERVICE");

//拼接路径

StringBuilder forwardAddress = new StringBuilder(HTTP_PREFIX);

forwardAddress.append(instance.getHost())

.append(COLON)

.append(instance.getPort());

return builder.routes()

//拦截请求类型为POST Content-Type application/json application/json;charset=UTF-8

.route(r -> r

.header(HttpHeaders.CONTENT_TYPE,

MediaType.APPLICATION_JSON_VALUE + MediaType.APPLICATION_JSON_UTF8_VALUE)

.and()

.method(HttpMethod.POST)

.and()

//获取缓存中的请求体

.readBody(Object.class, readBody -> {

return true;

})

.and()

.path(SERVICE)

//把请求体传递给拦截器reqTraceFilter

.filters(f -> {

f.filter(reqTraceFilter);

return f;

})

.uri(forwardAddress.toString())).build();

}

/**

* @description: 过滤器,用于获取请求体,和处理请求体业务,列如记录日志

* @modified:

*/

@Component

public class ReqTraceFilter implements GlobalFilter, GatewayFilter,Ordered {

private static final String CONTENT_TYPE = "Content-Type";

private static final String CONTENT_TYPE_JSON = "application/json";

  

//获取请求路由详细信息Route route = exchange.getAttribute(GATEWAY_ROUTE_BEAN)

private static final String GATEWAY_ROUTE_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";

private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerHttpRequest request = exchange.getRequest();

//判断过滤器是否执行

String requestUrl = RequestUtils.getCurrentRequest(request);

if (!RequestUtils.isFilter(requestUrl)) {

String bodyStr = "";

String contentType = request.getHeaders().getFirst(CONTENT_TYPE);

String method = request.getMethodValue();

//判断是否为POST请求

if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && contentType.contains(CONTENT_TYPE_JSON)) {

Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

if(null != cachedBody){

bodyStr = cachedBody.toString();

}

}

if (HttpMethod.GET.name().equalsIgnoreCase(method)) {

bodyStr = request.getQueryParams().toString();

}

log.info("请求体内容:{}",bodyStr);

}

return chain.filter(exchange);

}

@Override

public int getOrder() {

return 5;

}

}

该方案优点:这种解决,一不会带来重复读取问题,二不会带来requestbody取不全问题。三在低版本的Spring Cloud Finchley.SR2也可以运行。

缺点:不支持multipart/form-data(异常415),这个致命。

四、通过org.springframework.cloud.gateway.filter.factory.rewrite包下有个ModifyRequestBodyGatewayFilterFactory,顾名思义,这就是修改 Request Body 的过滤器工厂类。

@Component

@Slf4j

public class ReqTraceFilter implements GlobalFilter, GatewayFilter, Ordered {

@Resource

private IPlatformFeignClient platformFeignClient;

/**

* httpheader,traceId的key名称

*/

private static final String REQUESTID = "traceId";

private static final String CONTENT_TYPE = "Content-Type";

private static final String CONTENT_TYPE_JSON = "application/json";

private static final String GATEWAY_ROUTE_BEAN = "org.springframework.cloud.gateway.support.ServerWebExchangeUtils.gatewayRoute";

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerHttpRequest request = exchange.getRequest();

//判断过滤器是否执行

String requestUrl = RequestUtils.getCurrentRequest(request);

if (!RequestUtils.isFilter(requestUrl)) {

String bodyStr = "";

String contentType = request.getHeaders().getFirst(CONTENT_TYPE);

String method = request.getMethodValue();

//判断是否为POST请求

if (null != contentType && HttpMethod.POST.name().equalsIgnoreCase(method) && contentType.contains(CONTENT_TYPE_JSON)) {

ServerRequest serverRequest = new DefaultServerRequest(exchange);

List list = new ArrayList<>();

// 读取请求体

Mono modifiedBody = serverRequest.bodyToMono(String.class)

.flatMap(body -> {

//记录请求体日志

final String nId = saveRequestOperLog(exchange, body);

//记录日志id

list.add(nId);

return Mono.just(body);

});

BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);

HttpHeaders headers = new HttpHeaders();

headers.putAll(exchange.getRequest().getHeaders());

headers.remove(HttpHeaders.CONTENT_LENGTH);

CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);

return bodyInserter.insert(outputMessage, new BodyInserterContext())

.then(Mono.defer(() -> {

ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(

exchange.getRequest()) {

@Override

public HttpHeaders getHeaders() {

long contentLength = headers.getContentLength();

HttpHeaders httpHeaders = new HttpHeaders();

httpHeaders.putAll(super.getHeaders());

httpHeaders.put(REQUESTID,list);

if (contentLength > 0) {

httpHeaders.setContentLength(contentLength);

} else {

httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");

}

return httpHeaders;

}

@Override

public Flux getBody() {

return outputMessage.getBody();

}

};

return chain.filter(exchange.mutate().request(decorator).build());

}));

}

if (HttpMethod.GET.name().equalsIgnoreCase(method)) {

bodyStr = request.getQueryParams().toString();

String nId = saveRequestOperLog(exchange, bodyStr);

ServerHttpRequest userInfo = exchange.getRequest().mutate()

.header(REQUESTID, nId).build();

return chain.filter(exchange.mutate().request(userInfo).build());

}

}

return chain.filter(exchange);

}

/**

* 保存请求日志

*

* @param exchange

* @param requestParameters

* @return

*/

private String saveRequestOperLog(ServerWebExchange exchange, String requestParameters) {

log.debug("接口请求参数:{}", requestParameters);

ServerHttpRequest request = exchange.getRequest();

String ip = Objects.requireNonNull(request.getRemoteAddress()).getAddress().getHostAddress();

SaveOperLogVO vo = new SaveOperLogVO();

vo.setIp(ip);

vo.setReqUrl(RequestUtils.getCurrentRequest(request));

vo.setReqMethod(request.getMethodValue());

vo.setRequestParameters(requestParameters);

Route route = exchange.getAttribute(GATEWAY_ROUTE_BEAN);

//是否配置路由

if (route != null) {

vo.setSubsystem(route.getId());

}

ResEntity res = platformFeignClient.saveOperLog(vo);

log.debug("当前请求ID返回的数据:{}", res);

return res.getData();

}

@Override

public int getOrder() {

return 5;

}

}

该方案:完美解决以上所有问题

参考文档:https://codercto.com/a/52970.html

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:智能读图信息聚合(智能识图文)
下一篇:外卖聚合数据(外卖消费数据)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~