Spring Cloud Feign内部实现代码细节

网友投稿 270 2023-01-13

Spring Cloud Feign内部实现代码细节

1. 概述

Feign用于服务间调用,它的内部实现是一个包含Ribbon(负载均衡)的**JDK-HttpURLConnection(Http)**调用。虽然调用形式是类似于RPC,但是实际调用是Http,这也是为什么Feign被称为伪RPC调用的原因。

内部调用过程如下:

2. 代码细节

1) BaseLoadBalancer.java配置初始化

重点功能: 1. 初始化负载均衡策略 2. 初始化取服务注册列表调度策略

void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {

...

// 每隔30s Ping一次

int pingIntervalTime = Integer.parseInt(""

+ clientConfig.getProperty(

CommonClientConfigKey.NFLoadBalancerPingInterval,

Integer.parseInt("30")));

// 每次最多Ping 2s

int maxTotalPingTime = Integer.parseInt(""

+ clientConfig.getProperty(

CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,

Integer.parseInt("2")));

setPingInterval(pingIntervalTime);

setMaxTotalPingTime(maxTotalPingTime);

// cross associate with each other

// i.e. Rule,Ping meet your container LB

// LB, these are your Ping and Rule guys ...

// 设置负载均衡规则

setRule(rule);

// 初始化取服务注册列表调度策略

setPing(ping);

setLoadBalancerStats(stats);

rule.setLoadBalancer(this);

...

}

2) 负载均衡策略初始化

重点功能: 1. 默认使用轮询策略

BaseLoadBalancer.java

public void setRule(IRule rule) {

if (rule != null) {

this.rule = rule;

} else {

/* default rule */

// 默认使用轮询策略

this.rule = new RoundRobinRule();

}

if (this.rule.getLoadBalancer() != this) {

this.rule.setLoadBalancer(this);

}

}

RoundRobinRule.java

private AtomicInteger nextServerCyclicCounter;

public Server choose(ILoadBalancer lb, Object key) {

if (lb == null) {

log.warn("no load balancer");

return null;

}

Server server = null;

int count = 0;

while (server == null && count++ < 10) {

List reachableServers = lb.getReachableServers();

List allServers = lb.getAllServers();

int upCount = reachableServers.size();

int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {

log.warn("No up servers available from load balancer: " + lb);

return null;

}

// 轮询重点算法

int nextServerIndex = incrementAndGetModulo(serverCount);

server = allServers.get(nextServerIndex);

if (server == null) {

/* Transient. */

Thread.yield();

continue;

}

if (server.isAlive() && (server.isReadyToServe())) {

return (server);

}

// Next.

server = nuUEppAYAJrNll;

}

if (count >= 10) {

log.warn("No available alive servers after 10 tries from load balancer: "

+ lb);

}

return server;

}

private int incrementAndGetModulo(int modulo) {

for (;;) {

int current = nextServerCyclicCounter.get();

int next = (current + 1) % modulo;

if (nextServerCyclicCounter.compareAndSet(current, next))

return next;

}

}

3) 初始化取服务注册列表调度策略

重点功能: 1. 设置轮询间隔为30s 一次

注意: 这里没有做实际的Ping,只是获取缓存的注册列表的alive服务,原因是为了提高性能

BaseLoadBalancer.java

public void setPing(IPing ping) {

if (ping != null) {

if (!ping.equals(this.ping)) {

this.ping = ping;

setupPingTask(); // since ping data changed

}

} else {

this.ping = null;

// cancel the timer task

lbTimer.cancel();

}

}

void setupPingTask() {

if (canSkipPing()) {

return;

}

if (lbTimer != null) {

lbTimer.cancel();

}

lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,

true);

// 这里虽然默认设置是10s一次,但是在初始化的时候,设置了30s一次

lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);

forceQuickPing();

}

class Pinger {

private final IPingStrategy pingerStrategy;

public Pinger(IPingStrategy pingerStrategy) {

this.pingerStrategy = pingerStrategy;

}

public void runPinger() throws Exception {

if (!pingInProgress.compareAndSet(false, true)) {

return; // Ping in progress - nothing to do

}

// we are "in" - we get to Ping

Server[] allServers = null;

boolean[] results = null;

Lock allLock = null;

Lock upLock = null;

try {

/*

* The readLock should be free unless an addServer operation is

* going on...

*/

allLock = allServerLock.readLock();

allLock.lock();

allServers = allServerList.toArray(new Server[allServerList.size()]);

allLock.unlock();

int numCandidates = allServers.length;

results = pingerStrategy.pingServers(ping, allServers);

final List newUpList = new ArrayList();

final List changedServers = new ArrayList();

for (int i = 0; i < numCandidates; i++) {

boolean isAlive = results[i];

Server svr = allServers[i];

boolean oldIsAlive = svr.isAlive();

svr.setAlive(isAlive);

if (oldIsAlive != isAlive) {

changedServers.add(svr);

logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",

name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));

}

if (isAlive) {

newUpList.add(svr);

}

}

upLock = upServerLock.writeLock();

upLock.lock();

upServerList = newUpList;

upLock.unlock();

notifyServerStatusChangeListener(changedServers);

} finally {

pingInProgress.set(false);

}

}

}

private static class SerialPingStrategy implements IPingStrategy {

@Override

public boolean[] pingServers(IPing ping, Server[] servers) {

int numCandidates = servers.length;

boolean[] results = new boolean[numCandidates];

logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);

for (int i = 0; i < numCandidates; i++) {

results[i] = false; /* Default answer is DEAD. */

try {

// NOTE: IFF we were doing a real ping

// assuming we had a large set of servers (say 15)

// the logic below will run them serially

// hence taking 15 times the amount of time it takes

// to ping each server

// A better method would be to put this in an executor

// pool

// But, at the time of this writing, we dont REALLY

// use a Real Ping (its mostly in memory eureka call)

// hence we can afford to simplify this design and run

// this

// serially

if (ping != null) {

results[i] = ping.isAlive(servers[i]);

}

} catch (Exception e) {

logger.error("Exception while pinging Server: '{}'", servers[i], e);

}

}

return results;

}

}

4) 最后拼接完整URL使用JDK-HttpURLConnection进行调用

SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {

Request request = this.targetRequest(template);

if (this.logLevel != Level.NONE) {

this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);

}

long start = System.nanoTime();

Response response;

try {

response = this.client.execute(request, options);

response = response.toBuilder().request(request).requestTemplate(template).build();

} catch (IOException var13) {

if (this.logLevel != Level.NONE) {

this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));

}

throw FeignException.errorExecuting(request, var13);

}

...

}

LoadBalancerFeignClient.java

@Override

public Response execute(Request request, Request.Options options) throws IOException {

try {

URI asUri = URI.create(request.url());

String clientName = asUri.getHost();

URI uriWithoutHost = cleanUrl(request.url(), clientName);

FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(

this.delegate, request, uriWithoutHost);

IClientConfig requestConfig = getClientConfig(options, clientName);

return lbClient(clientName)

.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();

}

catch (ClientException e) {

IOException io = findIOException(e);

if (io != null) {

throw io;

}

throw new RuntimeException(e);

}

}

AbstractLoadBalancerAwareClient.java

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {

LoadBalancerCommand command = buildLoadBalancerCommand(request, requestConfig);

try {

return command.submit(

new ServerOperation() {

@Override

public Observable call(Server server) {

// 获取真实访问URL

URI finalUri = reconstructURIWithServer(server, request.getUri());

S requestForServer = (S) request.replaceUri(finalUri);

try {

return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));

}

catch (Exception e) {

return ObservableUEppAYAJrN.error(e);

}

}

})

.toBlocking()

UEppAYAJrN .single();

} catch (Exception e) {

Throwable t = e.getCause();

if (t instanceof ClientException) {

throw (ClientException) t;

} else {

throw new ClientException(e);

}

}

}

FeignLoadBalancer.java

@Override

public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)

throws IOException {

Request.Options options;

if (configOverride != null) {

RibbonProperties override = RibbonProperties.from(configOverride);

options = new Request.Options(override.connectTimeout(this.connectTimeout),

override.readTimeout(this.readTimeout));

}

else {

options = new Request.Options(this.connectTimeout, this.readTimeout);

}

Response response = request.client().execute(request.toRequest(), options);

return new RibbonResponse(request.getUri(), response);

}

feign.Client.java

@Override

public Response execute(Request request, Options options) throws IOException {

HttpURLConnection connection = convertAndSend(request, options);

return convertResponse(connection, request);

}

Response convertResponse(HttpURLConnection connection, Request request) throws IOException {

// 使用HttpURLConnection 真实进行Http调用

int status = connection.getResponseCode();

String reason = connection.getResponseMessage();

if (status < 0) {

throw new IOException(format("Invalid status(%s) executing %s %s", status,

connection.getRequestMethod(), connection.getURL()));

}

Map> headers = new LinkedHashMap<>();

for (Map.Entry> field : connection.getHeaderFields().entrySet()) {

// response message

if (field.getKey() != null) {

headers.put(field.getKey(), field.getValue());

}

}

Integer length = connection.getContentLength();

if (length == -1) {

length = null;

}

InputStream stream;

if (status >= 400) {

stream = connection.getErrorStream();

} else {

stream = connection.getInputStream();

}

return Response.builder()

.status(status)

.reason(reason)

.headers(headers)

.request(request)

.body(stream, length)

.build();

}

拓展干货阅读:一线大厂面试题、高并发等主流技术资料

以上就是Spring Cloud Feign内部实现代码细节的详细内容,更多关于Spring Cloud Feign的资料请关注我们其它相关文章!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:中远快运物流查询(中远物流地址)
下一篇:澳洲邮政快递物流查询单号(澳洲邮政快递单号查询单号)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~