使用 go-control-plane 自定义服务网格控制面

发布于:2024-07-01 ⋅ 阅读:(17) ⋅ 点赞:(0)

写在前面

阅读本文需要最起码了解envoy相关的概念

本文只是一个类似于demo的测试,只为了学习istio,更好的理解istio中的控制面和数据面(pilot -> proxy)是如何交互的,下图的蓝色虚线
在这里插入图片描述
先说go-control-plane是什么,它是一个用于实现xDS APIgolang库,xDS APIEnvoy用于动态配置的协议。我们实现了go-control-plane就可以做到

  1. 动态配置管理:允许控制面动态更新数据面代理的配置
  2. 支持多种 xDS 资源
  3. 缓存和版本管理:提供快照缓存机制,管理配置的版本和更新

使用go-control-plane我们可以创建一个自定义控制面来管理·Envoy`,从而实现动态的服务发现、负载均衡和路由等等…

xds则是一系列服务发现的总称

  • lds 监听器服务发现
  • rds 路由服务发现
  • cds 集群服务发现
  • eds 端点服务发现
  • ads 聚合服务发现

等等等等,还有一些其他的服务发现,本文不涉及就没有说到,如果不理解这些概念,建议先去官网了解一下 https://www.envoyproxy.io

实现 go-control-plane

功能描述

在这里插入图片描述
上文是envoy代理的架构,程序中的逻辑我使用倒叙的方式描述

  • 创建endpoint地址是www.envoyproxy.io端口是443
  • 创建cluster叫做xds_demo_cluster它的端点就是上面创建的
  • 创建路由在filter_chins下的http_connection_manager中名称叫做xds_demo_route,没有任何的路由规则,路由的cluster名称(请求转发的目的地)叫做xds_demo_cluster
  • 最后创建listener名称是listener_xds_demo监听的地址是0.0.0.0:12000

整体就是当我们访问localhost:12000的时候会将我们的请求转发到www.envoyproxy.io
如果运行过默认的envoy的用户可能就会发现我程序中下发的配置就是默认运行envoy时的配置,只不过默认运行envoy静态配置文件的方式就是所有的配置都写在envoy.yaml中,而本文是动态的方式。

envoy有多种运行方式本文不做赘述

功能实现

package main

import (
	"context"
	"log"
	"net"
	"time"

	cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
	listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
	routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
	http_connection_manager "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
	tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
	clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
	discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
	listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
	routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
	"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
	"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
	"github.com/envoyproxy/go-control-plane/pkg/server/v3"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/types/known/anypb"
	"google.golang.org/protobuf/types/known/durationpb"
)

func main() {
	ctx := context.Background()
	lis, err := net.Listen("tcp", ":18000")
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}

	sc := cache.NewSnapshotCache(true, cache.IDHash{}, nil)
	srv := server.NewServer(ctx, sc, nil)
	// new grpc server
	gs := grpc.NewServer()
	clusterservice.RegisterClusterDiscoveryServiceServer(gs, srv)
	endpointservice.RegisterEndpointDiscoveryServiceServer(gs, srv)
	listenerservice.RegisterListenerDiscoveryServiceServer(gs, srv)
	routeservice.RegisterRouteDiscoveryServiceServer(gs, srv)
	discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, srv)

	err = setSnapshot(ctx, "xds-node-id", sc)
	if err != nil {
		log.Fatalf("set snapshot error: %v", err)
	} else {
		log.Println("set snapshot success")

	}

	log.Println("Starting control plane server...")
	if err := gs.Serve(lis); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

func setSnapshot(ctx context.Context, nodeId string, sc cache.SnapshotCache) error {
	clusterName := "xds_demo_cluster"
	manager := buildHttpManager(clusterName)
	fcs := buildFilterChain(manager)
	serviceListener := buildListener("0.0.0.0", 12000, fcs)

	serviceEndpoint := buildEndpoint()
	serviceCluster := buildCluster(clusterName, serviceEndpoint)
	rs := map[resource.Type][]types.Resource{
		resource.ClusterType:  {serviceCluster},
		resource.EndpointType: {serviceEndpoint},
		resource.ListenerType: {serviceListener},
		resource.RouteType:    {manager},
	}
	snapshot, err := cache.NewSnapshot("1", rs)
	if err != nil {
		log.Fatalf("new snapshot error: %v", err)
	}
	return sc.SetSnapshot(ctx, nodeId, snapshot)
}

func buildFilterChain(manager *http_connection_manager.HttpConnectionManager) []*listener.FilterChain {
	managerPB, err := anypb.New(manager)
	if err != nil {
		log.Fatalf("Failed to marshal HttpConnectionManager: %v", err)
	}

	fcs := make([]*listener.FilterChain, 0, 0)
	fcs = append(fcs, &listener.FilterChain{
		Filters: []*listener.Filter{
			{
				Name: "envoy.filters.network.http_connection_manager",
				ConfigType: &listener.Filter_TypedConfig{
					TypedConfig: managerPB,
				},
			},
		},
	})
	return fcs
}

func buildHttpManager(clusterName string) *http_connection_manager.HttpConnectionManager {
	xdsRoute := &route.RouteConfiguration{
		Name: "xds_demo_route",
		VirtualHosts: []*route.VirtualHost{
			{
				Name:    "xds_demo_service",
				Domains: []string{"*"},
				Routes: []*route.Route{
					{
						Match: &route.RouteMatch{
							PathSpecifier: &route.RouteMatch_Prefix{
								Prefix: "/",
							},
						},
						Action: &route.Route_Route{
							Route: &route.RouteAction{
								HostRewriteSpecifier: &route.RouteAction_HostRewriteLiteral{
									HostRewriteLiteral: "www.envoyproxy.io",
								},
								// 集群要去下文一致
								ClusterSpecifier: &route.RouteAction_Cluster{
									Cluster: clusterName,
								},
							},
						},
					},
				},
			},
		},
	}
	routerConfig, _ := anypb.New(&routerv3.Router{})
	// http 链接管理器
	manager := &http_connection_manager.HttpConnectionManager{
		StatPrefix: "ingress_http",
		RouteSpecifier: &http_connection_manager.HttpConnectionManager_RouteConfig{
			RouteConfig: xdsRoute,
		},
		HttpFilters: []*http_connection_manager.HttpFilter{
			{
				Name: "envoy.filters.http.router",
				ConfigType: &http_connection_manager.HttpFilter_TypedConfig{
					TypedConfig: routerConfig,
				},
			},
		},
		SchemeHeaderTransformation: &corev3.SchemeHeaderTransformation{
			Transformation: &corev3.SchemeHeaderTransformation_SchemeToOverwrite{
				SchemeToOverwrite: "https",
			},
		},
	}
	return manager
}

func buildEndpoint() *endpoint.LbEndpoint {
	epTarget := &endpoint.LbEndpoint{
		HostIdentifier: &endpoint.LbEndpoint_Endpoint{
			Endpoint: &endpoint.Endpoint{
				Address: &corev3.Address{
					Address: &corev3.Address_SocketAddress{
						SocketAddress: &corev3.SocketAddress{
							Address: "www.envoyproxy.io",
							PortSpecifier: &corev3.SocketAddress_PortValue{
								PortValue: 443,
							},
						},
					},
				},
			},
		},
	}
	return epTarget
}

func buildCluster(clusterName string, ep *endpoint.LbEndpoint) *cluster.Cluster {
	serviceCluster := &cluster.Cluster{
		Name:           clusterName,
		ConnectTimeout: durationpb.New(time.Second * 3),
		ClusterDiscoveryType: &cluster.Cluster_Type{
			Type: cluster.Cluster_STRICT_DNS,
		},
		DnsLookupFamily: cluster.Cluster_V4_ONLY,
		LbPolicy:        cluster.Cluster_ROUND_ROBIN,
		LoadAssignment: &endpoint.ClusterLoadAssignment{
			ClusterName: clusterName,
			Endpoints: []*endpoint.LocalityLbEndpoints{
				{
					LbEndpoints: []*endpoint.LbEndpoint{ep},
				},
			},
		},
		TransportSocket: &corev3.TransportSocket{
			Name:       "envoy.transport_sockets.tls",
			ConfigType: nil,
		},
	}
	us := &tlsv3.UpstreamTlsContext{
		Sni: "www.envoyproxy.io",
	}
	tlsConfig, _ := anypb.New(us)
	serviceCluster.TransportSocket.ConfigType = &corev3.TransportSocket_TypedConfig{
		TypedConfig: tlsConfig,
	}
	return serviceCluster
}

func buildListener(ip string, port uint32, fcs []*listener.FilterChain) *listener.Listener {
	return &listener.Listener{
		Name: "listener_xds_demo",
		Address: &corev3.Address{
			Address: &corev3.Address_SocketAddress{
				SocketAddress: &corev3.SocketAddress{
					Address: ip,
					PortSpecifier: &corev3.SocketAddress_PortValue{
						PortValue: port,
					},
				},
			},
		},
		// 过滤器链
		FilterChains: fcs,
	}
}
  • cache.NewSnapshotCache返回一个SnapshotCachego-control-plane中的一个核心组件,用于管理和存储Envoy所需的 xDS资源的快照,并且向Envoy实例推送更新
  • server.NewServer创建xDS服务实例
  • sc.SetSnapshot用于将生成的配置快照设置到指定的节点上,是动态配置和更新Envoy代理的行为,入参有一个id 是下文envoy引导配置中的id

截止到现在我们就可以启动这个服务,我们要记住当前服务监听的地址,因为在envoy.yaml中会需要使用到的

为 envoy 设置引导配置

引导配置(bootstrap configuration),引导配置文件通常指定控制面地址(如xDS服务器地址)、监听器、集群、管理接口等基本信息。

node:
  id: xds-node-id
  cluster: xds-node-cluster

dynamic_resources:
  ads_config:
    api_type: GRPC
    grpc_services:
    - envoy_grpc:
        cluster_name: xds_cluster
  cds_config:
    ads: {}
  lds_config:
    ads: {}

static_resources:
  clusters:
  - name: xds_cluster
    connect_timeout: 1s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    http2_protocol_options: {}
    load_assignment:
      cluster_name: xds_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 18000

admin:
  access_log_path: /dev/null
  address:
    socket_address:
      address: 0.0.0.0
      port_value: 9901

解释

  1. static_resources
    1. 定义静态集群xds_cluster,用于与xDS服务器通信。这里的xDS服务器运行在127.0.0.1:18000就是我们上面服务监听的地址
  2. dynamic_resources
    1. ads_config: 配置聚合发现服务(ADS)来动态获取配置,使用gRPC服务与xds_cluster进行通信
    2. cds_configlds_config分别表示使用ADS来获取配置
  3. admin
    1. 定义管理接口,监听0.0.0.0:9901,用于查看Envoy的状态和配置
  4. node
    1. id 节点的唯一标识符,用于在xDS服务器中区分不同的Envoy实例
    2. cluster 节点所属的集群名称。

然后启动envoy,从输出的日志中我们可以看到通过控制面下发的配置,数据面已经加载成功了。

[2024-06-27 07:47:53.524][1][info][main] [source/server/server.cc:977] starting main dispatch loop
[2024-06-27 07:47:53.526][1][info][upstream] [source/common/upstream/cds_api_helper.cc:32] cds: add 1 cluster(s), remove 1 cluster(s)
[2024-06-27 07:47:53.527][1][info][upstream] [source/common/upstream/cds_api_helper.cc:71] cds: added/updated 1 cluster(s), skipped 0 unmodified cluster(s)
[2024-06-27 07:47:53.544][1][info][upstream] [source/common/upstream/cluster_manager_impl.cc:240] cm init: all clusters initialized
[2024-06-27 07:47:53.544][1][info][main] [source/server/server.cc:957] all clusters initialized. initializing init manager
[2024-06-27 07:47:53.546][1][info][upstream] [source/common/listener_manager/lds_api.cc:106] lds: add/update listener 'listener_xds_demo'
[2024-06-27 07:47:53.546][1][info][config] [source/common/listener_manager/listener_manager_impl.cc:930] all dependencies initialized. starting workers

下一步我们可以访问一下监听的地址,可以看到成功转发到了envoy的官方网站。

~ $ curl http://localhost:12000 | grep title
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
 19 15571   19  3098    0     0   5286      0  0:00:02 --:--:--  0:00:02  5286    <title>Envoy proxy - home</title>
100 15571  100 15571    0     0  21685      0 --:--:-- --:--:-- --:--:-- 21686

我们还可以通过 ~ $ curl http://localhost:9901/config_dump 来查看envoy的实时配置

写在最后

动态配置的方式是在内存中加载配置,不会更新到静态的文件中。

更高级、复杂的用法可以参考istio;具体来说pilot watch集群中的服务、端点、配置等资源的变化。当检测到这些资源的变化时,pilot会生成新的配置,并通过xDS API将更新推送到相应的Envoy实例,从而实现动态配置和管理服务网格中的流量控制和路由规则。这样可以确保 Envoy始终具有最新的服务发现信息和路由配置。

源码目录 https://github.com/istio/istio/tree/master/pilot


网站公告

今日签到

点亮在社区的每一天
去签到