Skip to main content

freya_video/
client.rs

1use std::{
2    io::{
3        Read as _,
4        Write as _,
5    },
6    path::{
7        Path,
8        PathBuf,
9    },
10    process::{
11        ChildStdin,
12        ChildStdout,
13    },
14    rc::Rc,
15    sync::{
16        Arc,
17        atomic::{
18            AtomicBool,
19            Ordering,
20        },
21        mpsc::sync_channel,
22    },
23    thread::{
24        Builder,
25        park,
26    },
27    time::{
28        Duration,
29        Instant,
30    },
31};
32
33use async_io::Timer;
34use ffmpeg_sidecar::{
35    child::FfmpegChild,
36    command::FfmpegCommand,
37    event::FfmpegEvent,
38};
39use freya_core::prelude::{
40    OwnedTaskHandle,
41    ScopeId,
42    provide_context_for_scope_id,
43    spawn,
44    try_consume_root_context,
45};
46use freya_engine::prelude::{
47    AlphaType,
48    ColorType,
49    Data,
50    ISize,
51    ImageInfo,
52    SkImage,
53    raster_from_data,
54};
55
56/// Source of a video to decode.
57#[derive(PartialEq, Eq, Clone, Debug, Hash)]
58pub enum VideoSource {
59    Path(PathBuf),
60}
61
62impl From<PathBuf> for VideoSource {
63    fn from(path: PathBuf) -> Self {
64        Self::Path(path)
65    }
66}
67
68impl From<&Path> for VideoSource {
69    fn from(path: &Path) -> Self {
70        Self::Path(path.to_path_buf())
71    }
72}
73
74impl From<&str> for VideoSource {
75    fn from(path: &str) -> Self {
76        Self::Path(PathBuf::from(path))
77    }
78}
79
80impl From<String> for VideoSource {
81    fn from(path: String) -> Self {
82        Self::Path(PathBuf::from(path))
83    }
84}
85
86/// Single decoded frame, backed by a Skia image.
87#[derive(Clone)]
88pub struct VideoFrame {
89    image: SkImage,
90}
91
92impl VideoFrame {
93    pub fn image(&self) -> &SkImage {
94        &self.image
95    }
96}
97
98impl PartialEq for VideoFrame {
99    fn eq(&self, other: &Self) -> bool {
100        self.image.unique_id() == other.image.unique_id()
101    }
102}
103
104/// Max decoded frames buffered ahead of the pacing loop.
105const FRAME_BUFFER: usize = 2;
106
107/// Max outgoing events buffered before the pacing loop blocks.
108const EVENTS_BUFFER: usize = 2;
109
110const AUDIO_SAMPLE_RATE: u32 = 48_000;
111const AUDIO_CHANNELS: u16 = 2;
112
113/// Event emitted by a [`VideoClient`].
114#[derive(Clone)]
115pub enum VideoEvent {
116    Duration(Duration),
117    Frame {
118        frame: VideoFrame,
119        position: Duration,
120    },
121    Ended,
122    Errored,
123}
124
125/// Decoding pipeline for one video. Drop to stop.
126pub struct VideoClient {
127    events: async_channel::Receiver<VideoEvent>,
128    paused: Arc<AtomicBool>,
129    _task: OwnedTaskHandle,
130}
131
132impl VideoClient {
133    /// Start decoding `source` at `start_offset`.
134    pub fn new(source: VideoSource, start_offset: Duration) -> Self {
135        let (sender, receiver) = async_channel::bounded(EVENTS_BUFFER);
136        let paused = Arc::new(AtomicBool::new(false));
137        let task = spawn(run_client(
138            source,
139            start_offset,
140            Arc::clone(&paused),
141            sender,
142        ))
143        .owned();
144        Self {
145            events: receiver,
146            paused,
147            _task: task,
148        }
149    }
150
151    /// Stream of decoded frames and lifecycle events.
152    pub fn events(&self) -> &async_channel::Receiver<VideoEvent> {
153        &self.events
154    }
155
156    /// Pause playback.
157    pub fn pause(&self) {
158        self.paused.store(true, Ordering::Relaxed);
159    }
160
161    /// Resume playback.
162    pub fn play(&self) {
163        self.paused.store(false, Ordering::Relaxed);
164    }
165
166    /// Whether playback is currently paused.
167    pub fn is_paused(&self) -> bool {
168        self.paused.load(Ordering::Relaxed)
169    }
170}
171
172/// Shared audio output handle. `None` if the host has no audio device.
173fn audio_handle() -> Option<Rc<rodio::OutputStreamHandle>> {
174    if let Some(handle) = try_consume_root_context::<Rc<rodio::OutputStreamHandle>>() {
175        return Some(handle);
176    }
177
178    let (sender, receiver) = sync_channel::<Option<rodio::OutputStreamHandle>>(0);
179    Builder::new()
180        .name("freya-audio".into())
181        .spawn(move || match rodio::OutputStream::try_default() {
182            Ok((stream, handle)) => {
183                if sender.send(Some(handle)).is_ok() {
184                    let _keepalive = stream;
185                    park();
186                }
187            }
188            Err(err) => {
189                tracing::info!(%err, "no audio output device");
190                let _ = sender.send(None);
191            }
192        })
193        .ok()?;
194
195    let handle = Rc::new(receiver.recv().ok().flatten()?);
196    provide_context_for_scope_id(handle.clone(), ScopeId::ROOT);
197    Some(handle)
198}
199
200struct RawFrame {
201    width: u32,
202    height: u32,
203    timestamp: f32,
204    data: Vec<u8>,
205}
206
207enum DecoderEvent {
208    Duration(f64),
209    Frame(RawFrame),
210}
211
212/// Asks ffmpeg to exit gracefully when dropped.
213struct FfmpegQuitter(ChildStdin);
214
215impl Drop for FfmpegQuitter {
216    fn drop(&mut self) {
217        let _ = self.0.write_all(b"q\n");
218        let _ = self.0.flush();
219    }
220}
221
222/// PCM audio samples streamed from an ffmpeg process.
223struct PcmSource {
224    reader: ChildStdout,
225}
226
227impl Iterator for PcmSource {
228    type Item = i16;
229
230    fn next(&mut self) -> Option<i16> {
231        let mut buf = [0u8; 2];
232        self.reader.read_exact(&mut buf).ok()?;
233        Some(i16::from_le_bytes(buf))
234    }
235}
236
237impl rodio::Source for PcmSource {
238    fn current_frame_len(&self) -> Option<usize> {
239        None
240    }
241
242    fn channels(&self) -> u16 {
243        AUDIO_CHANNELS
244    }
245
246    fn sample_rate(&self) -> u32 {
247        AUDIO_SAMPLE_RATE
248    }
249
250    fn total_duration(&self) -> Option<Duration> {
251        None
252    }
253}
254
255/// Audio side of a running playback.
256struct AudioPlayback {
257    _quitter: Option<FfmpegQuitter>,
258    sink: rodio::Sink,
259    _child: FfmpegChild,
260}
261
262async fn run_client(
263    source: VideoSource,
264    start_offset: Duration,
265    paused: Arc<AtomicBool>,
266    events: async_channel::Sender<VideoEvent>,
267) {
268    let mut child = match spawn_ffmpeg(&source, start_offset) {
269        Ok(child) => child,
270        Err(err) => {
271            tracing::error!(%err, "failed to spawn ffmpeg");
272            let _ = events.send(VideoEvent::Errored).await;
273            return;
274        }
275    };
276    let _quitter = child.take_stdin().map(FfmpegQuitter);
277
278    let audio = start_audio(&source, start_offset);
279
280    let (sender, receiver) = async_channel::bounded::<DecoderEvent>(FRAME_BUFFER);
281    let decoder = blocking::unblock(move || decode(child, sender));
282
283    let mut wall_start: Option<Instant> = None;
284    let mut paused_for = Duration::ZERO;
285
286    while let Ok(event) = receiver.recv().await {
287        let raw = match event {
288            DecoderEvent::Duration(seconds) if seconds.is_finite() && seconds >= 0.0 => {
289                let _ = events
290                    .send(VideoEvent::Duration(Duration::from_secs_f64(seconds)))
291                    .await;
292                continue;
293            }
294            DecoderEvent::Duration(_) => continue,
295            DecoderEvent::Frame(raw) => raw,
296        };
297
298        // While paused, bank the elapsed time so resume pacing stays correct.
299        if paused.load(Ordering::Relaxed) {
300            if let Some(audio) = audio.as_ref() {
301                audio.sink.pause();
302            }
303            let pause_start = Instant::now();
304            while paused.load(Ordering::Relaxed) {
305                Timer::after(Duration::from_millis(32)).await;
306            }
307            paused_for += pause_start.elapsed();
308            if let Some(audio) = audio.as_ref() {
309                audio.sink.play();
310            }
311        }
312
313        let wall_start = *wall_start.get_or_insert_with(Instant::now);
314        let frame_offset = Duration::from_secs_f32(raw.timestamp.max(0.0));
315        let elapsed = wall_start.elapsed().saturating_sub(paused_for);
316        if elapsed < frame_offset {
317            Timer::after(frame_offset - elapsed).await;
318        }
319
320        let Some(image) = raw_frame_to_sk_image(&raw) else {
321            continue;
322        };
323        if events
324            .send(VideoEvent::Frame {
325                frame: VideoFrame { image },
326                position: start_offset + frame_offset,
327            })
328            .await
329            .is_err()
330        {
331            break;
332        }
333    }
334
335    match decoder.await {
336        Ok(()) => {
337            let _ = events.send(VideoEvent::Ended).await;
338        }
339        Err(err) => {
340            tracing::error!(%err, "video decoder failed");
341            let _ = events.send(VideoEvent::Errored).await;
342        }
343    }
344}
345
346fn raw_frame_to_sk_image(raw: &RawFrame) -> Option<SkImage> {
347    let row_bytes = raw.width.checked_mul(4)? as usize;
348    let info = ImageInfo::new(
349        ISize::new(raw.width as i32, raw.height as i32),
350        ColorType::RGBA8888,
351        AlphaType::Unpremul,
352        None,
353    );
354    raster_from_data(&info, Data::new_copy(&raw.data), row_bytes)
355}
356
357fn spawn_ffmpeg(source: &VideoSource, start_offset: Duration) -> anyhow::Result<FfmpegChild> {
358    let VideoSource::Path(path) = source;
359    let mut cmd = FfmpegCommand::new();
360
361    // `-ss` before `-i` = fast keyframe-aligned input seek; output timestamps
362    // reset to 0, which is what the pacing loop expects.
363    let start_secs = start_offset.as_secs_f32();
364    if start_secs > 0.0 {
365        cmd.args(["-ss", &start_secs.to_string()]);
366    }
367    cmd.input(path.to_string_lossy().as_ref())
368        .format("rawvideo")
369        .pix_fmt("rgba")
370        .pipe_stdout();
371
372    Ok(cmd.spawn()?)
373}
374
375/// Start audio playback. `None` if audio can't be started.
376fn start_audio(source: &VideoSource, start_offset: Duration) -> Option<AudioPlayback> {
377    let handle = audio_handle()?;
378    let mut child = spawn_audio_ffmpeg(source, start_offset)
379        .map_err(|err| tracing::warn!(%err, "failed to spawn audio ffmpeg"))
380        .ok()?;
381    let stdout = child.take_stdout()?;
382    let quitter = child.take_stdin().map(FfmpegQuitter);
383    let sink = rodio::Sink::try_new(&handle)
384        .map_err(|err| tracing::warn!(%err, "failed to create audio sink"))
385        .ok()?;
386    sink.append(PcmSource { reader: stdout });
387    Some(AudioPlayback {
388        _quitter: quitter,
389        sink,
390        _child: child,
391    })
392}
393
394fn spawn_audio_ffmpeg(source: &VideoSource, start_offset: Duration) -> anyhow::Result<FfmpegChild> {
395    let VideoSource::Path(path) = source;
396    let mut cmd = FfmpegCommand::new();
397
398    let start_secs = start_offset.as_secs_f32();
399    if start_secs > 0.0 {
400        cmd.args(["-ss", &start_secs.to_string()]);
401    }
402    cmd.input(path.to_string_lossy().as_ref())
403        .args([
404            "-vn",
405            "-f",
406            "s16le",
407            "-ar",
408            &AUDIO_SAMPLE_RATE.to_string(),
409            "-ac",
410            &AUDIO_CHANNELS.to_string(),
411        ])
412        .pipe_stdout();
413
414    Ok(cmd.spawn()?)
415}
416
417fn decode(
418    mut child: FfmpegChild,
419    sender: async_channel::Sender<DecoderEvent>,
420) -> anyhow::Result<()> {
421    for event in child.iter()? {
422        let message = match event {
423            FfmpegEvent::ParsedDuration(duration) => DecoderEvent::Duration(duration.duration),
424            FfmpegEvent::OutputFrame(frame) => DecoderEvent::Frame(RawFrame {
425                width: frame.width,
426                height: frame.height,
427                timestamp: frame.timestamp,
428                data: frame.data,
429            }),
430            _ => continue,
431        };
432        // Parks the thread when the bounded channel is full, which backpressures
433        // ffmpeg via its stdout pipe. Err = receiver dropped (decode cancelled).
434        if sender.send_blocking(message).is_err() {
435            break;
436        }
437    }
438
439    // Reap regardless of how we exited the iter (ffmpeg-sidecar#72).
440    let _ = child.kill();
441    let _ = child.wait();
442
443    Ok(())
444}