在 Informer 中 `Store` 提供了存储对象的能力,而 `Indexer` 在 `Store` 的基础上又提供了可以自定义索引来查询对象的功能
## Store
`Store` 接口提供的功能比较直观,主要是用于对对象的增删改查。
```go
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
Replace([]interface{}, string) error
Resync() error
}
type KeyFunc func(obj interface{}) (storeKey string, err error)
func NewStore(keyFunc KeyFunc) Store
```
`NewStore` 返回的实例内部使用线程安全的 map 来存储对象,而 map 的 key 便是使用 KeyFunc 函数来计算出来的。
`Get` 操作实际便相当于先根据 `keyFunc(obj)`参数计算出 obj 对应的 key,然后在调用 `GeyByKey` 方法获取对象。
> `Get` 的 `obj` 参数实际只要可以根据 `KeyFunc` 可以计算出相应的 key 即可,不需要包含多余的数据
对于 `NewStore` 返回的实例,`Resync` 实际不会有任何操作,而 `Replace` 操作则会清空 `Store` 中所有数据,然后将参数中的所有对象存到 `Store` 中
#### MetaNamespaceKeyFunc
`MetaNamespaceKeyFunc` 通过 namespace 来生成对象的key,返回 `namespace/name`,如果 namespace 为空则只返回 `name>`
```go
func MetaNamespaceKeyFunc (obj interface{}) (string, error) {
// ... 如果 obj 是字符串,那么直接返回该字符串
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("obj has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}
```
## Indexer
我们先看一下如何创建一个 Indexer 实例
```go
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer
```
乍一看一头雾水,怎么参数又是 `Indexers` 类型,又返回一个 `Indexer`,从命名上看 `Indexers` 像是多个 `Indexer` 的感觉。
实际上 `Indexer` 应该叫做 `IndexerStore`,而 `Indexers` 是一组用来根据存储的对象来获取索引的 key 的函数
```go
// <IndexName:IndexFunc>
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)
```
`Indexer` 会对添加( `Add`, `Replace`) 或者更新(`Update`) 的对象调用每个 `IndexFunc` 来计算出一组索引的 key,也就是说同一个对象可以有多种索引方式来提供索引查询,而且每个 `IndexFunc` 可以返回多个索引 key
**当在添加或者更新对象时,使用 `IndexFunc` 计算索引key 时如果返回 error 会直接 panic**
```go
// NewIndexer() 实例的实现
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
// ...
if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}
// ...
}
```
`Indexer` 会在内部使用 `Indices` 来存储索引
```go
// <IndexName:Index>
type Indices map[string]Index
// type sets.String map[string]struct{}
type Index map[string]sets.String
```
`Indices` 根据 `IndexFunc` 的名称对索引进行分类
`Index` 是 `IndexFunc` 计算出来的索引 Key 和相应对象在 `Store` 中存储的 key 的集合
> 一个索引 Key 可能对应多个对象,因为根据 IndexFunc 不同的 obj 可能会计算出相同的索引 Key
我们现在来看一下 Indexer 提供的能力
```go
type Indexer interface {
Store
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
// indexedValue 实际是指由索引函数计算出来的索引 Key
IndexKeys(indexName, indexedValue string) ([]string, error)
ByIndex(indexName, indexedValue string) ([]interface{}, error)
Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(indexName string) []string
}
```
除了提供 `Store` 相同的操作外,还提供了根据 `indexName`(`IndexFunc` 的名字)来查询数据的功能
`GetIndexers` 获取 `Indexer` 的索引函数
`AddIndexers` 添加新的索引函数,如果 `Indexer` 中已经有数据了,接口并没有定义该操作的结果,实际上会直接报错
`IndexKeys` 根据索引函数的名称来获取到相应索引,然后使用索引Key (`indexedValue`)来获取相应的存储对象的 key,相当于`Indices[indexName][indexedValue]`
`ByIndex` 使用 `IndexKeys` 返回的 keys 来从存储对象 map 中取出相应的对象
`Index` 会使用 `indexName` 对应的索引函数来计算出 `obj` 参数的索引Key,然后再调用 `ByIndex`
**上述操作如果索引函数返回 error,那么不会像添加或更新对象时一样 panic,而是会返回 error**
`ListIndexFuncVaues` 会返回 IndexName 对应索引的所有索引Key
#### NamesapaceIndex
tools/cache 提供了一个基于 namespace 来创建索引的函数 `MetaNamespaceIndexfunc`
```go
const NamespaceIndex string = "namespace"
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
```
`NamespaceIndex` 是个非常常用索引函数,比如 `ListAllByNamespace`方法便是使用了该索引函数
```go
// ListAllByNamespace used to list items belongs to namespace from Indexer.
func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error {
// ...
// 如果 NamespaceIndex 不存在的话,就比对所有对象的 namespace
items, err := indexer.Index(NamespaceIndex, &metav1.ObjectMeta{Namespace: namespace})
if err != nil {
klog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range indexer.List() {
metadata, err := meta.Accessor(m)
if err != nil {
return err
}
if metadata.GetNamespace() == namespace && selector.Matches(labels.Set(metadata.GetLabels())) {
appendFn(m)
}
}
return nil
}
// ...
}
```
#### Indexer 使用
既然 Indexer 具有通过索引函数来自定义索引的能力,那么具体该怎么使用它呢
我们首先定义一个根据 lable 来生成索引的 `IndexFunc`
```go
// 不区分 namespace
func LabelIndexFunc(obj interface{}) ([]string, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
// ...
return nil, nil
}
var indexKeys []string
for key, value := range metadata.GetLabels() {
indexKeys = append(indexKeys, key, fmt.Sprintf("%s=%s", key, value))
}
return indexKeys, nil
}
```
我们使用 LabelIndexFunc 和 MetaNamespaceIndexFunc 作为索引函数来初始化 Indexer,然后将 pod 对象存到 Indexer 中
```go
func main() {
indexers := cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
"label": LabelIndexFunc,
}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)
pods := []*v1.Pod{
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod1", "label2": "pod1"},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod2"},
},
},
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod3",
Namespace: "namespace1",
Labels: map[string]string{"label1": "pod3", "label2": "pod3"},
},
},
}
for _, pod := range pods {
indexer.Add(pod)
}
keys, err := indexer.IndexKeys(cache.NamespaceIndex, "namespace1")
if err != nil {
panic(err)
}
fmt.Printf("get key by namespace 'namespace1': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label1")
fmt.Printf("get key by label 'label1': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label2")
fmt.Printf("get key by label 'label2': %v\n", keys)
keys, _ = indexer.IndexKeys("label", "label1=pod2")
fmt.Printf("get key by label 'label1=pod2': %v\n", keys)
}
```
这里为了逻辑简单使用了 `IndexKeys` 方法来打印出来存储的 key,如果需要获取对象可以使用 `Index` 或者 `ByIndex`
```bash
$ go run indexer.go
get key by namespace 'namespace1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label1': [namespace1/pod1 namespace1/pod2 namespace1/pod3]
get key by label 'label2': [namespace1/pod1 namespace1/pod3]
get key by label 'label1=pod2': [namespace1/pod2]
```
## ThreadStore
我们讲述了 Store 和 Indexer 提供的功能,实际上 Indexer 和 Store 都是基于 `ThreadSafeStore` 来实现的
```go
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThradSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
}
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: KeyFunc,
}
```
`Store`实例和 `Indexer`实例初始化的区别,就是没有 `Indexers`。`Store` 算是阉割版的 `Indexer`
`ThreadSafeStore` 的操作时需要使用 key 来对对象进行操作的,而 `cache` 的作用便是使用 `cache.keyFunc` 来计算对象的 key
```go
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string, obj interface{})
Get(key string)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexname string, obj interface{}) ([]interface{}, error)
IndexKeys(indexname, indexKey string) ([]string, error)
ListIndexFuncValues(name sring) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
}
```
cache 使用 `cache.keyFunc` 无法计算 obj 的 key 时会返回 `KeyError`
```go
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheSgtorage.Add(key, obj)
return nil
}
type KeyError struct {
Obj interface{}
Err error
}
func (k KeyError) Error() string {
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}
```
#### ThreadSafeStore 实现
我们来看一下实现 `ThreadSafeStore` 接口的 `threadSafeMap` 对存储对象 map 和 `Indices` 的操作
```go
type threadSafeMap struct {
lock sync.RWMutex // 保证操作 threadSafeMap 的线程安全
items map[string]interface{}
indexers Indexers // map[string]IndexFunc
indices Indices // map[string]Index
}
```
`threadSafeMap`在添加和更新对象时会更新 `indices`
```go
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj, key)
return
}
func (c *threadSafeMap) updateIndices(oldObj interface{}, newOjb interface{}, key string) {
if oldObj != nil {
// 删除旧的索引
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers {
// 计算索引的key时,如果索引函数返回 error 那么直接 panic
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
// 获取相应索引函数的索引
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
// 将对象的key保存到索引函数计算出的索引key下
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
```
**需要注意:计算索引的 key 时,如果索引函数返回 error 那么会直接 panic**
除了更新对象,当删除对象时也会在索引中删除该对象
```go
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(obj)
if err != nil {
panic("...")
}
index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set != nil {
set.Delete(key)
if len(set) == 0{
delete(index, indexValue)
}
}
}
}
}
```
当一个索引key下没有对象时,在索引中删除该key,防止占用内存
`Replace` 操作会直接重新创建一个 `Indices`,然后添加新的对象和索引
```go
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}
```
`Index`,`ByIndex` 和 `IndexKeys` 大致流程都是先判断 `indexName` 对应的索引函数是否存在,然后从 `indices` 中获取索引中相应的数据
我们来看一下其中最复杂的 `Index` 操作
```go
func (c *threadSafeMap) Index(indexname string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 先查询索引函数是否存在
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exists", indexName)
}
// 计算对象的索引key,如果计算失败,并不会 panic,而是返回 error
indexedValue, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
var storeKeySet sets.String
if len(indexedValues) == 1 {
// obj 只对应一个索引key,那么直接获取该索引key下的所有对象key
storeKeySet = index[indexedValues[0]]
} else {
storeKeySet = sets.String{}
// 根据对象所对应的所有索引key,取出保存的所有对象key
for _, indexValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
list := make([]interface{}, 0, storeKeySet.Len())
// 通过对象 key 获取对象
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}
```
深入 Kubernetes Informer -- Store 与 Indexer