【开源品鉴】FRP源码阅读

发布于:2025-07-05 ⋅ 阅读:(15) ⋅ 点赞:(0)
frp 是一款高性能的反向代理应用,专注于内网穿透,支持多种协议和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文将深入探讨其源码,揭示其背后的实现原理。

1. 前言

frp 是一款高性能的反向代理应用,专注于内网穿透。它支持多种协议,包括 TCP、UDP、HTTP、HTTPS 等,并且具备 P2P 通信功能。使用 frp,您可以安全、便捷地将内网服务暴露到公网,通过拥有公网 IP 的节点进行中转,具体场景就是:将客户端部署到你的内网中,然后该客户端与你内网服务网络可达,当客户端与在公网的服务端连接后,我们就可以通过访问服务端的指定端口,去访问到内网服务。

目前 GitHub 已经有 80k 的 star,这么猛的项目,我决定阅读一番源码偷师一波。

2. pkg/auth

这个包负责客户端和服务端认证的代码,这里面一共用到了 2 种验证机制,一种是基于 token,就是预共享密钥,客户端和服务端实现配置一样的字符串密钥,第二种是 OAuth 2.0,依赖第三方授权服务器颁发的访问令牌,然后客户端带着令牌去访问服务端。

这里面有很多技巧值得学习:

2.1. 工厂函数

通过不同的配置生成对应的认证方式。

type Setter interface {
	SetLogin(*msg.Login) error
	SetPing(*msg.Ping) error
	SetNewWorkConn(*msg.NewWorkConn) error
}

// 根据客户端配置创建认证提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {
    switch cfg.Method {
        // token 认证模式
        case v1.AuthMethodToken:
        authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)
        // openid 认证模式
        case v1.AuthMethodOIDC:
        authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)
        default:
        panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))
    }
    return authProvider
}

2.2. 常量时间的字符串比较

正常情况来说,token 模式下,两边比较一下字符串是不是相等就完了,但其实这个是有安全隐患的,第一个就是攻击者可以进行重放攻击,一直进行密码爆破,第二个就是攻击者可以进行定时攻击,比如普通比较(如 ==)在发现第一个不匹配字节时会立即返回,攻击者可通过测量响应时间差异推断出匹配的字节位置,ConstantTimeCompare 始终遍历全部字节(即使已发现不匹配),使攻击者无法通过时间差获取敏感信息。

// token 和客户端上线的时间戳组成 key
func GetAuthKey(token string, timestamp int64) (key string) {
	md5Ctx := md5.New()
	md5Ctx.Write([]byte(token))
	md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))
	data := md5Ctx.Sum(nil)
	return hex.EncodeToString(data)
}

// 全量匹配字节
func ConstantTimeCompare(x, y []byte) int {
	if len(x) != len(y) {
		return 0
	}

	var v byte

	for i := 0; i < len(x); i++ {
		v |= x[i] ^ y[i]
	}

	return ConstantTimeByteEq(v, 0)
}

// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {
	return int((uint32(x^y) - 1) >> 31)
}

3. pkg/config

config 文件夹是 frp 配置管理的核心模块,涵盖了配置的加载、解析、验证、转换和命令行支持等功能。它确保了 frp 的灵活性和兼容性,同时为用户提供了多种配置方式。

3.1. 使用环境变量进行模板渲染

serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000

[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}


export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml

这个实现是采用了 template 模板库,其中 Envs 前缀是由字段名 Envs 决定的:

type Values struct {
    Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由来
}

func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {
	tmpl, err := template.New(「frp」).Funcs(template.FuncMap{
		「parseNumberRange」:     parseNumberRange,
		「parseNumberRangePair」: parseNumberRangePair,
	}).Parse(string(in))
	if err != nil {
		return nil, err
	}

	buffer := bytes.NewBufferString(「」)
	if err := tmpl.Execute(buffer, values); err != nil {
		return nil, err
	}
	return buffer.Bytes(), nil
}

// 将端口范围解析为 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {
  ... ... 
}

这里面有一些自定义的解析函数,比如说:

ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」

export PORT_RANGE = 「1000-1005」

// 这样 ports 就会被 template 的 parseNumberRange 函数解析并渲染为
// ports = 1000, 1001, 1002, 1003, 1004, 1005

3.2. 配置拆分

通过 includes 参数可以在主配置中包含其他配置文件,从而实现将代理配置拆分到多个文件中管理

# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]

上述配置在 frpc.toml 中通过 includes 额外包含了 ./confd 目录下所有的 toml 文件的代理配置内容,效果等价于将这两个文件合并成一个文件。

这个实现是采用了,循环读取文件内容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:

// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {
    return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)

... ... 

var buf []byte
// 循环读取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
//   ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {
    err = fmt.Errorf(「getIncludeContents error: %v」, err)
    return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)

// 将所有配置合并,然后将 toml 序列化为 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {
    return
}
return

3.3. 配置热加载

frpc reload -C ./frpc.toml 等待一段时间后,客户端将根据新的配置文件创建、更新或删除代理。

这里面也比较简单,主要逻辑在于配置校验,旧配置中与新配置里同名的且代理内容不一样的 proxy 停止,新增的配置的 proxy 再启动,也就是说老配置和新配置完全一样的是不动的

func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {
	xl := xlog.FromContextSafe(pm.ctx)
	proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {
		return C.GetBaseConfig().Name
	})
	pm.mu.Lock()
	defer pm.mu.Unlock()

	delPxyNames := make([]string, 0)
	for name, pxy := range pm.proxies {
		del := false
		cfg, ok := proxyCfgsMap[name]
		if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
			del = true
		}

		if del {
			delPxyNames = append(delPxyNames, name)
			delete(pm.proxies, name)
			pxy.Stop()
		}
	}
	if len(delPxyNames) > 0 {
		xl.Infof(「proxy removed: %s」, delPxyNames)
	}

	addPxyNames := make([]string, 0)
	for _, cfg := range proxyCfgs {
		name := cfg.GetBaseConfig().Name
		if _, ok := pm.proxies[name]; !ok {
			pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)
			if pm.inWorkConnCallback != nil {
				pxy.SetInWorkConnCallback(pm.inWorkConnCallback)
			}
			pm.proxies[name] = pxy
			addPxyNames = append(addPxyNames, name)

			pxy.Start()
		}
	}
	if len(addPxyNames) > 0 {
		xl.Infof(「proxy added: %s」, addPxyNames)
	}

4. 监控

frps 服务端支持两种监控系统:指标存在内存中,和指标输出到 Prometheus。主要监控以下指标:

type serverMetrics struct {
    // 记录当前连接到服务端的客户端数量。
    clientCount     Prometheus.Gauge
    // 记录当前代理的数量,按代理类型(如 TCP、HTTP)分类。
    proxyCount      *Prometheus.GaugeVec
    // 记录当前连接的数量,按代理类型(如 TCP、HTTP)分类。
    connectionCount *Prometheus.GaugeVec
    // 记录流入的总流量,按代理类型(如 TCP、HTTP)分类。
    trafficIn       *Prometheus.CounterVec
    // 记录流出的总流量,按代理类型(如 TCP、HTTP)分类。
    trafficOut      *Prometheus.CounterVec
}

内存监控没啥,但统计的增删改,这里用到了原子操作的技巧:

func (C *StandardCounter) Count() int32 {
	return atomic.LoadInt32(&C.count)
}

func (C *StandardCounter) Inc(count int32) {
	atomic.AddInt32(&C.count, count)
}

func (C *StandardCounter) Dec(count int32) {
	atomic.AddInt32(&C.count, -count)
}

对于不同类型的 proxy 的统计,frp 没有使用 syn map,而是用一把读写锁保平安

m.mu.Lock()
	defer m.mu.Unlock()
	counter, ok := m.info.ProxyTypeCounts[proxyType]
	if !ok {
		counter = metric.NewCounter()
	}
counter.Inc(1)

对于如何进行 Prometheus 监控,frp 的使用流程可以借鉴,整体来说分为以下几个步骤:

  1. 编码前,先定义指标,类似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name:      「traffic_out」,
Help:      「The total out traffic」,
  1. frp 注册 Prometheus 指标
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{
    Namespace: namespace,
    Subsystem: serverSubsystem,
    Name:      「traffic_out」,
    Help:      「The total out traffic」,
}, []string{「name」, 「type」}),
}

Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
  1. frp 暴露 HTTP 服务,一般是/metric,promhttp 提供一个 HTTP 处理器,用于暴露所有注册的 Prometheus 指标。
if svr.cfg.EnablePrometheus {
    subRouter.Handle(「/metrics」, promhttp.Handler())
}
  1. 配置 Prometheus 定时抓取这个 HTTP 路径,舒服了
全球:
  scrape_interval: 15s # 每 15 秒抓取一次数据

scrape_configs:
  - job_name: 「frp_server」
    static_configs:
      - targets: [「localhost:8080」] # 替换为 frp 服务端暴露的 /metrics 端点

5. 通信安全

当 frpc 和 frps 之间启用了 TLS 之后,流量会被全局加密,不再需要配置单个代理上的加密,新版本中已经默认启用。每一个代理都可以选择是否启用加密和压缩的功能。

在每一个代理的配置中使用如下参数指定:

[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true

5.1. 加密

通过设置 transport.useEncryption = true,将 frpc 与 frps 之间的通信内容加密传输,将会有效防止传输内容被截取。

这个加密它使用了装饰器模式,传入普通的 IO,WithEncryption 后就会得到一个可以加密的 IO

remote, err = libio.WithEncryption(remote, encKey)
if err != nil {
    workConn.
    xl.Errorf(「create encryption stream error: %v」, err)
    return
}

我们接下来看如何加密的:

总体加密算法采用 aes-128-cfb,aes 是一个对称加密,主要靠 key 和 iv 两个值

// pbkdf2 会生成一个用于 aes 加密的 key
// 入参 key 为:配置的 token
// DefaultSalt 为字符串默认值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)

// iv 是用 rand 函数生成的安全加密的随机数
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
    return nil, err
}

// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
//   - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
//   - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
//   - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
//   - On NetBSD, Reader uses the kern.arandom sysctl.
//   - On Windows, Reader uses the ProcessPrng API.
//   - On js/wasm, Reader uses the Web Crypto API.
//   - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader

这样后续的 IO 操作都会自带加密了。

5.2. 压缩

压缩也是同理,搞一个压缩的 IO 装饰器就好了。

如果传输的报文长度较长,通过设置 transport.useCompression = true 对传输内容进行压缩,可以有效减小 frpc 与 frps 之间的网络流量,加快流量转发速度,但是会额外消耗一些 CPU 资源。

压缩算法采用 snappy 库

sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {
        _ = sw.Close()
        return rwc.Close()
    })
}

5.3. 自定义 TLS

这个其实就是使用自签发的 CA,去生成密钥和证书,然后客户端和服务端加载起来后,可以进行双向或者单向验证,进行 HTTPS 握手,后续流量也是 HTTPS 加密的。

客户端单向校验服务端:

# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」

服务端单向校验客户端:

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」

# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

双向验证

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

介绍这个之前,我们先回顾以下 TLS 握手的过程,hhh:

okk,那我们看 frp 是如何实现 tls 的:

// 获取 TLS 配置,作为 dial 选项
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))

...

// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(
    C.ctx,
    net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),
    dialOptions...,
)

// 加载服务端的 ca,证书+key
// 核心是 tls 库 tls.LoadX509KeyPair(certfile, keyfile),去管理证书和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {
    base := &tls.Config{}

    if certPath == «» || keyPath == «» {
        // server will generate tls conf by itself
        cert := newRandomTLSKeyPair()
        base.Certificates = []tls.Certificate{*cert}
    } else {
        // 调的是这个 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)
        cert, err := newCustomTLSKeyPair(certPath, keyPath)
        if err != nil {
            return nil, err
        }

        base.Certificates = []tls.Certificate{*cert}
    }

    if caPath != '' {
        // ca 证书
        pool, err := newCertPool(caPath)
        if err != nil {
            return nil, err
        }
        // 校验客户端
        base.ClientAuth = tls.RequireAndVerifyClientCert
        base.ClientCAs = pool
    }

    return base, nil
}

// 加载客户端的 ca,证书+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {
    base := &tls.Config{}

    if certPath != '' && keyPath != '' {
        cert, err := newCustomTLSKeyPair(certPath, keyPath)
        if err != nil {
            return nil, err
        }

        base.Certificates = []tls.Certificate{*cert}
    }

    base.ServerName = serverName

    if caPath != '' {
        pool, err := newCertPool(caPath)
        if err != nil {
            return nil, err
        }

        base.RootCAs = pool
        // 校验服务端
        base.InsecureSkipVerify = false
    } else {
        base.InsecureSkipVerify = true
    }

    return base, nil
}

// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {
    pool := x509.NewCertPool()

    cacrt, err := os.ReadFile(caPath)
    if err != nil {
        return nil, err
    }

    pool.AppendCertsFromPEM(caCrt)

    return pool, nil
}

6. 代理配置

6.1. proxy

代理是 frp 的核心,这里详细介绍一下它的流程。

frpc 和 frps 的整体流程,里面可以抽象为 3 种连接,整体我画了一张图:

  1. 用户连接 (User Connection):
  • 这是外部用户连接到 FRP 服务端(frps)特定端口的连接,也就是说想要访问内网服务的,例如,当运维访问 frps.example.com:8080 时建立的连接就是用户连接,它实际访问的是客户侧某个管理平台
  • 在 frps 端,这个连接由 handleUserTCPConnection 函数处理。
  • 工作连接 (Work Connection):
  • 这是 frps 和 frpc 之间预先建立的连接,用于传输用户连接的数据。
  • frps 在需要处理用户连接时会从连接池中获取一个可用的工作连接。
  • 如果池中没有可用的工作连接,frps 会通知 frpc 创建新的工作连接。
  • 工作连接是 frps 和 frpc 之间的隧道,用户数据通过这个隧道在外部用户和内部服务之间传输。
  • 本地连接 (Local Connection):
  • 在 frp 的上下文中,远程连接通常指的是 frpc 连接到内部服务的连接
  • 例如,当 frpc 收到从工作连接传来的数据时,它会创建一个连接到配置中指定的本地服务(如 localhost:80),这个连接就是远程连接。

下面是 FRP 数据流的完整过程:

  1. 外部用户(用户连接) -> frps 监听端口
  2. frps 从工作连接池中获取一个 工作连接(frps <-> frpc)
  3. frps 将用户连接与工作连接绑定(通过双向数据转发)
  4. frpc 接收到来自工作连接的数据,然后建立一个 远程连接(frpc -> 内部服务)
  5. frpc 将工作连接与远程连接绑定(通过双向数据转发)

下面来看看关键代码实现:

// 用户连接 (User Connection):
// frps 侧
// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {
    if pxy.cfg.LoadBalancer.Group != «» {
        // 获取组监听器(实际共享端口)
        l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
        pxy.listeners = append(pxy.listeners, l)
        
        // 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
        pxy.startCommonTCPListenersHandler() 
    }
    // ...
}
// 用户链接处理
func (pxy *BaseProxy) startCommonTCPListenersHandler() {
    for _, listener := range pxy.listeners {
        Go func(l net.Listener) {
            for {
                conn, err := l.Accept() // 此处调用 TCPGroupListener.Accept()
                Go pxy.handleUserTCPConnection(conn) // 处理连接
            }
        }(listener)
    }
}


// 工作连接 (Work Connection):
// frps 侧
// 从连接池中获取一个已建立的到 FRP 客户端的连接
// 内部实现路径:pxy.GetWorkConn() → pxy.workConnManager.Get()
// 底层通过 FRP 协议发送 NewWorkConn 消息到客户端建立隧道,这部分就是内部服务不一样的地方
// -> GetWorkConn
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {
    return
}
defer workConn.Close()

var local io.ReadWriteCloser = workConn
// 启动双向数据转发 
inCount, outCount, _ := libio.Join(local, userConn)

// 在取出工作连接后,frps 会立即向 frpc 发送 msg.ReqWorkConn 消息,请求新的工作连接。
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
// 如果连接池为空,frps 会等待 frpc 创建新的工作连接并发送过来。
select {
case workConn, ok = <-ctl.workConnCh:
    if !ok {
        err = pkgerr.ErrCtlClosed
        xl.Warnf(「no work connections available, %v」, err)
        return
    }
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
    err = fmt.Errorf(「timeout trying to get work connection」)
    xl.Warnf(「%v」, err)
    return
}


// 本地连接 (Local Connection):
// frpc 侧
// handleReqWorkConn
// HandleWorkConn
// HandleTCPWorkConnection
unc (ctl *Control) handleReqWorkConn(_ msg.Message) {
	xl := ctl.xl
	workConn, err := ctl.connectServer()
	if err != nil {
		xl.Warnf(「start new connection to server error: %v」, err)
		return
	}

	m := &msg.NewWorkConn{
		RunID: ctl.sessionCtx.RunID,
	}
	if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {
		xl.Warnf(「error during NewWorkConn authentication: %v」, err)
		workConn.Close()
		return
	}
	if err = msg.WriteMsg(workConn, m); err != nil {
		xl.Warnf(「work connection write to server error: %v」, err)
		workConn.Close()
		return
	}

	var startMsg msg.StartWorkConn
	if err = msg.ReadMsgInto(workConn, &startMsg); err != nil {
		xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)
		workConn.Close()
		return
	}
	if startMsg.Error != 「」 {
		xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)
		workConn.Close()
		return
	}

	// dispatch this work connection to related proxy
	ctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}

remote = workConn
... ... 
localConn, err := libnet.Dial(
    net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
    libnet.WithTimeout(10*time.Second),
)
... ... 
_, _, errs := libio.Join(localConn, remote)

双向转发的实现灰常简洁,值得学习:

func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {
	var wait sync.WaitGroup
	recordErrs := make([]error, 2)
	pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
		defer wait.Done()
		defer CosClose()
		defer from.Close()

		buf := pool.GetBuf(16 * 1024)
		defer pool.PutBuf(buf)
		*count, recordErrs[number] = io.CopyBuffer(to, from, buf)
	}

	wait.Add(2)
	Go pipe(0, c1, c2, &inCount)
	Go pipe(1, c2, c1, &outCount)
	wait.Wait()

	for _, e := range recordErrs {
		if e != nil {
			errors = append(errors, e)
		}
	}
	return
}

6.2. 负载均衡

你可以将多个相同类型的代理加入到同一个 group 中,以实现负载均衡的能力,当用户连接 frps 服务器的 80 端口时,frps 会将接收到的用户连接随机分发给其中一个存活的代理。这可以确保即使一台 frpc 机器挂掉,仍然有其他节点能够提供服务。

# frpc.toml
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 8080
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」

[[proxies]]
name = 「test2」
type = 「tcp」
localPort = 8081
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」

这个负载均衡的实现的关键结构体是 TCPGroupCtl *group.TCPGroupCtl:

// 管理 TCP 代理的分组逻辑,包括分组的创建、监听、连接分发等功能。
TCPGroupCtl *group.TCPGroupCtl

// 主要有三大功能

// 1. 分组管理:
// 将多个 TCP 代理分组到一起,形成一个逻辑组。
// 每个组可以共享一个端口,分发连接到组内的代理。

// 2. 负载均衡:
// 根据一定的规则随机分发,将链接分发到组内的代理。

// 3. 资源管理:
// 负责监听和关闭组内的连接。
// 管理组的生命周期。
// tcp 代理分组
// 分组内统一监听,共享一个 remote port 的 coon,这个我们叫 remote conn,就是用户 connection
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {
    tgc.mu.Lock()
    tcpGroup, ok := tgc.groups[group]
    if !ok {
        tcpGroup = NewTCPGroup(tgc)
        tgc.groups[group] = tcpGroup
    }
    tgc.mu.Unlock()

    return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}

// 代理加入组
func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {
    tg.mu.Lock()
    defer tg.mu.Unlock()
    
    // 首次加入组:创建真实监听
    if len(tg.lns) == 0 {
        realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申请端口
        tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))
        
        tg.realPort = realPort
        tg.tcpLn = tcpLn
        Go tg.worker() // 启动连接分发协程
        ...
        }
}

// 当新连接到达共享端口时,会被放入全局通道(acceptCh),
// 组内所有代理通过竞争机制获取链接,实现负载均衡
func (tg *TCPGroup) worker() {
    for {
        conn, err := tg.tcpLn.Accept() // 接收新连接
        tg.acceptCh <- conn            // 放入全局通道
    }
}
func (ln *TCPGroupListener) Accept() (net.Conn, error) {
    select {
    case <-ln.closeCh:
        return nil, ErrListenerClosed
    case conn := <-ln.group.acceptCh: // 从全局通道竞争获取连接
        return conn, nil
    }
}

// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {
    if pxy.cfg.LoadBalancer.Group != 「」 {
        // 获取组监听器(实际共享端口)
        l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)
        pxy.listeners = append(pxy.listeners, l)
        
        // 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)
        pxy.startCommonTCPListenersHandler() 
    }
    // ...
}

6.3. 健康检查

通过给代理配置健康检查参数,可以在要反向代理的服务出现故障时,将该服务从 frps 中摘除。结合负载均衡的功能,这可用于实现高可用架构,避免服务单点故障。

[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 22
remotePort = 6000
# 启用健康检查,类型为 tcp
healthCheck.type = 「tcp」
# 建立连接超时时间为 3 秒
healthCheck.timeoutSeconds = 3
# 连续 3 次检查失败,此 proxy 会被摘除
healthCheck.maxFailed = 3
# 每隔 10 秒进行一次健康检查
healthCheck.intervalSeconds = 10

这个配置被加载到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig

type HealthCheckConfig struct {
	// Type specifies what protocol to use for health checking.
	// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health
	// checking will not be performed.
	//
	// If the type is 「tcp」, a connection will be attempted to the target
	// server. If a connection cannot be established, the health check fails.
	//
	// If the type is 「HTTP」, a GET request will be made to the endpoint
	// specified by HealthCheckURL. If the response is not a 200, the health
	// check fails.
	Type string `json:「type」` // tcp | HTTP
	// TimeoutSeconds specifies the number of seconds to wait for a health
	// check attempt to connect. If the timeout is reached, this counts as a
	// health check failure. By default, this value is 3.
	TimeoutSeconds int `json:「timeoutSeconds,omitempty」`
	// MaxFailed specifies the number of allowed failures before the
	// is stopped. By default, this value is 1.
	MaxFailed int `json:「maxFailed,omitempty」`
	// IntervalSeconds specifies the time in seconds between health
	// checks. By default, this value is 10.
	IntervalSeconds int `json:「intervalSeconds」`
	// Path specifies the path to send health checks to if the
	// health check type is 「HTTP」.
	Path string `json:「path,omitempty」`
	// HTTPHeaders specifies the headers to send with the health request, if
	// the health check type is 「HTTP」.
	HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
}

这部分代码非常独立,相当于起了一个定时的 monitor,去监控代理的 proxy 是否有效,连续检查失败,此 proxy 会被摘除

func (monitor *Monitor) checkWorker() {
    for
        err := monitor.doCheck(doCtx)
        ... ... 
        time.Sleep(monitor.interval)
    }   
}

func (monitor *Monitor) doCheck(ctx context.Context) error {
	switch monitor.checkType {
	case 「tcp」:
		return monitor.doTCPCheck(ctx)
	case 「HTTP」:
		return monitor.doHTTPCheck(ctx)
	default:
		return ErrHealthCheckType
	}
}

func (monitor *Monitor) doTCPCheck(ctx context.Context) error {
	// if tcp address is not specified, always return nil
	if monitor.addr == 「」 {
		return nil
	}

	var d net.Dialer
	conn, err := d.DialContext(ctx, 「tcp」, monitor.addr)
	if err != nil {
		return err
	}
	conn.Close()
	return nil
}

6.4. 代理限速

# frpc.toml
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.bandwidthLimit = 「1MB」

核心代码,依然是获取 tcp 连接时,加一个限速的装饰器:

var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {
    limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}

if pxy.GetLimiter() != nil {
    local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
        return local.Close()
    })
}

limit 使用的是原生的 rate 包:

func (r *Reader) Read(p []byte) (n int, err error) {
    // 1. 获取令牌桶的突发容量
	b := r.limiter.Burst()

    // 2. 如果请求的读取量超过突发容量,调整读取大小
	if b < len(p) {
		p = p[:b]
	}

    // 3. 执行实际读取操作
	n, err = r.r.Read(p)
	if err != nil {
         // 4. 如果读取过程中出错,直接返回
		return
	}

     // 5. 根据实际读取的字节数消耗令牌
	err = r.limiter.WaitN(context.Background(), n)
	if err != nil {
		return
	}
	return
}

7. 参考文献

HTTPS://gofrp.org/zh-cn/docs/

HTTPS://blog.csdn.net/u012175637/article/details/84138925

HTTPS://cloud.tencent.com/developer/article/2093328


网站公告

今日签到

点亮在社区的每一天
去签到