base-golang/pkg/websocket/peer/session_manager.go

120 lines
2.4 KiB
Go
Raw Normal View History

2024-07-23 10:23:43 +08:00
package peer
import (
"sync"
"sync/atomic"
)
type SessionManager struct {
sessionList sync.Map // 使用Id关联会话
connIDGen int64 // 记录已经生成的会话ID流水号
count int64 // 记录当前在使用的会话数量
callback ConnectionCallBack
Register chan *Session
Unregister chan int64
}
func (mgr *SessionManager) SetIDBase(base int64) {
atomic.StoreInt64(&mgr.connIDGen, base)
}
func (mgr *SessionManager) Count() int {
return int(atomic.LoadInt64(&mgr.count))
}
func (mgr *SessionManager) Add(sess *Session) {
id := atomic.AddInt64(&mgr.connIDGen, 1)
atomic.AddInt64(&mgr.count, 1)
sess.Conn.(interface {
SetID(int64)
}).SetID(id)
mgr.sessionList.Store(id, sess)
}
func (mgr *SessionManager) Close(id int64) {
if v, ok := mgr.sessionList.Load(id); ok {
if mgr.callback != nil {
go mgr.callback.OnClosed(v.(*Session))
}
}
mgr.sessionList.Delete(id)
atomic.AddInt64(&mgr.count, -1)
}
func (mgr *SessionManager) ProcessMessage(id int64, msg []byte) {
if v, ok := mgr.sessionList.Load(id); ok {
if mgr.callback != nil {
go func() {
err := mgr.callback.OnReceive(v.(*Session), msg)
if err != nil {
v.(*Session).Conn.Close()
}
}()
}
}
}
func (mgr *SessionManager) run() {
for {
select {
case client := <-mgr.Register:
mgr.connIDGen++
mgr.count++
client.Conn.(interface {
SetID(int64)
}).SetID(mgr.connIDGen)
mgr.sessionList.Store(mgr.connIDGen, client)
case clientID := <-mgr.Unregister:
if v, ok := mgr.sessionList.Load(clientID); ok {
if mgr.callback != nil {
go mgr.callback.OnClosed(v.(*Session))
}
}
mgr.sessionList.Delete(clientID)
mgr.count--
}
}
}
func (mgr *SessionManager) GetSession(id int64) *Session {
if v, ok := mgr.sessionList.Load(id); ok {
return v.(*Session)
}
return nil
}
func (mgr *SessionManager) VisitSession(callback func(*Session) bool) {
mgr.sessionList.Range(func(key, value any) bool {
return callback(value.(*Session))
})
}
func (mgr *SessionManager) CloseAllSession() {
mgr.VisitSession(func(sess *Session) bool {
sess.Conn.Close()
return true
})
}
func (mgr *SessionManager) SessionCount() int64 {
return atomic.LoadInt64(&mgr.count)
}
func NewSessionMgr(callback ConnectionCallBack) *SessionManager {
s := &SessionManager{
callback: callback,
Register: make(chan *Session),
Unregister: make(chan int64),
}
go s.run()
return s
}