diff --git a/Request.go b/Request.go index e9b6f87..311eabf 100644 --- a/Request.go +++ b/Request.go @@ -170,20 +170,20 @@ func GenerateMultipleMessagesHandler(c *fiber.Ctx) error { outIcon := `User Image` - go sseChanel.SendEvent( - "swapContent-"+fmt.Sprintf("%d", message.Area.Position), - outContent, - ) - - go sseChanel.SendEvent( - "swapSelectionBtn-"+selectedLLMs[idx].ID.String(), - outBtn, - ) - - go sseChanel.SendEvent( - "swapIcon-"+fmt.Sprintf("%d", message.Area.Position), - outIcon, - ) + go func() { + sendEvent( + "swapContent-"+fmt.Sprintf("%d", message.Area.Position), + outContent, + ) + sendEvent( + "swapSelectionBtn-"+selectedLLMs[idx].ID.String(), + outBtn, + ) + sendEvent( + "swapIcon-"+fmt.Sprintf("%d", message.Area.Position), + outIcon, + ) + }() default: out, err := selectBtnTmpl.Execute(map[string]interface{}{ "message": templateMessage, @@ -195,15 +195,15 @@ func GenerateMultipleMessagesHandler(c *fiber.Ctx) error { } // Replace newline characters to prevent premature termination - out = strings.ReplaceAll(out, "\n", "") - - fmt.Println("Sending event: swapSelectionBtn-" + templateMessage.ModelID) + outBtn := strings.ReplaceAll(out, "\n", "") // Send Content event - sseChanel.SendEvent( - "swapSelectionBtn-"+templateMessage.ModelID, - out, - ) + go func() { + sendEvent( + "swapSelectionBtn-"+selectedLLMs[idx].ID.String(), + outBtn, + ) + }() } } }() diff --git a/main.go b/main.go index e3a5554..f0e8c09 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,9 @@ package main import ( + "bufio" "fmt" + "sync" "github.com/flosch/pongo2" "github.com/gofiber/fiber/v2" @@ -12,7 +14,29 @@ import ( var userTmpl *pongo2.Template var botTmpl *pongo2.Template var selectBtnTmpl *pongo2.Template -var sseChanel *FiberSSEChannel + +// SSE event structure +type SSE struct { + Event string + Data string +} + +// Global channel and mutex +var ( + clients = make(map[chan SSE]bool) + mu sync.Mutex +) + +// Function to send events to all clients +func sendEvent(event, data string) { + fmt.Println("Sending event: " + event) + mu.Lock() + defer mu.Unlock() + + for client := range clients { + client <- SSE{Event: event, Data: data} + } +} func main() { botTmpl = pongo2.Must(pongo2.FromFile("views/partials/message-bot.html")) @@ -28,8 +52,6 @@ func main() { AppName: "JADE 2.0", EnablePrintRoutes: true, }) - sse := NewSSE(app, "/sse") - sseChanel = sse.CreateChannel("sse", "") // Add default logger app.Use(logger.New()) @@ -74,6 +96,39 @@ func main() { return c.SendString("") }) + app.Get("/sse", func(c *fiber.Ctx) error { + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + + events := make(chan SSE) + mu.Lock() + clients[events] = true + mu.Unlock() + + // Create a context copy to use in the goroutine + ctx := c.Context() + + go func() { + <-ctx.Done() + mu.Lock() + delete(clients, events) + mu.Unlock() + close(events) + }() + + c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { + for event := range events { + if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.Event, event.Data); err != nil { + return + } + w.Flush() + } + }) + + return nil + }) + // Start server app.Listen(":8080") } diff --git a/sse.go b/sse.go deleted file mode 100644 index b55947d..0000000 --- a/sse.go +++ /dev/null @@ -1,251 +0,0 @@ -package main - -import ( - "bufio" - "fmt" - "time" - - "github.com/gofiber/fiber/v2" -) - -// SSEEvent is a struct that represents an SSE event -type FiberSSEEvent struct { - Timestamp time.Time `json:"timestamp"` - ID string `json:"id"` - Event string `json:"event"` - Data string `json:"data"` - Retry string `json:"retry"` - OnChannel *FiberSSEChannel -} - -// # TypeDef -// -// Handler for channel events -type FiberSSEEventHandler func(ctx *fiber.Ctx, sseChannel *FiberSSEChannel) - -// # TypeDef -// -// Handler for specific events on a channel -type FiberSSEOnEventHandler func(ctx *fiber.Ctx, sseChannel *FiberSSEChannel, sseEvent *FiberSSEEvent) -type FiberSSEEvents interface { - OnConnect(handlers ...FiberSSEEventHandler) - OnDisconnect(handlers ...FiberSSEEventHandler) - OnEvent(eventName string, handlers ...FiberSSEOnEventHandler) - FireOnEventHandlers(fiberCtx *fiber.Ctx, event string) -} - -/* -A channel with a name, and a sub-base-path -*/ -type FiberSSEChannel struct { - FiberSSEEvents - Name string - Base string - Events chan *FiberSSEEvent - ParentSSEApp *FiberSSEApp - Handlers map[string]([]FiberSSEEventHandler) - EventHandlers map[string]([]FiberSSEOnEventHandler) -} -type FiberSSEHandler func(c *fiber.Ctx, w *bufio.Writer) error - -/* -The SSE Information Structure includes a list of channels and the fiber application -*/ -type FiberSSEApp struct { - IFiberSSEApp - Base string - Router *fiber.Router - Channels map[string]*FiberSSEChannel - FiberApp *fiber.App -} - -// FiberSSEApp Interface -type IFiberSSEApp interface { - ServeHTTP(ctx *fiber.Ctx) error - CreateChannel(name, base string) *FiberSSEChannel - ListChannels() map[string]*FiberSSEChannel - GetChannel(name string) *FiberSSEChannel -} - -/* -New initializes a base SSE route group at `base`. - -The base route is the base path for all channels. - -The channels parameter is a list of channels that will be created. -Each channel has a name, a base route, and a channel for sending events. - - // Create a new SSE app - app := fiber.New() - // Create a new SSE app on the fiber app - sseApp := ssefiber.New(app, "/sse") - // Add a channel to the SSE app - testChan := sseApp.CreateChannel("test", "/test") // Channel at /sse/test - // Events Channel - eventsChan := testChan.Events -*/ -func NewSSE(app *fiber.App, base string) *FiberSSEApp { - // Add the base route - fiberRouter := app.Group(base, func(c *fiber.Ctx) error { - // Set the headers for SSE - c.Set("Cache-Control", "no-cache") - c.Set("Content-Type", "text/event-stream") - c.Set("Connection", "keep-alive") - c.Set("Access-Control-Allow-Origin", "*") - return c.Next() - }) - - // Create a new SSE App - newFSSEApp := &FiberSSEApp{ - Base: base, - Router: &fiberRouter, - FiberApp: app, - Channels: make(map[string]*FiberSSEChannel), - } - return newFSSEApp -} - -/* -CreateChannel creates a new channel with the given name and base path. -Functions as a shortcut for making a new chan each time - -Example: - - app := fiber.New() - sseApp := ssefiber.New(app, "/sse") - chanOne := sseApp.CreateChannel("Channel One", "/one") - chanTwo := sseApp.CreateChannel("Channel Two", "/two") -*/ -func (app *FiberSSEApp) CreateChannel(name, base string) *FiberSSEChannel { - newChannel := &FiberSSEChannel{ - Name: name, - Base: base, - Events: make(chan *FiberSSEEvent, 100), - ParentSSEApp: app, - Handlers: make(map[string][]FiberSSEEventHandler), - EventHandlers: make(map[string][]FiberSSEOnEventHandler), - } - app.Channels[name] = newChannel - // Add the sub-route for the channel - (*app.Router).Get(newChannel.Base, newChannel.ServeHTTP) - return newChannel -} - -// ListChannels returns a list of all the channels and prints them to the console -func (app *FiberSSEApp) ListChannels() map[string]*FiberSSEChannel { - fmt.Println("Listing Channels...") - for _, channel := range app.Channels { - channel.Print() - } - return app.Channels -} - -/* -Create an event and send it to the channel. -*/ -func (channel *FiberSSEChannel) SendEvent(event, data string) { - sseEvent := &FiberSSEEvent{ - Timestamp: time.Now(), - Event: event, - Data: data, - OnChannel: channel, - } - channel.Events <- sseEvent -} - -// Flush the event to the writer `w` - formats according to SSE standard -func (e *FiberSSEEvent) Flush(w *bufio.Writer) error { - fmt.Fprintf(w, "event: %s\ndata: %s\n\n", e.Event, e.Data) - return w.Flush() -} - -// Prints the channel information to the console -func (c *FiberSSEChannel) Print() { - fmt.Printf("==CHANNEL CREATED==\nName: %s\nRoute Endpoint: %s\n===================", c.Name, c.ParentSSEApp.Base+c.Base) -} - -// # Internal Method -// -// ServeHTTP returns a fiber.Handler for the channel. -// -// Use `sseApp.CreateChannel` to create a new channel. -func (fChan *FiberSSEChannel) ServeHTTP(c *fiber.Ctx) error { - - c.Context().SetBodyStreamWriter(func(w *bufio.Writer) { - // Fire OnConnect Event Handlers - - go fChan.FireHandlers(c, "connect") - - for { - event, more := <-fChan.Events - fmt.Println("event: ", string(event.Event), "\ndata: \n\n", string(event.Data)) - // w.Flush() - go event.FireEventHandlers(c) - if err := event.Flush(w); err != nil { - go fChan.FireHandlers(c, "disconnect") - return - } - if !more { - // Fire OnDisconnect Event Handlers - go fChan.FireHandlers(c, "disconnect") - return - } - } - }) - - return nil - -} - -// Cleanup removes all of the channels from the app. Should be used as a defer -func (sseApp *FiberSSEApp) Cleanup() { - for _, channel := range sseApp.Channels { - close(channel.Events) - } - fmt.Println("All Channels Closed - Cleanup Successful") -} - -// Fire the handlers for a given channel event (connect, disconnect) -func (channel *FiberSSEChannel) FireHandlers(fiberCtx *fiber.Ctx, event string) { - for _, handler := range channel.Handlers[event] { - handler(fiberCtx, channel) - } -} - -// Fire the handlers for this event -func (e *FiberSSEEvent) FireEventHandlers(fiberCtx *fiber.Ctx) { - channel := e.OnChannel - for _, handler := range channel.EventHandlers[e.Event] { - fmt.Println("Firing Event Handlers on Channel: " + channel.Name + " - Event: " + e.Event) - handler(fiberCtx, channel, e) - } -} - -// Adds the handlers to the channel for the connect method -func (channel *FiberSSEChannel) OnConnect(handlers ...FiberSSEEventHandler) { - channel.Handlers["connect"] = []FiberSSEEventHandler{} - channel.Handlers["connect"] = append(channel.Handlers["connect"], handlers...) -} - -// Adds the handlers to the channel for the disconnect method -func (channel *FiberSSEChannel) OnDisconnect(handlers ...FiberSSEEventHandler) { - channel.Handlers["disconnect"] = []FiberSSEEventHandler{} - channel.Handlers["disconnect"] = append(channel.Handlers["disconnect"], handlers...) -} - -// Add handlers for the any given event -// -// Example: -// -// channelOne.OnEvent("test", ...) // Fires anytime the event "test" is fired -func (channel *FiberSSEChannel) OnEvent(eventName string, handlers ...FiberSSEOnEventHandler) { - channel.EventHandlers[eventName] = []FiberSSEOnEventHandler{} - channel.EventHandlers[eventName] = append(channel.EventHandlers[eventName], handlers...) - -} - -// Returns a channel by name -func (app *FiberSSEApp) GetChannel(name string) *FiberSSEChannel { - findChan := app.Channels[name] - return findChan -}