Skip to content

Commit 5ed08db

Browse files
committed
refactor music pkg, fixed parsers fallback
1 parent 3ffe8c9 commit 5ed08db

26 files changed

Lines changed: 450 additions & 264 deletions

File tree

internal/command/music/common/status_listener.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,25 @@ import (
1616
// Updates after the first use the guild's stored message (edit), so they work beyond token expiry.
1717
const StatusListenTimeout = 15 * time.Minute
1818

19+
func statusEmoji(status player.Status) string {
20+
switch status {
21+
case player.StatusPlaying:
22+
return "▶️"
23+
case player.StatusAdded:
24+
return "🎶"
25+
case player.StatusStopped:
26+
return "⏹"
27+
case player.StatusPaused:
28+
return "⏸"
29+
case player.StatusResumed:
30+
return "▶️"
31+
case player.StatusError:
32+
return "❌"
33+
default:
34+
return ""
35+
}
36+
}
37+
1938
func ListenPlayerStatusSlash(session *discordgo.Session, event *discordgo.InteractionCreate, p *player.Player, bot discord.VoiceAPI, guildID string, appLog zerolog.Logger) {
2039
go func() {
2140
ctx, cancel := context.WithTimeout(context.Background(), StatusListenTimeout)
@@ -52,7 +71,7 @@ func ListenPlayerStatusSlash(session *discordgo.Session, event *discordgo.Intera
5271
}
5372

5473
if err := bot.UpdatePlaybackStatus(session, event, guildID, &discordgo.MessageEmbed{
55-
Title: player.StatusPlaying.StringEmoji() + " Now Playing",
74+
Title: statusEmoji(player.StatusPlaying) + " Now Playing",
5675
Description: desc,
5776
Color: discordreply.EmbedColor,
5877
}); err != nil {
@@ -62,7 +81,7 @@ func ListenPlayerStatusSlash(session *discordgo.Session, event *discordgo.Intera
6281

6382
case player.StatusAdded:
6483
if err := bot.UpdatePlaybackStatus(session, event, guildID, &discordgo.MessageEmbed{
65-
Title: player.StatusAdded.StringEmoji() + " Track(s) Added",
84+
Title: statusEmoji(player.StatusAdded) + " Track(s) Added",
6685
Description: "Added to queue",
6786
Color: discordreply.EmbedColor,
6887
}); err != nil {
@@ -74,4 +93,3 @@ func ListenPlayerStatusSlash(session *discordgo.Session, event *discordgo.Intera
7493
}
7594
}()
7695
}
77-

pkg/music/README.md

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,40 @@
22

33
Queue-based music playback library for Go with pluggable audio sinks and track resolvers. Resolves URLs and search queries (YouTube, SoundCloud, radio), opens PCM streams via multiple parsers (yt-dlp, kkdai, ffmpeg), and plays through a sink of your choice (e.g. speaker or custom Discord voice).
44

5+
## How it works (high level)
6+
7+
At runtime the system is a pipeline:
8+
9+
- **Resolve** user input → `sources.TrackInfo` (URL, title, available parsers)
10+
- **Enqueue** resolved tracks into a FIFO queue
11+
- **Open stream** using one of the available parsers (PCM `s16le` @ 48kHz stereo)
12+
- **Stream to sink** (speaker / Discord / custom) until the track ends or fails
13+
- **Recover** when possible (parser fallback on instant-open failures; reopen on early EOF; special handling for voice transport)
14+
15+
```mermaid
16+
flowchart TD
17+
A[User input<br/>URL / search] --> B[Resolver<br/>Resolve()]
18+
B --> C[TrackInfo + AvailableParsers]
19+
C --> D[Player.Enqueue()]
20+
D --> E[Player.PlayNext()]
21+
E --> F[RecoveryStream.Open()]
22+
F --> G{Open ok?}
23+
G -- no --> F
24+
G -- yes --> H[Sink.Stream(rs)]
25+
H --> I{Read error?}
26+
I -- no --> H
27+
I -- io.EOF early --> J[RecoveryStream.reopen()]
28+
J --> F
29+
I -- instant fail (first read) --> K[Advance parserIndex]
30+
K --> F
31+
I -- voice transport error --> L[ReopenAfterTransportFailure()]
32+
L --> F
33+
I -- other error --> M[Stop track + PlayNext()]
34+
M --> E
35+
H --> N[Track ended]
36+
N --> M
37+
```
38+
539
## Install
640

741
```bash
@@ -16,15 +50,96 @@ Create a sink provider (e.g. speaker for local playback), a resolver, and a play
1650
provider := sink.NewSpeakerProvider()
1751
defer provider.Close()
1852

19-
res := resolver.New()
53+
res := resolve.New()
2054
p := player.New(provider, res)
2155

2256
// Enqueue a URL or search query, then start playback
2357
_ = p.Enqueue("https://www.youtube.com/watch?v=...", "", "")
2458
_ = p.PlayNext("") // "" for local; use voice channel ID for Discord
2559
```
2660

27-
Listen to `p.PlayerStatus` for status updates (Playing, Added, Stopped, Error). See [examples/cli_speaker](examples/cli_speaker) for a full runnable CLI.
61+
Listen to `p.PlayerStatus` for status updates (Playing, Added, Stopped, Error). See [examples/clispeaker](examples/clispeaker) for a full runnable CLI.
62+
63+
## Algorithms (by stage)
64+
65+
### 1) Resolve (input → TrackInfo)
66+
67+
Goal: convert user input into canonical metadata + a parser preference list.
68+
69+
- **Input**: URL or search query + optional `source`/`parser` hints.
70+
- **Output**: `[]sources.TrackInfo` where `TrackInfo.AvailableParsers` is ordered by preference.
71+
72+
The resolver is intentionally pluggable; the player does not care *how* a track was discovered, only that it has a URL + parsers list.
73+
74+
### 2) Enqueue (TrackInfo → queue)
75+
76+
Goal: turn `TrackInfo` into `parsers.TrackParse` and append to the FIFO queue.
77+
78+
- Tracks without `AvailableParsers` are rejected/skipped.
79+
- `CurrentParser` starts as the first entry in `AvailableParsers` (will be updated later by recovery/open logic).
80+
81+
### 3) Start playback (dequeue → open resilient stream)
82+
83+
Performed by `Player.PlayNext()`:
84+
85+
- If something is playing, stop it.
86+
- Pop the next track from the queue.
87+
- Create `stream.RecoveryStream(track)` and call `rs.Open(seek=0)`.
88+
- If open fails for all parsers, skip the track and try the next.
89+
90+
### 4) Open stream (choose parser)
91+
92+
Performed inside `RecoveryStream.Open(seek)`:
93+
94+
- Starting at `parserIndex`, iterate through `track.SourceInfo.AvailableParsers`.
95+
- For each parser:
96+
- if `retries[parser] >= maxRecoveryAttempts` → skip
97+
- try `openWithParser(track, parser, seek)`
98+
- on success:
99+
- set `parserIndex` to that parser’s index
100+
- set `track.CurrentParser = parser`
101+
- reset `firstRead = true`
102+
- store cleanup + current seek
103+
104+
### 5) Media recovery (parser/ffmpeg level)
105+
106+
Recovery is intentionally conservative to avoid false-positive “fallback” when a track naturally ends.
107+
108+
**A) Instant failure right after open**
109+
110+
If the very first `Read()` on the opened stream returns any error (including an EOF-like failure from ffmpeg), it is treated as an *instant fail*:
111+
112+
- close/cleanup current stream
113+
- `parserIndex++`
114+
- open again at the current `seekSec` using the next parser
115+
116+
This is designed for cases like “ffmpeg opened, then immediately 403/forbidden and closed stdout”.
117+
118+
**B) Early EOF (mid-track)**
119+
120+
If `Read()` returns `io.EOF` with `n==0` and the track is far from its expected duration, recovery attempts to reopen:
121+
122+
- close/cleanup current stream
123+
- reopen at the current approximate `seekSec`
124+
- retries are bounded by `maxRecoveryAttempts` per parser
125+
126+
If duration is unknown, early-EOF recovery is only attempted at the beginning (`firstRead` or `seekSec < 1.0`).
127+
128+
### 6) Sink streaming + voice transport recovery
129+
130+
The sink drives the read loop via `AudioSink.Stream(reader, stopCh)`:
131+
132+
- On normal completion: the track ends → player advances to the next track.
133+
- On `stream.ErrVoiceTransport` (Discord transport issues):
134+
- the player can invalidate/rejoin the sink (hard) or retry without rejoin (soft mode)
135+
- then calls `rs.ReopenAfterTransportFailure()` to reopen media at the current seek
136+
- On user stop/skip: playback stops cleanly.
137+
138+
## Key extension points
139+
140+
- **Custom resolver**: implement `player.Resolver` to support new sources or search.
141+
- **Custom sink**: implement `sink.AudioSink` / `sink.Provider` to support new outputs.
142+
- **New parser**: implement `parsers.Streamer` and register it in `stream.Registry`.
28143

29144
## Requirements
30145

@@ -35,7 +150,7 @@ Listen to `p.PlayerStatus` for status updates (Playing, Added, Stopped, Error).
35150
## Documentation
36151

37152
- [player](player) — Queue-based playback engine
38-
- [resolver](resolver) — Resolve URLs and search to track metadata
153+
- [resolve](resolve) — Resolve URLs and search to track metadata
39154
- [sink](sink) — Audio sink interfaces and speaker implementation
40155
- [sources](sources) — Source interface and track types
41156
- [parsers](parsers) — Streamer interface and track type

pkg/music/parsers/ffmpeg/link.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ func ffmpegLink(url string) (io.ReadCloser, func(), error) {
2828
return nil, nil, fmt.Errorf("command start error: %w", err)
2929
}
3030

31+
pr := NewProcessReadCloser(cmd, reader)
3132
cleanup := func() {
3233
_ = cmd.Process.Kill()
33-
_ = cmd.Wait()
34+
_ = pr.WaitErr()
3435
}
3536

36-
return reader, cleanup, nil
37+
return pr, cleanup, nil
3738
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package ffmpeg
2+
3+
import (
4+
"io"
5+
"os/exec"
6+
"sync"
7+
)
8+
9+
// ProcessReadCloser wraps a process stdout pipe and turns a "silent" io.EOF into
10+
// a process error when the command exits non-zero.
11+
//
12+
// Motivation: ffmpeg often fails immediately (e.g. 403) and closes stdout, so the
13+
// first Read() returns io.EOF. Callers must see an error to trigger recovery.
14+
type ProcessReadCloser struct {
15+
r io.ReadCloser
16+
cmd *exec.Cmd
17+
18+
waitOnce sync.Once
19+
waitErr error
20+
done chan struct{}
21+
}
22+
23+
func NewProcessReadCloser(cmd *exec.Cmd, stdout io.ReadCloser) *ProcessReadCloser {
24+
p := &ProcessReadCloser{
25+
r: stdout,
26+
cmd: cmd,
27+
done: make(chan struct{}),
28+
}
29+
go func() {
30+
p.waitOnce.Do(func() { p.waitErr = cmd.Wait() })
31+
close(p.done)
32+
}()
33+
return p
34+
}
35+
36+
func (p *ProcessReadCloser) Read(b []byte) (int, error) {
37+
n, err := p.r.Read(b)
38+
if err == io.EOF {
39+
<-p.done
40+
if p.waitErr != nil {
41+
return n, p.waitErr
42+
}
43+
}
44+
return n, err
45+
}
46+
47+
func (p *ProcessReadCloser) Close() error {
48+
return p.r.Close()
49+
}
50+
51+
func (p *ProcessReadCloser) WaitErr() error {
52+
<-p.done
53+
return p.waitErr
54+
}

pkg/music/parsers/kkdai/link.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111

1212
"github.com/keshon/melodix/pkg/music/parsers"
13+
ffmpegparser "github.com/keshon/melodix/pkg/music/parsers/ffmpeg"
1314

1415
"github.com/kkdai/youtube/v2"
1516
)
@@ -106,10 +107,11 @@ func kkdaiLink(track *parsers.TrackParse, seekSec float64) (io.ReadCloser, func(
106107
return nil, nil, fmt.Errorf("command start error: %w", err)
107108
}
108109

110+
pr := ffmpegparser.NewProcessReadCloser(ffmpeg, reader)
109111
cleanup := func() {
110112
_ = ffmpeg.Process.Kill()
111-
_ = ffmpeg.Wait()
113+
_ = pr.WaitErr()
112114
}
113115

114-
return reader, cleanup, nil
116+
return pr, cleanup, nil
115117
}

pkg/music/parsers/kkdai/pipe.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os/exec"
99

1010
"github.com/keshon/melodix/pkg/music/parsers"
11+
ffmpegparser "github.com/keshon/melodix/pkg/music/parsers/ffmpeg"
1112

1213
"github.com/kkdai/youtube/v2"
1314
)
@@ -61,11 +62,12 @@ func kkdaiPipe(track *parsers.TrackParse, seekSec float64) (io.ReadCloser, func(
6162
return nil, nil, fmt.Errorf("ffmpeg start error: %w", err)
6263
}
6364

65+
pr := ffmpegparser.NewProcessReadCloser(ffmpeg, reader)
6466
cleanup := func() {
6567
stream.Close()
6668
_ = ffmpeg.Process.Kill()
67-
_ = ffmpeg.Wait()
69+
_ = pr.WaitErr()
6870
}
6971

70-
return reader, cleanup, nil
72+
return pr, cleanup, nil
7173
}

0 commit comments

Comments
 (0)