深入 Kubernetes Informer -- Store 与 Indexer

Posted by icebergu on 03-28,2021

在 Informer 中 Store 提供了存储对象的能力,而 IndexerStore 的基础上又提供了可以自定义索引来查询对象的功能

Store

Store 接口提供的功能比较直观,主要是用于对对象的增删改查。

type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)
	Replace([]interface{}, string) error
	Resync() error
}

type KeyFunc func(obj interface{}) (storeKey string, err error)
func NewStore(keyFunc KeyFunc) Store

NewStore 返回的实例内部使用线程安全的 map 来存储对象,而 map 的 key 便是使用 KeyFunc 函数来计算出来的。

Get 操作实际便相当于先根据 keyFunc(obj)参数计算出 obj 对应的 key,然后在调用 GeyByKey 方法获取对象。

Getobj 参数实际只要可以根据 KeyFunc 可以计算出相应的 key 即可,不需要包含多余的数据

对于 NewStore 返回的实例,Resync 实际不会有任何操作,而 Replace 操作则会清空 Store 中所有数据,然后将参数中的所有对象存到 Store

MetaNamespaceKeyFunc

MetaNamespaceKeyFunc 通过 namespace 来生成对象的key,返回 namespace/name,如果 namespace 为空则只返回 name>

func MetaNamespaceKeyFunc (obj interface{}) (string, error) {
	// ... 如果 obj 是字符串,那么直接返回该字符串

	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("obj has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

Indexer

我们先看一下如何创建一个 Indexer 实例

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer

乍一看一头雾水,怎么参数又是 Indexers 类型,又返回一个 Indexer,从命名上看 Indexers 像是多个 Indexer 的感觉。
实际上 Indexer 应该叫做 IndexerStore,而 Indexers 是一组用来根据存储的对象来获取索引的 key 的函数

// <IndexName:IndexFunc>
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

Indexer 会对添加( Add, Replace) 或者更新(Update) 的对象调用每个 IndexFunc 来计算出一组索引的 key,也就是说同一个对象可以有多种索引方式来提供索引查询,而且每个 IndexFunc 可以返回多个索引 key
当在添加或者更新对象时,使用 IndexFunc 计算索引key 时如果返回 error 会直接 panic

// NewIndexer() 实例的实现
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
	// ...
	if len(c.items) > 0 {
		return fmt.Errorf("cannot add indexers to running index")
	}
	// ...
}

Indexer 会在内部使用 Indices 来存储索引

// <IndexName:Index>
type Indices map[string]Index
// type sets.String map[string]struct{}
type Index map[string]sets.String

Indices 根据 IndexFunc 的名称对索引进行分类
IndexIndexFunc 计算出来的索引 Key 和相应对象在 Store 中存储的 key 的集合

一个索引 Key 可能对应多个对象,因为根据 IndexFunc 不同的 obj 可能会计算出相同的索引 Key

我们现在来看一下 Indexer 提供的能力

type Indexer interface {
	Store

	GetIndexers() Indexers
	AddIndexers(newIndexers Indexers) error
	
	// indexedValue 实际是指由索引函数计算出来的索引 Key
	IndexKeys(indexName, indexedValue string) ([]string, error)
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	Index(indexName string, obj interface{}) ([]interface{}, error)

	ListIndexFuncValues(indexName string) []string
}

除了提供 Store 相同的操作外,还提供了根据 indexName(IndexFunc 的名字)来查询数据的功能
GetIndexers 获取 Indexer 的索引函数
AddIndexers 添加新的索引函数,如果 Indexer 中已经有数据了,接口并没有定义该操作的结果,实际上会直接报错

IndexKeys 根据索引函数的名称来获取到相应索引,然后使用索引Key (indexedValue)来获取相应的存储对象的 key,相当于Indices[indexName][indexedValue]
ByIndex 使用 IndexKeys 返回的 keys 来从存储对象 map 中取出相应的对象
Index 会使用 indexName 对应的索引函数来计算出 obj 参数的索引Key,然后再调用 ByIndex
上述操作如果索引函数返回 error,那么不会像添加或更新对象时一样 panic,而是会返回 error

ListIndexFuncVaues 会返回 IndexName 对应索引的所有索引Key

NamesapaceIndex

tools/cache 提供了一个基于 namespace 来创建索引的函数 MetaNamespaceIndexfunc

const NamespaceIndex string = "namespace"

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
        meta, err := meta.Accessor(obj)
        if err != nil {
                return []string{""}, fmt.Errorf("object has no meta: %v", err)
        }
        return []string{meta.GetNamespace()}, nil
}

NamespaceIndex 是个非常常用索引函数,比如 ListAllByNamespace方法便是使用了该索引函数


// ListAllByNamespace used to list items belongs to namespace from Indexer.
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
		// ...

		// 如果 NamespaceIndex 不存在的话,就比对所有对象的 namespace
        items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
        if err != nil {
                klog.Warningf("can not retrieve list of objects using index : %v", err)
                for _, m := range indexer.List() {
                        metadata, err := meta.Accessor(m)
                        if err != nil {
                                return err
                        }
                        if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
                                appendFn(m)
                        }
                }
                return nil
        }
        // ...
}

Indexer 使用

既然 Indexer 具有通过索引函数来自定义索引的能力,那么具体该怎么使用它呢
我们首先定义一个根据 lable 来生成索引的 IndexFunc

// 不区分 namespace
func LabelIndexFunc(obj interface{}) ([]string, error) {
        metadata, err := meta.Accessor(obj)
        if err != nil {
                // ...
                return nil, nil
        }

        var indexKeys []string
        for key, value := range metadata.GetLabels() {
                indexKeys = append(indexKeys, key, fmt.Sprintf("%s=%s", key, value))
        }
        return indexKeys, nil
}

我们使用 LabelIndexFunc 和 MetaNamespaceIndexFunc 作为索引函数来初始化 Indexer,然后将 pod 对象存到 Indexer 中


func main() {
        indexers := cache.Indexers{
                cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
                "label":              LabelIndexFunc,
        }
        indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)

        pods := []*v1.Pod{
                &v1.Pod{
                        ObjectMeta: metav1.ObjectMeta{
                                Name:      "pod1",
                                Namespace: "namespace1",
                                Labels:    map[string]string{"label1": "pod1", "label2": "pod1"},
                        },
                },
                &v1.Pod{
                        ObjectMeta: metav1.ObjectMeta{
                                Name:      "pod2",
                                Namespace: "namespace1",
                                Labels:    map[string]string{"label1": "pod2"},
                        },
                },
                &v1.Pod{
                        ObjectMeta: metav1.ObjectMeta{
                                Name:      "pod3",
                                Namespace: "namespace1",
                                Labels:    map[string]string{"label1": "pod3", "label2": "pod3"},
                        },
                },
        }

        for _, pod := range pods {
                indexer.Add(pod)
        }

        keys, err := indexer.IndexKeys(cache.NamespaceIndex, "namespace1")
        if err != nil {
                panic(err)
        }
        fmt.Printf("get key by namespace 'namespace1': %v\n", keys)

        keys, _ = indexer.IndexKeys("label", "label1")
        fmt.Printf("get key by label 'label1': %v\n", keys)

        keys, _ = indexer.IndexKeys("label", "label2")
        fmt.Printf("get key by label 'label2': %v\n", keys)

        keys, _ = indexer.IndexKeys("label", "label1=pod2")
        fmt.Printf("get key by label 'label1=pod2': %v\n", keys)
}

这里为了逻辑简单使用了 IndexKeys 方法来打印出来存储的 key,如果需要获取对象可以使用 Index 或者 ByIndex

$ go run indexer.go
get key by namespace 'namespace1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label2': [namespace1/pod1 namespace1/pod3]
get key by label 'label1=pod2': [namespace1/pod2]

ThreadStore

我们讲述了 Store 和 Indexer 提供的功能,实际上 Indexer 和 Store 都是基于 ThreadSafeStore 来实现的

func NewStore(keyFunc KeyFunc) Store {
	return &cache{
		cacheStorage: NewThradSafeStore(Indexers{}, Indices{}),
		keyFunc: keyFunc,
	}
}

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc: KeyFunc,
}

Store实例和 Indexer实例初始化的区别,就是没有 IndexersStore 算是阉割版的 Indexer

ThreadSafeStore 的操作时需要使用 key 来对对象进行操作的,而 cache 的作用便是使用 cache.keyFunc 来计算对象的 key

type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string, obj interface{})
	Get(key string)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	
	Index(indexname string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexname, indexKey string) ([]string, error)
	ListIndexFuncValues(name sring) []string
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	GetIndexers() Indexers
}

cache 使用 cache.keyFunc 无法计算 obj 的 key 时会返回 KeyError

func (c *cache) Add(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheSgtorage.Add(key, obj)
	return nil
}

type KeyError struct {
	Obj interface{}
	Err error
}

func (k KeyError) Error() string {
	return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}

ThreadSafeStore 实现

我们来看一下实现 ThreadSafeStore 接口的 threadSafeMap 对存储对象 map 和 Indices 的操作

type threadSafeMap struct {
	lock sync.RWMutex // 保证操作 threadSafeMap 的线程安全
	items map[string]interface{}

	indexers Indexers  // map[string]IndexFunc
	indices Indices	// map[string]Index
}

threadSafeMap在添加和更新对象时会更新 indices

func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.updateIndices(oldObject, obj, key)
	return
}

func (c *threadSafeMap) updateIndices(oldObj interface{}, newOjb interface{}, key string) {
	if oldObj != nil {
		// 删除旧的索引
		c.deleteFromIndices(oldObj, key)
	}
	for name, indexFunc := range c.indexers {
		// 计算索引的key时,如果索引函数返回 error 那么直接 panic
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
		// 获取相应索引函数的索引
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}
		// 将对象的key保存到索引函数计算出的索引key下
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key)
		}
	}

需要注意:计算索引的 key 时,如果索引函数返回 error 那么会直接 panic

除了更新对象,当删除对象时也会在索引中删除该对象


func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
	for name, indexFunc := range c.indexers {
		indexValue, err := indexFunc(obj)
		if err != nil {
			panic("...")
		}
		index := c.indices[name]
		if index == nil {
			continue
		}
		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set != nil {
				set.Delete(key)
				if len(set) == 0{
					delete(index, indexValue)
				}
			}
		}
	}
}

当一个索引key下没有对象时,在索引中删除该key,防止占用内存

Replace 操作会直接重新创建一个 Indices,然后添加新的对象和索引

func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	c.items = items

	c.indices = Indices{}
	for key, item := range c.items {
		c.updateIndices(nil, item, key)
	}
}

IndexByIndexIndexKeys 大致流程都是先判断 indexName 对应的索引函数是否存在,然后从 indices 中获取索引中相应的数据
我们来看一下其中最复杂的 Index 操作

func (c *threadSafeMap) Index(indexname string, obj interface{}) ([]interface{}, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	//  先查询索引函数是否存在
	indexFunc := c.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exists", indexName)
	}

	// 计算对象的索引key,如果计算失败,并不会 panic,而是返回 error
	indexedValue, err := indexFunc(obj)
	if err != nil {
		return nil, err
	}
	index := c.indices[indexName]

	var storeKeySet sets.String
	if len(indexedValues) == 1 {
		// obj 只对应一个索引key,那么直接获取该索引key下的所有对象key
		storeKeySet = index[indexedValues[0]]
	} else {
		storeKeySet = sets.String{}
		// 根据对象所对应的所有索引key,取出保存的所有对象key
		for _, indexValue := range indexedValues {
			for key := range index[indexedValue] {
				storeKeySet.Insert(key)
			}
		}
	}

	list := make([]interface{}, 0, storeKeySet.Len())
	// 通过对象 key 获取对象
	for storeKey := range storeKeySet {
		list = append(list, c.items[storeKey])
	}
	return list, nil
}