package sse import ( "fmt" "gitea.bvbej.com/bvbej/base-golang/pkg/mux" "gitea.bvbej.com/bvbej/base-golang/pkg/ticker" "github.com/gin-gonic/gin" "io" "net/http" "sync" "sync/atomic" "time" ) var _ Server = (*event)(nil) type Server interface { HandlerFunc() mux.HandlerFunc GinHandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc Push(user any, name, msg string) bool Broadcast(name, msg string) } type clientChan struct { User any Chan chan msgChan } type msgChan struct { Name string Message string } type event struct { SessionList sync.Map Count atomic.Int32 Register chan clientChan Unregister chan any Ticker ticker.Ticker } func NewServer() Server { e := &event{ SessionList: sync.Map{}, Count: atomic.Int32{}, Register: make(chan clientChan), Unregister: make(chan any), Ticker: ticker.New(time.Second * 3), } go e.Ticker.Process(func() { e.Broadcast("ping", fmt.Sprintf("%d", time.Now().Unix())) }) go e.listen() return e } func (stream *event) listen() { for { select { case client := <-stream.Register: stream.SessionList.Store(client.User, client.Chan) stream.Count.Add(1) case user := <-stream.Unregister: value, ok := stream.SessionList.Load(user) if ok { event := value.(chan msgChan) close(event) stream.SessionList.Delete(user) stream.Count.Add(-1) } } } } func (stream *event) HandlerFunc() mux.HandlerFunc { return func(c mux.Context) { auth := c.Auth() if auth == nil { c.Context().AbortWithStatus(http.StatusBadRequest) return } e := make(chan msgChan) client := clientChan{ User: auth, Chan: e, } stream.Register <- client defer func() { stream.Unregister <- auth }() c.Context().Writer.Header().Set("Content-Type", "text/event-stream") c.Context().Writer.Header().Set("Cache-Control", "no-cache") c.Context().Writer.Header().Set("Connection", "keep-alive") c.Context().Writer.Header().Set("Transfer-Encoding", "chunked") c.Context().Stream(func(w io.Writer) bool { if msg, ok := <-e; ok { c.Context().SSEvent(msg.Name, msg.Message) return true } return false }) } } func (stream *event) GinHandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc { return func(c *gin.Context) { user, err := auth(c) if err != nil { c.AbortWithStatus(http.StatusBadRequest) return } e := make(chan msgChan) client := clientChan{ User: user, Chan: e, } stream.Register <- client defer func() { stream.Unregister <- user }() c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Stream(func(w io.Writer) bool { if msg, ok := <-e; ok { c.SSEvent(msg.Name, msg.Message) return true } return false }) } } func (stream *event) Push(user any, name, msg string) bool { value, ok := stream.SessionList.Load(user) if ok { val := value.(chan msgChan) val <- msgChan{Name: name, Message: msg} } return false } func (stream *event) Broadcast(name, msg string) { stream.SessionList.Range(func(user, value any) bool { val := value.(chan msgChan) val <- msgChan{Name: name, Message: msg} return true }) }