Skip to main content

为 Traefik 添加从 nacos 读取服务地址功能

·1532 words·8 mins· loading · loading ·
Table of Contents
网络与架构 - This article is part of a series.
Part : This Article

软件版本
#

Traefik 版本:2.4.0

背景
#

Traefik 是一个 go 实现的高性能 API 网关,本身支持多种配置加载方式,包括 file, k8s, consul 等。

我们之前用的file作为配置文件,路由配置文件中会配置应用的路由信息和应用的负载信息。一个服务的配置信息类似下面这样:

http:
  services:
    dev-service:
      loadBalancer:
        servers:
        - url: http://aliyun-slb-service:8080
        passHostHeader: true
  routers:
    dev:
      entryPoints:
      - test
      service: dev-service
      rule: PathPrefix(`/path`)

服务的请求 url 配置的是服务前面挂载的负载均衡(用的阿里云的SLB),服务上下线等会从SLB上注册反注册。这样所有对外的服务前面都需要挂载一层负载,增加了复杂度和调用链长度。更好的做法是网关能自动从注册中心拉取服务的地址列表,就不需要在服务上再挂载一个负载均衡了。

我们现在后端注册中心使用的是 nacos,Traefik 现版本是不支持 nacos 的,需要简单修改下源码,添加一个 nacos 的 provider。

Consul provider
#

Traefik 虽然没有 nacos 的实现,但是有别的注册中心的实现,比如 consul。在新添加 provider 之前可以先看下 Traefik consul 是怎么实现的。

consul 逻辑是,服务信息会从注册的服务上直接读取,路由信息等配置都是tag信息,类似下图这样,需要每个配置字段都设定一个tag。

https://void.oss-cn-beijing.aliyuncs.com/img/20210714190352.jpg

(图从网上随便搜的)

这样的话每个服务都需要在consul中注册很多的tag,使用起来不太方便,也不方便维护和快速查找。

考虑到方便维护和兼容之前的文件配置,想要实现的效果是路由信息还是从文件中读取,服务地址信息从 nacos 中读取。之后与文件的配置文件合并,为了兼容之前的配置,文件中的服务信息也会读取,如果 nacos 中有重复的服务,就用 nacos 的覆盖文件的。

最终的配置文件
#

代码修改好后的配置文件就会像下面这样

静态配置里的 provider 配置
#

providers:
  nacos:
    file:
      watch: true
      filename: "/Users/d/depend/config/route/test.yaml"
      debugLogGeneratedTemplate: true
    endpoint:
      host: devnacos.inc.com
      port: 8848
      group: aaa
      logDir: "/tmp/nacos/log"
      cacheDir: "/tmp/nacos/cache"

配置里有两部分:

  • 第一部分是 file,这个和自带的 file provider 逻辑一样
  • 第二部分是 endpoint,这个是 nacos SDK 的配置

路由配置
#

http:
  routers:
    dev:
      entryPoints:
      - test
      service: nacos中的服务名
      rule: PathPrefix(`/path`)
      middlewares:
      - test-retry
  middlewares:
    test-retry:
      retry:
        attempts: 3
        initialInterval: 10ms

去掉了 services 配置,从 nacos 中读取,路由的 service 对应 nacos 中的服务名。

这里加的retry插件主要是用来failover的,服务短时不可用可以故障转移,请求下个服务实例(比如服务下线时网关还未接到下线通知,请求了已下线服务)

代码实现
#

接下来看下代码实现,需要为 Traefik 添加一个叫 nacos 的 provider。

主逻辑
#

首先在 pkg/provider 下添加一个文件夹 nacos,在里面添加个 nacos.go 文件,主要逻辑都会在这个文件里实现。

首先添加从文件读取配置的代码,这里大部分代码都可以复用 Traefik 自带的文件解析 provider

// 把文件解析成配置,复用 Traefik 原有代码
fileProvider := file.Provider{
	Directory: p.File.Directory,
	Watch: p.File.Watch,
	Filename: p.File.Filename,
	DebugLogGeneratedTemplate: p.File.DebugLogGeneratedTemplate,
}

fileConfiguration, err := fileProvider.BuildConfiguration()

// 注册文件变化监听,因为监听回调逻辑不一样,这里自己实现下
if p.File.Watch {
	var watchItem string

	switch {
	case len(p.File.Directory) > 0:
		watchItem = p.File.Directory
	case len(p.File.Filename) > 0:
		watchItem = filepath.Dir(p.File.Filename)
	default:
		return nil, errors.New("error using file configuration provider, neither filename or directory defined")
	}

	if err := p.addWatcher(pool, watchItem, configurationChan, p.watcherCallback); err != nil {
		return nil, err
	}
}

回调的代码省略,见下面的完整代码。

然后添加 nacos 的代码。

先安装 SDK

go get -u github.com/nacos-group/nacos-sdk-go

和 nacos 的交互主要是获取实例信息和注册服务变化监听。

// 首先初始化 nacos 实例
p.client, err = createClient(p.EndPoint)

// 获取需要的服务,循环获取地址。我这里简化了,获取了所有服务
services, err := p.fetchServices()
for _, service := range services {
		instances, err := p.client.SelectAllInstances(vo.SelectAllInstancesParam{
			ServiceName: service,
			GroupName:   p.EndPoint.Group,
			//HealthyOnly: true,
		})

		// ...

		// 注册服务变化的监听
		err = p.client.Subscribe(&vo.SubscribeParam{
			ServiceName: service,
			GroupName:   p.EndPoint.Group,
			SubscribeCallback: func(services []model.SubscribeService, err error) {
				p.nacosCallback(&services, configurationChan)
			},
		})
		
		// ...
}

之后还需要注册个定时任务,来定时拉取最新的服务状态,做个兜底。

pool.GoCtx(func(routineCtx context.Context) {
		ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "nacos"))
		logger := log.FromContext(ctxLog)

		operation := func() error {
			ticker := time.NewTicker(15 * time.Second)

			for {
				select {
				case <-ticker.C:
					_, err = p.nacosProvide(configurationChan)
					if err != nil {
						logger.Errorf("error get nacos service data, %v", err)
						return err
					}
					configuration := p.buildConfiguration()
					sendConfigToChannel(configurationChan, configuration)
				case <-routineCtx.Done():
					ticker.Stop()
					return nil
				}
			}
		}

		notify := func(err error, time time.Duration) {
			logger.Errorf("Provider connection error %+v, retrying in %s", err, time)
		}

		err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), notify)
		if err != nil {
			logger.Errorf("Cannot connect to nacos server %+v", err)
		}
})

最终的 nacos 文件

package nacos

import (
	"context"
	"errors"
	"fmt"
	"github.com/cenkalti/backoff/v4"
	"github.com/nacos-group/nacos-sdk-go/clients"
	"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
	"github.com/nacos-group/nacos-sdk-go/common/constant"
	"github.com/nacos-group/nacos-sdk-go/common/logger"
	"github.com/nacos-group/nacos-sdk-go/model"
	"github.com/nacos-group/nacos-sdk-go/vo"
	"github.com/traefik/traefik/v2/pkg/config/dynamic"
	"github.com/traefik/traefik/v2/pkg/job"
	"github.com/traefik/traefik/v2/pkg/log"
	"github.com/traefik/traefik/v2/pkg/provider/file"
	"github.com/traefik/traefik/v2/pkg/safe"
	"gopkg.in/fsnotify.v1"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"
)

type Provider struct {
	File *FileConfig `description:"Nacos file settings" json:"file,omitempty" toml:"file,omitempty" yaml:"file,omitempty" export:"true"`
	EndPoint *EndpointConfig `description:"Nacos endpoint settings" json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty" export:"true"`

	client naming_client.INamingClient
	fileConfig *dynamic.Configuration
	nacosServices *map[string]*dynamic.Service
}

type FileConfig struct {
	Directory                 string `description:"Load dynamic configuration from one or more .toml or .yml files in a directory." json:"directory,omitempty" toml:"directory,omitempty" yaml:"directory,omitempty" export:"true"`
	Watch                     bool   `description:"Watch provider." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"`
	Filename                  string `description:"Load dynamic configuration from a file." json:"filename,omitempty" toml:"filename,omitempty" yaml:"filename,omitempty" export:"true"`
	DebugLogGeneratedTemplate bool   `description:"Enable debug logging of generated configuration template." json:"debugLogGeneratedTemplate,omitempty" toml:"debugLogGeneratedTemplate,omitempty" yaml:"debugLogGeneratedTemplate,omitempty" export:"true"`
}

type EndpointConfig struct {
	Host          string                  `description:"The host of the Nacos server" json:"host,omitempty" toml:"host,omitempty" yaml:"host,omitempty" export:"true"`
	Port          uint64                  `description:"The port of the Nacos server" json:"port,omitempty" toml:"port,omitempty" yaml:"port,omitempty" export:"true"`
	Group         string                  `description:"The group of the Nacos server" json:"group,omitempty" toml:"group,omitempty" yaml:"group,omitempty" export:"true"`
	LogDir         string                  `description:"The logDir of the Nacos server" json:"logDir,omitempty" toml:"logDir,omitempty" yaml:"logDir,omitempty" export:"true"`
	CacheDir         string                `description:"The cacheDir of the Nacos server" json:"cacheDir,omitempty" toml:"cacheDir,omitempty" yaml:"cacheDir,omitempty" export:"true"`
}

func (p *Provider) Init() error {
	return nil
}

func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
	//logger := log.WithoutContext().WithField(log.ProviderName, "nacos")

	_, err := p.fileProvide(configurationChan, pool)
	if err != nil {
		return err
	}

	p.client, err = createClient(p.EndPoint)
	if err != nil {
		return fmt.Errorf("error create nacos client, %w", err)
	}

	_, err = p.nacosProvide(configurationChan)
	if err != nil {
		return err
	}
	configuration := p.buildConfiguration()
	sendConfigToChannel(configurationChan, configuration)

	pool.GoCtx(func(routineCtx context.Context) {
		ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "nacos"))
		logger := log.FromContext(ctxLog)

		operation := func() error {
			ticker := time.NewTicker(15 * time.Second)

			for {
				select {
				case <-ticker.C:
					_, err = p.nacosProvide(configurationChan)
					if err != nil {
						logger.Errorf("error get nacos service data, %v", err)
						return err
					}
					configuration := p.buildConfiguration()
					sendConfigToChannel(configurationChan, configuration)
				case <-routineCtx.Done():
					ticker.Stop()
					return nil
				}
			}
		}

		notify := func(err error, time time.Duration) {
			logger.Errorf("Provider connection error %+v, retrying in %s", err, time)
		}

		err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), notify)
		if err != nil {
			logger.Errorf("Cannot connect to nacos server %+v", err)
		}
	})

	return nil
}

func (p *Provider) buildConfiguration() *dynamic.Configuration {
	config := p.fileConfig
	for k, v := range *p.nacosServices {
		config.HTTP.Services[k] = v
	}

	return config
}

func (p *Provider) nacosProvide(configurationChan chan<- dynamic.Message) (*map[string]*dynamic.Service, error) {
	services, err := p.fetchServices()
	if err != nil {
		return nil, err
	}

	serviceMap := make(map[string]*dynamic.Service)
	for _, service := range services {
		instances, err := p.client.SelectAllInstances(vo.SelectAllInstancesParam{
			ServiceName: service,
			GroupName:   p.EndPoint.Group,
			//HealthyOnly: true,
		})

		if err != nil {
			logger.Errorf("Skip item %s: %v", service, err)
			continue
		}
		
		err = p.client.Subscribe(&vo.SubscribeParam{
			ServiceName: service,
			GroupName:   p.EndPoint.Group,
			SubscribeCallback: func(services []model.SubscribeService, err error) {
				p.nacosCallback(&services, configurationChan)
			},
		})

		if err != nil {
			logger.Errorf("Skip item %s: %v", service, err)
			return nil, err
		}

		var servers []dynamic.Server
		for _, instance := range instances {
			server := dynamic.Server{
				URL: "http://" + instance.Ip + ":" + strconv.FormatUint(instance.Port, 10),
			}
			servers = append(servers, server)
		}

		b := true
		serviceMap[service] = &dynamic.Service{
			LoadBalancer: &dynamic.ServersLoadBalancer{
				Sticky:             nil,
				Servers:            servers,
				HealthCheck:        nil,
				PassHostHeader:     &b,
				ResponseForwarding: nil,
			},
		}
	}

	p.nacosServices = &serviceMap

	return &serviceMap, nil
}

func (p *Provider) nacosCallback(services *[]model.SubscribeService, configurationChan chan<- dynamic.Message) {
	serviceSet := make(map[string]bool)
	for _, service := range *services {
		nameSplit := strings.Split(service.ServiceName, "@")
		serviceName := nameSplit[2]

		serviceSet[serviceName] = true
	}

	for s := range serviceSet {
		nacosServices := *p.nacosServices
		serviceName := s

		instances, err := p.client.SelectInstances(vo.SelectInstancesParam{
			ServiceName: serviceName,
			GroupName:   p.EndPoint.Group,
			HealthyOnly: true,
		})

		if err != nil {
			logger.Errorf("Skip item %s: %v", s, err)
			continue
		}

		var servers []dynamic.Server
		for _, instance := range instances {
			server := dynamic.Server{
				URL: "http://" + instance.Ip + ":" + strconv.FormatUint(instance.Port, 10),
			}
			servers = append(servers, server)
		}

		serviceLB := nacosServices[serviceName]
		if serviceLB == nil {
			b := true
			nacosServices[serviceName] = &dynamic.Service{
				LoadBalancer: &dynamic.ServersLoadBalancer{
					Sticky:             nil,
					Servers:            servers,
					HealthCheck:        nil,
					PassHostHeader:     &b,
					ResponseForwarding: nil,
				},
			}
		} else {
			nacosServices[serviceName].LoadBalancer.Servers = servers
		}
	}

	configuration := p.buildConfiguration()
	sendConfigToChannel(configurationChan, configuration)
}

func (p *Provider) fileProvide(configurationChan chan<- dynamic.Message, pool *safe.Pool) (*dynamic.Configuration, error) {
	fileProvider := file.Provider{
		Directory: p.File.Directory,
		Watch: p.File.Watch,
		Filename: p.File.Filename,
		DebugLogGeneratedTemplate: p.File.DebugLogGeneratedTemplate,
	}

	fileConfiguration, err := fileProvider.BuildConfiguration()
	if err != nil {
		return nil, err
	}
	//fileConfiguration.HTTP.Services = make(map[string]*dynamic.Service)
	//fileConfiguration.TCP.Services = make(map[string]*dynamic.TCPService)
	//fileConfiguration.UDP.Services = make(map[string]*dynamic.UDPService)
	p.fileConfig = fileConfiguration

	if p.File.Watch {
		var watchItem string

		switch {
		case len(p.File.Directory) > 0:
			watchItem = p.File.Directory
		case len(p.File.Filename) > 0:
			watchItem = filepath.Dir(p.File.Filename)
		default:
			return nil, errors.New("error using file configuration provider, neither filename or directory defined")
		}

		if err := p.addWatcher(pool, watchItem, configurationChan, p.watcherCallback); err != nil {
			return nil, err
		}
	}

	return fileConfiguration, nil
}

func (p *Provider) addWatcher(pool *safe.Pool, directory string, configurationChan chan<- dynamic.Message, callback func(chan<- dynamic.Message, fsnotify.Event)) error {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		return fmt.Errorf("error creating file watcher: %w", err)
	}

	err = watcher.Add(directory)
	if err != nil {
		return fmt.Errorf("error adding file watcher: %w", err)
	}

	// Process events
	pool.GoCtx(func(ctx context.Context) {
		defer watcher.Close()
		for {
			select {
			case <-ctx.Done():
				return
			case evt := <-watcher.Events:
				if p.File.Directory == "" {
					_, evtFileName := filepath.Split(evt.Name)
					_, confFileName := filepath.Split(p.File.Filename)
					if evtFileName == confFileName {
						callback(configurationChan, evt)
					}
				} else {
					callback(configurationChan, evt)
				}
			case err := <-watcher.Errors:
				log.WithoutContext().WithField(log.ProviderName, "nacos").Errorf("Watcher event error: %s", err)
			}
		}
	})
	return nil
}

func (p *Provider) watcherCallback(configurationChan chan<- dynamic.Message, event fsnotify.Event) {
	watchItem := p.File.Filename
	if len(p.File.Directory) > 0 {
		watchItem = p.File.Directory
	}

	logger := log.WithoutContext().WithField(log.ProviderName, "nacos")

	if _, err := os.Stat(watchItem); err != nil {
		logger.Errorf("Unable to watch %s : %v", watchItem, err)
		return
	}

	fileProvider := file.Provider{
		Directory: p.File.Directory,
		Watch: p.File.Watch,
		Filename: p.File.Filename,
		DebugLogGeneratedTemplate: p.File.DebugLogGeneratedTemplate,
	}
	configuration, err := fileProvider.BuildConfiguration()
	if err != nil {
		logger.Errorf("Error occurred during watcher callback: %s", err)
		return
	}

	p.fileConfig = configuration

	sendConfigToChannel(configurationChan, p.buildConfiguration())
}

func createClient(cfg *EndpointConfig) (naming_client.INamingClient, error) {
	// server
	sc := []constant.ServerConfig{
		*constant.NewServerConfig(cfg.Host, cfg.Port),
	}

	// client
	cc := *constant.NewClientConfig(
		//constant.WithNamespaceId("e525eafa-f7d7-4029-83d9-008937f9d468"),
		constant.WithTimeoutMs(5000),
		constant.WithNotLoadCacheAtStart(true),
		constant.WithLogDir(cfg.LogDir),
		constant.WithCacheDir(cfg.CacheDir),
		constant.WithRotateTime("1h"),
		constant.WithMaxAge(3),
		constant.WithLogLevel("info"),
	)

	namingClient, err := clients.NewNamingClient(
		vo.NacosClientParam{
			ClientConfig:  &cc,
			ServerConfigs: sc,
		},
	)

	if err != nil {
		panic(err)
	}

	return namingClient, nil
}

func (p *Provider) fetchServices() ([]string, error) {
	i := 1
	var filtered []string
	for {
		serviceInfos, err := p.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
			GroupName: p.EndPoint.Group,
			PageNo: uint32(i),
			PageSize:  100,
		})

		if err != nil {
			return nil, err
		}

		if serviceInfos.Count == 0 {
			break
		}

		for _, serviceInfo := range serviceInfos.Doms {
			filtered = append(filtered, serviceInfo)
		}

		i = i + 1
	}

	return filtered, nil
}

func sendConfigToChannel(configurationChan chan<- dynamic.Message, configuration *dynamic.Configuration) {
	configurationChan <- dynamic.Message{
		ProviderName:  "nacos",
		Configuration: configuration,
	}
}

配置解析
#

然后添加静态配置解析的代码

pkg/config/static/static_config.go 里添加 nacos 配置结构体:

type Providers struct {
// 省略原有代码

	Nacos     *nacos.Provider `description:"Enable Nacos backend with default settings." json:"nacos,omitempty" toml:"nacos,omitempty" yaml:"nacos,omitempty"`
}

pkg/provider/aggregator/aggregator.go 里添加 nacos 初始化:

func NewProviderAggregator(conf static.Providers) ProviderAggregator {
// 省略原有代码

	if conf.Nacos != nil {
		p.quietAddProvider(conf.Nacos)
	}

	return p
}

结尾
#

修改后 Traefik 就可以从注册中心自动拉取实例信息来负载了,方便了很多。上边的实例做了一些简化,比如从 nacos 获取了全量服务,在服务很多的情况下不太高效,可以只获取需要用的服务、路由信息修改可以放到配置中心或者修改下UI,就可以可视化操作了、等等。实际使用时可以根据自己的情况做下修改。

网络与架构 - This article is part of a series.
Part : This Article