diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 93fb1fbac712f4ca041e9ac4347ebce823c01457..273e8158bb992a69898c8b15b70210d305232415 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -28,6 +28,7 @@ dependencies = [ name = "amalib" version = "0.1.0" dependencies = [ + "async-trait", "commons", "getset", "hashbrown 0.13.2", @@ -43,6 +44,17 @@ version = "1.0.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61" +[[package]] +name = "async-trait" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff18d764974428cf3a9328e23fc5c986f5fbed46e6cd4cdf42544df5d297ec1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index faa282f7895534a7a5234f04d95b9298c35aea17..af6306cbfce869444440b3bea2803aa94974d768 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -30,6 +30,7 @@ license = "MIT" libc = "0.2" lazy_static = "^1" thiserror = "^1" +async-trait = "^0.1" hashbrown = { version = "^0.13", features = ["serde"] } smallvec = { version = "^1", default-features = false, features = [ diff --git a/src/rust/amalib/Cargo.toml b/src/rust/amalib/Cargo.toml index bbd3b6c18114a3c7a2b0b605c129fe7bfc7f7a45..e54b728ff75a538a5ed6c4e58d577f9462e6f9f2 100644 --- a/src/rust/amalib/Cargo.toml +++ b/src/rust/amalib/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true serde.workspace = true tokio.workspace = true hashbrown.workspace = true +async-trait.workspace = true smallstring = { path = "../smallstring" } commons = { path = "../commons" } diff --git a/src/rust/amalib/src/amadeus.rs b/src/rust/amalib/src/amadeus.rs new file mode 100644 index 0000000000000000000000000000000000000000..bbb2448e67586a49b4a975784695647ef6bc1030 --- /dev/null +++ b/src/rust/amalib/src/amadeus.rs @@ -0,0 +1,119 @@ +//! Contains the amadeus client, has the caching and the overrides. + +use tokio::net::TcpStream; + +use crate::*; +use std::{ + net::SocketAddr, + path::{Path, PathBuf}, +}; + +declare_err_type!(pub AmaClientError { + #[error("tcp client creation error")] + TCPClientCreateError, + + #[error("unix socket client creation error")] + UNIXClientCreateError, + + #[error("tcp client error on {0} -> {1}")] + TCPClientError(SocketAddr, SocketAddr), + + #[error("unix socket client error on {0} -> {1}")] + UNIXClientError(SocketAddr, SocketAddr), + + #[error("not implemented")] + NotImplemented, +}); + +/// The [AmaClient] is just a [LektorConnexion] with a cache behind it to make +/// queries all the time. +#[allow(dead_code)] +pub struct AmaClient { + connexion: Box<dyn LektorStream>, + cache: AmaDB, +} + +/// A builder for the [AmaClient] +pub struct AmaClientBuilder { + address: Option<SmallString>, + socket: Option<PathBuf>, + port: Option<i16>, + cache: Option<PathBuf>, + sockty: LektorSocketType, +} + +impl AmaClient { + pub fn builder() -> AmaClientBuilder { + AmaClientBuilder::default() + } +} + +impl Default for AmaClientBuilder { + fn default() -> Self { + Self { + port: Default::default(), + cache: Default::default(), + socket: Default::default(), + address: Default::default(), + sockty: LektorSocketType::TCP, + } + } +} + +impl AmaClientBuilder { + pub fn unix_socket(mut self, path: impl AsRef<Path>) -> Self { + self.socket = Some(path.as_ref().into()); + self.sockty = LektorSocketType::UNIX; + self + } + + pub fn tcp_socket(mut self, address: impl AsRef<str>, port: i16) -> Self { + self.address = Some(address.as_ref().into()); + self.port = Some(port); + self.sockty = LektorSocketType::TCP; + self + } + + pub fn load_cache(mut self, path: impl AsRef<Path>) -> Self { + self.cache = Some(path.as_ref().into()); + self + } + + pub async fn build(self) -> StackedResult<AmaClient, AmaClientError> { + if let Some(cache) = self.cache { + log::warn!(target: "INIT", "the path to the cache dump file `{}` is ignored for now...", cache.to_string_lossy()) + } + + let cache = AmaDB::new(); + + let connexion = Box::new(match self.sockty { + LektorSocketType::TCP => { + let (Some(hostname), Some(port)) = (self.address, self.port) else { + unreachable!("the state is invalid, with TCP socket we should have a hostname and a port") + }; + let address = format!("{hostname}:{port}"); + let connexion = LektorConnexion::<TcpStream>::new(address); + err_ctx!(connexion.await => AmaClientError::TCPClientCreateError) + } + + #[cfg(unix)] + LektorSocketType::UNIX => { + let Some(path) = self.socket else { + unreachable!("the state is invalid, with UNIX socket we should have a path") + }; + let address = path + .canonicalize() + .map_err(|e| { + err_report!(AmaClientError::UNIXClientCreateError => [ + format!("path has no canonical name: {e}") + ]) + })? + .to_string_lossy(); + let connexion = LektorConnexion::<UnixStream, String>::new(address); + err_ctx!(connexion.await => AmaClientError::UNIXClientCreateError) + } + }?); + + Ok(AmaClient { connexion, cache }) + } +} diff --git a/src/rust/amalib/src/connexion.rs b/src/rust/amalib/src/connexion.rs deleted file mode 100644 index e202f2856a5054a352fe028ab15c1720825139c1..0000000000000000000000000000000000000000 --- a/src/rust/amalib/src/connexion.rs +++ /dev/null @@ -1,272 +0,0 @@ -//! Connexion to the lektord server. - -use crate::*; -use tokio::net::TcpStream; - -pub struct LektorConnexion { - version: String, - stream: TcpStream, -} - -pub struct LektorIdleConnexion { - version: String, - stream: TcpStream, - idle_list: Vec<String>, -} - -/// Write a string into the socket. The buffer being send must be valid, -/// i.e. must not exed the [constants::LKT_MESSAGE_MAX] counting the -/// ending new line `\n`. -async fn write_string( - stream: &mut TcpStream, - buffer: impl AsRef<str>, -) -> StackedResult<(), LektorCommError> { - use LektorCommError::*; - let buffer = buffer.as_ref(); - let size = buffer.len(); - if size >= constants::LKT_MESSAGE_MAX { - err_report!(err BufferTooBig(size)) - } else if !buffer.ends_with('\n') { - err_report!(err BufferDontEndWithLF) - } else { - loop { - trace!("try to write {} bytes...", buffer.len()); - stream.writable().await.map_err(Io)?; - match stream.try_write(buffer.as_bytes()) { - Ok(n) if n != size => break err_report!(err IncorrectWriteSize(size, n)), - Ok(_) => break Ok(()), - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - Err(e) => break err_report!(err Io(e)), - } - } - } -} - -/// Read replies from the lektord server. If the `exit_on_would_block` is set, -/// and if nothing is available returns nothing! -async fn read_replies( - stream: &mut TcpStream, - exit_on_would_block: bool, -) -> StackedResult<(Vec<String>, Option<usize>), LektorCommError> { - let (mut ret, mut continuation) = (Vec::new(), None); - let peer_addr = stream.peer_addr().expect("failed to get peer address"); - loop { - trace!("try to read from remote..."); - stream.readable().await.map_err(|e| { - err_report!(LektorCommError::Io(e) => [ - format!("failed to read from {peer_addr}") ]) - })?; - let mut line = [0; constants::LKT_MESSAGE_MAX]; - match stream.try_read(&mut line) { - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - either!(exit_on_would_block => return Ok((vec![], None)); continue) - } - Err(e) => return err_report!(err LektorCommError::Io(e)), - Ok(0) => return Ok((ret, continuation)), - Ok(size) => { - let lines = std::str::from_utf8(&line[..size]).map_err(|e| - err_report!(LektorCommError::Utf8(e) => [ - format!("the lektord server at {peer_addr} returned an invalid utf8 string") - ]) - )?.trim().lines().map(|line| line.trim()); - for msg in lines { - trace!("got line: {msg}"); - match LektorQueryLineType::from_str(msg) { - Ok(LektorQueryLineType::Ok) => return Ok((ret, continuation)), - Ok(LektorQueryLineType::Ack(msg)) => { - return err_report!(err LektorCommError::MpdAck => [ msg ]) - } - Ok(LektorQueryLineType::Data) => ret.push(msg.to_string()), - Ok(LektorQueryLineType::ListOk) => continue, - Ok(LektorQueryLineType::Continuation(cont)) => continuation = Some(cont), - Err(()) => { - return err_report!(err LektorCommError::QueryError => [ "unknown query line type" ]) - } - } - } - } - } - } -} - -declare_err_type!(pub LektorCommError { - #[error("lektord server didn't return OK code")] - MpdAck, - - #[error("got an io error: {0}")] - Io(std::io::Error), - - #[error("invalid utf8 string found: {0}")] - Utf8(std::str::Utf8Error), - - #[error("invalid buffer of length {0}: too big")] - BufferTooBig(usize), - - #[error("invalid buffer: don't end with LF `\\n`")] - BufferDontEndWithLF, - - #[error("failed to write all the buffer of size {0}, only wrote {1} bytes")] - IncorrectWriteSize(usize, usize), - - #[error("query failed validation test")] - InvalidQuery, - - #[error("failed to convert the unformated response into a formated one")] - ToFormatedResponse, - - #[error("the lektord server returned an error with the sent query(ies)")] - QueryError, -}); - -impl LektorConnexion { - /// Create a new connexion to a lektord server. If the versions mismatch we - /// log an error but continue... - pub async fn new(hostname: impl AsRef<str>, port: i16) -> StackedResult<Self, LektorCommError> { - let addr = format!("{}:{port}", hostname.as_ref()); - let stream: TcpStream = TcpStream::connect(&addr).await.map_err( - |e| err_report!(LektorCommError::Io(e) => [ format!("faild to connect to {addr}") ]), - )?; - info!( - "connected to {addr} from local address {:?}", - stream.local_addr().map_err( - |e| err_report!(LektorCommError::Io(e) => [ format!("faild to connect to {addr}") ]) - )? - ); - - let version: String = loop { - stream.readable().await.map_err(|e| err_report!(LektorCommError::Io(e) => [ format!("failed to get MPD version from {addr}") ]))?; - let mut version = [0; constants::LKT_MESSAGE_MAX]; - match stream.try_read(&mut version) { - Ok(n) => { - break std::str::from_utf8(&version[..n]) - .map_err(|e| err_report!(LektorCommError::Utf8(e) => [ format!("failed to get MPD version from {addr}") ]))? - .trim() - .to_string() - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, - Err(e) => return err_report!(err LektorCommError::Io(e)), - } - }; - - either!(version == format!("OK MPD {}", constants::MPD_VERSION) - => info!("connecting to lektord with MPD version {version}") - ; error!("got MPD version {version} from {addr}, but amalib is compatible with {}", constants::MPD_VERSION) - ); - - Ok(Self { version, stream }) - } - - pub fn version(&self) -> &str { - &self.version - } - - /// Send a query to the lektord server. - pub async fn send( - &mut self, - query: LektorQuery, - ) -> StackedResult<LektorResponse, LektorCommError> { - debug!( - "send query to server {}: {query:?}", - self.stream - .peer_addr() - .expect("failed to get remote address") - ); - let mut res: Vec<String> = Vec::new(); - query - .verify() - .map_err(|e| err_report!(LektorCommError::InvalidQuery => [ e ]))?; - let builder = query.get_response_type(); - - self.send_query_inner(query, &mut res).await?; - let formated = LektorFormatedResponse::try_from(res) - .map_err(|e| err_report!(LektorCommError::QueryError => [ e ]))?; - builder(formated).map_err(|e| err_report!(LektorCommError::ToFormatedResponse => [ e ])) - } - - /// The inner helper function to send queries to the lektord server. - async fn send_query_inner( - &mut self, - query: LektorQuery, - previous_ret: &mut Vec<String>, - ) -> StackedResult<(), LektorCommError> { - write_string(&mut self.stream, query.format_query()).await?; - loop { - match read_replies(&mut self.stream, false).await { - Err(e) => return Err(e), - Ok((res, _)) if res.is_empty() => return Ok(()), - Ok((res, None)) => { - trace!("got no continuation and return: {res:?}"); - previous_ret.extend(res); - return Ok(()); - } - Ok((res, Some(cont))) => { - trace!("got continuation {cont} and return: {res:?}"); - previous_ret.extend(res); - let query = LektorQuery::create_continuation(query.clone(), cont); - write_string(&mut self.stream, query.format_query()).await?; - } - } - } - } -} - -impl LektorIdleConnexion { - pub async fn idle( - connexion: LektorConnexion, - idle_list: impl IntoIterator<Item = LektorIdleNotification>, - ) -> StackedResult<Self, LektorCommError> { - let idle_list: Vec<_> = idle_list - .into_iter() - .map(|notification| format!("{notification}")) - .collect(); - let idle_list_buffer = format!("idle {}\n", idle_list.join(" ")); - let LektorConnexion { - version, - mut stream, - } = connexion; - - write_string(&mut stream, idle_list_buffer).await?; - - let (mut reply, _) = read_replies(&mut stream, false).await?; - either!(reply.is_empty() - => Ok(Self { version, stream, idle_list }) - ; { - reply.iter_mut().for_each(|msg| msg.insert_str(0, " - ")); - reply.insert(0, "failed to idle the connexion, got the following messages from the server:".to_string()); - err_report!(err LektorCommError::MpdAck => reply) - } - ) - } - - pub async fn get_notifications(&mut self) -> Vec<LektorIdleNotification> { - match read_replies(&mut self.stream, true).await { - Ok((notifications, _)) => notifications - .iter() - .filter_map(|msg| msg.strip_prefix("changed: ")) - .flat_map(|notifications| notifications.split(&[',', ' '][..])) - .filter_map( - |notification| match LektorIdleNotification::try_from(notification) { - Ok(notification) => Some(notification), - Err(()) => { - error!("the notification `{notification}` is invalid..."); - None - } - }, - ) - .collect(), - Err(e) => { - let e = err_attach!(err_ctx!(e => LektorCommError::MpdAck) => [ "failed to read idle replies" ]); - error!("{e}"); - vec![] - } - } - } - - pub fn version(&self) -> &str { - &self.version - } - - pub fn idle_list(&self) -> &[String] { - &self.idle_list - } -} diff --git a/src/rust/amalib/src/connexion/helpers.rs b/src/rust/amalib/src/connexion/helpers.rs new file mode 100644 index 0000000000000000000000000000000000000000..eea9ec5f9d838bcb45decb29d590a51e307810a8 --- /dev/null +++ b/src/rust/amalib/src/connexion/helpers.rs @@ -0,0 +1,125 @@ +use super::*; + +pub(super) async fn send<Stream>( + stream: &mut Stream, + query: LektorQuery, +) -> StackedResult<LektorResponse, LektorCommError> +where + Stream: AsyncRead + AsyncWrite + StreamReadWrite, +{ + debug!( + "send query to server {}: {query:?}", + stream.peer_addr().expect("failed to get remote address") + ); + let mut ret: Vec<String> = Vec::new(); + query + .verify() + .map_err(|e| err_report!(LektorCommError::InvalidQuery => [ e ]))?; + let builder = query.get_response_type(); + + // self.send_query_inner(query, &mut res).await?; + + write_string(stream, query.format_query()).await?; + loop { + match read_replies(stream, false).await { + Err(e) => return Err(e), + Ok((res, _)) if res.is_empty() => break, + Ok((res, None)) => { + trace!("got no continuation and return: {res:?}"); + ret.extend(res); + break; + } + Ok((res, Some(cont))) => { + trace!("got continuation {cont} and return: {res:?}"); + ret.extend(res); + let query = LektorQuery::create_continuation(query.clone(), cont); + write_string(stream, query.format_query()).await?; + } + } + } + + let formated = LektorFormatedResponse::try_from(ret) + .map_err(|e| err_report!(LektorCommError::QueryError => [ e ]))?; + builder(formated).map_err(|e| err_report!(LektorCommError::ToFormatedResponse => [ e ])) +} + +/// Write a string into the socket. The buffer being send must be valid, +/// i.e. must not exed the [constants::LKT_MESSAGE_MAX] counting the +/// ending new line `\n`. +pub(super) async fn write_string<Stream>( + stream: &mut Stream, + buffer: impl AsRef<str>, +) -> StackedResult<(), LektorCommError> +where + Stream: AsyncRead + AsyncWrite + StreamReadWrite, +{ + use LektorCommError::*; + let buffer = buffer.as_ref(); + let size = buffer.len(); + if size >= constants::LKT_MESSAGE_MAX { + err_report!(err BufferTooBig(size)) + } else if !buffer.ends_with('\n') { + err_report!(err BufferDontEndWithLF) + } else { + loop { + trace!("try to write {} bytes...", buffer.len()); + stream.writable().await.map_err(Io)?; + match stream.try_write(buffer.as_bytes()) { + Ok(n) if n != size => break err_report!(err IncorrectWriteSize(size, n)), + Ok(_) => break Ok(()), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => break err_report!(err Io(e)), + } + } + } +} + +/// Read replies from the lektord server. If the `exit_on_would_block` is set, +/// and if nothing is available returns nothing! +pub(super) async fn read_replies<Stream>( + stream: &mut Stream, + exit_on_would_block: bool, +) -> StackedResult<(Vec<String>, Option<usize>), LektorCommError> +where + Stream: AsyncRead + AsyncWrite + StreamReadWrite, +{ + let (mut ret, mut continuation) = (Vec::new(), None); + let peer_addr = stream.peer_addr().expect("failed to get peer address"); + loop { + trace!("try to read from remote..."); + stream.readable().await.map_err(|e| { + err_report!(LektorCommError::Io(e) => [ + format!("failed to read from {peer_addr}") ]) + })?; + let mut line = [0; constants::LKT_MESSAGE_MAX]; + match stream.try_read(&mut line) { + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + either!(exit_on_would_block => return Ok((vec![], None)); continue) + } + Err(e) => return err_report!(err LektorCommError::Io(e)), + Ok(0) => return Ok((ret, continuation)), + Ok(size) => { + let lines = std::str::from_utf8(&line[..size]).map_err(|e| + err_report!(LektorCommError::Utf8(e) => [ + format!("the lektord server at {peer_addr} returned an invalid utf8 string") + ]) + )?.trim().lines().map(|line| line.trim()); + for msg in lines { + trace!("got line: {msg}"); + match LektorQueryLineType::from_str(msg) { + Ok(LektorQueryLineType::Ok) => return Ok((ret, continuation)), + Ok(LektorQueryLineType::Ack(msg)) => { + return err_report!(err LektorCommError::MpdAck => [ msg ]) + } + Ok(LektorQueryLineType::Data) => ret.push(msg.to_string()), + Ok(LektorQueryLineType::ListOk) => continue, + Ok(LektorQueryLineType::Continuation(cont)) => continuation = Some(cont), + Err(()) => { + return err_report!(err LektorCommError::QueryError => [ "unknown query line type" ]) + } + } + } + } + } + } +} diff --git a/src/rust/amalib/src/connexion/mod.rs b/src/rust/amalib/src/connexion/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..a7c2ec634b885260a680eb83c06605e01250faa5 --- /dev/null +++ b/src/rust/amalib/src/connexion/mod.rs @@ -0,0 +1,175 @@ +//! Connexion to the lektord server. + +mod helpers; +mod traits; + +use self::{helpers::*, traits::*}; +use crate::*; +use tokio::io::{AsyncRead, AsyncWrite}; + +pub use traits::LektorStream; + +declare_err_type!(pub LektorCommError { + #[error("lektord server didn't return OK code")] + MpdAck, + + #[error("got an io error: {0}")] + Io(std::io::Error), + + #[error("invalid utf8 string found: {0}")] + Utf8(std::str::Utf8Error), + + #[error("invalid buffer of length {0}: too big")] + BufferTooBig(usize), + + #[error("invalid buffer: don't end with LF `\\n`")] + BufferDontEndWithLF, + + #[error("failed to write all the buffer of size {0}, only wrote {1} bytes")] + IncorrectWriteSize(usize, usize), + + #[error("query failed validation test")] + InvalidQuery, + + #[error("failed to convert the unformated response into a formated one")] + ToFormatedResponse, + + #[error("the lektord server returned an error with the sent query(ies)")] + QueryError, +}); + +pub enum LektorSocketType { + TCP, + + #[cfg(unix)] + UNIX, +} + +pub struct LektorConnexion<Stream> +where + Stream: StreamConnect<Stream> + StreamReadWrite, +{ + version: String, + pub(super) stream: Stream, +} + +/// The idle TCP client. +pub struct LektorIdleConnexion<Stream> +where + Stream: StreamConnect<Stream> + StreamReadWrite, +{ + version: String, + stream: Stream, + idle_list: Vec<String>, +} + +impl<Stream> LektorConnexion<Stream> +where + Stream: StreamConnect<Stream> + StreamReadWrite, +{ + /// Create a new connexion to a lektord server. If the versions mismatch we + /// log an error but continue... + pub async fn new(addr: impl ToString) -> StackedResult<Self, LektorCommError> { + let addr = addr.to_string(); + let stream = Stream::connect(addr.clone()).await.map_err( + |e| err_report!(LektorCommError::Io(e) => [ format!("faild to connect to {addr}") ]), + )?; + info!( + "connected to {addr} from local address {:?}", + stream.local_addr().map_err( + |e| err_report!(LektorCommError::Io(e) => [ format!("faild to connect to {addr}") ]) + )? + ); + + let version: String = loop { + stream.readable().await.map_err(|e| err_report!(LektorCommError::Io(e) => [ format!("failed to get MPD version from {addr}") ]))?; + let mut version = [0; constants::LKT_MESSAGE_MAX]; + match stream.try_read(&mut version) { + Ok(n) => { + break std::str::from_utf8(&version[..n]) + .map_err(|e| err_report!(LektorCommError::Utf8(e) => [ format!("failed to get MPD version from {addr}") ]))? + .trim() + .to_string() + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => return err_report!(err LektorCommError::Io(e)), + } + }; + + either!(version == format!("OK MPD {}", constants::MPD_VERSION) + => info!("connecting to lektord with MPD version {version}") + ; error!("got MPD version {version} from {addr}, but amalib is compatible with {}", constants::MPD_VERSION) + ); + + Ok(Self { version, stream }) + } + + pub fn version(&self) -> &str { + &self.version + } +} + +impl<Stream> LektorIdleConnexion<Stream> +where + Stream: StreamConnect<Stream> + StreamReadWrite, +{ + pub async fn idle( + connexion: LektorConnexion<Stream>, + idle_list: impl IntoIterator<Item = LektorIdleNotification>, + ) -> StackedResult<Self, LektorCommError> { + let idle_list: Vec<_> = idle_list + .into_iter() + .map(|notification| format!("{notification}")) + .collect(); + let idle_list_buffer = format!("idle {}\n", idle_list.join(" ")); + let LektorConnexion { + version, + mut stream, + .. + } = connexion; + + write_string(&mut stream, idle_list_buffer).await?; + + let (mut reply, _) = read_replies(&mut stream, false).await?; + either!(reply.is_empty() + => Ok(Self { version, stream, idle_list }) + ; { + reply.iter_mut().for_each(|msg| msg.insert_str(0, " - ")); + reply.insert(0, "failed to idle the connexion, got the following messages from the server:".to_string()); + err_report!(err LektorCommError::MpdAck => reply) + } + ) + } + + pub async fn get_notifications(&mut self) -> Vec<LektorIdleNotification> { + match read_replies(&mut self.stream, true).await { + Ok((notifications, _)) => notifications + .iter() + .filter_map(|msg| msg.strip_prefix("changed: ")) + .flat_map(|notifications| notifications.split(&[',', ' '][..])) + .filter_map( + |notification| match LektorIdleNotification::try_from(notification) { + Ok(notification) => Some(notification), + Err(()) => { + error!("the notification `{notification}` is invalid..."); + None + } + }, + ) + .collect(), + Err(e) => { + let e = err_attach!(err_ctx!(e => LektorCommError::MpdAck) => [ "failed to read idle replies" ]); + error!("{e}"); + vec![] + } + } + } + + pub fn version(&self) -> &str { + &self.version + } + + pub fn idle_list(&self) -> &[String] { + &self.idle_list + } +} diff --git a/src/rust/amalib/src/connexion/traits.rs b/src/rust/amalib/src/connexion/traits.rs new file mode 100644 index 0000000000000000000000000000000000000000..324981f3518db8ca9a0252c7bcb422bccab02e52 --- /dev/null +++ b/src/rust/amalib/src/connexion/traits.rs @@ -0,0 +1,178 @@ +use crate::*; +use async_trait::async_trait; +use core::{future::Future, pin::Pin}; +use tokio::io::{AsyncRead, AsyncWrite}; + +#[async_trait] +pub trait LektorStream { + /// Send a query to the lektord server. + async fn send(&mut self, query: LektorQuery) -> StackedResult<LektorResponse, LektorCommError>; +} + +#[async_trait] +pub trait StreamConnect<Stream: StreamReadWrite> { + /// Connect to a lektord server. + async fn connect(addr: impl ToString) -> std::io::Result<Stream>; +} + +#[async_trait] +pub trait StreamReadWrite: AsyncRead + AsyncWrite { + async fn writable(&self) -> std::io::Result<()>; + async fn readable(&self) -> std::io::Result<()>; + + fn try_read(&self, buf: &mut [u8]) -> std::io::Result<usize>; + fn try_write(&self, buf: &[u8]) -> std::io::Result<usize>; + + fn peer_addr(&self) -> std::io::Result<String>; + fn local_addr(&self) -> std::io::Result<String>; +} + +#[cfg(unix)] +mod unix { + use super::*; + use tokio::net::UnixStream; + + impl StreamConnect<UnixStream> for UnixStream { + fn connect<'async_trait>( + addr: impl ToString, + ) -> Pin<Box<dyn Future<Output = std::io::Result<Self>> + Send + 'async_trait>> { + Box::pin(UnixStream::connect(addr.to_string())) + } + } + + impl StreamReadWrite for UnixStream { + fn writable<'life0, 'async_trait>( + &'life0 self, + ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(UnixStream::writable(&self)) + } + + fn readable<'life0, 'async_trait>( + &'life0 self, + ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(UnixStream::readable(&self)) + } + + fn try_read(&self, buf: &mut [u8]) -> std::io::Result<usize> { + UnixStream::try_read(&self, buf) + } + + fn try_write(&self, buf: &[u8]) -> std::io::Result<usize> { + UnixStream::try_write(&self, buf) + } + + fn peer_addr(&self) -> std::io::Result<String> { + use std::io::{Error, ErrorKind}; + let addr = UnixStream::peer_addr(&self)?; + addr.as_pathname() + .ok_or_else(|| Error::new(ErrorKind::Other, "the socket is not path based")) + .map(|path| path.to_string_lossy().to_string()) + } + + fn local_addr(&self) -> std::io::Result<String> { + use std::io::{Error, ErrorKind}; + let addr = UnixStream::local_addr(&self)?; + addr.as_pathname() + .ok_or_else(|| Error::new(ErrorKind::Other, "the socket is not path based")) + .map(|path| path.to_string_lossy().to_string()) + } + } + + impl LektorStream for LektorConnexion<UnixStream> { + fn send<'life0, 'async_trait>( + &'life0 mut self, + query: LektorQuery, + ) -> Pin< + Box< + dyn Future<Output = StackedResult<LektorResponse, LektorCommError>> + + Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(super::super::send(&mut self.stream, query)) + } + } +} + +mod tcp { + use super::*; + use tokio::net::TcpStream; + + impl StreamReadWrite for TcpStream { + fn writable<'life0, 'async_trait>( + &'life0 self, + ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(TcpStream::writable(&self)) + } + + fn readable<'life0, 'async_trait>( + &'life0 self, + ) -> Pin<Box<dyn Future<Output = std::io::Result<()>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(TcpStream::readable(&self)) + } + + fn try_read(&self, buf: &mut [u8]) -> std::io::Result<usize> { + TcpStream::try_read(&self, buf) + } + + fn try_write(&self, buf: &[u8]) -> std::io::Result<usize> { + TcpStream::try_write(&self, buf) + } + + fn peer_addr(&self) -> std::io::Result<String> { + TcpStream::peer_addr(&self).map(|addr| format!("{addr}")) + } + + fn local_addr(&self) -> std::io::Result<String> { + TcpStream::local_addr(&self).map(|addr| format!("{addr}")) + } + } + + impl LektorStream for LektorConnexion<TcpStream> { + fn send<'life0, 'async_trait>( + &'life0 mut self, + query: LektorQuery, + ) -> Pin< + Box< + dyn Future<Output = StackedResult<LektorResponse, LektorCommError>> + + Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + Box::pin(super::super::send(&mut self.stream, query)) + } + } + + impl StreamConnect<TcpStream> for TcpStream { + fn connect<'async_trait>( + addr: impl ToString, + ) -> Pin<Box<dyn Future<Output = std::io::Result<TcpStream>> + Send + 'async_trait>> + { + Box::pin(TcpStream::connect(addr.to_string())) + } + } +} diff --git a/src/rust/amalib/src/lib.rs b/src/rust/amalib/src/lib.rs index 22338b5cdd0cf5e5a363ab293e58372388c1b042..9561264942992dbade821216d1a5169707ccd797 100644 --- a/src/rust/amalib/src/lib.rs +++ b/src/rust/amalib/src/lib.rs @@ -4,6 +4,7 @@ //! communicate with the lektord server and elements to store and organise the //! queried informations. +mod amadeus; mod connexion; mod constants; mod db_cache; @@ -14,6 +15,7 @@ mod query; mod response; mod uri; +pub use amadeus::*; pub use connexion::*; pub use db_cache::*; pub use db_kara::*; diff --git a/src/rust/lkt/src/main.rs b/src/rust/lkt/src/main.rs index 0644a1ef2b29cb43102694b025d69633aee2d330..1447ecec6c3db5ddd710288db749e30ddcccb554 100644 --- a/src/rust/lkt/src/main.rs +++ b/src/rust/lkt/src/main.rs @@ -8,8 +8,9 @@ use crate::{ args::*, config::{LktConfig, LktHostPort}, }; -use amalib::{LektorConnexion, LektorQuery, LektorResponse, LektorState}; +use amalib::{LektorConnexion, LektorQuery, LektorResponse, LektorState, LektorStream}; use commons::log; +use tokio::net::TcpStream; #[tokio::main(worker_threads = 2)] async fn main() { @@ -25,9 +26,11 @@ async fn handle_cmd(config: LktConfig, cmd: LktCommand) { log::debug!("{config:#?}\ncmd = {cmd:#?}"); let conn = match config.host.socket { LktHostPort::UNIX => unimplemented!("connexion to unix socket is not implemented"), - LktHostPort::TCP(port) => LektorConnexion::new(&config.host.address, port) - .await - .expect("failed to connect to the lektord server"), + LktHostPort::TCP(port) => { + LektorConnexion::<TcpStream>::new(format!("{}:{}", config.host.address, port)) + .await + .expect("failed to connect to the lektord server") + } }; match cmd { LktCommand::Queue(cmd) => handle_cmd_queue(config, conn, cmd).await, @@ -50,7 +53,7 @@ macro_rules! send { }}; } -async fn handle_cmd_queue(config: LktConfig, mut conn: LektorConnexion, cmd: LktQueueCommand) { +async fn handle_cmd_queue(config: LktConfig, mut conn: impl LektorStream, cmd: LktQueueCommand) { match cmd { LktQueueCommand::ShowCurrent => { send!(conn => LektorQuery::CurrentKara; LektorResponse::CurrentKara(kara) => { @@ -149,7 +152,7 @@ async fn handle_cmd_queue(config: LktConfig, mut conn: LektorConnexion, cmd: Lkt } } -async fn handle_cmd_search(_: LktConfig, mut conn: LektorConnexion, cmd: LktSearchCommand) { +async fn handle_cmd_search(_: LktConfig, mut conn: impl LektorStream, cmd: LktSearchCommand) { match cmd { LktSearchCommand::Database { query } => { send!(conn => LektorQuery::SearchKaraDatabase(query); LektorResponse::KaraSet(karas) => { @@ -181,7 +184,7 @@ async fn handle_cmd_search(_: LktConfig, mut conn: LektorConnexion, cmd: LktSear async fn handle_cmd_playlist( config: LktConfig, - mut conn: LektorConnexion, + mut conn: impl LektorStream, cmd: LktPlaylistCommand, ) { let user = config.user.user; @@ -214,7 +217,7 @@ async fn handle_cmd_playlist( } } -async fn handle_cmd_admin(config: LktConfig, mut conn: LektorConnexion, cmd: LktAdminCommand) { +async fn handle_cmd_admin(config: LktConfig, mut conn: impl LektorStream, cmd: LktAdminCommand) { match config.user.admin { Some(password) => { let cmd = Box::new(match cmd {