pikadick/util/
encoder_task.rs

1use anyhow::Context;
2use std::{
3    ffi::OsString,
4    sync::Arc,
5};
6use tokio::sync::oneshot;
7use tokio_stream::{
8    wrappers::ReceiverStream,
9    Stream,
10    StreamExt,
11};
12use tracing::info;
13
14/// A message for the encoder task
15enum Message {
16    /// Get encoders available to the application
17    GetEncoders {
18        /// Whether to validate the encoder.
19        ///
20        /// If this is false, false positives will show up in the output.
21        validate: bool,
22
23        /// The response
24        tx: oneshot::Sender<anyhow::Result<Vec<tokio_ffmpeg_cli::Encoder>>>,
25    },
26
27    /// Request an encode
28    Encode {
29        /// The options for the encode
30        builder: Box<tokio_ffmpeg_cli::Builder>,
31        /// The notification for when the task is processed, as well as a handle to the download event stream
32        tx: oneshot::Sender<
33            anyhow::Result<
34                tokio::sync::mpsc::Receiver<
35                    Result<tokio_ffmpeg_cli::Event, tokio_ffmpeg_cli::Error>,
36                >,
37            >,
38        >,
39    },
40
41    /// Request a shutdown.
42    ///
43    /// the task will drain the channel until it is empty after recieving this.
44    /// the task will still accept new messages until it processes this one.
45    Close {
46        /// The notification for when the task processes this message
47        tx: oneshot::Sender<()>,
48    },
49}
50
51/// A task to re-encode things
52#[derive(Debug, Clone)]
53pub struct EncoderTask {
54    handle: Arc<parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>>,
55    tx: tokio::sync::mpsc::Sender<Message>,
56}
57
58impl EncoderTask {
59    /// Make a new encoder task
60    pub fn new() -> Self {
61        let (tx, rx) = tokio::sync::mpsc::channel(1);
62        let handle = tokio::spawn(encoder_task_impl(rx));
63
64        Self {
65            handle: Arc::new(parking_lot::Mutex::new(Some(handle))),
66            tx,
67        }
68    }
69
70    /// Get encoders
71    pub async fn get_encoders(
72        &self,
73        validate: bool,
74    ) -> anyhow::Result<Vec<tokio_ffmpeg_cli::Encoder>> {
75        let (tx, rx) = oneshot::channel();
76        self.tx
77            .try_send(Message::GetEncoders { validate, tx })
78            .ok()
79            .context("failed to send message")?;
80
81        rx.await.context("task crashed")?
82    }
83
84    /// Create a builder for an encode request
85    pub fn encode(&self) -> EncoderTaskEncodeBuilder<'_> {
86        EncoderTaskEncodeBuilder::new(self)
87    }
88
89    /// Request this task to close
90    pub async fn close(&self) -> anyhow::Result<()> {
91        let (tx, rx) = oneshot::channel();
92
93        self.tx
94            .send(Message::Close { tx })
95            .await
96            .ok()
97            .context("task is gone")?;
98
99        rx.await.context("task crashed")
100    }
101
102    /// Join this task, waiting for it to exit.
103    ///
104    /// This will NOT send a shutdown request, that must be done beforehand.
105    /// Also, this function can only be called once. Future calls will return an error.
106    pub async fn join(&self) -> anyhow::Result<()> {
107        let handle = self.handle.lock().take().context("missing handle")?;
108        handle.await.context("task panicked")
109    }
110
111    /// Shutdown the task, sending a close request can joining the task.
112    ///
113    /// This calls `join` under the hood, so it has the same restrictions as close:
114    /// Either shutdown or close can only be called once.
115    pub async fn shutdown(&self) -> anyhow::Result<()> {
116        self.close().await.context("failed to send close request")?;
117        self.join().await.context("failed to join task")?;
118        Ok(())
119    }
120}
121
122/// Impl for the encoder task
123async fn encoder_task_impl(mut rx: tokio::sync::mpsc::Receiver<Message>) {
124    while let Some(msg) = rx.recv().await {
125        match msg {
126            Message::Close { tx } => {
127                rx.close();
128
129                // We don't care if the user doesn't care about the result.
130                let _ = tx.send(()).is_ok();
131            }
132            Message::Encode { mut builder, tx } => {
133                let maybe_stream = builder.spawn().context("failed to spawn FFMpeg");
134
135                match maybe_stream {
136                    Ok(mut stream) => {
137                        let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
138
139                        // TODO: Consider stopping download if the user does not care anymore.
140                        let _ = tx.send(Ok(event_rx)).is_ok();
141
142                        // We manage the stream here so that downloads are not cancelable.
143                        // Also, it gives us a concrete stream type.
144                        while let Some(event) = stream.next().await {
145                            // TODO: Consider cancelling download if user stopped caring
146                            let _ = event_tx.send(event).await.is_ok();
147                        }
148                    }
149                    Err(e) => {
150                        // If the stopped caring, we don't care since it was an error anyways
151                        let _ = tx.send(Err(e)).is_ok();
152                    }
153                }
154            }
155            Message::GetEncoders { validate, tx } => {
156                let result = async {
157                    // Get all encoders
158                    let raw_encoders = tokio_ffmpeg_cli::get_encoders()
159                        .await
160                        .context("failed to get ffmpeg encoders")?;
161
162                    if validate {
163                        // If we are validating, allocate a new buffer and move valid entries to it was we validate it.
164                        let mut encoders = Vec::with_capacity(raw_encoders.len());
165
166                        // TODO: We only support sanity checks for video output, so the output will only be video
167                        // In the future, we should edit the sanity check based on encoder type
168                        // TODO: We filter out anything that isn't 264 as thats all we need right now.
169                        // In the future, we should expose an api to configure this filter.
170                        // TODO: Maybe this should be run in parallel.
171                        for encoder in raw_encoders
172                            .into_iter()
173                            .filter(|encoder| encoder.is_video())
174                            .filter(|encoder| encoder.name.contains("264"))
175                        {
176                            // Run a basic transcoding sanity check
177                            let status = tokio_ffmpeg_cli::Builder::new()
178                                .input("nullsrc")
179                                .input_format("lavfi")
180                                .output("-")
181                                .output_format("null")
182                                .video_codec(&*encoder.name)
183                                .video_frames(1_u64)
184                                .ffmpeg_output()
185                                .await?
186                                .status;
187
188                            // If it passed, add it to the output
189                            if status.success() {
190                                encoders.push(encoder);
191                            } else {
192                                info!("skipping \"{}\" as it failed a sanity check", encoder.name);
193                            }
194                        }
195
196                        Ok(encoders)
197                    } else {
198                        // If we are not validating, just return
199                        Ok(raw_encoders)
200                    }
201                }
202                .await;
203
204                // Don't care if the user hung up,
205                // but it is a bit sad they asked for expensive data they didn't want
206                let _ = tx.send(result).is_ok();
207            }
208        }
209    }
210}
211
212impl Default for EncoderTask {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218/// A builder for encoding messages
219#[derive(Debug)]
220pub struct EncoderTaskEncodeBuilder<'a> {
221    builder: Box<tokio_ffmpeg_cli::Builder>,
222
223    task: &'a EncoderTask,
224}
225
226impl<'a> EncoderTaskEncodeBuilder<'a> {
227    /// Make a new [`EncoderTaskEncodeBuilder`]
228    pub fn new(task: &'a EncoderTask) -> Self {
229        Self {
230            builder: Box::new(tokio_ffmpeg_cli::Builder::new()),
231            task,
232        }
233    }
234
235    /// Set the file input
236    pub fn input(&mut self, input: impl Into<OsString>) -> &mut Self {
237        self.builder.input(input);
238        self
239    }
240
241    /// Set the file output
242    pub fn output(&mut self, output: impl Into<OsString>) -> &mut Self {
243        self.builder.output(output);
244        self
245    }
246
247    /// Set the audio codec
248    pub fn audio_codec(&mut self, audio_codec: impl Into<String>) -> &mut Self {
249        self.builder.audio_codec(audio_codec);
250        self
251    }
252
253    /// Set the video codec
254    pub fn video_codec(&mut self, video_codec: impl Into<String>) -> &mut Self {
255        self.builder.video_codec(video_codec);
256        self
257    }
258
259    /// Set the video bitrate
260    pub fn video_bitrate(&mut self, video_bitrate: impl Into<String>) -> &mut Self {
261        self.builder.video_bitrate(video_bitrate);
262        self
263    }
264
265    /// Set the input format
266    pub fn input_format(&mut self, input_format: impl Into<String>) -> &mut Self {
267        self.builder.input_format(input_format);
268        self
269    }
270
271    /// Set the output format
272    pub fn output_format(&mut self, output_format: impl Into<String>) -> &mut Self {
273        self.builder.output_format(output_format);
274        self
275    }
276
277    /// Set the # of video frames from the input
278    pub fn video_frames(&mut self, video_frames: impl Into<u64>) -> &mut Self {
279        self.builder.video_frames(video_frames);
280        self
281    }
282
283    /// Set the video profile
284    pub fn video_profile(&mut self, video_profile: impl Into<String>) -> &mut Self {
285        self.builder.video_profile(video_profile);
286        self
287    }
288
289    /// Set the preset
290    pub fn preset(&mut self, preset: impl Into<String>) -> &mut Self {
291        self.builder.preset(preset);
292        self
293    }
294
295    /// Try to send the message, exiting it it is at capacity
296    pub async fn try_send(
297        &self,
298    ) -> anyhow::Result<impl Stream<Item = Result<tokio_ffmpeg_cli::Event, tokio_ffmpeg_cli::Error>>>
299    {
300        let (tx, rx) = oneshot::channel();
301        self.task
302            .tx
303            .try_send(Message::Encode {
304                builder: self.builder.clone(),
305                tx,
306            })
307            .ok()
308            .context("failed to send message")?;
309
310        rx.await
311            .context("encode task crashed")?
312            .map(ReceiverStream::new)
313    }
314}