musiclink/internal/bot/bot.go

290 lines
6.4 KiB
Go

// Package bot handles communication with matterbridge via WebSocket.
package bot
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"musiclink/pkg/config"
)
// Message represents a matterbridge message.
type Message struct {
Text string `json:"text"`
Channel string `json:"channel"`
Username string `json:"username"`
UserID string `json:"userid"`
Avatar string `json:"avatar"`
Account string `json:"account"`
Event string `json:"event"`
Protocol string `json:"protocol"`
Gateway string `json:"gateway"`
ParentID string `json:"parent_id"`
Timestamp time.Time `json:"timestamp"`
ID string `json:"id"`
Extra map[string]interface{} `json:"extra"`
}
// Bot manages the WebSocket connection to matterbridge.
type Bot struct {
config config.MatterbridgeConfig
conn *websocket.Conn
handler MessageHandler
mu sync.Mutex
done chan struct{}
messages chan Message
}
// MessageHandler is called for each received message.
type MessageHandler func(ctx context.Context, msg Message) *Message
// New creates a new Bot instance.
func New(cfg config.MatterbridgeConfig, handler MessageHandler) *Bot {
return &Bot{
config: cfg,
handler: handler,
done: make(chan struct{}),
messages: make(chan Message, 100),
}
}
// connect establishes a WebSocket connection to matterbridge.
func (b *Bot) connect(ctx context.Context) error {
header := http.Header{}
if b.config.Token != "" {
header.Set("Authorization", "Bearer "+b.config.Token)
}
dialer := websocket.Dialer{
HandshakeTimeout: 10 * time.Second,
}
conn, _, err := dialer.DialContext(ctx, b.config.URL, header)
if err != nil {
return fmt.Errorf("connecting to matterbridge: %w", err)
}
// Increase read limit to 10MB to handle potentially huge Matrix messages with previews
conn.SetReadLimit(10 * 1024 * 1024)
b.mu.Lock()
b.conn = conn
b.mu.Unlock()
log.Printf("Connected to matterbridge at %s", b.config.URL)
return nil
}
// Run connects to matterbridge and starts the message processing loop.
// It automatically reconnects on connection failures.
func (b *Bot) Run(ctx context.Context) error {
backoff := time.Second
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Connect (or reconnect)
if err := b.connect(ctx); err != nil {
log.Printf("Connection failed: %v (retrying in %v)", err, backoff)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(backoff):
// Exponential backoff, max 30 seconds
backoff = min(backoff*2, 30*time.Second)
continue
}
}
// Reset backoff on successful connection
backoff = time.Second
// Run the message loop until disconnection
err := b.runLoop(ctx)
// Connection lost, will reconnect
if err != nil && err != context.Canceled {
log.Printf("Connection lost: %v (reconnecting...)", err)
}
b.closeConn()
if err == context.Canceled || (ctx.Err() != nil) {
return ctx.Err()
}
}
}
// runLoop processes messages until the connection is lost or context is canceled.
func (b *Bot) runLoop(ctx context.Context) error {
b.mu.Lock()
conn := b.conn
b.mu.Unlock()
if conn == nil {
return fmt.Errorf("connection is nil")
}
// Create a sub-context for this specific connection's goroutines
connCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Channel to signal read loop exit
readDone := make(chan error, 1)
// Start reader goroutine
go func() {
readDone <- b.readLoop(connCtx, conn)
}()
// Start ping goroutine
go b.pingLoop(connCtx, conn)
// Process messages
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-readDone:
return err
case msg := <-b.messages:
// Skip our own messages
if msg.Username == b.config.Username {
continue
}
// Skip events that aren't regular messages
if msg.Event != "" && msg.Event != "api_connected" {
continue
}
// Handle the message
response := b.handler(ctx, msg)
if response != nil {
if err := b.Send(*response); err != nil {
log.Printf("Error sending response: %v", err)
}
}
}
}
}
// readLoop reads messages from the WebSocket connection.
func (b *Bot) readLoop(ctx context.Context, conn *websocket.Conn) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
_, data, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return nil
}
return fmt.Errorf("read error: %w", err)
}
if len(data) > 1024 {
log.Printf("Received large message: %d bytes", len(data))
}
var msg Message
if err := json.Unmarshal(data, &msg); err != nil {
log.Printf("Error parsing message: %v", err)
continue
}
select {
case b.messages <- msg:
case <-ctx.Done():
return ctx.Err()
default:
log.Printf("Message queue full, dropping message")
}
}
}
// pingLoop sends periodic pings to keep the connection alive.
func (b *Bot) pingLoop(ctx context.Context, conn *websocket.Conn) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Printf("Ping failed: %v", err)
b.closeConn()
return
}
}
}
}
// Send sends a message to matterbridge.
func (b *Bot) Send(msg Message) error {
// Set required fields
msg.Gateway = b.config.Gateway
msg.Username = b.config.Username
if b.config.Avatar != "" {
msg.Avatar = b.config.Avatar
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
log.Printf("Sending message payload size: %d bytes", len(data))
b.mu.Lock()
defer b.mu.Unlock()
if b.conn == nil {
return fmt.Errorf("not connected")
}
b.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
return b.conn.WriteMessage(websocket.TextMessage, data)
}
// closeConn closes the current connection.
func (b *Bot) closeConn() {
b.mu.Lock()
defer b.mu.Unlock()
if b.conn != nil {
b.conn.Close()
b.conn = nil
}
}
// Close closes the WebSocket connection.
func (b *Bot) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.conn != nil {
err := b.conn.Close()
b.conn = nil
return err
}
return nil
}