use crate::{
progress_event::ProgressEventLineBuilder,
Error,
Event,
};
use futures::future::FutureExt;
use once_cell::sync::Lazy;
use regex::Regex;
use std::{
ffi::OsString,
process::Stdio,
};
use tokio::io::{
AsyncBufReadExt,
BufReader,
};
use tokio_stream::{
wrappers::LinesStream,
Stream,
StreamExt,
};
use tracing::trace;
static FILE_EXISTS_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("File '.*' already exists\\. Exiting\\.")
.expect("failed to compile FILE_EXISTS_REGEX")
});
#[derive(Debug, Clone)]
pub struct Builder {
pub audio_codec: Option<String>,
pub video_codec: Option<String>,
pub video_bitrate: Option<String>,
pub input: Option<OsString>,
pub output: Option<OsString>,
pub input_format: Option<String>,
pub output_format: Option<String>,
pub pass: Option<u8>,
pub video_frames: Option<u64>,
pub video_profile: Option<String>,
pub preset: Option<String>,
pub overwrite: bool,
}
impl Builder {
pub fn new() -> Self {
Self {
audio_codec: None,
video_codec: None,
video_bitrate: None,
input: None,
output: None,
input_format: None,
output_format: None,
pass: None,
video_frames: None,
video_profile: None,
preset: None,
overwrite: false,
}
}
pub fn audio_codec(&mut self, audio_codec: impl Into<String>) -> &mut Self {
self.audio_codec = Some(audio_codec.into());
self
}
pub fn video_codec(&mut self, video_codec: impl Into<String>) -> &mut Self {
self.video_codec = Some(video_codec.into());
self
}
pub fn video_bitrate(&mut self, video_bitrate: impl Into<String>) -> &mut Self {
self.video_bitrate = Some(video_bitrate.into());
self
}
pub fn input(&mut self, input: impl Into<OsString>) -> &mut Self {
self.input = Some(input.into());
self
}
pub fn output(&mut self, output: impl Into<OsString>) -> &mut Self {
self.output = Some(output.into());
self
}
pub fn input_format(&mut self, input_format: impl Into<String>) -> &mut Self {
self.input_format = Some(input_format.into());
self
}
pub fn output_format(&mut self, output_format: impl Into<String>) -> &mut Self {
self.output_format = Some(output_format.into());
self
}
pub fn pass(&mut self, pass: u8) -> &mut Self {
self.pass = Some(pass);
self
}
pub fn video_frames(&mut self, video_frames: impl Into<u64>) -> &mut Self {
self.video_frames = Some(video_frames.into());
self
}
pub fn video_profile(&mut self, video_profile: impl Into<String>) -> &mut Self {
self.video_profile = Some(video_profile.into());
self
}
pub fn preset(&mut self, preset: impl Into<String>) -> &mut Self {
self.preset = Some(preset.into());
self
}
pub fn overwrite(&mut self, overwrite: bool) -> &mut Self {
self.overwrite = overwrite;
self
}
fn build_command(&mut self) -> Result<tokio::process::Command, Error> {
let audio_codec = self.audio_codec.take();
let video_codec = self.video_codec.take();
let video_bitrate = self.video_bitrate.take();
let input = self.input.take();
let output = self.output.take();
let input_format = self.input_format.take();
let output_format = self.output_format.take();
let pass = self.pass.take();
let video_frames = self.video_frames.take();
let video_profile = self.video_profile.take();
let preset = self.preset.take();
let overwrite = std::mem::take(&mut self.overwrite);
let mut command = tokio::process::Command::new("ffmpeg");
command.arg("-hide_banner");
command.arg("-nostdin");
if let Some(input_format) = input_format.as_deref() {
command.args(["-f", input_format]);
}
let input = input.ok_or(Error::MissingInput)?;
command.args(["-i".as_ref(), input.as_os_str()]);
if let Some(video_frames) = video_frames {
command.args(["-frames:v", &video_frames.to_string()]);
}
if let Some(audio_codec) = audio_codec.as_deref() {
command.args(["-codec:a", audio_codec]);
}
if let Some(video_codec) = video_codec.as_deref() {
command.args(["-codec:v", video_codec]);
}
if let Some(video_bitrate) = video_bitrate.as_deref() {
command.args(["-b:v", video_bitrate]);
}
if let Some(video_profile) = video_profile.as_deref() {
command.args(["-profile:v", video_profile]);
}
if let Some(preset) = preset.as_deref() {
command.args(["-preset", preset]);
}
if let Some(pass) = pass {
command.args(["-pass", &pass.to_string()]);
}
command.args(["-progress", "-"]);
command.arg(if overwrite { "-y" } else { "-n" });
if let Some(output_format) = output_format.as_deref() {
command.args(["-f", output_format]);
}
let output = output.ok_or(Error::MissingOutput)?;
command.arg(output.as_os_str());
Ok(command)
}
pub async fn ffmpeg_status(&mut self) -> Result<std::process::ExitStatus, Error> {
self.build_command()?.status().await.map_err(Error::Io)
}
pub async fn ffmpeg_output(&mut self) -> Result<std::process::Output, Error> {
self.build_command()?.output().await.map_err(Error::Io)
}
pub fn spawn(&mut self) -> Result<impl Stream<Item = Result<Event, Error>> + Unpin, Error> {
let mut command = self.build_command()?;
command
.kill_on_drop(true)
.stdout(Stdio::piped())
.stdin(Stdio::null())
.stderr(Stdio::piped());
trace!("built ffmpeg command for spawning: {:?}", command);
let mut child = command.spawn().map_err(Error::ProcessSpawn)?;
let stdout = child.stdout.take().expect("missing stdout");
let stdout_buf_reader = BufReader::new(stdout);
let stdout_stream = LinesStream::new(stdout_buf_reader.lines());
let stderr = child.stderr.take().expect("missing stderr");
let stderr_buf_reader = BufReader::new(stderr);
let stderr_stream = LinesStream::new(stderr_buf_reader.lines());
let exit_status_stream = Box::pin(async move { child.wait().await })
.into_stream()
.map(|maybe_exit_status| maybe_exit_status.map(Event::ExitStatus).map_err(Error::Io));
let mut builder = ProgressEventLineBuilder::new();
let stdout_event_stream = stdout_stream.filter_map(move |maybe_line| {
let maybe_event = maybe_line
.map(|line| {
builder
.push(&line)
.transpose()
.map(|e| e.map(Event::Progress).map_err(From::from))
})
.transpose()?;
Some(maybe_event.unwrap_or_else(|e| Err(Error::Io(e))))
});
let stderr_event_stream = stderr_stream.map(|maybe_line| {
let line = maybe_line.map_err(Error::Io)?;
if FILE_EXISTS_REGEX.is_match(&line) {
Err(Error::OutputAlreadyExists)
} else {
Ok(Event::Unknown(line))
}
});
Ok(stdout_event_stream
.merge(stderr_event_stream)
.chain(exit_status_stream))
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}