1use std::{
2 io::{
3 Read as _,
4 Write as _,
5 },
6 path::{
7 Path,
8 PathBuf,
9 },
10 process::{
11 ChildStdin,
12 ChildStdout,
13 },
14 rc::Rc,
15 sync::{
16 Arc,
17 atomic::{
18 AtomicBool,
19 Ordering,
20 },
21 mpsc::sync_channel,
22 },
23 thread::{
24 Builder,
25 park,
26 },
27 time::{
28 Duration,
29 Instant,
30 },
31};
32
33use async_io::Timer;
34use ffmpeg_sidecar::{
35 child::FfmpegChild,
36 command::FfmpegCommand,
37 event::FfmpegEvent,
38};
39use freya_core::prelude::{
40 OwnedTaskHandle,
41 ScopeId,
42 provide_context_for_scope_id,
43 spawn,
44 try_consume_root_context,
45};
46use freya_engine::prelude::{
47 AlphaType,
48 ColorType,
49 Data,
50 ISize,
51 ImageInfo,
52 SkImage,
53 raster_from_data,
54};
55
56#[derive(PartialEq, Eq, Clone, Debug, Hash)]
58pub enum VideoSource {
59 Path(PathBuf),
60}
61
62impl From<PathBuf> for VideoSource {
63 fn from(path: PathBuf) -> Self {
64 Self::Path(path)
65 }
66}
67
68impl From<&Path> for VideoSource {
69 fn from(path: &Path) -> Self {
70 Self::Path(path.to_path_buf())
71 }
72}
73
74impl From<&str> for VideoSource {
75 fn from(path: &str) -> Self {
76 Self::Path(PathBuf::from(path))
77 }
78}
79
80impl From<String> for VideoSource {
81 fn from(path: String) -> Self {
82 Self::Path(PathBuf::from(path))
83 }
84}
85
86#[derive(Clone)]
88pub struct VideoFrame {
89 image: SkImage,
90}
91
92impl VideoFrame {
93 pub fn image(&self) -> &SkImage {
94 &self.image
95 }
96}
97
98impl PartialEq for VideoFrame {
99 fn eq(&self, other: &Self) -> bool {
100 self.image.unique_id() == other.image.unique_id()
101 }
102}
103
104const FRAME_BUFFER: usize = 2;
106
107const EVENTS_BUFFER: usize = 2;
109
110const AUDIO_SAMPLE_RATE: u32 = 48_000;
111const AUDIO_CHANNELS: u16 = 2;
112
113#[derive(Clone)]
115pub enum VideoEvent {
116 Duration(Duration),
117 Frame {
118 frame: VideoFrame,
119 position: Duration,
120 },
121 Ended,
122 Errored,
123}
124
125pub struct VideoClient {
127 events: async_channel::Receiver<VideoEvent>,
128 paused: Arc<AtomicBool>,
129 _task: OwnedTaskHandle,
130}
131
132impl VideoClient {
133 pub fn new(source: VideoSource, start_offset: Duration) -> Self {
135 let (sender, receiver) = async_channel::bounded(EVENTS_BUFFER);
136 let paused = Arc::new(AtomicBool::new(false));
137 let task = spawn(run_client(
138 source,
139 start_offset,
140 Arc::clone(&paused),
141 sender,
142 ))
143 .owned();
144 Self {
145 events: receiver,
146 paused,
147 _task: task,
148 }
149 }
150
151 pub fn events(&self) -> &async_channel::Receiver<VideoEvent> {
153 &self.events
154 }
155
156 pub fn pause(&self) {
158 self.paused.store(true, Ordering::Relaxed);
159 }
160
161 pub fn play(&self) {
163 self.paused.store(false, Ordering::Relaxed);
164 }
165
166 pub fn is_paused(&self) -> bool {
168 self.paused.load(Ordering::Relaxed)
169 }
170}
171
172fn audio_handle() -> Option<Rc<rodio::OutputStreamHandle>> {
174 if let Some(handle) = try_consume_root_context::<Rc<rodio::OutputStreamHandle>>() {
175 return Some(handle);
176 }
177
178 let (sender, receiver) = sync_channel::<Option<rodio::OutputStreamHandle>>(0);
179 Builder::new()
180 .name("freya-audio".into())
181 .spawn(move || match rodio::OutputStream::try_default() {
182 Ok((stream, handle)) => {
183 if sender.send(Some(handle)).is_ok() {
184 let _keepalive = stream;
185 park();
186 }
187 }
188 Err(err) => {
189 tracing::info!(%err, "no audio output device");
190 let _ = sender.send(None);
191 }
192 })
193 .ok()?;
194
195 let handle = Rc::new(receiver.recv().ok().flatten()?);
196 provide_context_for_scope_id(handle.clone(), ScopeId::ROOT);
197 Some(handle)
198}
199
200struct RawFrame {
201 width: u32,
202 height: u32,
203 timestamp: f32,
204 data: Vec<u8>,
205}
206
207enum DecoderEvent {
208 Duration(f64),
209 Frame(RawFrame),
210}
211
212struct FfmpegQuitter(ChildStdin);
214
215impl Drop for FfmpegQuitter {
216 fn drop(&mut self) {
217 let _ = self.0.write_all(b"q\n");
218 let _ = self.0.flush();
219 }
220}
221
222struct PcmSource {
224 reader: ChildStdout,
225}
226
227impl Iterator for PcmSource {
228 type Item = i16;
229
230 fn next(&mut self) -> Option<i16> {
231 let mut buf = [0u8; 2];
232 self.reader.read_exact(&mut buf).ok()?;
233 Some(i16::from_le_bytes(buf))
234 }
235}
236
237impl rodio::Source for PcmSource {
238 fn current_frame_len(&self) -> Option<usize> {
239 None
240 }
241
242 fn channels(&self) -> u16 {
243 AUDIO_CHANNELS
244 }
245
246 fn sample_rate(&self) -> u32 {
247 AUDIO_SAMPLE_RATE
248 }
249
250 fn total_duration(&self) -> Option<Duration> {
251 None
252 }
253}
254
255struct AudioPlayback {
257 _quitter: Option<FfmpegQuitter>,
258 sink: rodio::Sink,
259 _child: FfmpegChild,
260}
261
262async fn run_client(
263 source: VideoSource,
264 start_offset: Duration,
265 paused: Arc<AtomicBool>,
266 events: async_channel::Sender<VideoEvent>,
267) {
268 let mut child = match spawn_ffmpeg(&source, start_offset) {
269 Ok(child) => child,
270 Err(err) => {
271 tracing::error!(%err, "failed to spawn ffmpeg");
272 let _ = events.send(VideoEvent::Errored).await;
273 return;
274 }
275 };
276 let _quitter = child.take_stdin().map(FfmpegQuitter);
277
278 let audio = start_audio(&source, start_offset);
279
280 let (sender, receiver) = async_channel::bounded::<DecoderEvent>(FRAME_BUFFER);
281 let decoder = blocking::unblock(move || decode(child, sender));
282
283 let mut wall_start: Option<Instant> = None;
284 let mut paused_for = Duration::ZERO;
285
286 while let Ok(event) = receiver.recv().await {
287 let raw = match event {
288 DecoderEvent::Duration(seconds) if seconds.is_finite() && seconds >= 0.0 => {
289 let _ = events
290 .send(VideoEvent::Duration(Duration::from_secs_f64(seconds)))
291 .await;
292 continue;
293 }
294 DecoderEvent::Duration(_) => continue,
295 DecoderEvent::Frame(raw) => raw,
296 };
297
298 if paused.load(Ordering::Relaxed) {
300 if let Some(audio) = audio.as_ref() {
301 audio.sink.pause();
302 }
303 let pause_start = Instant::now();
304 while paused.load(Ordering::Relaxed) {
305 Timer::after(Duration::from_millis(32)).await;
306 }
307 paused_for += pause_start.elapsed();
308 if let Some(audio) = audio.as_ref() {
309 audio.sink.play();
310 }
311 }
312
313 let wall_start = *wall_start.get_or_insert_with(Instant::now);
314 let frame_offset = Duration::from_secs_f32(raw.timestamp.max(0.0));
315 let elapsed = wall_start.elapsed().saturating_sub(paused_for);
316 if elapsed < frame_offset {
317 Timer::after(frame_offset - elapsed).await;
318 }
319
320 let Some(image) = raw_frame_to_sk_image(&raw) else {
321 continue;
322 };
323 if events
324 .send(VideoEvent::Frame {
325 frame: VideoFrame { image },
326 position: start_offset + frame_offset,
327 })
328 .await
329 .is_err()
330 {
331 break;
332 }
333 }
334
335 match decoder.await {
336 Ok(()) => {
337 let _ = events.send(VideoEvent::Ended).await;
338 }
339 Err(err) => {
340 tracing::error!(%err, "video decoder failed");
341 let _ = events.send(VideoEvent::Errored).await;
342 }
343 }
344}
345
346fn raw_frame_to_sk_image(raw: &RawFrame) -> Option<SkImage> {
347 let row_bytes = raw.width.checked_mul(4)? as usize;
348 let info = ImageInfo::new(
349 ISize::new(raw.width as i32, raw.height as i32),
350 ColorType::RGBA8888,
351 AlphaType::Unpremul,
352 None,
353 );
354 raster_from_data(&info, Data::new_copy(&raw.data), row_bytes)
355}
356
357fn spawn_ffmpeg(source: &VideoSource, start_offset: Duration) -> anyhow::Result<FfmpegChild> {
358 let VideoSource::Path(path) = source;
359 let mut cmd = FfmpegCommand::new();
360
361 let start_secs = start_offset.as_secs_f32();
364 if start_secs > 0.0 {
365 cmd.args(["-ss", &start_secs.to_string()]);
366 }
367 cmd.input(path.to_string_lossy().as_ref())
368 .format("rawvideo")
369 .pix_fmt("rgba")
370 .pipe_stdout();
371
372 Ok(cmd.spawn()?)
373}
374
375fn start_audio(source: &VideoSource, start_offset: Duration) -> Option<AudioPlayback> {
377 let handle = audio_handle()?;
378 let mut child = spawn_audio_ffmpeg(source, start_offset)
379 .map_err(|err| tracing::warn!(%err, "failed to spawn audio ffmpeg"))
380 .ok()?;
381 let stdout = child.take_stdout()?;
382 let quitter = child.take_stdin().map(FfmpegQuitter);
383 let sink = rodio::Sink::try_new(&handle)
384 .map_err(|err| tracing::warn!(%err, "failed to create audio sink"))
385 .ok()?;
386 sink.append(PcmSource { reader: stdout });
387 Some(AudioPlayback {
388 _quitter: quitter,
389 sink,
390 _child: child,
391 })
392}
393
394fn spawn_audio_ffmpeg(source: &VideoSource, start_offset: Duration) -> anyhow::Result<FfmpegChild> {
395 let VideoSource::Path(path) = source;
396 let mut cmd = FfmpegCommand::new();
397
398 let start_secs = start_offset.as_secs_f32();
399 if start_secs > 0.0 {
400 cmd.args(["-ss", &start_secs.to_string()]);
401 }
402 cmd.input(path.to_string_lossy().as_ref())
403 .args([
404 "-vn",
405 "-f",
406 "s16le",
407 "-ar",
408 &AUDIO_SAMPLE_RATE.to_string(),
409 "-ac",
410 &AUDIO_CHANNELS.to_string(),
411 ])
412 .pipe_stdout();
413
414 Ok(cmd.spawn()?)
415}
416
417fn decode(
418 mut child: FfmpegChild,
419 sender: async_channel::Sender<DecoderEvent>,
420) -> anyhow::Result<()> {
421 for event in child.iter()? {
422 let message = match event {
423 FfmpegEvent::ParsedDuration(duration) => DecoderEvent::Duration(duration.duration),
424 FfmpegEvent::OutputFrame(frame) => DecoderEvent::Frame(RawFrame {
425 width: frame.width,
426 height: frame.height,
427 timestamp: frame.timestamp,
428 data: frame.data,
429 }),
430 _ => continue,
431 };
432 if sender.send_blocking(message).is_err() {
435 break;
436 }
437 }
438
439 let _ = child.kill();
441 let _ = child.wait();
442
443 Ok(())
444}