【开源品鉴】FRP源码阅读

frp 是一款高性能的反向代理应用,专注于内网穿透,支持多种协议和 P2P 通信功能,目前在 GitHub 上已有 80k 的 star。本文将深入探讨其源码,揭示其背后的实现原理。

1. 前言

frp 是一款高性能的反向代理应用,专注于内网穿透。它支持多种协议,包括 TCP、UDP、HTTP、HTTPS 等,并且具备 P2P 通信功能。使用 frp,您可以安全、便捷地将内网服务暴露到公网,通过拥有公网 IP 的节点进行中转,具体场景就是:将客户端部署到你的内网中,然后该客户端与你内网服务网络可达,当客户端与在公网的服务端连接后,我们就可以通过访问服务端的指定端口,去访问到内网服务。

目前 GitHub 已经有 80k 的 star,这么猛的项目,我决定阅读一番源码偷师一波。

2. pkg/auth

这个包负责客户端和服务端认证的代码,这里面一共用到了 2 种验证机制,一种是基于 token,就是预共享密钥,客户端和服务端实现配置一样的字符串密钥,第二种是 OAuth 2.0,依赖第三方授权服务器颁发的访问令牌,然后客户端带着令牌去访问服务端。

这里面有很多技巧值得学习:

2.1. 工厂函数

通过不同的配置生成对应的认证方式。

type Setter interface {SetLogin(*msg.Login) errorSetPing(*msg.Ping) errorSetNewWorkConn(*msg.NewWorkConn) error
}// 根据客户端配置创建认证提供者
func NewAuthSetter(cfg v1.AuthClientConfig) (authProvider Setter) {switch cfg.Method {// token 认证模式case v1.AuthMethodToken:authProvider = NewTokenAuth(cfg.AdditionalScopes, cfg.Token)// openid 认证模式case v1.AuthMethodOIDC:authProvider = NewOidcAuthSetter(cfg.AdditionalScopes, cfg.OIDC)default:panic(fmt.Sprintf(「wrong method: 『%s』」, cfg.Method))}return authProvider
}

2.2. 常量时间的字符串比较

正常情况来说,token 模式下,两边比较一下字符串是不是相等就完了,但其实这个是有安全隐患的,第一个就是攻击者可以进行重放攻击,一直进行密码爆破,第二个就是攻击者可以进行定时攻击,比如普通比较(如 ==)在发现第一个不匹配字节时会立即返回,攻击者可通过测量响应时间差异推断出匹配的字节位置,ConstantTimeCompare 始终遍历全部字节(即使已发现不匹配),使攻击者无法通过时间差获取敏感信息。

// token 和客户端上线的时间戳组成 key
func GetAuthKey(token string, timestamp int64) (key string) {md5Ctx := md5.New()md5Ctx.Write([]byte(token))md5Ctx.Write([]byte(strconv.FormatInt(timestamp, 10)))data := md5Ctx.Sum(nil)return hex.EncodeToString(data)
}// 全量匹配字节
func ConstantTimeCompare(x, y []byte) int {if len(x) != len(y) {return 0}var v bytefor i := 0; i < len(x); i++ {v |= x[i] ^ y[i]}return ConstantTimeByteEq(v, 0)
}// ConstantTimeByteEq returns 1 if x == y and 0 otherwise.
func ConstantTimeByteEq(x, y uint8) int {return int((uint32(x^y) - 1) >> 31)
}

3. pkg/config

config 文件夹是 frp 配置管理的核心模块,涵盖了配置的加载、解析、验证、转换和命令行支持等功能。它确保了 frp 的灵活性和兼容性,同时为用户提供了多种配置方式。

3.1. 使用环境变量进行模板渲染

serverAddr = 「{{ .Envs.FRP_SERVER_ADDR }}」
serverPort = 7000[[proxies]]
name = 「ssh」
type = 「tcp」
localIP = 「127.0.0.1」
localPort = 22
remotePort = {{ .Envs.FRP_SSH_REMOTE_PORT }}export FRP_SERVER_ADDR=「x.x.x.x」
export FRP_SSH_REMOTE_PORT=「6000」
./frpc -C ./frpc.toml

这个实现是采用了 template 模板库,其中 Envs 前缀是由字段名 Envs 决定的:

type Values struct {Envs map[string]string // 「{{ .Envs.FRP_SERVER_ADDR }}」 Envs 的由来
}func RenderWithTemplate(in []byte, values *Values) ([]byte, error) {tmpl, err := template.New(「frp」).Funcs(template.FuncMap{「parseNumberRange」:     parseNumberRange,「parseNumberRangePair」: parseNumberRangePair,}).Parse(string(in))if err != nil {return nil, err}buffer := bytes.NewBufferString(「」)if err := tmpl.Execute(buffer, values); err != nil {return nil, err}return buffer.Bytes(), nil
}// 将端口范围解析为 端口列表
func parseNumberRange(firstRangeStr string) ([]int64, error) {... ... 
}

这里面有一些自定义的解析函数,比如说:

ports = 「{{ parseNumberRange .Envs.PORT_RANGE }}」export PORT_RANGE = 「1000-1005」// 这样 ports 就会被 template 的 parseNumberRange 函数解析并渲染为
// ports = 1000, 1001, 1002, 1003, 1004, 1005

3.2. 配置拆分

通过 includes 参数可以在主配置中包含其他配置文件,从而实现将代理配置拆分到多个文件中管理

# frpc.toml
serverAddr = 「x.x.x.x」
serverPort = 7000
includes = [「./confd/*.toml」]

上述配置在 frpc.toml 中通过 includes 额外包含了 ./confd 目录下所有的 toml 文件的代理配置内容,效果等价于将这两个文件合并成一个文件。

这个实现是采用了,循环读取文件内容 + 模板渲染 + 配置合并+ toml 反序列化 的方法:

// 主文件配置,就是 frpc.toml
var content []byte
content, err = GetRenderedConfFromFile(filePath)
if err != nil {return
}
configBuffer := bytes.NewBuffer(nil)
configBuffer.Write(content)... ... var buf []byte
// 循环读取 include 的文件
// getIncludeContents
// ->ReadFile
// ->RenderContent
//   ->template.New(「frp」).Parse(string(in))
buf, err = getIncludeContents(cfg.IncludeConfigFiles)
if err != nil {err = fmt.Errorf(「getIncludeContents error: %v」, err)return
}
configBuffer.WriteString(「
」)
configBuffer.Write(buf)// 将所有配置合并,然后将 toml 序列化为 type ClientCommonConf struct
代理 Cfgs, visitorCfgs, err = LoadAllProxyConfsFromIni(cfg.User, configBuffer.Bytes(), cfg.Start)
if err != nil {return
}
return

3.3. 配置热加载

frpc reload -C ./frpc.toml 等待一段时间后,客户端将根据新的配置文件创建、更新或删除代理。

这里面也比较简单,主要逻辑在于配置校验,旧配置中与新配置里同名的且代理内容不一样的 proxy 停止,新增的配置的 proxy 再启动,也就是说老配置和新配置完全一样的是不动的

func (pm *Manager) UpdateAll(proxyCfgs []v1.ProxyConfigurer) {xl := xlog.FromContextSafe(pm.ctx)proxyCfgsMap := lo.KeyBy(proxyCfgs, func(C v1.ProxyConfigurer) string {return C.GetBaseConfig().Name})pm.mu.Lock()defer pm.mu.Unlock()delPxyNames := make([]string, 0)for name, pxy := range pm.proxies {del := falsecfg, ok := proxyCfgsMap[name]if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {del = true}if del {delPxyNames = append(delPxyNames, name)delete(pm.proxies, name)pxy.Stop()}}if len(delPxyNames) > 0 {xl.Infof(「proxy removed: %s」, delPxyNames)}addPxyNames := make([]string, 0)for _, cfg := range proxyCfgs {name := cfg.GetBaseConfig().Nameif _, ok := pm.proxies[name]; !ok {pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.msgTransporter, pm.vnetController)if pm.inWorkConnCallback != nil {pxy.SetInWorkConnCallback(pm.inWorkConnCallback)}pm.proxies[name] = pxyaddPxyNames = append(addPxyNames, name)pxy.Start()}}if len(addPxyNames) > 0 {xl.Infof(「proxy added: %s」, addPxyNames)}

4. 监控

frps 服务端支持两种监控系统:指标存在内存中,和指标输出到 Prometheus。主要监控以下指标:

type serverMetrics struct {// 记录当前连接到服务端的客户端数量。clientCount     Prometheus.Gauge// 记录当前代理的数量,按代理类型(如 TCP、HTTP)分类。proxyCount      *Prometheus.GaugeVec// 记录当前连接的数量,按代理类型(如 TCP、HTTP)分类。connectionCount *Prometheus.GaugeVec// 记录流入的总流量,按代理类型(如 TCP、HTTP)分类。trafficIn       *Prometheus.CounterVec// 记录流出的总流量,按代理类型(如 TCP、HTTP)分类。trafficOut      *Prometheus.CounterVec
}

内存监控没啥,但统计的增删改,这里用到了原子操作的技巧:

func (C *StandardCounter) Count() int32 {return atomic.LoadInt32(&C.count)
}func (C *StandardCounter) Inc(count int32) {atomic.AddInt32(&C.count, count)
}func (C *StandardCounter) Dec(count int32) {atomic.AddInt32(&C.count, -count)
}

对于不同类型的 proxy 的统计,frp 没有使用 syn map,而是用一把读写锁保平安

m.mu.Lock()defer m.mu.Unlock()counter, ok := m.info.ProxyTypeCounts[proxyType]if !ok {counter = metric.NewCounter()}
counter.Inc(1)

对于如何进行 Prometheus 监控,frp 的使用流程可以借鉴,整体来说分为以下几个步骤:

  1. 编码前,先定义指标,类似于:
Namespace: 「frp」,
Subsystem: 「server」,
Name:      「traffic_out」,
Help:      「The total out traffic」,
  1. frp 注册 Prometheus 指标
trafficOut: Prometheus.NewCounterVec(Prometheus.CounterOpts{Namespace: namespace,Subsystem: serverSubsystem,Name:      「traffic_out」,Help:      「The total out traffic」,
}, []string{「name」, 「type」}),
}Prometheus.MustRegister(m.clientCount)
Prometheus.MustRegister(m.proxyCount)
Prometheus.MustRegister(m.connectionCount)
Prometheus.MustRegister(m.trafficIn)
Prometheus.MustRegister(m.trafficOut)
  1. frp 暴露 HTTP 服务,一般是/metric,promhttp 提供一个 HTTP 处理器,用于暴露所有注册的 Prometheus 指标。
if svr.cfg.EnablePrometheus {subRouter.Handle(「/metrics」, promhttp.Handler())
}
  1. 配置 Prometheus 定时抓取这个 HTTP 路径,舒服了
全球:scrape_interval: 15s # 每 15 秒抓取一次数据scrape_configs:- job_name: 「frp_server」static_configs:- targets: [「localhost:8080」] # 替换为 frp 服务端暴露的 /metrics 端点

5. 通信安全

当 frpc 和 frps 之间启用了 TLS 之后,流量会被全局加密,不再需要配置单个代理上的加密,新版本中已经默认启用。每一个代理都可以选择是否启用加密和压缩的功能。

在每一个代理的配置中使用如下参数指定:

[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.useEncryption = true
transport.useCompression = true

5.1. 加密

通过设置 transport.useEncryption = true,将 frpc 与 frps 之间的通信内容加密传输,将会有效防止传输内容被截取。

这个加密它使用了装饰器模式,传入普通的 IO,WithEncryption 后就会得到一个可以加密的 IO

remote, err = libio.WithEncryption(remote, encKey)
if err != nil {workConn.xl.Errorf(「create encryption stream error: %v」, err)return
}

我们接下来看如何加密的:

总体加密算法采用 aes-128-cfb,aes 是一个对称加密,主要靠 key 和 iv 两个值

// pbkdf2 会生成一个用于 aes 加密的 key
// 入参 key 为:配置的 token
// DefaultSalt 为字符串默认值
key = pbkdf2.Key(key, []byte(DefaultSalt), 64, aes.BlockSize, sha1.New)// iv 是用 rand 函数生成的安全加密的随机数
if _, err := io.ReadFull(rand.Reader, iv); err != nil {return nil, err
}// Reader is a global, shared instance of a cryptographically
// secure random number generator. It is safe for concurrent use.
//
//   - On Linux, FreeBSD, Dragonfly, and Solaris, Reader uses getrandom(2).
//   - On legacy Linux (< 3.17), Reader opens /dev/urandom on first use.
//   - On macOS, iOS, and OpenBSD Reader, uses arc4random_buf(3).
//   - On NetBSD, Reader uses the kern.arandom sysctl.
//   - On Windows, Reader uses the ProcessPrng API.
//   - On js/wasm, Reader uses the Web Crypto API.
//   - On wasi/wasm, Reader uses random_get.
//
// In FIPS 140-3 mode, the output passes through an SP 800-90A Rev. 1
// Deterministic Random Bit Generator (DRBG).
var Reader io.Reader

这样后续的 IO 操作都会自带加密了。

5.2. 压缩

压缩也是同理,搞一个压缩的 IO 装饰器就好了。

如果传输的报文长度较长,通过设置 transport.useCompression = true 对传输内容进行压缩,可以有效减小 frpc 与 frps 之间的网络流量,加快流量转发速度,但是会额外消耗一些 CPU 资源。

压缩算法采用 snappy 库

sr := snappy.NewReader(rwc)
sw := snappy.NewWriter(rwc)
return WrapReadWriteCloser(sr, sw, func() error {_ = sw.Close()return rwc.Close()})
}

5.3. 自定义 TLS

这个其实就是使用自签发的 CA,去生成密钥和证书,然后客户端和服务端加载起来后,可以进行双向或者单向验证,进行 HTTPS 握手,后续流量也是 HTTPS 加密的。

客户端单向校验服务端:

# frpc.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」

服务端单向校验客户端:

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」# frps.toml
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

双向验证

# frpc.toml
transport.tls.certFile = 「/to/cert/path/client.crt」
transport.tls.keyFile = 「/to/key/path/client.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」# frps.toml
transport.tls.certFile = 「/to/cert/path/server.crt」
transport.tls.keyFile = 「/to/key/path/server.key」
transport.tls.trustedCaFile = 「/to/ca/path/ca.crt」

介绍这个之前,我们先回顾以下 TLS 握手的过程,hhh:

okk,那我们看 frp 是如何实现 tls 的:

// 获取 TLS 配置,作为 dial 选项
// tlsConfig, err = transport.NewClientTLSConfig
// tlsConfig, err = transport.NewServerTLSConfig
dialOptions = append(dialOptions, libnet.WithTLSConfig(tlsConfig))...// dail tcp 本身就是 tls 的了
conn, err := libnet.DialContext(C.ctx,net.JoinHostPort(C.cfg.ServerAddr, strconv.Itoa(C.cfg.ServerPort)),dialOptions...,
)// 加载服务端的 ca,证书+key
// 核心是 tls 库 tls.LoadX509KeyPair(certfile, keyfile),去管理证书和 key
func NewServerTLSConfig(certPath, keyPath, caPath string) (*tls.Config, error) {base := &tls.Config{}if certPath == «» || keyPath == «» {// server will generate tls conf by itselfcert := newRandomTLSKeyPair()base.Certificates = []tls.Certificate{*cert}} else {// 调的是这个 tlsCert, err := tls.LoadX509KeyPair(certfile, keyfile)cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}if caPath != '' {// ca 证书pool, err := newCertPool(caPath)if err != nil {return nil, err}// 校验客户端base.ClientAuth = tls.RequireAndVerifyClientCertbase.ClientCAs = pool}return base, nil
}// 加载客户端的 ca,证书+key
func NewClientTLSConfig(certPath, keyPath, caPath, serverName string) (*tls.Config, error) {base := &tls.Config{}if certPath != '' && keyPath != '' {cert, err := newCustomTLSKeyPair(certPath, keyPath)if err != nil {return nil, err}base.Certificates = []tls.Certificate{*cert}}base.ServerName = serverNameif caPath != '' {pool, err := newCertPool(caPath)if err != nil {return nil, err}base.RootCAs = pool// 校验服务端base.InsecureSkipVerify = false} else {base.InsecureSkipVerify = true}return base, nil
}// Only support one ca file to add
func newCertPool(caPath string) (*x509.CertPool, error) {pool := x509.NewCertPool()cacrt, err := os.ReadFile(caPath)if err != nil {return nil, err}pool.AppendCertsFromPEM(caCrt)return pool, nil
}

6. 代理配置

6.1. proxy

代理是 frp 的核心,这里详细介绍一下它的流程。

frpc 和 frps 的整体流程,里面可以抽象为 3 种连接,整体我画了一张图:

  1. 用户连接 (User Connection):
  • 这是外部用户连接到 FRP 服务端(frps)特定端口的连接,也就是说想要访问内网服务的,例如,当运维访问 frps.example.com:8080 时建立的连接就是用户连接,它实际访问的是客户侧某个管理平台
  • 在 frps 端,这个连接由 handleUserTCPConnection 函数处理。
  • 工作连接 (Work Connection):
  • 这是 frps 和 frpc 之间预先建立的连接,用于传输用户连接的数据。
  • frps 在需要处理用户连接时会从连接池中获取一个可用的工作连接。
  • 如果池中没有可用的工作连接,frps 会通知 frpc 创建新的工作连接。
  • 工作连接是 frps 和 frpc 之间的隧道,用户数据通过这个隧道在外部用户和内部服务之间传输。
  • 本地连接 (Local Connection):
  • 在 frp 的上下文中,远程连接通常指的是 frpc 连接到内部服务的连接
  • 例如,当 frpc 收到从工作连接传来的数据时,它会创建一个连接到配置中指定的本地服务(如 localhost:80),这个连接就是远程连接。

下面是 FRP 数据流的完整过程:

  1. 外部用户(用户连接) -> frps 监听端口
  2. frps 从工作连接池中获取一个 工作连接(frps <-> frpc)
  3. frps 将用户连接与工作连接绑定(通过双向数据转发)
  4. frpc 接收到来自工作连接的数据,然后建立一个 远程连接(frpc -> 内部服务)
  5. frpc 将工作连接与远程连接绑定(通过双向数据转发)

下面来看看关键代码实现:

// 用户连接 (User Connection):
// frps 侧
// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != «» {// 获取组监听器(实际共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}
// 用户链接处理
func (pxy *BaseProxy) startCommonTCPListenersHandler() {for _, listener := range pxy.listeners {Go func(l net.Listener) {for {conn, err := l.Accept() // 此处调用 TCPGroupListener.Accept()Go pxy.handleUserTCPConnection(conn) // 处理连接}}(listener)}
}// 工作连接 (Work Connection):
// frps 侧
// 从连接池中获取一个已建立的到 FRP 客户端的连接
// 内部实现路径:pxy.GetWorkConn() → pxy.workConnManager.Get()
// 底层通过 FRP 协议发送 NewWorkConn 消息到客户端建立隧道,这部分就是内部服务不一样的地方
// -> GetWorkConn
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {return
}
defer workConn.Close()var local io.ReadWriteCloser = workConn
// 启动双向数据转发 
inCount, outCount, _ := libio.Join(local, userConn)// 在取出工作连接后,frps 会立即向 frpc 发送 msg.ReqWorkConn 消息,请求新的工作连接。
_ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
// 如果连接池为空,frps 会等待 frpc 创建新的工作连接并发送过来。
select {
case workConn, ok = <-ctl.workConnCh:if !ok {err = pkgerr.ErrCtlClosedxl.Warnf(「no work connections available, %v」, err)return}
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):err = fmt.Errorf(「timeout trying to get work connection」)xl.Warnf(「%v」, err)return
}// 本地连接 (Local Connection):
// frpc 侧
// handleReqWorkConn
// HandleWorkConn
// HandleTCPWorkConnection
unc (ctl *Control) handleReqWorkConn(_ msg.Message) {xl := ctl.xlworkConn, err := ctl.connectServer()if err != nil {xl.Warnf(「start new connection to server error: %v」, err)return}m := &msg.NewWorkConn{RunID: ctl.sessionCtx.RunID,}if err = ctl.sessionCtx.AuthSetter.SetNewWorkConn(m); err != nil {xl.Warnf(「error during NewWorkConn authentication: %v」, err)workConn.Close()return}if err = msg.WriteMsg(workConn, m); err != nil {xl.Warnf(「work connection write to server error: %v」, err)workConn.Close()return}var startMsg msg.StartWorkConnif err = msg.ReadMsgInto(workConn, &startMsg); err != nil {xl.Tracef(「work connection closed before response StartWorkConn message: %v」, err)workConn.Close()return}if startMsg.Error != 「」 {xl.Errorf(「StartWorkConn contains error: %s」, startMsg.Error)workConn.Close()return}// dispatch this work connection to related proxyctl.pm.HandleWorkConn(startMsg.ProxyName, workConn, &startMsg)
}remote = workConn
... ... 
localConn, err := libnet.Dial(net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),libnet.WithTimeout(10*time.Second),
)
... ... 
_, _, errs := libio.Join(localConn, remote)

双向转发的实现灰常简洁,值得学习:

func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64, errors []error) {var wait sync.WaitGrouprecordErrs := make([]error, 2)pipe := func(number int, to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {defer wait.Done()defer CosClose()defer from.Close()buf := pool.GetBuf(16 * 1024)defer pool.PutBuf(buf)*count, recordErrs[number] = io.CopyBuffer(to, from, buf)}wait.Add(2)Go pipe(0, c1, c2, &inCount)Go pipe(1, c2, c1, &outCount)wait.Wait()for _, e := range recordErrs {if e != nil {errors = append(errors, e)}}return
}

6.2. 负载均衡

你可以将多个相同类型的代理加入到同一个 group 中,以实现负载均衡的能力,当用户连接 frps 服务器的 80 端口时,frps 会将接收到的用户连接随机分发给其中一个存活的代理。这可以确保即使一台 frpc 机器挂掉,仍然有其他节点能够提供服务。

# frpc.toml
[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 8080
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」[[proxies]]
name = 「test2」
type = 「tcp」
localPort = 8081
remotePort = 80
loadBalancer.group = 「web」
loadBalancer.groupKey = 「123」

这个负载均衡的实现的关键结构体是 TCPGroupCtl *group.TCPGroupCtl:

// 管理 TCP 代理的分组逻辑,包括分组的创建、监听、连接分发等功能。
TCPGroupCtl *group.TCPGroupCtl// 主要有三大功能// 1. 分组管理:
// 将多个 TCP 代理分组到一起,形成一个逻辑组。
// 每个组可以共享一个端口,分发连接到组内的代理。// 2. 负载均衡:
// 根据一定的规则随机分发,将链接分发到组内的代理。// 3. 资源管理:
// 负责监听和关闭组内的连接。
// 管理组的生命周期。
// tcp 代理分组
// 分组内统一监听,共享一个 remote port 的 coon,这个我们叫 remote conn,就是用户 connection
func (tgc *TCPGroupCtl) Listen(proxyName string, group string, groupKey string, addr string, port int) (l net.Listener, realPort int, err error) {tgc.mu.Lock()tcpGroup, ok := tgc.groups[group]if !ok {tcpGroup = NewTCPGroup(tgc)tgc.groups[group] = tcpGroup}tgc.mu.Unlock()return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}// 代理加入组
func (tg *TCPGroup) Listen(proxyName, group, groupKey, addr string, port int) (*TCPGroupListener, int, error) {tg.mu.Lock()defer tg.mu.Unlock()// 首次加入组:创建真实监听if len(tg.lns) == 0 {realPort, err := tg.ctl.portManager.Acquire(proxyName, port) // 申请端口tcpLn, err := net.Listen(「tcp」, net.JoinHostPort(addr, strconv.Itoa(port)))tg.realPort = realPorttg.tcpLn = tcpLnGo tg.worker() // 启动连接分发协程...}
}// 当新连接到达共享端口时,会被放入全局通道(acceptCh),
// 组内所有代理通过竞争机制获取链接,实现负载均衡
func (tg *TCPGroup) worker() {for {conn, err := tg.tcpLn.Accept() // 接收新连接tg.acceptCh <- conn            // 放入全局通道}
}
func (ln *TCPGroupListener) Accept() (net.Conn, error) {select {case <-ln.closeCh:return nil, ErrListenerClosedcase conn := <-ln.group.acceptCh: // 从全局通道竞争获取连接return conn, nil}
}// tcp 代理启动
func (pxy *TCPProxy) Run() (string, error) {if pxy.cfg.LoadBalancer.Group != 「」 {// 获取组监听器(实际共享端口)l, realBindPort, err := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.LoadBalancer.Group, ...)pxy.listeners = append(pxy.listeners, l)// 启动连接处理器(最终调用 BaseProxy.startCommonTCPListenersHandler)pxy.startCommonTCPListenersHandler() }// ...
}

6.3. 健康检查

通过给代理配置健康检查参数,可以在要反向代理的服务出现故障时,将该服务从 frps 中摘除。结合负载均衡的功能,这可用于实现高可用架构,避免服务单点故障。

[[proxies]]
name = 「test1」
type = 「tcp」
localPort = 22
remotePort = 6000
# 启用健康检查,类型为 tcp
healthCheck.type = 「tcp」
# 建立连接超时时间为 3 秒
healthCheck.timeoutSeconds = 3
# 连续 3 次检查失败,此 proxy 会被摘除
healthCheck.maxFailed = 3
# 每隔 10 秒进行一次健康检查
healthCheck.intervalSeconds = 10

这个配置被加载到 TCPProxyConfig-》ProxyBaseConfig-》HealthCheckConfig

type HealthCheckConfig struct {// Type specifies what protocol to use for health checking.// Valid values include 「tcp」, 「HTTP」, and 「」. If this value is 「」, health// checking will not be performed.//// If the type is 「tcp」, a connection will be attempted to the target// server. If a connection cannot be established, the health check fails.//// If the type is 「HTTP」, a GET request will be made to the endpoint// specified by HealthCheckURL. If the response is not a 200, the health// check fails.Type string `json:「type」` // tcp | HTTP// TimeoutSeconds specifies the number of seconds to wait for a health// check attempt to connect. If the timeout is reached, this counts as a// health check failure. By default, this value is 3.TimeoutSeconds int `json:「timeoutSeconds,omitempty」`// MaxFailed specifies the number of allowed failures before the// is stopped. By default, this value is 1.MaxFailed int `json:「maxFailed,omitempty」`// IntervalSeconds specifies the time in seconds between health// checks. By default, this value is 10.IntervalSeconds int `json:「intervalSeconds」`// Path specifies the path to send health checks to if the// health check type is 「HTTP」.Path string `json:「path,omitempty」`// HTTPHeaders specifies the headers to send with the health request, if// the health check type is 「HTTP」.HTTPHeaders []HTTPHeader `json:「httpHeaders,omitempty」`
}

这部分代码非常独立,相当于起了一个定时的 monitor,去监控代理的 proxy 是否有效,连续检查失败,此 proxy 会被摘除

func (monitor *Monitor) checkWorker() {forerr := monitor.doCheck(doCtx)... ... time.Sleep(monitor.interval)}   
}func (monitor *Monitor) doCheck(ctx context.Context) error {switch monitor.checkType {case 「tcp」:return monitor.doTCPCheck(ctx)case 「HTTP」:return monitor.doHTTPCheck(ctx)default:return ErrHealthCheckType}
}func (monitor *Monitor) doTCPCheck(ctx context.Context) error {// if tcp address is not specified, always return nilif monitor.addr == 「」 {return nil}var d net.Dialerconn, err := d.DialContext(ctx, 「tcp」, monitor.addr)if err != nil {return err}conn.Close()return nil
}

6.4. 代理限速

# frpc.toml
[[proxies]]
name = 「ssh」
type = 「tcp」
localPort = 22
remotePort = 6000
transport.bandwidthLimit = 「1MB」

核心代码,依然是获取 tcp 连接时,加一个限速的装饰器:

var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}if pxy.GetLimiter() != nil {local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {return local.Close()})
}

limit 使用的是原生的 rate 包:

func (r *Reader) Read(p []byte) (n int, err error) {// 1. 获取令牌桶的突发容量b := r.limiter.Burst()// 2. 如果请求的读取量超过突发容量,调整读取大小if b < len(p) {p = p[:b]}// 3. 执行实际读取操作n, err = r.r.Read(p)if err != nil {// 4. 如果读取过程中出错,直接返回return}// 5. 根据实际读取的字节数消耗令牌err = r.limiter.WaitN(context.Background(), n)if err != nil {return}return
}

7. 参考文献

HTTPS://gofrp.org/zh-cn/docs/

HTTPS://blog.csdn.net/u012175637/article/details/84138925

HTTPS://cloud.tencent.com/developer/article/2093328

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87952.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day048-系统负载高排查流程与前后端分离项目

文章目录 0. 老男孩思想1. 系统负载高排查流程1.1 进程/线程相关命令1.1.1 jps1.1.2 jstack1.1.3 jmap1.1.4 top -Hp pid 1.2 排查流程图 2. 前后端分离项目2.1 项目说明2.2 负载均衡2.3 数据库配置2.3.1 安装数据库服务2.3.2 配置数据库环境 2.4 后端配置2.5 四层负载均衡配置…

Spring Boot 牵手EasyExcel:解锁高效数据处理姿势

引言 在日常的 Java 开发中&#xff0c;处理 Excel 文件是一个极为常见的需求。无论是数据的导入导出&#xff0c;还是报表的生成&#xff0c;Excel 都扮演着重要的角色。例如&#xff0c;在企业的财务管理系统中&#xff0c;需要将每月的财务数据导出为 Excel 报表&#xff0…

【ARM AMBA AXI 入门 21 -- AXI partial 访问和 narrow 访问的区别】

文章目录 Overview一、定义区别二、AXI 信号层面对比三、举例说明示例一:Partial Access示例二:Narrow Access四、硬件/系统处理角度五、AXI 总线接口信号举例对比Partial Write 事务:Narrow Write 事务(32-bit Master on 64-bit Bus):六、总结对比表七,软件判断判断 Pa…

使用Ideal创建一个spring boot的helloWorld项目

说明&#xff1a;本篇将介绍如何使用Ideal2024.2.1去创建一个spring boot的helloWorld项目&#xff0c;本篇将包含创建的详细步骤以及spring boot项目的目录结构说明&#xff0c;创建过程中的选项说明等。详细步骤如下&#xff1a;第一步&#xff1a;点击文件——新建——项目&…

国内Ubuntu访问不了github等外网

各位小伙伴们&#xff0c;大家好呀。 大家是不是经常遇到访问不了外网的情况呀。 在Ubuntu中可以这样做。 访问这个网站网站测速-Ping检测-Trace查询-Dig查询-路由跟踪查询-tools.ipip.net&#xff0c; 对于github.com&#xff0c;在这个网站输入github.com&#xff0c;会返…

PDF转换工具,即开即用

在办公室里&#xff0c;这句话被反复验证。每天面对成堆的Word和Excel文件&#xff0c;将它们转换成PDF格式是常有的事。可之前用过的工具&#xff0c;不是一次只能转一个&#xff0c;就是操作繁琐得让人头疼。记得有次赶项目&#xff0c;需要把二十多个文档转成PDF&#xff0c…

2. 你可以说一下 http 版本的发展过程吗

你可以说一下 http 版本的发展过程吗 总结&#xff1a;0.9&#xff1a;只能发送 get&#xff0c;无状态。1.0&#xff1a;新增 post&#xff0c;请求头&#xff0c;状态码&#xff0c;cookie。1.1&#xff1a;新增 put/delete/options/patch&#xff0c;keep-alive&#xff0c…

04-Linux驱动模块的自动加载

概述 上一节&#xff0c;我们讲述了Linux驱动开发的基本的模块代码编写和手动执行模块加载的操作&#xff0c; 这一节&#xff0c;我们讲述嵌入式设备上使用Sysvint引导方式下如何开机自动加载模块的步骤。感兴趣的同学看下使用systemd引导方式的开启自动加载模块的步骤 操作…

【牛客算法】游游的整数切割

文章目录 一、题目介绍1.1 题目链接1.2 题目描述1.3 输入描述1.4 输出描述1.5 示例二、解题思路2.1 核心算法设计2.2 性能优化关键2.3 算法流程图三、解法实现3.1 解法一:基础遍历法3.1.1 初级版本分析3.2 解法二:奇偶预统计法(推荐)3.2.1 优化版本分析四、总结与拓展4.1 关…

笔记本电脑忽亮忽暗问题

关于笔记本电脑忽亮忽暗的问题这个问题困扰了我大半年&#xff0c;最后忽然找到解决方法了---主要的话有三种可能性1.关闭显示器自动调亮的功能2.关闭节能模式自动调亮功能3.调整显卡的功率&#xff0c;关闭自动调亮功能一开始一直都是尝试的第一种方法&#xff0c;没解决。。。…

Qt的顶部工具栏在多个界面使用

Qt的工具栏在多个界面使用1、前言2、创建一个工具栏类2.1 新建一个工具栏类3、提升工具栏类3.1登录界面添加工具栏3.2 创建工具栏对象4、总结1、前言 今天遇到了个问题&#xff0c;顶部的工具栏&#xff0c;像软键盘&#xff0c;时间显示和退出按钮那些&#xff0c;想在多个界…

C#和SQL Server连接常用通讯方式

C#和SQL Server连接通讯 在 C# 中与 SQL Server 建立数据库连接&#xff0c;主要通过 ADO.NET 技术实现。以下是几种常见的连接方式及相关实践&#xff1a; ADO.NET 全面指南&#xff1a;C# 数据库访问核心技术 ADO.NET 是 .NET Framework 中用于数据访问的核心组件&#xf…

安卓10.0系统修改定制化____实现自动开启 USB 调试​的步骤解析 列举常用的几种修改方法

对于安卓开发者、测试人员,甚至是喜欢折腾手机的数码爱好者来说,USB 调试是一个非常重要的功能。它能让手机与电脑相连,实现应用安装、系统调试、数据传输等操作。但每次连接手机都要手动去设置里开启 USB 调试,实在麻烦。其实,通过修改安卓 10.0 的 ROM,就能让手机自动开…

Redisson详细教程 - 从入门到精通

目录 1. 什么是Redisson 2. 为什么要用Redisson 3. 环境准备和配置 4. 基础使用方法 5. 分布式数据结构 6. 分布式锁详解 7. 分布式服务 8. 实际应用场景 9. 最佳实践 10. 常见问题解答 总结 1. 什么是Redisson 简单理解 想象一下,Redis就像一个超级强大的"内…

动态规划VS记忆化搜索(2)

luoguP1434滑雪 题目描述 Michael 喜欢滑雪。这并不奇怪&#xff0c;因为滑雪的确很刺激。可是为了获得速度&#xff0c;滑的区域必须向下倾斜&#xff0c;而且当你滑到坡底&#xff0c;你不得不再次走上坡或者等待升降机来载你。Michael 想知道在一个区域中最长的滑坡。区域由…

如何将服务守护进程化

进程组 什么是进程组 之前我们提到了进程的概念&#xff0c; 其实每一个进程除了有一个进程 ID(PID)之外 还属于一个进程组。进程组是一个或者多个进程的集合&#xff0c; 一个进程组可以包含多个进程。 每一个进程组也有一个唯一的进程组 ID(PGID)&#xff0c; 并且这个 PGID …

【跟着PMP学习项目管理】项目管理 之 范围管理知识点

目录 一、收集需求 1、知识点汇总 2、输入 3、工具 4、输出 二、定义范围 1、知识点汇总 2、输入 3、工具 4、输出 三、创作工作分解结构 1、知识点汇总 2、输入 3、工具 4、输出 四、核实范围 1、知识点汇总 2、输入 3、工具 4、输出 五、控制范围 1、知…

AIX 环境磁盘空间管理指南

AIX 环境磁盘空间管理指南 在AIX环境中&#xff0c;磁盘空间的监控、管理与扩展是运维人员必备的技能。本文通过实际案例&#xff0c;系统地介绍如何查询磁盘信息、卷组(VG)、逻辑卷(LV)信息&#xff0c;以及在磁盘空间不足时的扩容方案&#xff0c;帮助读者掌握磁盘空间管理的…

k8s将service的IP对应的不同端口分配到不同的pod上

在Kubernetes中&#xff0c;Service是一种抽象层&#xff0c;它将请求路由到一组Pod。当你需要将Service的不同端口映射到不同的Pod时&#xff0c;可以通过以下两种主要方式实现&#xff1a; 方法一&#xff1a;使用单个Service的多端口配置 如果不同的Pod提供不同的服务&…

aic8800M40低功耗sdio wifi在arm-linux平台调试经验

背景 好多年没有搞过wifi相关的内容了,最近也被安排上了,把一颗低功耗aic8800M40的芯片在arm-linux开发板上做bring up,记录一下SDIO wifi调试的过程和经验,SDIO驱动这里需要改动一些linux内核HOST驱动代码,会在文章中贴出来: AIC8800M40芯片简介 这个wifi芯片是一颗低…