在 Informer 中 Store
提供了存储对象的能力,而 Indexer
在 Store
的基础上又提供了可以自定义索引来查询对象的功能
Store
Store
接口提供的功能比较直观,主要是用于对对象的增删改查。
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
type KeyFunc func(obj interface{}) (storeKey string, err error)
func NewStore(keyFunc KeyFunc) Store
NewStore
返回的实例内部使用线程安全的 map 来存储对象,而 map 的 key 便是使用 KeyFunc 函数来计算出来的。
Get
操作实际便相当于先根据 keyFunc(obj)
参数计算出 obj 对应的 key,然后在调用 GeyByKey
方法获取对象。
Get
的obj
参数实际只要可以根据KeyFunc
可以计算出相应的 key 即可,不需要包含多余的数据
对于 NewStore
返回的实例,Resync
实际不会有任何操作,而 Replace
操作则会清空 Store
中所有数据,然后将参数中的所有对象存到 Store
中
MetaNamespaceKeyFunc
MetaNamespaceKeyFunc
通过 namespace 来生成对象的key,返回 namespace/name
,如果 namespace 为空则只返回 name>
func MetaNamespaceKeyFunc (obj interface{}) (string, error) {
// ... 如果 obj 是字符串,那么直接返回该字符串
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("obj has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
Indexer
我们先看一下如何创建一个 Indexer 实例
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer
乍一看一头雾水,怎么参数又是 Indexers
类型,又返回一个 Indexer
,从命名上看 Indexers
像是多个 Indexer
的感觉。
实际上 Indexer
应该叫做 IndexerStore
,而 Indexers
是一组用来根据存储的对象来获取索引的 key 的函数
// <IndexName:IndexFunc>
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
Indexer
会对添加( Add
, Replace
) 或者更新(Update
) 的对象调用每个 IndexFunc
来计算出一组索引的 key,也就是说同一个对象可以有多种索引方式来提供索引查询,而且每个 IndexFunc
可以返回多个索引 key
当在添加或者更新对象时,使用 IndexFunc
计算索引key 时如果返回 error 会直接 panic
// NewIndexer() 实例的实现
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
// ...
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
// ...
}
Indexer
会在内部使用 Indices
来存储索引
// <IndexName:Index>
type Indices map[string]Index
// type sets.String map[string]struct{}
type Index map[string]sets.String
Indices
根据 IndexFunc
的名称对索引进行分类
Index
是 IndexFunc
计算出来的索引 Key 和相应对象在 Store
中存储的 key 的集合
一个索引 Key 可能对应多个对象,因为根据 IndexFunc 不同的 obj 可能会计算出相同的索引 Key
我们现在来看一下 Indexer 提供的能力
type Indexer interface {
Store
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
// indexedValue 实际是指由索引函数计算出来的索引 Key
IndexKeys(indexName, indexedValue string) ([]string, error)
ByIndex(indexName, indexedValue string) ([]interface{}, error)
Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(indexName string) []string
}
除了提供 Store
相同的操作外,还提供了根据 indexName
(IndexFunc
的名字)来查询数据的功能
GetIndexers
获取 Indexer
的索引函数
AddIndexers
添加新的索引函数,如果 Indexer
中已经有数据了,接口并没有定义该操作的结果,实际上会直接报错
IndexKeys
根据索引函数的名称来获取到相应索引,然后使用索引Key (indexedValue
)来获取相应的存储对象的 key,相当于Indices[indexName][indexedValue]
ByIndex
使用 IndexKeys
返回的 keys 来从存储对象 map 中取出相应的对象
Index
会使用 indexName
对应的索引函数来计算出 obj
参数的索引Key,然后再调用 ByIndex
上述操作如果索引函数返回 error,那么不会像添加或更新对象时一样 panic,而是会返回 error
ListIndexFuncVaues
会返回 IndexName 对应索引的所有索引Key
NamesapaceIndex
tools/cache 提供了一个基于 namespace 来创建索引的函数 MetaNamespaceIndexfunc
const NamespaceIndex string = "namespace"
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
NamespaceIndex
是个非常常用索引函数,比如 ListAllByNamespace
方法便是使用了该索引函数
// ListAllByNamespace used to list items belongs to namespace from Indexer.
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
// ...
// 如果 NamespaceIndex 不存在的话,就比对所有对象的 namespace
items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
if err != nil {
klog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range indexer.List() {
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
}
return nil
}
// ...
}
Indexer 使用
既然 Indexer 具有通过索引函数来自定义索引的能力,那么具体该怎么使用它呢
我们首先定义一个根据 lable 来生成索引的 IndexFunc
// 不区分 namespace
func LabelIndexFunc(obj interface{}) ([]string, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
// ...
return nil, nil
}
var indexKeys []string
for key, value := range metadata.GetLabels() {
indexKeys = append(indexKeys, key, fmt.Sprintf("%s=%s", key, value))
}
return indexKeys, nil
}
我们使用 LabelIndexFunc 和 MetaNamespaceIndexFunc 作为索引函数来初始化 Indexer,然后将 pod 对象存到 Indexer 中
func main() {
indexers := cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"label": LabelIndexFunc,
}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)
pods := []*v1.Pod{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod1", "label2": "pod1"},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod2"},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod3", "label2": "pod3"},
},
},
}
for _, pod := range pods {
indexer.Add(pod)
}
keys, err := indexer.IndexKeys(cache.NamespaceIndex, "namespace1")
if err != nil {
panic(err)
}
fmt.Printf("get key by namespace 'namespace1': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label1")
fmt.Printf("get key by label 'label1': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label2")
fmt.Printf("get key by label 'label2': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label1=pod2")
fmt.Printf("get key by label 'label1=pod2': %v\n", keys)
}
这里为了逻辑简单使用了 IndexKeys
方法来打印出来存储的 key,如果需要获取对象可以使用 Index
或者 ByIndex
$ go run indexer.go
get key by namespace 'namespace1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label2': [namespace1/pod1 namespace1/pod3]
get key by label 'label1=pod2': [namespace1/pod2]
ThreadStore
我们讲述了 Store 和 Indexer 提供的功能,实际上 Indexer 和 Store 都是基于 ThreadSafeStore
来实现的
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThradSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: KeyFunc,
}
Store
实例和 Indexer
实例初始化的区别,就是没有 Indexers
。Store
算是阉割版的 Indexer
ThreadSafeStore
的操作时需要使用 key 来对对象进行操作的,而 cache
的作用便是使用 cache.keyFunc
来计算对象的 key
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string, obj interface{})
Get(key string)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexname string, obj interface{}) ([]interface{}, error)
IndexKeys(indexname, indexKey string) ([]string, error)
ListIndexFuncValues(name sring) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
}
cache 使用 cache.keyFunc
无法计算 obj 的 key 时会返回 KeyError
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheSgtorage.Add(key, obj)
return nil
}
type KeyError struct {
Obj interface{}
Err error
}
func (k KeyError) Error() string {
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}
ThreadSafeStore 实现
我们来看一下实现 ThreadSafeStore
接口的 threadSafeMap
对存储对象 map 和 Indices
的操作
type threadSafeMap struct {
lock sync.RWMutex // 保证操作 threadSafeMap 的线程安全
items map[string]interface{}
indexers Indexers // map[string]IndexFunc
indices Indices // map[string]Index
}
threadSafeMap
在添加和更新对象时会更新 indices
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
return
}
func (c *threadSafeMap) updateIndices(oldObj interface{}, newOjb interface{}, key string) {
if oldObj != nil {
// 删除旧的索引
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers {
// 计算索引的key时,如果索引函数返回 error 那么直接 panic
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取相应索引函数的索引
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
// 将对象的key保存到索引函数计算出的索引key下
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
需要注意:计算索引的 key 时,如果索引函数返回 error 那么会直接 panic
除了更新对象,当删除对象时也会在索引中删除该对象
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(obj)
if err != nil {
panic("...")
}
index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set != nil {
set.Delete(key)
if len(set) == 0{
delete(index, indexValue)
}
}
}
}
}
当一个索引key下没有对象时,在索引中删除该key,防止占用内存
Replace
操作会直接重新创建一个 Indices
,然后添加新的对象和索引
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
Index
,ByIndex
和 IndexKeys
大致流程都是先判断 indexName
对应的索引函数是否存在,然后从 indices
中获取索引中相应的数据
我们来看一下其中最复杂的 Index
操作
func (c *threadSafeMap) Index(indexname string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 先查询索引函数是否存在
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exists", indexName)
}
// 计算对象的索引key,如果计算失败,并不会 panic,而是返回 error
indexedValue, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
var storeKeySet sets.String
if len(indexedValues) == 1 {
// obj 只对应一个索引key,那么直接获取该索引key下的所有对象key
storeKeySet = index[indexedValues[0]]
} else {
storeKeySet = sets.String{}
// 根据对象所对应的所有索引key,取出保存的所有对象key
for _, indexValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
list := make([]interface{}, 0, storeKeySet.Len())
// 通过对象 key 获取对象
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}