From be7b2bf15ed44cbf29a1d907ecf9dad892063c27 Mon Sep 17 00:00:00 2001 From: bvbej Date: Sat, 7 Sep 2024 11:25:30 +0800 Subject: [PATCH] =?UTF-8?q?[=F0=9F=9A=80]=20sse?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/sse/server.go | 70 +++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/pkg/sse/server.go b/pkg/sse/server.go index 01e3d61..208ea4e 100644 --- a/pkg/sse/server.go +++ b/pkg/sse/server.go @@ -1,6 +1,7 @@ package sse import ( + "gitea.bvbej.com/bvbej/base-golang/pkg/mux" "github.com/gin-gonic/gin" "io" "net/http" @@ -11,13 +12,14 @@ import ( var _ Server = (*event)(nil) type Server interface { - HandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc + GinHandle(ctx *gin.Context, user any) + HandlerFunc() mux.HandlerFunc Push(user, name, msg string) bool Broadcast(name, msg string) } type clientChan struct { - User string + User any Chan chan msgChan } @@ -31,7 +33,7 @@ type event struct { Count atomic.Int32 Register chan clientChan - Unregister chan string + Unregister chan any } func NewServer() Server { @@ -39,7 +41,7 @@ func NewServer() Server { SessionList: sync.Map{}, Count: atomic.Int32{}, Register: make(chan clientChan), - Unregister: make(chan string), + Unregister: make(chan any), } go e.listen() @@ -65,38 +67,40 @@ func (stream *event) listen() { } } -func (stream *event) HandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc { - return func(c *gin.Context) { - user, err := auth(c) - if err != nil { - c.AbortWithStatus(http.StatusBadRequest) - return +func (stream *event) GinHandle(ctx *gin.Context, user any) { + if user == nil { + ctx.AbortWithStatus(http.StatusUnauthorized) + return + } + e := make(chan msgChan) + client := clientChan{ + User: user, + Chan: e, + } + stream.Register <- client + defer func() { + stream.Unregister <- user + }() + + ctx.Writer.Header().Set("Content-Type", "text/event-stream") + ctx.Writer.Header().Set("Cache-Control", "no-cache") + ctx.Writer.Header().Set("Connection", "keep-alive") + ctx.Writer.Header().Set("Transfer-Encoding", "chunked") + + ctx.Stream(func(w io.Writer) bool { + if msg, ok := <-e; ok { + ctx.SSEvent(msg.Name, msg.Message) + return true } + return false + }) - e := make(chan msgChan) - client := clientChan{ - User: user, - Chan: e, - } - stream.Register <- client - defer func() { - stream.Unregister <- user - }() + ctx.Next() +} - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.Header().Set("Cache-Control", "no-cache") - c.Writer.Header().Set("Connection", "keep-alive") - c.Writer.Header().Set("Transfer-Encoding", "chunked") - - c.Stream(func(w io.Writer) bool { - if msg, ok := <-e; ok { - c.SSEvent(msg.Name, msg.Message) - return true - } - return false - }) - - c.Next() +func (stream *event) HandlerFunc() mux.HandlerFunc { + return func(c mux.Context) { + stream.GinHandle(c.Context(), c.Auth()) } }