跬步 On Coding

k8s云原生程序实现CRD的搜索查询分页

前言

来新公司还是在做云原生平台的开发,基本的业务逻辑就是通过一系列的CRD资源,走Operator的模式来实现平台的各种业务部署。但是CRD在apiserver中是以key-value的形式存储在etcd中,虽然labels可以实现简单的查询,但是并不能满足复杂的查询条件。在我来之前CRD资源的列表查询都是直接全量返回由前端自行筛选,随着业务量的增加,查询的效率越来越低。

本来这个项目的前任开发者决策使用MySQL来双写CRD资源,来实现辅助查询,代码也写了很多了,双写带来了一些代码结构上的耦合,但是其实也不是个问题。更恶心的问题是有一个客户由于信创方面的政治问题,要求我们不能引入MySQL!虽然客户有建议用他们自己的数据库,但是从我们的角度来说,我们不希望自己的服务依赖外部客户组件,所以就必须考虑下其它的方案了,比如不用数据库服务。

基于以往在SQLite上的学习经验,以当前CRD资源的体量完全可以使用SQLite来实现CRD资源的辅助查询,那么是不是要引入持久存储呢?其实还是不用,因为CRD本身已经存储在etcd中了,我们只需要在程序启动时通过k8s的List-Watch的方式将CRD资源同步到SQLite中,这样就可以实现CRD资源的辅助查询了。具体到实现上完全可以使用k8s client中的informer来实现CRD资源的同步,这样可以使用到informer的缓存机制减少apiserver的查询压力。

实现

synchronizer的接口定义

首先我们定一个synchronizer接口,针对每种需要同步的CRD资源,需要实现synchronizer接口。这个接口会处理informer的add、update、delete事件,将CRD资源同步到SQLite中。每一个资源可以创建一张在SQLite中创建的表,将CRD资源同步到表中。

import (
	"k8s.io/apimachinery/pkg/runtime/schema"
)

type synchronizer interface {
	gvr() schema.GroupVersionResource

	add(obj interface{})
	update(oldObj, newObj interface{})
	delete(obj interface{})
}

// 以下是一个实现接口的例子
type templateSynchronizerImpl struct {
	manager dao.TemplateManager // SQLite数据库操作
}

func (s *templateSynchronizerImpl) gvr() schema.GroupVersionResource {
	return schema.GroupVersionResource{
		Group:    "apps.titanide.cn",
		Version:  "v1alpha1",
		Resource: "templates",
	}
}

// 分别处理informer的add、update、delete事件,把数据同步到SQLite的表中
func (s *templateSynchronizerImpl) add(obj interface{}) {
}

func (s *templateSynchronizerImpl) update(oldObj, newObj interface{}) {
}

func (s *templateSynchronizerImpl) delete(obj interface{}) {
}

同步逻辑实现

然后我们使用k8s的dynamicclient的informer来实现CRD资源的同步,因为这段代码在项目中的层级比较低,没能直接创建带scheme的client,所以只能使用dynamicclient。如果你的项目代码层级比较高,可以直接使用带scheme的client。initSynchronizer函数必须在服务启动时调用,等待同步完成后再启动服务。

import (
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/pkg/errors"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"

	appsv1alpha1 "project/api/apis/apps/v1alpha1"
	"project/pkg/database"
	"project/pkg/database/dao"
)

func initSynchronizer() {
	client, err := GetKubeDynamicClient()
	if err != nil {
		panic(err)
	}

  sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	stopCh := make(chan struct{})
	// 启动一个goroutine,等待接收信号
	go func() {
		<-sigCh
		close(stopCh)
	}()

	// 启动同步逻辑
	db := database.GetDB()

  // 初始化具体的同步对象
	templateSynchronizer := newTemplateSynchronizer(dao.NewTemplateManager(db))
	projectSynchronizer := newProjectSynchronizer(dao.NewProjectManager(db))
	pluginSynchronizer := newPluginSynchronizer(dao.NewPluginManager(db))

	err = synchronize(client, []synchronizer{templateSynchronizer, projectSynchronizer, pluginSynchronizer}, stopCh)
	if err != nil {
		panic(err)
	}
}

func synchronize(dynamicClient dynamic.Interface, syncs []synchronizer, stopCh chan struct{}) error {
	// 创建 Informer 工厂, 10分钟全量同步一次
	factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, time.Minute*10)

	hasSyncedFuncs := make([]cache.InformerSynced, 0, len(syncs))
	for _, sync := range syncs {
		// 创建 informer
		gvr := sync.gvr()

		informer := factory.ForResource(gvr).Informer()

		// 设置事件处理程序
		informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
			AddFunc:    sync.add,
			UpdateFunc: sync.update,
			DeleteFunc: sync.delete,
		})

		// 启动 informer
		go informer.Run(stopCh)

		hasSyncedFuncs = append(hasSyncedFuncs, informer.HasSynced)
	}

	// 等待缓存同步
	if !cache.WaitForCacheSync(stopCh, hasSyncedFuncs...) {
		return errors.New("failed to wait for cache sync")
	}

	return nil
}

// GetKubeDynamicClient 函数用于获取 Kubernetes 客户端集,返回一个指向 *kubernetes.Clientset 的指针和 error 对象
func GetKubeDynamicClient() (dynamic.Interface, error) {
	cfg, err := rest.InClusterConfig()
	if err != nil {
		err = errors.Wrap(err, "failed to get cluster config")
		return nil, err
	}

	dynamicClient, err := dynamic.NewForConfig(cfg)
	if err != nil {
		err = errors.Wrap(err, "failed to create dynamicClient")
		return nil, err
	}

	return dynamicClient, nil
}

其它

通过以上机制,我们就可以实现CRD资源的辅助查询了。这种同步机制也摆脱了需要双写的复杂逻辑,在程序启动时List同步,启动后Watch动态更新,在查询时直接从SQLite中查询,写入时直接写CRD。从某种意义上来说,这种机制实现了读写分离,也解决了双写带来的耦合问题。

在封装db的dao层时,基于以往项目的经验还是建议直接使用sqlx+squirrel来实现SQL的生成和执行,这样在代码结构上会更加清晰。没有引入ORM的原因是以前踩过坑。

在使用SQLite时建议开启一些配置,来优化性能:

db.MustExec("PRAGMA journal_mode = WAL;")
db.MustExec("PRAGMA synchronous=NORMAL;")
db.MustExec("PRAGMA mmap_size = 134217728;")
db.MustExec("PRAGMA journal_size_limit = 27103364;")
db.MustExec("PRAGMA cache_size=2000;")

总结

在实际项目中我们会碰到各种问题,解决问题的思路和方法多种多样。需要根据项目的实际情况与项目未来的发展方向来决策使用什么方法来解决问题。以我当前的项目为例,客户的政治原因导致不能引入MySQL,只能考虑其它的方式来解决问题。在引入SQLite的同时还解决了需要双写带来的耦合问题,这里的代码不多,但是提供一种解决CRD资源通用查询的思路,有需要可以参考。