// Package bot handles communication with matterbridge via WebSocket. package bot import ( "context" "encoding/json" "fmt" "log" "net/http" "sync" "time" "github.com/gorilla/websocket" "musiclink/pkg/config" ) // Message represents a matterbridge message. type Message struct { Text string `json:"text"` Channel string `json:"channel"` Username string `json:"username"` UserID string `json:"userid"` Avatar string `json:"avatar"` Account string `json:"account"` Event string `json:"event"` Protocol string `json:"protocol"` Gateway string `json:"gateway"` ParentID string `json:"parent_id"` Timestamp time.Time `json:"timestamp"` ID string `json:"id"` Extra map[string]interface{} `json:"extra"` } // Bot manages the WebSocket connection to matterbridge. type Bot struct { config config.MatterbridgeConfig conn *websocket.Conn handler MessageHandler mu sync.Mutex done chan struct{} messages chan Message } // MessageHandler is called for each received message. type MessageHandler func(ctx context.Context, msg Message) *Message // New creates a new Bot instance. func New(cfg config.MatterbridgeConfig, handler MessageHandler) *Bot { return &Bot{ config: cfg, handler: handler, done: make(chan struct{}), messages: make(chan Message, 100), } } // connect establishes a WebSocket connection to matterbridge. func (b *Bot) connect(ctx context.Context) error { header := http.Header{} if b.config.Token != "" { header.Set("Authorization", "Bearer "+b.config.Token) } dialer := websocket.Dialer{ HandshakeTimeout: 10 * time.Second, } conn, _, err := dialer.DialContext(ctx, b.config.URL, header) if err != nil { return fmt.Errorf("connecting to matterbridge: %w", err) } b.mu.Lock() b.conn = conn b.mu.Unlock() log.Printf("Connected to matterbridge at %s", b.config.URL) return nil } // Run connects to matterbridge and starts the message processing loop. // It automatically reconnects on connection failures. func (b *Bot) Run(ctx context.Context) error { backoff := time.Second for { select { case <-ctx.Done(): return ctx.Err() default: } // Connect (or reconnect) if err := b.connect(ctx); err != nil { log.Printf("Connection failed: %v (retrying in %v)", err, backoff) select { case <-ctx.Done(): return ctx.Err() case <-time.After(backoff): // Exponential backoff, max 30 seconds backoff = min(backoff*2, 30*time.Second) continue } } // Reset backoff on successful connection backoff = time.Second // Run the message loop until disconnection err := b.runLoop(ctx) // Connection lost, will reconnect if err != nil && err != context.Canceled { log.Printf("Connection lost: %v (reconnecting...)", err) } b.closeConn() if err == context.Canceled || (ctx.Err() != nil) { return ctx.Err() } } } // runLoop processes messages until the connection is lost or context is canceled. func (b *Bot) runLoop(ctx context.Context) error { b.mu.Lock() conn := b.conn b.mu.Unlock() if conn == nil { return fmt.Errorf("connection is nil") } // Create a sub-context for this specific connection's goroutines connCtx, cancel := context.WithCancel(ctx) defer cancel() // Channel to signal read loop exit readDone := make(chan error, 1) // Start reader goroutine go func() { readDone <- b.readLoop(connCtx, conn) }() // Start ping goroutine go b.pingLoop(connCtx, conn) // Process messages for { select { case <-ctx.Done(): return ctx.Err() case err := <-readDone: return err case msg := <-b.messages: // Skip our own messages if msg.Username == b.config.Username { continue } // Skip events that aren't regular messages if msg.Event != "" && msg.Event != "api_connected" { continue } // Handle the message response := b.handler(ctx, msg) if response != nil { if err := b.Send(*response); err != nil { log.Printf("Error sending response: %v", err) } } } } } // readLoop reads messages from the WebSocket connection. func (b *Bot) readLoop(ctx context.Context, conn *websocket.Conn) error { for { select { case <-ctx.Done(): return ctx.Err() default: } _, data, err := conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure) { return nil } return fmt.Errorf("read error: %w", err) } var msg Message if err := json.Unmarshal(data, &msg); err != nil { log.Printf("Error parsing message: %v", err) continue } select { case b.messages <- msg: case <-ctx.Done(): return ctx.Err() default: log.Printf("Message queue full, dropping message") } } } // pingLoop sends periodic pings to keep the connection alive. func (b *Bot) pingLoop(ctx context.Context, conn *websocket.Conn) { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { log.Printf("Ping failed: %v", err) b.closeConn() return } } } } // Send sends a message to matterbridge. func (b *Bot) Send(msg Message) error { // Set required fields msg.Gateway = b.config.Gateway msg.Username = b.config.Username if b.config.Avatar != "" { msg.Avatar = b.config.Avatar } data, err := json.Marshal(msg) if err != nil { return err } b.mu.Lock() defer b.mu.Unlock() if b.conn == nil { return fmt.Errorf("not connected") } b.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) return b.conn.WriteMessage(websocket.TextMessage, data) } // closeConn closes the current connection. func (b *Bot) closeConn() { b.mu.Lock() defer b.mu.Unlock() if b.conn != nil { b.conn.Close() b.conn = nil } } // Close closes the WebSocket connection. func (b *Bot) Close() error { b.mu.Lock() defer b.mu.Unlock() if b.conn != nil { err := b.conn.Close() b.conn = nil return err } return nil }