java系统找不到指定文件怎么解决
284
2022-09-11
k8s源码学习-EventBroadcaster事件管理器
Kubernetes的事件(Event)是一种资源对象(Resource Object),用于展示集群内发生的情况,Kubernetes系统中的各个组件会将运行时发生的各种事件上报给Kubernetes API Server。例如,调度器做了什么决定,某些Pod为什么被从节点中驱逐。可以通过kubectl get event或kubectl describe pod
注意 :此处的Event事件是Kubernetes所管理的Event资源对象,而非Etcd集群监控机制产生的回调事件,需要注意区分。
由于Kubernetes的事件是一种资源对象,因此它们存储在Kubernetes API Server的Etcd集群中。为避免磁盘空间被填满,故强制执行保留策略:在最后一次的事件发生后,删除1小时之前发生的事件。
Kubernetes系统以Pod资源为核心,Deployment、StatefulSet、ReplicaSet、DaemonSet、CronJob等,最终都会创建出Pod。因此Kubernetes事件也是围绕Pod进行的,在Pod生命周期内的关键步骤中都会产生事件消息。Event资源数据结构体定义在core资源组下,代码示例:vendor/k8s.io/api/core/v1/types.go
// Event is a report of an event somewhere in the cluster. Events// have a limited retention time and triggers and messages may evolve// with time. Event consumers should not rely on the timing of an event// with a given Reason reflecting a consistent underlying trigger, or the// continued existence of events with that Reason. Events should be// treated as informative, best-effort, supplemental data.type Event struct { metav1.TypeMeta `json:",inline"` // Standard object's metadata. // More info: metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"` // The object that this event is about. InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"` // This should be a short, machine understandable string that gives the reason // for the transition into the object's current status. // TODO: provide exact specification for format. // +optional Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"` // A human-readable description of the status of this operation. // TODO: decide on maximum length. // +optional Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"` // The component reporting this event. Should be a short machine understandable string. // +optional Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"` // The time at which the event was first recorded. (Time of server receipt is in TypeMeta.) // +optional FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"` // The time at which the most recent occurrence of this event was recorded. // +optional LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"` // The number of times this event has occurred. // +optional Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"` // Type of this event (Normal, Warning), new types could be added in the future // +optional Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"` // Time when this Event was first observed. // +optional EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"` // Data about the Event series this event represents or nil if it's a singleton Event. // +optional Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"` // What action was taken/failed regarding to the Regarding object. // +optional Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"` // Optional secondary object for more complex actions. // +optional Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"` // Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`. // +optional ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"` // ID of the controller instance, e.g. `kubelet-xyzf`. // +optional ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`}
Event资源数据结构体描述了当前时间段内发生了哪些关键性事件。事件有两种类型,分别为Normal和Warning,前者为正常事件,后者为警告事件。代码示例如下:
// Valid values for event types (new types could be added in future)const ( // Information only and will not cause any problems EventTypeNormal string = "Normal" // These events are to warn that something might go wrong EventTypeWarning string = "Warning")
EventBroadcaster事件管理机制运行原理
Actor可以是Kubernetes系统中的任意组件,当组件中发生了一些关键性事件时,可通过EventRecorder记录该事件。EventBroadcaster事件管理机制可分为如下部分。
● EventRecorder :事件(Event)生产者,也称为事件记录器。Kubernetes系统组件通过EventRecorder记录关键性事件。
● EventBroadcaster :事件(Event)消费者,也称为事件广播器。EventBroadcaster消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。分发过程有两种机制,分别是非阻塞(Non-Blocking)分发机制和阻塞(Blocking)分发机制。
● broadcasterWatcher :观察者(Watcher)管理,用于定义事件的处理方式,例如上报事件至Kubernetes API Server。
1.EventRecorder
EventRecorder拥有如下4种记录方法,EventRecorder事件记录器接口代码示例代码:vendor/k8s.io/client-go/tools/record/event.go
// EventRecorder knows how to record events on behalf of an EventSource.type EventRecorder interface { // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also // pass a reference to the object directly. // 'type' of this event, and can be one of Normal, Warning. New types could be added in future // 'reason' is the reason this event is generated. 'reason' should be short and unique; it // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used // to automate handling of events, so imagine people writing switch statements to handle them. // You want to make that easy. // 'message' is intended to be human readable. // // The resulting event will be created in the same namespace as the reference object. Event(object runtime.Object, eventtype, reason, message string) // Eventf is just like Event, but with Sprintf for the message field. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) // AnnotatedEventf is just like eventf, but with annotations attached AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})}
● Event :对刚发生的事件进行记录。
● Eventf :通过使用fmt.Sprintf格式化输出事件的格式。
● PastEventf :允许自定义事件发生的时间,以记录已经发生过的消息。
● AnnotatedEventf :功能与Eventf一样,但附加了注释(Annotations)字段。
以Event方法为例,记录当前发生的事件,Event→recorder.generateEvent→recorder.Action代码示例代码 :vendor/k8s.io/apimachinery/pkg/watch/mux.go
// Action distributes the given event among all watchers.func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj}}
Action函数通过goroutine实现异步操作,该函数将事件写入m.incommit Chan中,完成事件生产过程。
2.EventBroadcaster
EventBroadcaster消费EventRecorder记录的事件并将其分发给目前所有已连接的broadcasterWatcher。EventBroadcaster通过NewBroadcaster函数进行实例化:
代码路径:vendor/k8s.io/client-go/tools/record/event.go
// Creates a new event broadcaster.func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, }}
在实例化过程中,会通过watch.NewBroadcaster函数在内部启动goroutine(即m.loop函数)来监控m.incoming,并将监控的事件通过m.distribute函数分发给所有已连接的broadcasterWatcher。分发过程有两种机制,分别是非阻塞分发机制和阻塞分发机制。在非阻塞分发机制下使用DropIfChannelFull标识,在阻塞分发机制下使用WaitIfChannelFull标识,默认为DropIfChannelFull标识。代码路径:vendor/k8s.io/apimachinery/pkg/watch/mux.go
// distribute sends event to all watchers. Blocking.func (m *Broadcaster) distribute(event Event) { if m.fullChannelBehavior == DropIfChannelFull { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: // Don't block if the event can't be queued. } } } else { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: } } }}
在分发过程中,DropIfChannelFull标识位于select多路复用中,使用default关键字做非阻塞分发,当w.result缓冲区满的时候,事件会丢失。WaitIfChannelFull标识也位于select多路复用中,没有default关键字,当w.result缓冲区满的时候,分发过程会阻塞并等待。
注意 :Kubernetes中的事件与其他的资源不同,它有一个很重要的特性,那就是它可以丢失。因为随着Kubernetes系统集群规模越来越大,上报的事件越来越多,每次上报事件都要对Etcd集群进行读/写,这样会给Etcd集群带来很大的压力。如果某个事件丢失了,并不会影响集群的正常工作,事件的重要性远低于集群的稳定性,所以可以看到源码中当w.result缓冲区满的时候,在非阻塞分发机制下事件会丢失。
3.broadcasterWatcher
broadcasterWatcher是每个Kubernetes系统组件自定义处理事件的方式。例如,上报事件至Kubernetes API Server。每个broadcasterWatcher拥有两种自定义处理事件的函数,分别介绍如下。
● StartLogging :将事件写入日志中。
● StartRecordingToSink :将事件上报至Kubernetes API Server并存储至Etcd集群。
以kube-scheduler组件为例,该组件作为一个broadcasterWatcher,通过StartLogging函数将事件输出至klog stdout标准输出,通过StartRecordingToSink函数将关键性事件上报至Kubernetes API Server。代码:cmd/kube-scheduler/app/server.go
// Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
StartLogging和StartRecordingToSink函数依赖于StartEventWatcher函数,该函数内部运行了一个goroutine,用于不断监控EventBroadcaster来发现事件并调用相关函数对事件进行处理。
下面重点介绍一下StartRecordingToSink函数,kube-scheduler组件将v1core.EventSinkImpl作为上报事件的自定义函数。上报事件有3种方法,分别是Create(即Post方法)、Update(即Put方法)、Patch(Patch方法)。以Create方法为例,Create→e.Interface.CreateWithEventNamespace代码示例如下:
代码路径:vendor/k8s.io/client-go/kubernetes/typed/core/v1/event_expansion.go
// CreateWithEventNamespace makes a new event. Returns the copy of the event the server returns,// or an error. The namespace to create the event within is deduced from the// event; it must either match this event client's namespace, or this event// client must have been created with the "" namespace.func (e *events) CreateWithEventNamespace(event *v1.Event) (*v1.Event, error) { if e.ns != "" && event.Namespace != e.ns { return nil, fmt.Errorf("can't create an event with namespace '%v' in namespace '%v'", event.Namespace, e.ns) } result := &v1.Event{} err := e.client.Post(). NamespaceIfScoped(event.Namespace, len(event.Namespace) > 0). Resource("events"). Body(event). Do(context.TODO()). Into(result) return result, err}
上报过程通过RESTClient发送Post请求,将事件发送至Kubernetes API Server,最终存储在Etcd集群中
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~