Spring Cloud Gateway 记录请求应答数据日志操作

网友投稿 378 2023-02-16

Spring Cloud Gateway 记录请求应答数据日志操作

我就废话不多说了,大家还是直接看代码吧~

public class GatewayContext {

public static final String CACHE_GATEWAY_CONTEXT = "cacheGatewayContext";

/**

* cache json body

*/

private String cacheBody;

/**

* cache formdata

*/

private MultiValueMap formData;

/**

* cache reqeust path

*/

private String path;

public String getCacheBody() {

return cacheBody;

}

public void setCacheBody(String cacheBody) {

this.cacheBody = cacheBody;

}

public MultiValueMap getFormData() {

return formData;

}

public void setFormData(MultiValueMap formData) {

this.formData = formData;

}

public String getPath() {

return path;

}

public void setPath(String path) {

this.path = path;

}

}

import java.io.UnsupportedEncodingException;

import java.net.URLEncoder;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;

import java.util.List;

import java.util.Map;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.core.io.ByteArrayResource;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferUtils;

import org.springframework.core.io.buffer.NettyDataBufferFactory;

import org.springframework.http.HttpHeaders;

import org.springframework.http.HttpMethod;

import org.springframework.http.MediaType;

import org.springframework.http.codec.HttpMessageReader;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpRequestDecorator;

import org.springframework.stereotype.Component;

import org.springframework.util.MultiValueMap;

import org.springframework.web.reactive.function.server.HandlerStrategies;

import org.springframework.web.reactive.function.server.ServerRequest;

import org.springframework.web.server.ServerWebExchange;

import io.netty.buffer.ByteBufAllocator;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

// https://segmentfault.com/a/1190000017898354

@Component

public class LogRequestGlobalFilter

implements GlobalFilter {

/**

* default HttpMessageReader

*/

private static final List> messageReaders =

HandlerStrategies.withDefaults().messageReaders();

private Logger log = LoggerFactory.getLogger(LogRequestGlobalFilter.class);

@Override

public Mono filter(

ServerWebExchange exchange,

GatewayFilterChain chain) {

/**

* save request path and serviceId into gateway context

*/

ServerHttpRequest request = exchange.getRequest();

String path = request.getPath().pathWithinApplication().value();

GatewayContext gatewayContext = new GatewayContext();

gatewayContext.setPath(path);

/**

* save gateway context into exchange

*/

exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,

gatewayContext);

HttpHeaders headers = request.getHeaders();

MediaType contentType = headers.getContentType();

log.info("start-------------------------------------------------");

log.info("HttpMethod:{},Url:{}", request.getMethod(),

request.getURI().getRawPath());

log.info("Headers token: {}", headers.getFirst("token"));

if (request.getMethod() == HttpMethod.GET) {

log.info("end-------------------------------------------------");

}

if (request.getMethod() == HttpMethod.POST) {

Mono voidMono = null;

if (MediaType.APPLICATION_JSON.equals(contentType)

|| MediaType.APPLICATION_JSON_UTF8.equals(contentType)) {

voidMono =

readBody(exchange, chain, gatewayContext);

}

if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {

voidMono =

readFormData(exchange, chain, gatewayContext);

}

return voidMono;

}

/* log.debug(

"[GatewayContext]ContentType:{},Gateway context is set with {}",

contentType, gatewayContext);*/

return chain.filter(exchange);

}

/**

* ReadFormData

*

* @param exchange

* @param chain

* @return

*/

private Mono readFormData(

ServerWebExchange exchange,

GatewayFilterChain chain,

GatewayContext gatewayContext) {

final ServerHttpRequest request = exchange.getRequest();

HttpHeaders headers = request.getHeaders();

return exchange.getFormData()

.doOnNext(multiValueMap -> {

gatewayContext.setFormData(multiValueMap);

log.info("Post x-www-form-urlencoded:{}",

multiValueMap);

log.info(

"end-------------------------------------------------");

})

.then(Mono.defer(() -> {

Charset charset = headers.getContentType().getCharset();

charset = charset == null ? StandardCharsets.UTF_8 : charset;

String charsetName = charset.name();

MultiValueMap formData =

gatewayContext.getFormData();

/**

* formData is empty just return

*/

if (null == formData || formData.isEmpty()) {

return chain.filter(exchange);

}

StringBuilder formDataBodyBuilder = new StringBuilder();

MTGMO String entryKey;

List entryValue;

try {

/**

* repackage form data

*/

for (Map.Entry> entry : formData

.entrySet()) {

entryKey = entry.getKey();

entryValue = entry.getValue();

if (entryValue.size() > 1) {

for (String value : entryValue) {

formDataBodyBuilder.append(entryKey).append("=")

.append(

URLEncoder.encode(value, charsetName))

.append("&");

}

} else {

formDataBodyBuilder

.append(entryKey).append("=").append(URLEncoder

.encode(entryValue.get(0), charsetName))

.append("&");

}

}

} catch (UnsupportedEncodingException e) {

// ignore URLEncode Exception

}

/**

* substring with the last char '&'

*/

String formDataBodyString = "";

if (formDataBodyBuilder.length() > 0) {

formDataBodyString = formDataBodyBuilder.substring(0,

formDataBodyBuilder.length() - 1);

}

/**

* get data bytes

*/

byte[] bodyBytes = formDataBodyString.getBytes(charset);

int contentLength = bodyBytes.length;

ServerHttpRequestDecorator decorator =

new ServerHttpRequestDecorator(

request) {

/**

* change content-length

*

* @return

*/

@Override

public HttpHeaders getHeaders() {

HttpHeaders httpHeaders = new HttpHeaders();

httpHeaders.putAll(super.getHeaders());

if (contentLength > 0) {

httpHeaders.setContentLength(contentLength);

} else {

httpHeaders.set(HttpHeaders.TRANSFER_ENCODING,

"chunked");

}

return httpHeaders;

}

/**

* read bytes to Flux

*

* @return

*/

@Override

public Flux getBody() {

return DataBufferUtils

.read(new ByteArrayResource(bodyBytes),

new NettyDataBufferFactory(

ByteBufAllocator.DEFAULT),

contentLength);

}

};

ServerWebExchange mutateExchange =

exchange.mutate().request(decorator).build();

/* log.info("[GatewayContext]Rewrite Form Data :{}",

formDataBodyString);*/

return chain.filter(mutateExchange);

}));

}

/**

* ReadJsonBody

*

* @param exchange

* @param chain

* @return

*/

private Mono readBody(

ServerWebExchange exchange,

GatewayFilterChain chain,

GatewayContext gatewayContext) {

/**

* join the body

*/

return DataBufferUtils.join(exchange.getRequest().getBody())

.flatMap(dataBuffer -> {

/*

* read the body Flux, and release the buffer

* //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature

* see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095

*/

byte[] bytes = new byte[dataBuffer.readableByteCount()];

dataBuffer.read(bytes);

DataBufferUtils.release(dataBuffer);

Flux cachedFlux = Flux.defer(() -> {

DataBuffer buffer =

exchange.getResponse().bufferFactory().wrap(bytes);

DataBufferUtils.retain(buffer);

return Mono.just(buffer);

});

/**

* repackage ServerHttpRequest

*/

ServerHttpRequest mutatedRequest =

new ServerHttpRequestDecorator(exchange.getRequest()) {

@Override

public Flux getBody() {

return cachedFlux;

}

};

/**

* mutate exchage with new ServerHttpRequest

*/

ServerWebExchange mutatedExchange =

exchange.mutate().request(mutatedRequest).build();

/**

* read body string with default messageReaders

*/

return ServerRequest.create(mutatedExchange, messageReaders)

.bodyToMono(String.class)

.doOnNext(objectValue -> {

log.info("PostBody:{}", objectValue);

log.info(

"end-------------------------------------------------");

gatewayContext.setCacheBody(objectValue);

/* log.debug("[GatewayContext]Read JsonBody:{}",

objectValue);*/

}).then(chain.filter(mutatedExchange));

});

}

}

import lombok.extern.slf4j.Slf4j;

import org.reactivestreams.Publisher;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.core.Ordered;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferFactory;

import org.springframework.http.HttpHeaders;

import org.springframework.http.HttpMethod;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpResponse;

import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import org.springframework.stereotype.Component;

import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import java.net.InetSocketAddress;

import java.net.URI;

import java.nio.CharBuffer;

import java.nio.charset.Charset;

import java.nio.charset.StandardCharsets;

import java.util.concurrent.atomic.AtomicReference;

@Component

@Slf4j

public class LogResponseGlobalFilter implements GlobalFilter, Ordered {

private static final String REQUEST_PREFIX = "Request Info [ ";

private static final String REQUEST_TAIL = " ]";

private static final String RESPONSE_PREFIX = "Response Info [ ";

private static final String RESPONSE_TAIL = " ]";

private StringBuilder normalMsg = new StringBuilder();

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerHttpRequest request = exchange.getRequest();

ServerHttpResponse response = exchange.getResponse();

DataBufferFactory bufferFactory = response.bufferFactory();

normalMsg.append(RESPONSE_PREFIX);

ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {

@Override

public Mono writeWith(Publisher extends DataBuffer> body) {

if (body instanceof Flux) {

Flux extends DataBuffer> fluxBody = (Flux extends DataBuffer>) body;

return super.writeWith(fluxBody.map(dataBuffer -> {

// probably should reuse buffers

byte[] content = new byte[dataBuffer.readableByteCount()];

dataBuffer.read(content);

String responseResult = new String(content, Charset.forName("UTF-8"));

normalMsg.append("status=").append(this.getStatusCode());

normalMsg.append(";header=").append(this.getHeaders());

normalMsg.append(";responseResult=").append(responseResult);

normalMsg.append(RESPONSE_TAIL);

log.info(normalMsg.toString());

return bufferFactory.wrap(content);

}));

}

return super.writeWith(body); // if body is not a flux. never got there.

}

};

return chain.filter(exchange.mutate().response(decoratedResponse).build());

}

@Override

public int getOrder() {

return -2;

}

}

补充知识:Spring Cloud Gateway 2.x 打印 Log

场景

在服务网关层面,需要打印出用户每次的请求body和其他的参数,gateway使用的是Reactor响应式编程,和Zuul网关获取流的写法还有些不同,

不过基本的思路是一样的,都是在filter中读取body流,然后缓存回去,因为body流,框架默认只允许读取一次。

思路

1. 添加一个filter做一次请求的拦截

GatewayConfig.java

添加一个配置类,配置一个高优先级的filter,并且注入一个PayloadServerWebExchangeDecorator 对request和response做包装的类。

package com.demo.gateway2x.config;

import com.demo.gateway2x.decorator.PayloadServerWebExchangeDecorator;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.Ordered;

import org.springframework.core.annotation.Order;

import org.springframework.web.server.WebFilter;

@Configuration

public class GatewayConfig {

@Bean

@Order(Ordered.HIGHEST_PRECEDENCE) //过滤器顺序

public WebFilter webFilter() {

return (exchange, chain) -> chain.filter(new PayloadServerWebExchangeDecorator(exchange));

}

}

PayloadServerWebExchangeDecorator.java

这个类中,我们实现了框架的ServerWebExchangeDecorator类,同时注入了自定义的两个类,PartnerServerHttpRequestDecorator 和 PartnerServerHttpResponseDecorator ,

这两个类用于后面对请求与响应的拦截。

package com.demo.gateway2x.decorator;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpResponse;

import org.springframework.web.server.ServerWebExchange;

import org.springframework.web.server.ServerWebExchangeDecorator;

public class PayloadServerWebExchangeDecorator extends ServerWebExchangeDecorator {

private PartnerServerHttpRequestDecorator requestDecorator;

private PartnerServerHttpResponseDecorator responseDecorator;

public PayloadServerWebExchangeDecorator(ServerWebExchange delegate) {

super(delegate);

requestDecorator = new PartnerServerHttpRequestDecorator(delegate.getRequest());

responseDecorator = new PartnerServerHttpResponseDecorator(delegate.getResponse());

}

@Override

public ServerHttpRequest getRequest() {

return requestDecorator;

}

@Override

public ServerHttpResponse getResponse() {

return responseDecorator;

}

}

2. 在请求进入时,对request做一次拦截

PartnerServerHttpRequestDecorator.java

这个类实现了 ServerHttpRequestDecorator , 并在构造函数中,使用响应式编程,调用了打印log的方法,注意关注 Mono mono = DataBufferUtils.join(flux); ,

这里将Flux合并成了一个Mono,因为如果不这么做,body内容过多,将会被分段打印,这里是一个恒重要的点,

在打印RequestParamsHandle.chain打印过日志后,我们又返回了一个dataBuffer,用作向下传递,否则dataBuffer被读取过一次后就不能继续使用了。

package com.demo.gateway2x.decorator;

import lombok.extern.slf4j.Slf4j;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferUtils;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.http.server.reactive.ServerHttpRequestDecorator;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import static reactor.core.scheduler.Schedulers.single;

@Slf4j

public class PartnerServerHttpRequestDecorator extends ServerHttpRequestDecorator {

private Flux body;

public PartnerServerHttpRequestDecorator(ServerHttpRequest delegate) {

super(delegate);

Flux flux = super.getBody();

if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(delegate.getHeaders().getContentType())) {

Mono mono = DataBufferUtils.join(flux);

body = mono.publishOn(single()).map(dataBuffer -> RequestParamsHandle.chain(delegate, log, dataBuffer)).flux();

} else {

body = flux;

}

}

@Override

public Flux getBody() {

return body;

}

}

RequestParamsHandle.java

这个类主要用来读取dataBuffer并做了日志打印处理,也可以做一些其他的例如参数校验等使用。

package com.demo.gateway2x.decorator;

import com.alibaba.fastjson.JSON;

import org.slf4j.Logger;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.util.StringUtils;

import java.util.HashMap;

import java.util.Map;

public class RequestParamsHandle {

public static T chain(ServerHttpRequest delegate, Logger log, T buffer) {

ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer);

// 参数校验 和 参数打印

log.info("Payload: {}", JSON.toJSONString(validParams(getParams(delegate, bodyDecorator.getBody()))));

return (T) bodyDecorator.getDataBuffer();

}

public static Map getParams(ServerHttpRequest delegate, String body) {

// 整理参数

Map params = new HashMap<>();

if (delegate.getQueryParams() != null) {

params.putAll(delegate.getQueryParams());

}

if (!StringUtils.isEmpty(body)) {

params.putAll(JSON.parseObject(body));

}

return params;

}

public static Map validParams(Map params) {

// todo 参数校验

return params;

}

}

3. 在结果返回时,对response做一次拦截

PartnerServerHttpResponseDecorator.java

这个类和上面的request的异曲同工,拦截响应流,并做记录入处理。

package com.demo.gateway2x.decorator;

import lombok.extern.slf4j.Slf4j;

import org.reactivestreams.Publisher;

import org.springframework.core.io.buffer.DataBuffer;

import org.springframework.core.io.buffer.DataBufferUtils;

import org.springframework.http.MediaType;

import org.springframework.http.server.reactive.ServerHttpResponse;

import org.springframework.http.server.reactive.ServerHttpResponseDecorator;

import reactor.core.publisher.Flux;

import reactor.core.publisher.Mono;

import static reactor.core.scheduler.Schedulers.single;

@Slf4j

public class PartnerServerHttpResponseDecorator extends ServerHttpResponseDecorator {

PartnerServerHttpResponseDecorator(ServerHttpResponse delegate) {

super(delegate);

}

@Override

public Mono writeAndFlushWith(Publisher extends Publisher extends DataBuffer>> body) {

return super.writeAndFlushWith(body);

}

@Override

public Mono writeWith(Publisher extends DataBuffer> body) {

final MediaType contentType = super.getHeaders().getContentType();

if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(contentType)) {

if (body instanceof Mono) {

final Mono monoBody = (Mono) body;

return super.writeWith(monoBody.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer)));

} else if (body instanceof Flux) {

Mono mono = DataBufferUtils.join(body);

final Flux monoBody = mono.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer)).flux();

return super.writeWith(monoBody);

}

}

return super.writeWith(body);

}

}

ResponseParamsHandle.java

响应流的日志打印

package com.demo.gateway2x.decorator;

import org.slf4j.Logger;

import org.springframework.core.io.buffer.DataBuffer;

public class ResponseParamsHandle {

public static T chain(Logger log, T buffer) {

ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer);

// 参数校验 和 参数打印

log.info("Payload: {}", bodyDecorator.getBody());

return (T) bodyDecorator.getDataBuffer();

}

}

下面是实际操作,发送一次http请求:

控制台log结果:

githubMTGMO源码地址:https://github.com/qiaomengnan16/gateway-2x-log-demo

总结

gateway和zuul打印参数的方式思路是一致的,只是gateway采用的是reactor,写法上与zuul的直接读取流有些不同,这里需要知道的是Flux需要转换为Mono这个地方,如果不转换容易分多批打印。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:机票api接口平台(航班api)
下一篇:基于java实现websocket代码示例
相关文章

 发表评论

暂时没有评论,来抢沙发吧~