diff --git a/pkg/router/sse.go b/pkg/broker/broker.go similarity index 82% rename from pkg/router/sse.go rename to pkg/broker/broker.go index 29ef8e0..3675b90 100644 --- a/pkg/router/sse.go +++ b/pkg/broker/broker.go @@ -1,4 +1,4 @@ -package router +package broker import ( "encoding/json" @@ -33,7 +33,9 @@ type Broker struct { // messages chan Message - handler *Handler + // This function is called every time a new clients connects. + // Its output is sent to the client as initial connect message. + init func(c echo.Context) (interface{}, error) } type MessageKind string @@ -50,12 +52,15 @@ type Message struct { Data interface{} `json:"data"` } -func NewBroker() *Broker { - return &Broker{ +var broker *Broker + +func Init(init func(c echo.Context) (interface{}, error)) { + broker = &Broker{ clients: make(map[chan Message]bool), newClients: make(chan (chan Message)), defunctClients: make(chan (chan Message)), messages: make(chan Message), + init: init, } } @@ -63,10 +68,10 @@ func NewBroker() *Broker { // the addition & removal of clients, as well as the broadcasting // of messages out to clients that are currently attached. // -func (b *Broker) Start() { +func Start() { - if b.handler == nil { - panic("handler must not be nil!") + if broker.init == nil { + panic("Init function must not be nil!") } // Start a goroutine @@ -81,37 +86,37 @@ func (b *Broker) Start() { // three following channels. select { - case s := <-b.newClients: + case s := <-broker.newClients: // There is a new client attached and we // want to start sending them messages. - b.clients[s] = true + broker.clients[s] = true log.Debug("Added new client") - case s := <-b.defunctClients: + case s := <-broker.defunctClients: // A client has dettached and we want to // stop sending them messages. - delete(b.clients, s) + delete(broker.clients, s) close(s) log.Debug("Removed client") - case msg := <-b.messages: + case msg := <-broker.messages: // There is a new message to send. For each // attached client, push the new message // into the client's message channel. - for s := range b.clients { + for s := range broker.clients { s <- msg } - log.Printf("Broadcast message to %d clients", len(b.clients)) + log.Printf("Broadcast message to %d clients", len(broker.clients)) } } }() } -func (b *Broker) Serve(c echo.Context) error { +func Serve(c echo.Context) error { rw := c.Response().Writer @@ -128,7 +133,7 @@ func (b *Broker) Serve(c echo.Context) error { // Add this client to the map of those that should // receive updates - b.newClients <- messageChan + broker.newClients <- messageChan // Listen to the closing of the http connection via the CloseNotifier // FIXME: use Done() @@ -137,7 +142,7 @@ func (b *Broker) Serve(c echo.Context) error { <-notify // Remove this client from the map of attached clients // when `EventHandler` exits. - b.defunctClients <- messageChan + broker.defunctClients <- messageChan log.Debug("HTTP connection closed.") }() @@ -148,7 +153,7 @@ func (b *Broker) Serve(c echo.Context) error { rw.Header().Set("Transfer-Encoding", "chunked") // Push the initial list to the user - data, err := b.handler.readAll(c) + data, err := broker.init(c) if err != nil { log.Errorf("Error getting initial data: %s", err) } @@ -178,6 +183,10 @@ func (b *Broker) Serve(c echo.Context) error { return nil } +func SendMessage(m Message) { + broker.messages <- m +} + func sendJSONMessage(m *Message, f http.Flusher, rw http.ResponseWriter) { jsonMessage, err := json.Marshal(m.Data) if err != nil { diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 05d4723..903d893 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -1,6 +1,7 @@ package router import ( + "git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/broker" "net/http" "strconv" @@ -11,7 +12,7 @@ import ( type Handler struct { str func() models.Managable - broker *Broker + broker *broker.Broker } type UpdatedMessage struct { @@ -55,10 +56,10 @@ func (h *Handler) Create(c echo.Context) error { } // Notify the broker - h.broker.messages <- Message{ - Kind: KindCreate, + broker.SendMessage(broker.Message{ + Kind: broker.KindCreate, Data: str, - } + }) return c.JSON(http.StatusOK, "success") } @@ -75,10 +76,10 @@ func (h *Handler) Delete(c echo.Context) error { } // Notify the broker - h.broker.messages <- Message{ - Kind: KindDelete, + broker.SendMessage(broker.Message{ + Kind: broker.KindDelete, Data: str, - } + }) return c.JSON(http.StatusOK, "success") } @@ -97,10 +98,10 @@ func (h *Handler) Update(c echo.Context) error { } // Notify the broker - h.broker.messages <- Message{ - Kind: KindUpdate, + broker.SendMessage(broker.Message{ + Kind: broker.KindUpdate, Data: str, - } + }) return c.JSON(http.StatusOK, UpdatedMessage{ Message: "success", diff --git a/pkg/router/router.go b/pkg/router/router.go index cc3c4b4..5046695 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -1,6 +1,7 @@ package router import ( + "git.kolaente.de/konrad/Konfi-Castle-Kasino/pkg/broker" "html/template" "net/http" @@ -65,11 +66,9 @@ func RegisterRoutes(e *echo.Echo) { } // Fancy message broker with SSE - b := NewBroker() - handler.broker = b - b.handler = &handler - b.Start() - e.GET("/events", b.Serve) + broker.Init(handler.readAll) + broker.Start() + e.GET("/events", broker.Serve) e.GET("/list", handler.ReadAll) // Routes with auth