微服务从代码到k8s部署应有尽有系列(八、各种队列)

网友投稿 343 2022-09-11

微服务从代码到k8s部署应有尽有系列(八、各种队列)

我们用一个系列来讲解从需求到上线、从代码到k8s部署、从日志到监控等各个方面的微服务完整实践。

整个项目使用了go-zero开发的微服务,基本包含了go-zero以及相关go-zero作者开发的一些中间件,所用到的技术栈基本是go-zero项目组的自研组件,基本是go-zero全家桶了。

实战项目地址:​​github.com/Mikaelemmmm…​​

1、概述

消息队列有很多种,有rabbitmq、rocketmq、kafka等常用的,其中go-queue(​​github.com/zeromicro/g…​​

本项目采用的是go-queue做消息队列,asynq做延迟队列、定时队列

为什么使用asynq的几个原因

直接基于redis,一般项目都有redis,而asynq本身就是基于redis所以可以少维护一个中间件支持消息队列、延迟队列、定时任务调度 , 因为希望项目支持定时任务而asynq直接就支持有webui界面,每个任务都可以暂停、归档、通过ui界面查看成功失败、监控

为什么asynq支持消息队列还在使用go-queue?

kafka的吞吐是业绩出名的,如果前期量不大可以直接用asynq没啥目的,就是想给你们演示一下go-queue

在我们使用go-zero的时候,goctl给我们带了很大的便利,但是目前go-zero只有生成api、rpc,很多同学在群里问定时任务、延迟队列、消息队列如何生成,目录结构该怎样做,其实go-zero是为我们设计好了的,就是serviceGroup,使用serviceGroup管理你的服务。

2、如何使用

在前面订单、消息等场景我们其实已经演示过了,这里再额外单独补充一次

我们还是拿order-mq来举例子,显然使用goctl生成api、rpc不是我们想要的,那我们就自己使用serviceGroup改造,目录结构还是延续api的基本差不多,只是将handler改成了listen , 将logic换成了mqs。

2.1 在main中代码如下

var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")func main() { flag.Parse() var c config.Config conf.MustLoad(*configFile, &c) // log, prometheus, trace, metricsUrl if err := c.SetUp(); err != nil { panic(err) } serviceGroup := service.NewServiceGroup() defer serviceGroup.Stop() for _, mq := range listen.Mqs(c) { serviceGroup.Add(mq) } serviceGroup.Start()}

首先我们要定义配置以及解析配置。其次为什么我们要在这里加SetUp而api、rpc不需要呢?因为api、rpc都是在MustNewServer中已经框架写的,但是我们用serviceGroup管理没有,可以手动点进去SetUp看看,这个方法中包含了log、prometheus、trace、metricsUrl的定义,一个方法可以省很多事情,这样我们直接修改配置文件就可以实现日志、监控、链路追踪了。接下来就是go-zero的serivceGroup管理服务了,serviceGroup是用来管理一组service的,那service其实就是一个接口,代码如下Service (代码在go-zero/core/service/servicegroup.go)​​// Service is the interface that groups Start and Stop methods. Service interface { Starter // Start Stopper // Stop } ​​所以,只要你的服务实现了这两个接口,就可以加入到serviceGroup统一管理那可以看到我们把所有的mq都实现这个接口,然后统一放到都 list.Mqs中,在启动服务即可

2.2 mq分类管理

go-zero-looklook/app/order/cmd/mq/internal/listen目录下代码

该目录下代码是统一管理不同类型mq,因为我们要管理kq、asynq可能后续还有rabbitmq、rocketmq等等,所以在这里做了分类方便维护

统一管理在go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go,然后在main中调用listen.Mqs可以获取所有mq一起start

// 返回所有消费者func Mqs(c config.Config) []service.Service { svcContext := svc.NewServiceContext(c) ctx := context.Background() var services []service.Service // kq :消息队列. services = append(services, KqMqs(c, ctx, svcContext)...) // asynq:延迟队列、定时任务 services = append(services, AsynqMqs(c, ctx, svcContext)...) // other mq .... return services}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的asynq

// asynq// 定时任务、延迟任务func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service { return []service.Service{ // 监听延迟队列 deferMq.NewAsynqTask(ctx, svcContext), // 监听定时任务 }}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的kq (go-queue的kafka)

// kq// 消息队列func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service { return []service.Service{ // 监听消费流水状态变更 kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)), // ..... }}

2.3 实际业务

编写实际业务,我们就在go-zero-looklook/app/order/cmd/mq/internal/listen/mqs下,这里为了方便维护,也是做了分类

deferMq : 延迟队列kq:消息队列

2.3.1 延迟队列

// 监听关闭订单type AsynqTask struct { ctx context.Context svcCtx *svc.ServiceContext}func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask { return &AsynqTask{ ctx: ctx, svcCtx: svcCtx, }}func (l *AsynqTask) Start() { fmt.Println("AsynqTask start ") srv := asynq.NewServer( asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass}, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, }, ) mux := asynq.NewServeMux() // 关闭民宿订单任务 mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler) if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) }}func (l *AsynqTask) Stop() { fmt.Println("AsynqTask stop")}

因为 asynq 要先启动,然后定义路由任务,所以我们在asynqTask.go中做了统一的路由管理,之后我们每个业务都单独的在deferMq的文件夹下面定义一个文件(如“延迟关闭订单:closeHomestayOrderState.go”),这样每个业务一个文件,跟go-zero的api、rpc的logic一样,维护很方便

closeHomestayOrderState.go 关闭订单逻辑

package deferMqimport ( "context" "encoding/json" "looklook/app/order/cmd/rpc/order" "looklook/app/order/model" "looklook/common/asynqmq" "looklook/common/xerr" "github.com/hibiken/asynq" "github.com/pkg/errors")func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error { var p asynqmq.HomestayOrderCloseTaskPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return errors.Wrapf(xerr.NewErrMsg("解析asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload()) } resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{ Sn: p.Sn, }) if err != nil || resp.HomestayOrder == nil { return errors.Wrapf(xerr.NewErrMsg("获取订单失败"), "closeHomestayOrderStateMqHandler 获取订单失败 or 订单不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder) } if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay { _, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{ Sn: p.Sn, TradeState: model.HomestayOrderTradeStateCancel, }) if err != nil { return errors.Wrapf(xerr.NewErrMsg("关闭订单失败"), "closeHomestayOrderStateMqHandler 关闭订单失败 err:%v, sn:%s ", err, p.Sn) } } return nil}

2.3.2 kq消息队列

看go-zero-looklook/app/order/cmd/mq/internal/mqs/kq文件夹下,因为kq跟asynq不太一样,它本身就是使用go-zero的Service管理的,已经实现了starter、stopper接口了,所以我们在/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go中直接定义好一个go-queue业务扔给serviceGroup,去交给main启动就好了 , 我们的业务代码只需要实现go-queue的Consumer直接写我们自己业务即可。

1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go

func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service { return []service.Service{ // 监听消费流水状态变更 kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)), // ..... }}

可以看到kq.MustNewQueue本身返回就是 queue.MessageQueue , queue.MessageQueue又实现了Start、Stop

2)业务中

/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go

func (l *PaymentUpdateStatusMq) Consume(_, val string) error { fmt.Printf(" PaymentUpdateStatusMq Consume val : %s \n", val) // 解析数据 var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage if err := json.Unmarshal([]byte(val), &message); err != nil { logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val) return err } // 执行业务.. if err := l.execService(message); err != nil { logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService err : %v , val : %s , message:%+v", err, val, message) return err } return nil}

我们在paymentUpdateStatus.go中只需要实现接口Consume 就可以接受来自kq传过来的kafka的消息了,我们只管在我们Consumer中处理我们业务即可

3、定时任务

关于定时任务,目前go-zero-looklook没有使用,这里我也说明一下

如果你想简单一点直接使用cron(裸机、k8s都有),如果稍微复杂一点可以使用​​github.com/robfig/cron…​​使用 xxl-job、gocron 分布式定时任务系统接入asynq 的 shedule

这里因为项目用的asynq,我就演示一下asynq的shedule吧

分为client与server , client用来定义调度时间,server是到了时间接受client的消息触发来执行我们写的业务的,实际业务我们应该写在server,client用来定义业务调度时间的

asynqtest/docker-compose.yml

version: '3'services: #asynqmon asynq延迟队列、定时队列的webui asynqmon: image: hibiken/asynqmon:latest container_name: asynqmon_asynq ports: - 8980:8080 command: - '--redis-addr=redis:6379' - '--redis-password=G62m50oigInC30sf' restart: always networks: - asynqtest_net depends_on: - redis #redis容器 redis: image: redis:6.2.5 container_name: redis_asynq ports: - 63779:6379 environment: # 时区上海 TZ: Asia/Shanghai volumes: # 数据文件 - ./data/redis/data:/data:rw command: "redis-server --requirepass G62m50oigInC30sf --appendonly yes" privileged: true restart: always networks: - asynqtest_netnetworks: asynqtest_net: driver: bridge ipam: config: - subnet: 172.22.0.0/16

asynqtest/shedule/client/client.go

package mainimport ( "asynqtest/tpl" "encoding/json" "log" "github.com/hibiken/asynq")const redisAddr = "127.0.0.1:63779"const redisPwd = "G62m50oigInC30sf"func main() { // 周期性任务 scheduler := asynq.NewScheduler( asynq.RedisClientOpt{ Addr: redisAddr, Password: redisPwd, }, nil) payload, err := json.Marshal(tpl.EmailPayload{Email: "546630576@qq.com", Content: "发邮件呀"}) if err != nil { log.Fatal(err) } task := asynq.NewTask(tpl.EMAIL_TPL, payload) // 每隔1分钟同步一次 entryID, err := scheduler.Register("*/1 * * * *", task) if err != nil { log.Fatal(err) } log.Printf("registered an entry: %q\n", entryID) if err := scheduler.Run(); err != nil { log.Fatal(err) }}

asynqtest/shedule/server/server.go

package mainimport ( "context" "encoding/json" "fmt" "log" "asynqtest/tpl" "github.com/hibiken/asynq")func main() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"}, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, }, }, ) mux := asynq.NewServeMux() // 关闭民宿订单任务 mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler) if err := srv.Run(mux); err != nil { log.Fatalf("could not run server: %v", err) }}func emailMqHandler(ctx context.Context, t *asynq.Task) error { var p tpl.EmailPayload if err := json.Unmarshal(t.Payload(), &p); err != nil { return fmt.Errorf("emailMqHandler err:%+v", err) } fmt.Printf("p : %+v \n", p) return nil}

asynqtest/tpl/tpl.go

package tplconst EMAIL_TPL = "schedule:email"type EmailPayload struct { Email string Content string}

启动 ​​server.go​​​、​​client.go​​

浏览器输入​​可以看到所有client定义的任务

浏览器输入​​,kafka可以通过管理工具去查看,至于asynq查看webui在go-zero-looklook/docker-compose-env.yml中我们已经启动好了asynqmon,直接使用​​即可查看

项目地址

​​github.com/zeromicro/g…​​

欢迎使用 ​​go-zero​​ 并 star 支持我们!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:热门好用的空号检测API推荐,空号检测API数据接口
下一篇:微服务从代码到k8s部署应有尽有系列(九、事务精讲)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~