Working new event system

This commit is contained in:
Adrien Bouvais 2024-05-17 11:25:26 +02:00
parent 19ed54fbf3
commit e896225a4b
3 changed files with 79 additions and 275 deletions

View File

@ -170,20 +170,20 @@ func GenerateMultipleMessagesHandler(c *fiber.Ctx) error {
outIcon := `<img src="` + selectedLLMs[idx].Model.Company.Icon + `" alt="User Image">`
go sseChanel.SendEvent(
"swapContent-"+fmt.Sprintf("%d", message.Area.Position),
outContent,
)
go sseChanel.SendEvent(
"swapSelectionBtn-"+selectedLLMs[idx].ID.String(),
outBtn,
)
go sseChanel.SendEvent(
"swapIcon-"+fmt.Sprintf("%d", message.Area.Position),
outIcon,
)
go func() {
sendEvent(
"swapContent-"+fmt.Sprintf("%d", message.Area.Position),
outContent,
)
sendEvent(
"swapSelectionBtn-"+selectedLLMs[idx].ID.String(),
outBtn,
)
sendEvent(
"swapIcon-"+fmt.Sprintf("%d", message.Area.Position),
outIcon,
)
}()
default:
out, err := selectBtnTmpl.Execute(map[string]interface{}{
"message": templateMessage,
@ -195,15 +195,15 @@ func GenerateMultipleMessagesHandler(c *fiber.Ctx) error {
}
// Replace newline characters to prevent premature termination
out = strings.ReplaceAll(out, "\n", "")
fmt.Println("Sending event: swapSelectionBtn-" + templateMessage.ModelID)
outBtn := strings.ReplaceAll(out, "\n", "")
// Send Content event
sseChanel.SendEvent(
"swapSelectionBtn-"+templateMessage.ModelID,
out,
)
go func() {
sendEvent(
"swapSelectionBtn-"+selectedLLMs[idx].ID.String(),
outBtn,
)
}()
}
}
}()

61
main.go
View File

@ -1,7 +1,9 @@
package main
import (
"bufio"
"fmt"
"sync"
"github.com/flosch/pongo2"
"github.com/gofiber/fiber/v2"
@ -12,7 +14,29 @@ import (
var userTmpl *pongo2.Template
var botTmpl *pongo2.Template
var selectBtnTmpl *pongo2.Template
var sseChanel *FiberSSEChannel
// SSE event structure
type SSE struct {
Event string
Data string
}
// Global channel and mutex
var (
clients = make(map[chan SSE]bool)
mu sync.Mutex
)
// Function to send events to all clients
func sendEvent(event, data string) {
fmt.Println("Sending event: " + event)
mu.Lock()
defer mu.Unlock()
for client := range clients {
client <- SSE{Event: event, Data: data}
}
}
func main() {
botTmpl = pongo2.Must(pongo2.FromFile("views/partials/message-bot.html"))
@ -28,8 +52,6 @@ func main() {
AppName: "JADE 2.0",
EnablePrintRoutes: true,
})
sse := NewSSE(app, "/sse")
sseChanel = sse.CreateChannel("sse", "")
// Add default logger
app.Use(logger.New())
@ -74,6 +96,39 @@ func main() {
return c.SendString("")
})
app.Get("/sse", func(c *fiber.Ctx) error {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
events := make(chan SSE)
mu.Lock()
clients[events] = true
mu.Unlock()
// Create a context copy to use in the goroutine
ctx := c.Context()
go func() {
<-ctx.Done()
mu.Lock()
delete(clients, events)
mu.Unlock()
close(events)
}()
c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
for event := range events {
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.Event, event.Data); err != nil {
return
}
w.Flush()
}
})
return nil
})
// Start server
app.Listen(":8080")
}

251
sse.go
View File

@ -1,251 +0,0 @@
package main
import (
"bufio"
"fmt"
"time"
"github.com/gofiber/fiber/v2"
)
// SSEEvent is a struct that represents an SSE event
type FiberSSEEvent struct {
Timestamp time.Time `json:"timestamp"`
ID string `json:"id"`
Event string `json:"event"`
Data string `json:"data"`
Retry string `json:"retry"`
OnChannel *FiberSSEChannel
}
// # TypeDef
//
// Handler for channel events
type FiberSSEEventHandler func(ctx *fiber.Ctx, sseChannel *FiberSSEChannel)
// # TypeDef
//
// Handler for specific events on a channel
type FiberSSEOnEventHandler func(ctx *fiber.Ctx, sseChannel *FiberSSEChannel, sseEvent *FiberSSEEvent)
type FiberSSEEvents interface {
OnConnect(handlers ...FiberSSEEventHandler)
OnDisconnect(handlers ...FiberSSEEventHandler)
OnEvent(eventName string, handlers ...FiberSSEOnEventHandler)
FireOnEventHandlers(fiberCtx *fiber.Ctx, event string)
}
/*
A channel with a name, and a sub-base-path
*/
type FiberSSEChannel struct {
FiberSSEEvents
Name string
Base string
Events chan *FiberSSEEvent
ParentSSEApp *FiberSSEApp
Handlers map[string]([]FiberSSEEventHandler)
EventHandlers map[string]([]FiberSSEOnEventHandler)
}
type FiberSSEHandler func(c *fiber.Ctx, w *bufio.Writer) error
/*
The SSE Information Structure includes a list of channels and the fiber application
*/
type FiberSSEApp struct {
IFiberSSEApp
Base string
Router *fiber.Router
Channels map[string]*FiberSSEChannel
FiberApp *fiber.App
}
// FiberSSEApp Interface
type IFiberSSEApp interface {
ServeHTTP(ctx *fiber.Ctx) error
CreateChannel(name, base string) *FiberSSEChannel
ListChannels() map[string]*FiberSSEChannel
GetChannel(name string) *FiberSSEChannel
}
/*
New initializes a base SSE route group at `base`.
The base route is the base path for all channels.
The channels parameter is a list of channels that will be created.
Each channel has a name, a base route, and a channel for sending events.
// Create a new SSE app
app := fiber.New()
// Create a new SSE app on the fiber app
sseApp := ssefiber.New(app, "/sse")
// Add a channel to the SSE app
testChan := sseApp.CreateChannel("test", "/test") // Channel at /sse/test
// Events Channel
eventsChan := testChan.Events
*/
func NewSSE(app *fiber.App, base string) *FiberSSEApp {
// Add the base route
fiberRouter := app.Group(base, func(c *fiber.Ctx) error {
// Set the headers for SSE
c.Set("Cache-Control", "no-cache")
c.Set("Content-Type", "text/event-stream")
c.Set("Connection", "keep-alive")
c.Set("Access-Control-Allow-Origin", "*")
return c.Next()
})
// Create a new SSE App
newFSSEApp := &FiberSSEApp{
Base: base,
Router: &fiberRouter,
FiberApp: app,
Channels: make(map[string]*FiberSSEChannel),
}
return newFSSEApp
}
/*
CreateChannel creates a new channel with the given name and base path.
Functions as a shortcut for making a new chan each time
Example:
app := fiber.New()
sseApp := ssefiber.New(app, "/sse")
chanOne := sseApp.CreateChannel("Channel One", "/one")
chanTwo := sseApp.CreateChannel("Channel Two", "/two")
*/
func (app *FiberSSEApp) CreateChannel(name, base string) *FiberSSEChannel {
newChannel := &FiberSSEChannel{
Name: name,
Base: base,
Events: make(chan *FiberSSEEvent, 100),
ParentSSEApp: app,
Handlers: make(map[string][]FiberSSEEventHandler),
EventHandlers: make(map[string][]FiberSSEOnEventHandler),
}
app.Channels[name] = newChannel
// Add the sub-route for the channel
(*app.Router).Get(newChannel.Base, newChannel.ServeHTTP)
return newChannel
}
// ListChannels returns a list of all the channels and prints them to the console
func (app *FiberSSEApp) ListChannels() map[string]*FiberSSEChannel {
fmt.Println("Listing Channels...")
for _, channel := range app.Channels {
channel.Print()
}
return app.Channels
}
/*
Create an event and send it to the channel.
*/
func (channel *FiberSSEChannel) SendEvent(event, data string) {
sseEvent := &FiberSSEEvent{
Timestamp: time.Now(),
Event: event,
Data: data,
OnChannel: channel,
}
channel.Events <- sseEvent
}
// Flush the event to the writer `w` - formats according to SSE standard
func (e *FiberSSEEvent) Flush(w *bufio.Writer) error {
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", e.Event, e.Data)
return w.Flush()
}
// Prints the channel information to the console
func (c *FiberSSEChannel) Print() {
fmt.Printf("==CHANNEL CREATED==\nName: %s\nRoute Endpoint: %s\n===================", c.Name, c.ParentSSEApp.Base+c.Base)
}
// # Internal Method
//
// ServeHTTP returns a fiber.Handler for the channel.
//
// Use `sseApp.CreateChannel` to create a new channel.
func (fChan *FiberSSEChannel) ServeHTTP(c *fiber.Ctx) error {
c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
// Fire OnConnect Event Handlers
go fChan.FireHandlers(c, "connect")
for {
event, more := <-fChan.Events
fmt.Println("event: ", string(event.Event), "\ndata: \n\n", string(event.Data))
// w.Flush()
go event.FireEventHandlers(c)
if err := event.Flush(w); err != nil {
go fChan.FireHandlers(c, "disconnect")
return
}
if !more {
// Fire OnDisconnect Event Handlers
go fChan.FireHandlers(c, "disconnect")
return
}
}
})
return nil
}
// Cleanup removes all of the channels from the app. Should be used as a defer
func (sseApp *FiberSSEApp) Cleanup() {
for _, channel := range sseApp.Channels {
close(channel.Events)
}
fmt.Println("All Channels Closed - Cleanup Successful")
}
// Fire the handlers for a given channel event (connect, disconnect)
func (channel *FiberSSEChannel) FireHandlers(fiberCtx *fiber.Ctx, event string) {
for _, handler := range channel.Handlers[event] {
handler(fiberCtx, channel)
}
}
// Fire the handlers for this event
func (e *FiberSSEEvent) FireEventHandlers(fiberCtx *fiber.Ctx) {
channel := e.OnChannel
for _, handler := range channel.EventHandlers[e.Event] {
fmt.Println("Firing Event Handlers on Channel: " + channel.Name + " - Event: " + e.Event)
handler(fiberCtx, channel, e)
}
}
// Adds the handlers to the channel for the connect method
func (channel *FiberSSEChannel) OnConnect(handlers ...FiberSSEEventHandler) {
channel.Handlers["connect"] = []FiberSSEEventHandler{}
channel.Handlers["connect"] = append(channel.Handlers["connect"], handlers...)
}
// Adds the handlers to the channel for the disconnect method
func (channel *FiberSSEChannel) OnDisconnect(handlers ...FiberSSEEventHandler) {
channel.Handlers["disconnect"] = []FiberSSEEventHandler{}
channel.Handlers["disconnect"] = append(channel.Handlers["disconnect"], handlers...)
}
// Add handlers for the any given event
//
// Example:
//
// channelOne.OnEvent("test", ...) // Fires anytime the event "test" is fired
func (channel *FiberSSEChannel) OnEvent(eventName string, handlers ...FiberSSEOnEventHandler) {
channel.EventHandlers[eventName] = []FiberSSEOnEventHandler{}
channel.EventHandlers[eventName] = append(channel.EventHandlers[eventName], handlers...)
}
// Returns a channel by name
func (app *FiberSSEApp) GetChannel(name string) *FiberSSEChannel {
findChan := app.Channels[name]
return findChan
}