Kubernetes Informer基本原理,你明白了吗?( 二 )

首先通过 f.defaultInformer 方法生成 informer,然后通过 f.factory.InformerFor 方法,将 informer 注册到 sharedInformerFactory
3.3、Register event handler这个过程展示如何注册一个回调函数,以及如何触发这个回调函数
### podInformer.AddEventHandler:func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)}func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {...listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(),initialBufferSize)if !s.started {s.processor.addListener(listener)return}...}### s.processor.addListener(listener):func (p *sharedProcessor) addListener(listener *processorListener) {p.addListenerLocked(listener)if p.listenersStarted {p.wg.Start(listener.run)p.wg.Start(listener.pop)}}### listener.run:func (p *processorListener) run() {// this call blocks until the channel is closed.When a panic hAppens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted.This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {for next := range p.nextCh {switch notification := next.(type) {// 通过next结构体本身的类型来判断事件类型case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedclose(stopCh)}, 1*time.Second, stopCh)}### listener.pop:func (p *processorListener) pop() {var nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}}这个过程总结就是:
(1)AddEventHandler 到 sharedProcessor,注册事件回调函数到 sharedProcessor
(2)listener pop 方法里会监听 p.addCh,通过 nextCh = p.nextCh 将 addCh 将事件传递给 p.nextCh
(3)listener run 方法里会监听 p.nextCh,收到信号之后,判断是属于什么类型的方法,并且执行前面注册的 Handler
所以后面需要关注当资源对象发生变更时,是如何将变更信号给 p.addCh , 进一步触发回调函数的
3.4、Start all informers通过 sharedInformers.Start(stopCh)启动所有的 informer,代码如下:
// Start initializes all requested informers.func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {for informerType, informer := range f.informers {if !f.startedInformers[informerType] {go informer.Run(stopCh)f.startedInformers[informerType] = true}}}我们的例子中其实就只启动了 PodInformer,接下来看到 podInformer 的 Run 方法做了什么
### go informer.Run(stopCh):func (s *sharedIndexInformer) Run(stopCh <-chan struct{}){defer utilruntime.HandleCrash()fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{// DeltafifoKnownObjects:s.indexer,EmitDeltaTypeReplaced: true,})cfg := &Config{Queue:fifo,// DeltafifoListerWatcher:s.listerWatcher,// listerWatcherObjectType:s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,// HandleDeltas, added to process, and done in processloopProcess:s.HandleDeltas,WatchErrorHandler: s.watchErrorHandler,}func() {...s.controller = New(cfg)...}s.controller.Run(stopCh)}### s.controller.Run(stopCh)func (c *controller) Run(stopCh <-chan struct{}) {r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)c.reflector = r// Run reflectorwg.StartWithChannel(stopCh, r.Run)// Run processLoop, pop from deltafifo and do ProcessFunc,// ProcessFunc is the s.HandleDeltas beforewait.Until(c.processLoop, time.Second, stopCh)}可以看到上面的逻辑首先生成一个 DeltaFifo,然后接下来的逻辑分为两块,生产和消费:
(1)生产—r.Run:主要的逻辑就是利用 list and watch 将资源对象包括操作类型压入队列 DeltaFifo
#### r.Run:func (r *Reflector) Run(stopCh <-chan struct{}) {// 执行listAndWatchif err := r.ListAndWatch(stopCh);}// 执行ListAndWatch流程func (r *Reflector)ListAndWatch(stopCh <-chan struct{}) error{// 1、list:// (1)、list pods, 实际调用的是podInformer里的ListFunc方法,// client.CoreV1().Pods(namespace).List(context.TODO(), options)r.listerWatcher.List(opts)// (2)、获取资源版本号,用于watchresourceVersion = listMetaInterface.GetResourceVersion()//(3)、数据转换,转换成列表items, err := meta.ExtractList(list)// (4)、将资源列表中的资源对象和版本号存储到DeltaFifo中r.syncWith(items, resourceVersion);// 2、watch,无限循环去watch apiserver , 当watch到事件的时候,执行watchHandler将event事件压入fifofor {// (1)、watch pods, 实际调用的是podInformer里的WatchFunc方法,// client.CoreV1().Pods(namespace).Watch(context.TODO(), options)w, err := r.listerWatcher.Watch(options)// (2)、watchHandler// watchHandler watches pod,更新DeltaFifo信息,并且更新resourceVersionif err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);}}### r.watchHandler// watchHandler watches w and keeps *resourceVersion up to date.func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {...loop:for {select {case event, ok := <-w.ResultChan():newResourceVersion := meta.GetResourceVersion()switch event.Type {case watch.Added:err := r.store.Add(event.Object)// Add event to srore, store的具体方法在fifo中if err != nil {utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))}...}*resourceVersion = newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}...}### r.store.Add:## 即为deltaFifo的add方法:func (f *DeltaFIFO) Add(obj interface{}) error {...return f.queueActionLocked(Added, obj)...}func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err := f.KeyOf(obj)if err != nil {return KeyError{obj, err}}newDeltas := append(f.items[id], Delta{actionType, obj})newDeltas = dedupDeltas(newDeltas)if len(newDeltas) > 0 {if _, exists := f.items[id]; !exists {f.queue = append(f.queue, id)}f.items[id] = newDeltasf.cond.Broadcast()// 通知所有阻塞住的消费者}...return nil}


推荐阅读