A matterbridge bot that detects music links (Spotify, YouTube, Apple Music, etc.) in chat messages and responds with equivalent links on other platforms. Features: - Connects to matterbridge via WebSocket API - Detects links from 7 music services (Spotify, YouTube, Apple, Deezer, etc.) - Uses idonthavespotify API for conversion (no API credentials required) - Automatic reconnection with exponential backoff - Platform setup guide for NixOS deployment Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
299 lines
6.4 KiB
Go
299 lines
6.4 KiB
Go
// Package bot handles communication with matterbridge via WebSocket.
|
|
package bot
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"musiclink/pkg/config"
|
|
)
|
|
|
|
// Message represents a matterbridge message.
|
|
type Message struct {
|
|
Text string `json:"text"`
|
|
Channel string `json:"channel"`
|
|
Username string `json:"username"`
|
|
UserID string `json:"userid"`
|
|
Avatar string `json:"avatar"`
|
|
Account string `json:"account"`
|
|
Event string `json:"event"`
|
|
Protocol string `json:"protocol"`
|
|
Gateway string `json:"gateway"`
|
|
ParentID string `json:"parent_id"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
ID string `json:"id"`
|
|
Extra map[string]interface{} `json:"extra"`
|
|
}
|
|
|
|
// Bot manages the WebSocket connection to matterbridge.
|
|
type Bot struct {
|
|
config config.MatterbridgeConfig
|
|
conn *websocket.Conn
|
|
handler MessageHandler
|
|
|
|
mu sync.Mutex
|
|
done chan struct{}
|
|
messages chan Message
|
|
}
|
|
|
|
// MessageHandler is called for each received message.
|
|
type MessageHandler func(ctx context.Context, msg Message) *Message
|
|
|
|
// New creates a new Bot instance.
|
|
func New(cfg config.MatterbridgeConfig, handler MessageHandler) *Bot {
|
|
return &Bot{
|
|
config: cfg,
|
|
handler: handler,
|
|
done: make(chan struct{}),
|
|
messages: make(chan Message, 100),
|
|
}
|
|
}
|
|
|
|
// connect establishes a WebSocket connection to matterbridge.
|
|
func (b *Bot) connect(ctx context.Context) error {
|
|
header := http.Header{}
|
|
if b.config.Token != "" {
|
|
header.Set("Authorization", "Bearer "+b.config.Token)
|
|
}
|
|
|
|
dialer := websocket.Dialer{
|
|
HandshakeTimeout: 10 * time.Second,
|
|
}
|
|
|
|
conn, _, err := dialer.DialContext(ctx, b.config.URL, header)
|
|
if err != nil {
|
|
return fmt.Errorf("connecting to matterbridge: %w", err)
|
|
}
|
|
|
|
b.mu.Lock()
|
|
b.conn = conn
|
|
b.mu.Unlock()
|
|
|
|
log.Printf("Connected to matterbridge at %s", b.config.URL)
|
|
return nil
|
|
}
|
|
|
|
// Run connects to matterbridge and starts the message processing loop.
|
|
// It automatically reconnects on connection failures.
|
|
func (b *Bot) Run(ctx context.Context) error {
|
|
backoff := time.Second
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
// Connect (or reconnect)
|
|
if err := b.connect(ctx); err != nil {
|
|
log.Printf("Connection failed: %v (retrying in %v)", err, backoff)
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(backoff):
|
|
// Exponential backoff, max 30 seconds
|
|
backoff = min(backoff*2, 30*time.Second)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Reset backoff on successful connection
|
|
backoff = time.Second
|
|
|
|
// Run the message loop until disconnection
|
|
err := b.runLoop(ctx)
|
|
if err == context.Canceled {
|
|
return err
|
|
}
|
|
|
|
// Connection lost, will reconnect
|
|
log.Printf("Connection lost: %v (reconnecting...)", err)
|
|
b.closeConn()
|
|
}
|
|
}
|
|
|
|
// runLoop processes messages until the connection is lost or context is canceled.
|
|
func (b *Bot) runLoop(ctx context.Context) error {
|
|
// Channel to signal read loop exit
|
|
readDone := make(chan error, 1)
|
|
|
|
// Start reader goroutine
|
|
go func() {
|
|
readDone <- b.readLoop(ctx)
|
|
}()
|
|
|
|
// Start ping goroutine
|
|
go b.pingLoop(ctx)
|
|
|
|
// Process messages
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-readDone:
|
|
return err
|
|
case msg := <-b.messages:
|
|
// Skip our own messages
|
|
if msg.Username == b.config.Username {
|
|
continue
|
|
}
|
|
|
|
// Skip events that aren't regular messages
|
|
if msg.Event != "" && msg.Event != "api_connected" {
|
|
continue
|
|
}
|
|
|
|
// Handle the message
|
|
response := b.handler(ctx, msg)
|
|
if response != nil {
|
|
if err := b.Send(*response); err != nil {
|
|
log.Printf("Error sending response: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// readLoop reads messages from the WebSocket connection.
|
|
func (b *Bot) readLoop(ctx context.Context) error {
|
|
for {
|
|
// Check if context is done before blocking on read
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
|
|
b.mu.Lock()
|
|
conn := b.conn
|
|
b.mu.Unlock()
|
|
|
|
if conn == nil {
|
|
return fmt.Errorf("connection closed")
|
|
}
|
|
|
|
// Set read deadline so we can check context periodically
|
|
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
|
|
|
|
_, data, err := conn.ReadMessage()
|
|
if err != nil {
|
|
// Timeout is expected, check context and continue
|
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
return nil
|
|
}
|
|
if isTimeout(err) {
|
|
continue
|
|
}
|
|
return fmt.Errorf("read error: %w", err)
|
|
}
|
|
|
|
var msg Message
|
|
if err := json.Unmarshal(data, &msg); err != nil {
|
|
log.Printf("Error parsing message: %v", err)
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case b.messages <- msg:
|
|
default:
|
|
log.Printf("Message queue full, dropping message")
|
|
}
|
|
}
|
|
}
|
|
|
|
// pingLoop sends periodic pings to keep the connection alive.
|
|
func (b *Bot) pingLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
b.mu.Lock()
|
|
conn := b.conn
|
|
b.mu.Unlock()
|
|
|
|
if conn == nil {
|
|
return
|
|
}
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
|
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
log.Printf("Ping failed: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send sends a message to matterbridge.
|
|
func (b *Bot) Send(msg Message) error {
|
|
// Set required fields
|
|
msg.Gateway = b.config.Gateway
|
|
msg.Username = b.config.Username
|
|
if b.config.Avatar != "" {
|
|
msg.Avatar = b.config.Avatar
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if b.conn == nil {
|
|
return fmt.Errorf("not connected")
|
|
}
|
|
|
|
b.conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
|
|
return b.conn.WriteMessage(websocket.TextMessage, data)
|
|
}
|
|
|
|
// closeConn closes the current connection.
|
|
func (b *Bot) closeConn() {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if b.conn != nil {
|
|
b.conn.Close()
|
|
b.conn = nil
|
|
}
|
|
}
|
|
|
|
// Close closes the WebSocket connection.
|
|
func (b *Bot) Close() error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
if b.conn != nil {
|
|
err := b.conn.Close()
|
|
b.conn = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// isTimeout checks if an error is a timeout error.
|
|
func isTimeout(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
// Check for net.Error timeout
|
|
if netErr, ok := err.(interface{ Timeout() bool }); ok {
|
|
return netErr.Timeout()
|
|
}
|
|
return false
|
|
}
|