tokio_ffmpeg_cli/
builder.rs1use crate::{
2 progress_event::ProgressEventLineBuilder,
3 Error,
4 Event,
5};
6use futures::future::FutureExt;
7use once_cell::sync::Lazy;
8use regex::Regex;
9use std::{
10 ffi::OsString,
11 process::Stdio,
12};
13use tokio::io::{
14 AsyncBufReadExt,
15 BufReader,
16};
17use tokio_stream::{
18 wrappers::LinesStream,
19 Stream,
20 StreamExt,
21};
22use tracing::trace;
23
24static FILE_EXISTS_REGEX: Lazy<Regex> = Lazy::new(|| {
26 Regex::new("File '.*' already exists\\. Exiting\\.")
27 .expect("failed to compile FILE_EXISTS_REGEX")
28});
29
30#[derive(Debug, Clone)]
32pub struct Builder {
33 pub audio_codec: Option<String>,
35
36 pub video_codec: Option<String>,
38
39 pub video_bitrate: Option<String>,
41
42 pub input: Option<OsString>,
44
45 pub output: Option<OsString>,
47
48 pub input_format: Option<String>,
50
51 pub output_format: Option<String>,
53
54 pub pass: Option<u8>,
56
57 pub video_frames: Option<u64>,
59
60 pub video_profile: Option<String>,
62
63 pub preset: Option<String>,
65
66 pub overwrite: bool,
68}
69
70impl Builder {
71 pub fn new() -> Self {
73 Self {
74 audio_codec: None,
75 video_codec: None,
76
77 video_bitrate: None,
78
79 input: None,
80 output: None,
81
82 input_format: None,
83 output_format: None,
84
85 pass: None,
86
87 video_frames: None,
88
89 video_profile: None,
90
91 preset: None,
92
93 overwrite: false,
94 }
95 }
96
97 pub fn audio_codec(&mut self, audio_codec: impl Into<String>) -> &mut Self {
99 self.audio_codec = Some(audio_codec.into());
100 self
101 }
102
103 pub fn video_codec(&mut self, video_codec: impl Into<String>) -> &mut Self {
105 self.video_codec = Some(video_codec.into());
106 self
107 }
108
109 pub fn video_bitrate(&mut self, video_bitrate: impl Into<String>) -> &mut Self {
111 self.video_bitrate = Some(video_bitrate.into());
112 self
113 }
114
115 pub fn input(&mut self, input: impl Into<OsString>) -> &mut Self {
117 self.input = Some(input.into());
118 self
119 }
120
121 pub fn output(&mut self, output: impl Into<OsString>) -> &mut Self {
123 self.output = Some(output.into());
124 self
125 }
126
127 pub fn input_format(&mut self, input_format: impl Into<String>) -> &mut Self {
129 self.input_format = Some(input_format.into());
130 self
131 }
132
133 pub fn output_format(&mut self, output_format: impl Into<String>) -> &mut Self {
135 self.output_format = Some(output_format.into());
136 self
137 }
138
139 pub fn pass(&mut self, pass: u8) -> &mut Self {
141 self.pass = Some(pass);
142 self
143 }
144
145 pub fn video_frames(&mut self, video_frames: impl Into<u64>) -> &mut Self {
147 self.video_frames = Some(video_frames.into());
148 self
149 }
150
151 pub fn video_profile(&mut self, video_profile: impl Into<String>) -> &mut Self {
153 self.video_profile = Some(video_profile.into());
154 self
155 }
156
157 pub fn preset(&mut self, preset: impl Into<String>) -> &mut Self {
159 self.preset = Some(preset.into());
160 self
161 }
162
163 pub fn overwrite(&mut self, overwrite: bool) -> &mut Self {
165 self.overwrite = overwrite;
166 self
167 }
168
169 fn build_command(&mut self) -> Result<tokio::process::Command, Error> {
171 let audio_codec = self.audio_codec.take();
176
177 let video_codec = self.video_codec.take();
178 let video_bitrate = self.video_bitrate.take();
179
180 let input = self.input.take();
181 let output = self.output.take();
182
183 let input_format = self.input_format.take();
184 let output_format = self.output_format.take();
185
186 let pass = self.pass.take();
187
188 let video_frames = self.video_frames.take();
189
190 let video_profile = self.video_profile.take();
191
192 let preset = self.preset.take();
193
194 let overwrite = std::mem::take(&mut self.overwrite);
195
196 let mut command = tokio::process::Command::new("ffmpeg");
197 command.arg("-hide_banner");
198 command.arg("-nostdin");
199
200 if let Some(input_format) = input_format.as_deref() {
201 command.args(["-f", input_format]);
202 }
203
204 let input = input.ok_or(Error::MissingInput)?;
205 command.args(["-i".as_ref(), input.as_os_str()]);
206
207 if let Some(video_frames) = video_frames {
208 command.args(["-frames:v", &video_frames.to_string()]);
210 }
211
212 if let Some(audio_codec) = audio_codec.as_deref() {
213 command.args(["-codec:a", audio_codec]);
214 }
215
216 if let Some(video_codec) = video_codec.as_deref() {
217 command.args(["-codec:v", video_codec]);
218 }
219
220 if let Some(video_bitrate) = video_bitrate.as_deref() {
221 command.args(["-b:v", video_bitrate]);
222 }
223
224 if let Some(video_profile) = video_profile.as_deref() {
225 command.args(["-profile:v", video_profile]);
226 }
227
228 if let Some(preset) = preset.as_deref() {
229 command.args(["-preset", preset]);
230 }
231
232 if let Some(pass) = pass {
233 command.args(["-pass", &pass.to_string()]);
234 }
235
236 command.args(["-progress", "-"]);
237 command.arg(if overwrite { "-y" } else { "-n" });
238
239 if let Some(output_format) = output_format.as_deref() {
240 command.args(["-f", output_format]);
241 }
242
243 let output = output.ok_or(Error::MissingOutput)?;
244 command.arg(output.as_os_str());
245
246 Ok(command)
247 }
248
249 pub async fn ffmpeg_status(&mut self) -> Result<std::process::ExitStatus, Error> {
253 self.build_command()?.status().await.map_err(Error::Io)
254 }
255
256 pub async fn ffmpeg_output(&mut self) -> Result<std::process::Output, Error> {
260 self.build_command()?.output().await.map_err(Error::Io)
261 }
262
263 pub fn spawn(&mut self) -> Result<impl Stream<Item = Result<Event, Error>> + Unpin, Error> {
265 let mut command = self.build_command()?;
266 command
267 .kill_on_drop(true)
268 .stdout(Stdio::piped())
269 .stdin(Stdio::null())
270 .stderr(Stdio::piped());
271
272 trace!("built ffmpeg command for spawning: {:?}", command);
273
274 let mut child = command.spawn().map_err(Error::ProcessSpawn)?;
275
276 let stdout = child.stdout.take().expect("missing stdout");
278 let stdout_buf_reader = BufReader::new(stdout);
279 let stdout_stream = LinesStream::new(stdout_buf_reader.lines());
280
281 let stderr = child.stderr.take().expect("missing stderr");
283 let stderr_buf_reader = BufReader::new(stderr);
284 let stderr_stream = LinesStream::new(stderr_buf_reader.lines());
285
286 let exit_status_stream = Box::pin(async move { child.wait().await })
288 .into_stream()
289 .map(|maybe_exit_status| maybe_exit_status.map(Event::ExitStatus).map_err(Error::Io));
290
291 let mut builder = ProgressEventLineBuilder::new();
293 let stdout_event_stream = stdout_stream.filter_map(move |maybe_line| {
294 let maybe_event = maybe_line
295 .map(|line| {
296 builder
297 .push(&line)
298 .transpose()
299 .map(|e| e.map(Event::Progress).map_err(From::from))
300 })
301 .transpose()?;
302
303 Some(maybe_event.unwrap_or_else(|e| Err(Error::Io(e))))
304 });
305
306 let stderr_event_stream = stderr_stream.map(|maybe_line| {
308 let line = maybe_line.map_err(Error::Io)?;
309
310 if FILE_EXISTS_REGEX.is_match(&line) {
311 Err(Error::OutputAlreadyExists)
312 } else {
313 Ok(Event::Unknown(line))
314 }
315 });
316
317 Ok(stdout_event_stream
318 .merge(stderr_event_stream)
319 .chain(exit_status_stream))
320 }
321}
322
323impl Default for Builder {
324 fn default() -> Self {
325 Self::new()
326 }
327}