transport

package
v0.0.0-...-bb27e5e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Overview

Package transport 定义传输层接口

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrConnectionClosed = errors.New("connection closed")
	ErrNotConnected     = errors.New("not connected")
	ErrInvalidMessage   = errors.New("invalid message")
	ErrSendFailed       = errors.New("send failed")
)

错误定义

Functions

This section is empty.

Types

type Config

type Config struct {
	Address              string
	ConnectTimeout       time.Duration
	ReadTimeout          time.Duration
	WriteTimeout         time.Duration
	HeartbeatInterval    time.Duration
	BufferSize           int
	CompressionEnabled   bool
	CompressionThreshold int
}

Config 传输配置

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig 默认配置

type GRPCTransport

type GRPCTransport struct {
	// contains filtered or unexported fields
}

GRPCTransport gRPC 双向流传输实现

func NewGRPCTransport

func NewGRPCTransport(config *Config) *GRPCTransport

NewGRPCTransport 创建 gRPC 传输

func (*GRPCTransport) Close

func (t *GRPCTransport) Close() error

Close 关闭连接

func (*GRPCTransport) Connect

func (t *GRPCTransport) Connect(ctx context.Context) error

Connect 建立 gRPC 连接和双向流

func (*GRPCTransport) IsHealthy

func (t *GRPCTransport) IsHealthy() bool

IsHealthy 健康检查

func (*GRPCTransport) Receive

func (t *GRPCTransport) Receive(ctx context.Context) (*message.Envelope, error)

Receive 接收消息

func (*GRPCTransport) Send

func (t *GRPCTransport) Send(ctx context.Context, envelope *message.Envelope) error

Send 发送消息

func (*GRPCTransport) State

func (t *GRPCTransport) State() State

State 获取状态

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport HTTP 降级传输实现

func NewHTTPTransport

func NewHTTPTransport(config *Config) *HTTPTransport

NewHTTPTransport 创建 HTTP 传输

func (*HTTPTransport) Close

func (t *HTTPTransport) Close() error

Close 关闭连接

func (*HTTPTransport) Connect

func (t *HTTPTransport) Connect(ctx context.Context) error

Connect 建立 HTTP 连接(验证服务器可达性)

func (*HTTPTransport) IsHealthy

func (t *HTTPTransport) IsHealthy() bool

IsHealthy 检查连接健康状态

func (*HTTPTransport) Receive

func (t *HTTPTransport) Receive(ctx context.Context) (*message.Envelope, error)

Receive 接收消息

func (*HTTPTransport) Send

func (t *HTTPTransport) Send(ctx context.Context, envelope *message.Envelope) error

Send 发送消息 (POST)

func (*HTTPTransport) SetAPIKey

func (t *HTTPTransport) SetAPIKey(apiKey string)

SetAPIKey 设置 API Key

func (*HTTPTransport) State

func (t *HTTPTransport) State() State

State 获取当前状态

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 连接管理器 - 支持 WS+gRPC 负载均衡、自动降级和重连

func NewManager

func NewManager(config *ManagerConfig) *Manager

NewManager 创建连接管理器

func (*Manager) ActiveTransport

func (m *Manager) ActiveTransport() TransportType

ActiveTransport 获取当前活跃传输类型(用于兼容)

func (*Manager) GetConnectedTransports

func (m *Manager) GetConnectedTransports() []TransportType

GetConnectedTransports 获取所有已连接的传输类型

func (*Manager) IsConnected

func (m *Manager) IsConnected() bool

IsConnected 检查是否已连接

func (*Manager) IsDegraded

func (m *Manager) IsDegraded() bool

IsDegraded 是否处于降级状态

func (*Manager) Receive

func (m *Manager) Receive(ctx context.Context) (*message.Envelope, error)

Receive 接收消息

func (*Manager) ReceiveChan

func (m *Manager) ReceiveChan() <-chan *message.Envelope

ReceiveChan 获取接收 channel

func (*Manager) Send

func (m *Manager) Send(ctx context.Context, envelope *message.Envelope) error

Send 发送消息(通过负载均衡选择 transport)

func (*Manager) SendDirect

func (m *Manager) SendDirect(ctx context.Context, t TransportType, envelope *message.Envelope) error

SendDirect 直接通过指定的 transport 发送消息(用于认证等场景)

func (*Manager) SetAuthCallback

func (m *Manager) SetAuthCallback(callback func(TransportType) error)

SetAuthCallback 设置认证回调(重连后调用,传入 transport 类型)

func (*Manager) SetTransportChangeCallback

func (m *Manager) SetTransportChangeCallback(callback func(TransportType))

SetTransportChangeCallback 设置传输切换回调

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start 启动连接管理器

func (*Manager) Stop

func (m *Manager) Stop() error

Stop 停止连接管理器

type ManagerConfig

type ManagerConfig struct {
	WSAddress           string        // WebSocket 地址
	GRPCAddress         string        // gRPC 地址
	HTTPAddress         string        // HTTP 地址
	APIKey              string        // API Key (用于 HTTP transport)
	EnabledTransports   []string      // 启用的传输协议 (websocket, grpc, http)
	MaxRetries          int           // 最大重试次数
	RetryDelay          time.Duration // 重试延迟
	HealthCheckInterval time.Duration // 健康检查间隔
	FailoverThreshold   int           // 触发故障转移的连续失败次数
	PingInterval        time.Duration // Ping 间隔
	MaxSendRetries      int           // 消息发送最大重试次数
}

ManagerConfig 连接管理器配置

func DefaultManagerConfig

func DefaultManagerConfig() *ManagerConfig

DefaultManagerConfig 默认管理器配置

type State

type State uint8

State 连接状态

const (
	StateDisconnected State = iota
	StateConnecting
	StateConnected
	StateClosed
)

type Transport

type Transport interface {
	Connect(ctx context.Context) error
	Close() error
	Send(ctx context.Context, envelope *message.Envelope) error
	Receive(ctx context.Context) (*message.Envelope, error)
	State() State
	IsHealthy() bool
}

Transport 传输层接口

type TransportType

type TransportType uint8

TransportType 传输类型

const (
	TransportAuto TransportType = iota
	TransportWebSocket
	TransportGRPC
	TransportHTTP
)

func (TransportType) String

func (t TransportType) String() string

String 返回传输类型的字符串表示

type WebSocketTransport

type WebSocketTransport struct {
	// contains filtered or unexported fields
}

WebSocketTransport WebSocket 传输实现

func NewWebSocketTransport

func NewWebSocketTransport(config *Config) *WebSocketTransport

NewWebSocketTransport 创建 WebSocket 传输

func (*WebSocketTransport) Close

func (t *WebSocketTransport) Close() error

Close 关闭连接

func (*WebSocketTransport) Connect

func (t *WebSocketTransport) Connect(ctx context.Context) error

Connect 建立连接(参考 LG 模式:服务端主导心跳,客户端配合响应)

func (*WebSocketTransport) IsHealthy

func (t *WebSocketTransport) IsHealthy() bool

IsHealthy 健康检查

func (*WebSocketTransport) Receive

Receive 接收消息(不主动设置 ReadDeadline,由 PingHandler/PongHandler 管理)

func (*WebSocketTransport) Send

func (t *WebSocketTransport) Send(ctx context.Context, envelope *message.Envelope) error

Send 发送消息

func (*WebSocketTransport) State

func (t *WebSocketTransport) State() State

State 获取状态

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL