pikadick/util/
encoder_task.rs1use anyhow::Context;
2use std::{
3 ffi::OsString,
4 sync::Arc,
5};
6use tokio::sync::oneshot;
7use tokio_stream::{
8 wrappers::ReceiverStream,
9 Stream,
10 StreamExt,
11};
12use tracing::info;
13
14enum Message {
16 GetEncoders {
18 validate: bool,
22
23 tx: oneshot::Sender<anyhow::Result<Vec<tokio_ffmpeg_cli::Encoder>>>,
25 },
26
27 Encode {
29 builder: Box<tokio_ffmpeg_cli::Builder>,
31 tx: oneshot::Sender<
33 anyhow::Result<
34 tokio::sync::mpsc::Receiver<
35 Result<tokio_ffmpeg_cli::Event, tokio_ffmpeg_cli::Error>,
36 >,
37 >,
38 >,
39 },
40
41 Close {
46 tx: oneshot::Sender<()>,
48 },
49}
50
51#[derive(Debug, Clone)]
53pub struct EncoderTask {
54 handle: Arc<parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>>,
55 tx: tokio::sync::mpsc::Sender<Message>,
56}
57
58impl EncoderTask {
59 pub fn new() -> Self {
61 let (tx, rx) = tokio::sync::mpsc::channel(1);
62 let handle = tokio::spawn(encoder_task_impl(rx));
63
64 Self {
65 handle: Arc::new(parking_lot::Mutex::new(Some(handle))),
66 tx,
67 }
68 }
69
70 pub async fn get_encoders(
72 &self,
73 validate: bool,
74 ) -> anyhow::Result<Vec<tokio_ffmpeg_cli::Encoder>> {
75 let (tx, rx) = oneshot::channel();
76 self.tx
77 .try_send(Message::GetEncoders { validate, tx })
78 .ok()
79 .context("failed to send message")?;
80
81 rx.await.context("task crashed")?
82 }
83
84 pub fn encode(&self) -> EncoderTaskEncodeBuilder<'_> {
86 EncoderTaskEncodeBuilder::new(self)
87 }
88
89 pub async fn close(&self) -> anyhow::Result<()> {
91 let (tx, rx) = oneshot::channel();
92
93 self.tx
94 .send(Message::Close { tx })
95 .await
96 .ok()
97 .context("task is gone")?;
98
99 rx.await.context("task crashed")
100 }
101
102 pub async fn join(&self) -> anyhow::Result<()> {
107 let handle = self.handle.lock().take().context("missing handle")?;
108 handle.await.context("task panicked")
109 }
110
111 pub async fn shutdown(&self) -> anyhow::Result<()> {
116 self.close().await.context("failed to send close request")?;
117 self.join().await.context("failed to join task")?;
118 Ok(())
119 }
120}
121
122async fn encoder_task_impl(mut rx: tokio::sync::mpsc::Receiver<Message>) {
124 while let Some(msg) = rx.recv().await {
125 match msg {
126 Message::Close { tx } => {
127 rx.close();
128
129 let _ = tx.send(()).is_ok();
131 }
132 Message::Encode { mut builder, tx } => {
133 let maybe_stream = builder.spawn().context("failed to spawn FFMpeg");
134
135 match maybe_stream {
136 Ok(mut stream) => {
137 let (event_tx, event_rx) = tokio::sync::mpsc::channel(128);
138
139 let _ = tx.send(Ok(event_rx)).is_ok();
141
142 while let Some(event) = stream.next().await {
145 let _ = event_tx.send(event).await.is_ok();
147 }
148 }
149 Err(e) => {
150 let _ = tx.send(Err(e)).is_ok();
152 }
153 }
154 }
155 Message::GetEncoders { validate, tx } => {
156 let result = async {
157 let raw_encoders = tokio_ffmpeg_cli::get_encoders()
159 .await
160 .context("failed to get ffmpeg encoders")?;
161
162 if validate {
163 let mut encoders = Vec::with_capacity(raw_encoders.len());
165
166 for encoder in raw_encoders
172 .into_iter()
173 .filter(|encoder| encoder.is_video())
174 .filter(|encoder| encoder.name.contains("264"))
175 {
176 let status = tokio_ffmpeg_cli::Builder::new()
178 .input("nullsrc")
179 .input_format("lavfi")
180 .output("-")
181 .output_format("null")
182 .video_codec(&*encoder.name)
183 .video_frames(1_u64)
184 .ffmpeg_output()
185 .await?
186 .status;
187
188 if status.success() {
190 encoders.push(encoder);
191 } else {
192 info!("skipping \"{}\" as it failed a sanity check", encoder.name);
193 }
194 }
195
196 Ok(encoders)
197 } else {
198 Ok(raw_encoders)
200 }
201 }
202 .await;
203
204 let _ = tx.send(result).is_ok();
207 }
208 }
209 }
210}
211
212impl Default for EncoderTask {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218#[derive(Debug)]
220pub struct EncoderTaskEncodeBuilder<'a> {
221 builder: Box<tokio_ffmpeg_cli::Builder>,
222
223 task: &'a EncoderTask,
224}
225
226impl<'a> EncoderTaskEncodeBuilder<'a> {
227 pub fn new(task: &'a EncoderTask) -> Self {
229 Self {
230 builder: Box::new(tokio_ffmpeg_cli::Builder::new()),
231 task,
232 }
233 }
234
235 pub fn input(&mut self, input: impl Into<OsString>) -> &mut Self {
237 self.builder.input(input);
238 self
239 }
240
241 pub fn output(&mut self, output: impl Into<OsString>) -> &mut Self {
243 self.builder.output(output);
244 self
245 }
246
247 pub fn audio_codec(&mut self, audio_codec: impl Into<String>) -> &mut Self {
249 self.builder.audio_codec(audio_codec);
250 self
251 }
252
253 pub fn video_codec(&mut self, video_codec: impl Into<String>) -> &mut Self {
255 self.builder.video_codec(video_codec);
256 self
257 }
258
259 pub fn video_bitrate(&mut self, video_bitrate: impl Into<String>) -> &mut Self {
261 self.builder.video_bitrate(video_bitrate);
262 self
263 }
264
265 pub fn input_format(&mut self, input_format: impl Into<String>) -> &mut Self {
267 self.builder.input_format(input_format);
268 self
269 }
270
271 pub fn output_format(&mut self, output_format: impl Into<String>) -> &mut Self {
273 self.builder.output_format(output_format);
274 self
275 }
276
277 pub fn video_frames(&mut self, video_frames: impl Into<u64>) -> &mut Self {
279 self.builder.video_frames(video_frames);
280 self
281 }
282
283 pub fn video_profile(&mut self, video_profile: impl Into<String>) -> &mut Self {
285 self.builder.video_profile(video_profile);
286 self
287 }
288
289 pub fn preset(&mut self, preset: impl Into<String>) -> &mut Self {
291 self.builder.preset(preset);
292 self
293 }
294
295 pub async fn try_send(
297 &self,
298 ) -> anyhow::Result<impl Stream<Item = Result<tokio_ffmpeg_cli::Event, tokio_ffmpeg_cli::Error>>>
299 {
300 let (tx, rx) = oneshot::channel();
301 self.task
302 .tx
303 .try_send(Message::Encode {
304 builder: self.builder.clone(),
305 tx,
306 })
307 .ok()
308 .context("failed to send message")?;
309
310 rx.await
311 .context("encode task crashed")?
312 .map(ReceiverStream::new)
313 }
314}