package connect import ( "errors" "time" "gitea.bvbej.com/bvbej/base-golang/pkg/websocket/peer" "github.com/gorilla/websocket" ) const ( writeWait = 20 * time.Second pongWait = 60 * time.Second pingPeriod = (pongWait * 9) / 10 maxFrameMessageLen = 16 * 1024 //4 * 4096 maxSendBuffer = 16 ) var ( ErrBrokenPipe = errors.New("send to broken pipe") ErrBufferPoolExceed = errors.New("send buffer exceed") ) type wsConnection struct { peer.ConnectionIdentify pm *peer.SessionManager conn *websocket.Conn send chan []byte running bool } func NewConnection(conn *websocket.Conn, p *peer.SessionManager) *wsConnection { wsc := &wsConnection{ conn: conn, pm: p, send: make(chan []byte, maxSendBuffer), running: true, } go wsc.acceptLoop() go wsc.sendLoop() return wsc } func (ws *wsConnection) Peer() *peer.SessionManager { return ws.pm } func (ws *wsConnection) Raw() any { if ws.conn == nil { return nil } return ws.conn } func (ws *wsConnection) RemoteAddr() string { if ws.conn == nil { return "" } return ws.conn.RemoteAddr().String() } func (ws *wsConnection) Close() { _ = ws.conn.Close() ws.running = false } func (ws *wsConnection) Send(msg []byte) (err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe } }() if !ws.running { return ErrBrokenPipe } if len(ws.send) >= maxSendBuffer { return ErrBufferPoolExceed } if len(msg) > maxFrameMessageLen { return } ws.send <- msg return nil } func (ws *wsConnection) acceptLoop() { defer func() { ws.pm.Unregister <- ws.ID() _ = ws.conn.Close() ws.running = false }() _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait)) ws.conn.SetPongHandler(func(string) error { _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) for ws.conn != nil { _, data, err := ws.conn.ReadMessage() if err != nil { break } ws.pm.ProcessMessage(ws.ID(), data) } } func (ws *wsConnection) sendLoop() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() _ = ws.conn.Close() ws.running = false close(ws.send) }() for { select { case msg := <-ws.send: _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := ws.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { return } case <-ticker.C: _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := ws.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } func (ws *wsConnection) IsClosed() bool { return !ws.running }