linux怎么查看本机内存大小
323
2022-09-11
client-go gin的简单整合四-list-watch初探
背景:
完成了client-go gin的简单整合三(list列表相关再进阶关于Pods),恩如果有代理是可以看到每次的请求都要访问后端服务的,如何避免频繁调用后端apiserver呢?list-watch监听机制可以使用一下?
关于list-watch:
参照:http短链接调用资源的api,获取列表。 使用http长连接持续监听资源,有变化则返回一个WatchEvent client-go informer client-go k8s.io/client-go/tools/cache包informer对象对list-watch机制进行了封装 初始化调用List api获得全量list 缓存(本地缓存) 调用watch api watch资源,当资源发生变更通过一定机制维护缓存,减少访问apiserver的压力
个人觉得不错的文章Client-go源码分析之Reflector,华为云不错的视频list-watch机制原理详解,client-go(kubernetes)的ListerWatcher解析.
client-go gin的简单整合四-list-watch
以deployment简单例子的开始
文件名 /src/service/test.go,监控deployment的变化, 开始其实是不是可以跟java是的弄一个单独的测试包?这里就简单操作了偷懒......
cache.NewInformer()开始:
cache.NewInformer()
goland查看源码功能:
实现Handler方法:
只实现了OnUpdate方法(仅仅打印deployment名字),OnAdd,OnDelete是空的:
type DepHandler struct { } func (d *DepHandler) OnAdd(obj interface{}) {} func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) { if dep, ok := newObj.(*v1.Deployment); ok { fmt.Println(dep.Name) } } func (d *DepHandler) OnDelete(obj interface{}) { }
最终test.go如下:
package main import ( "fmt" "k8s-demo1/src/lib" v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" ) type DepHandler struct { } func (d *DepHandler) OnAdd(obj interface{}) {} func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) { if dep, ok := newObj.(*v1.Deployment); ok { fmt.Println(dep.Name) } } func (d *DepHandler) OnDelete(obj interface{}) { } func main() { s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.AppsV1().RESTClient(), "deployments", "default", fields.Everything()), &v1.Deployment{}, 0, &DepHandler{}, ) c.Run(wait.NeverStop) s.List() }
运行test.go
go run test.go手动修改nginx deployment副本数量,查看 goland输出:
[zhangpeng@zhangpeng ~]$ kubectl get deployments NAME READY UP-TO-DATE AVAILABLE AGE nginx 2/2 2 2 10d [zhangpeng@zhangpeng ~]$ kubectl get deployment NAME READY UP-TO-DATE AVAILABLE AGE nginx 2/2 2 2 10d [zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3 deployment.apps/nginx scaled [zhangpeng@zhangpeng ~]$
实现一个pod的list-watch?
package main import ( "fmt" "k8s-demo1/src/lib" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" ) type PodHandler struct { } func (p *PodHandler) OnAdd(obj interface{}) {} func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { if pods, ok := newObj.(*corev1.Pod); ok { fmt.Println(pods.Name) } } func (p *PodHandler) OnDelete(obj interface{}) { } func main() { s, c := cache.NewInformer(cache.NewListWatchFromClient(lib.K8sClient.CoreV1().RESTClient(), "pods", "default", fields.Everything()), &corev1.Pod{}, 0, &PodHandler{}, ) c.Run(wait.NeverStop) s.List() }
注意:pod 的api是 corev1。参照:下nginx deployment副本数:
[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=3 deployment.apps/nginx scaled
SharedInformerFactory工厂模式
思考一下为什么要使用工厂模式呢?关于SharedInformerFactory参考:main import ( "fmt" "k8s-demo1/src/lib" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" ) type PodHandler struct { } func (p *PodHandler) OnAdd(obj interface{}) {} func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { if pods, ok := newObj.(*corev1.Pod); ok { fmt.Println(pods.Name) } } func (p *PodHandler) OnDelete(obj interface{}) { } func main() { factory := informers.NewSharedInformerFactory(lib.K8sClient, 0) podinformer := factory.Core().V1().Pods() podinformer.Informer().AddEventHandler(&PodHandler{}) factory.Start(wait.NeverStop) select {} }
写的时候以为直接corev1......发现是core().v1(),为什么要用select{}呢?参照:deployment副本:
[zhangpeng@zhangpeng ~]$ kubectl scale deployment/nginx --replicas=4 deployment.apps/nginx scaled
package main import ( "fmt" "k8s-demo1/src/lib" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" ) type PodHandler struct { } func (p *PodHandler) OnAdd(obj interface{}) {} func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { if pods, ok := newObj.(*corev1.Pod); ok { fmt.Println(pods.Name) } } func (p *PodHandler) OnDelete(obj interface{}) { } type DepHandler struct { } func (d *DepHandler) OnAdd(obj interface{}) {} func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) { if dep, ok := newObj.(*v1.Deployment); ok { fmt.Println(dep.Name) } } func (d *DepHandler) OnDelete(obj interface{}) { } func main() { factory := informers.NewSharedInformerFactory(lib.K8sClient, 0) podinformer := factory.Core().V1().Pods() podinformer.Informer().AddEventHandler(&PodHandler{}) depinformer := factory.Apps().V1().Deployments() depinformer.Informer().AddEventHandler(&DepHandler{}) factory.Start(wait.NeverStop) select {} }
Handler OnAdd
补全一下OnAdd方法,打印一下pod deployment列表:
func (p *PodHandler) OnAdd(obj interface{}) { fmt.Println(obj.(*corev1.Pod).Name) } func (d *DepHandler) OnAdd(obj interface{}) { fmt.Println(obj.(*v1.Deployment).Name) }
package main import ( "fmt" "k8s-demo1/src/lib" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" ) type PodHandler struct { } func (p *PodHandler) OnAdd(obj interface{}) { fmt.Println(obj.(*corev1.Pod).Name) } func (p *PodHandler) OnUpdate(oldObj, newObj interface{}) { if pods, ok := newObj.(*corev1.Pod); ok { fmt.Println(pods.Name) } } func (p *PodHandler) OnDelete(obj interface{}) { } type DepHandler struct { } func (d *DepHandler) OnAdd(obj interface{}) { fmt.Println(obj.(*v1.Deployment).Name) } func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) { if dep, ok := newObj.(*v1.Deployment); ok { fmt.Println(dep.Name) } } func (d *DepHandler) OnDelete(obj interface{}) { } func main() { factory := informers.NewSharedInformerFactory(lib.K8sClient, 0) podinformer := factory.Core().V1().Pods() podinformer.Informer().AddEventHandler(&PodHandler{}) depinformer := factory.Apps().V1().Deployments() depinformer.Informer().AddEventHandler(&DepHandler{}) factory.Start(wait.NeverStop) select {} }
sync.Map
为什么引用sync.Map呢?Go语言sync.Map(在并发环境中使用的map),还是考虑并发原因!拿deployment为例,打印一下develop namespace命名空间下的deployment列表:test1.go
package main import ( "context" "fmt" "github.com/gin-gonic/gin" "k8s-demo1/src/lib" v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "log" "sync" "time" ) type DeploymentMap struct { data sync.Map } func (depmap *DeploymentMap) Add(dep *v1.Deployment) { if list, ok := depmap.data.Load(dep.Namespace); ok { list = append(list.([]*v1.Deployment), dep) depmap.data.Store(dep.Namespace, list) } else { depmap.data.Store(dep.Namespace, []*v1.Deployment{dep}) } } type DepHandler struct { } func (d *DepHandler) OnAdd(obj interface{}) { //fmt.Println(obj.(*v1.Deployment).Name) DepMap.Add(obj.(*v1.Deployment)) } func (d *DepHandler) OnUpdate(oldObj, newObj interface{}) { if dep, ok := newObj.(*v1.Deployment); ok { fmt.Println(dep.Name) } } func (d *DepHandler) OnDelete(obj interface{}) { } var DepMap *DeploymentMap func init() { DepMap = &DeploymentMap{} } func main() { factory := informers.NewSharedInformerFactory(lib.K8sClient, 0) depinformer := factory.Apps().V1().Deployments() depinformer.Informer().AddEventHandler(&DepHandler{}) factory.Start(wait.NeverStop) c, _ := context.WithTimeout(context.Background(), time.Second*3) select { case <-c.Done(): log.Fatal("time out") default: r := gin.New() r.GET("/", func(c *gin.Context) { var res []string DepMap.data.Range(func(key, value interface{}) bool { if key == "develop" { for _, item := range value.([]*v1.Deployment) { res = append(res, item.Name) } } return true }) c.JSON(200, res) }) r.Run(":8080") } }
总结
list-watch机制 cache informer,informer工厂模式。 handler实现 sync.map 断言......
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~