diff --git a/src/rust/amadeus-rs/amadeus-lib/src/deamon.rs b/src/rust/amadeus-rs/amadeus-lib/src/deamon.rs deleted file mode 100644 index a2f321ee858b6b3aaf6ed6b5cf0820b71cee00ea..0000000000000000000000000000000000000000 --- a/src/rust/amadeus-rs/amadeus-lib/src/deamon.rs +++ /dev/null @@ -1,195 +0,0 @@ -use lkt_lib::*; -use log::*; -use std::{ - cell::Cell, - io, - sync::{ - atomic::{self, AtomicBool}, - mpsc::{channel, Receiver, Sender}, - Arc, Mutex, - }, - thread, time, -}; - -pub trait Deamon: Sized { - type Channels; - - /// Quit the deamon - fn quit(&self); - - /// Spawn a deamon - fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, Self)>; - - /// Returns true when the thread has terminated. - fn should_joined(&self) -> bool; - - /// Join the deamon. - fn join(&self); -} - -pub struct CommandDeamon { - thread: Arc<Mutex<Cell<Option<thread::JoinHandle<()>>>>>, - quit: Arc<AtomicBool>, - joined: Arc<AtomicBool>, -} - -pub type StatusDeamonMessageType = ( - LektorPlaybackStatusResponse, - Option<LektorCurrentKaraResponse>, -); - -pub struct StatusDeamon { - thread: Arc<Mutex<Cell<Option<thread::JoinHandle<()>>>>>, - quit: Arc<AtomicBool>, - joined: Arc<AtomicBool>, -} - -macro_rules! return_when_flagged { - ($arc_atomic: expr, $joined_deamon: expr) => { - if $arc_atomic.load(atomic::Ordering::SeqCst) { - $joined_deamon.store(true, atomic::Ordering::SeqCst); - return; - } - }; -} - -macro_rules! implement_deamon_quit { - () => { - fn quit(&self) { - self.quit.store(true, atomic::Ordering::SeqCst); - } - }; -} - -macro_rules! implement_deamon_joined { - () => { - fn should_joined(&self) -> bool { - return self.joined.load(atomic::Ordering::SeqCst); - } - - fn join(&self) { - let locked = self.thread.lock(); - if locked.is_err() { - error!("Failed to lock the mutex that has the join handler",) - } - let locked = locked.unwrap(); - let thread = locked.replace(None); - match thread { - Some(thread) => { - let _ = thread.join(); - } - None => error!("Nothing to join!"), - } - } - }; -} - -impl Deamon for CommandDeamon { - type Channels = (Sender<LektorQuery>, Receiver<LektorResponse>); - implement_deamon_quit!(); - implement_deamon_joined!(); - - fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, Self)> { - let mut connexion = LektorConnexion::new(hostname, port)?; - - let (responses_send, responses_recv) = channel::<LektorResponse>(); - let (commands_send, commands_recv) = channel::<LektorQuery>(); - let quit = Arc::<AtomicBool>::new(AtomicBool::default()); - let joined = Arc::<AtomicBool>::new(AtomicBool::default()); - quit.store(false, atomic::Ordering::SeqCst); - joined.store(false, atomic::Ordering::SeqCst); - let quit_deamon = quit.clone(); - let joined_deamon = joined.clone(); - - let thread = thread::spawn(move || loop { - return_when_flagged!(quit_deamon, joined_deamon); - match commands_recv.recv() { - Ok(command) => { - let res = match connexion.send_query(command) { - Ok(ok) => ok, - Err(e) => { - error!("failed to send query to lektor: {e}"); - continue; - } - }; - if let Err(e) = responses_send.send(res) { - error!("failed to send response to amadeus: {e}") - } - } - Err(e) => error!("failed to get command from amadeus: {e}"), - }; - }); - - let ret = Self { - thread: Arc::new(Mutex::new(Cell::new(Some(thread)))), - quit, - joined, - }; - Ok(((commands_send, responses_recv), ret)) - } -} - -impl Deamon for StatusDeamon { - type Channels = (Receiver<StatusDeamonMessageType>,); - implement_deamon_quit!(); - implement_deamon_joined!(); - - fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, StatusDeamon)> { - let mut connexion = LektorConnexion::new(hostname, port)?; - - let (responses_send, responses_recv) = channel(); - let quit = Arc::<AtomicBool>::new(AtomicBool::default()); - let joined = Arc::<AtomicBool>::new(AtomicBool::default()); - quit.store(false, atomic::Ordering::SeqCst); - joined.store(false, atomic::Ordering::SeqCst); - let quit_deamon = quit.clone(); - let joined_deamon = joined.clone(); - - let thread = thread::spawn(move || loop { - return_when_flagged!(quit_deamon, joined_deamon); - thread::sleep(time::Duration::from_secs(1)); - return_when_flagged!(quit_deamon, joined_deamon); - - let status = match connexion.send_query(LektorQuery::PlaybackStatus) { - Ok(LektorResponse::PlaybackStatus(res)) => res, - Ok(_) => { - error!("got invalid response from lektor, not a status..."); - continue; - } - Err(e) => { - error!("failed to send the playback status command to lektor: {e}"); - continue; - } - }; - - let current = match status.state() { - LektorState::Stopped => None, - LektorState::Play(_) | LektorState::Pause(_) => { - match connexion.send_query(LektorQuery::CurrentKara) { - Ok(LektorResponse::CurrentKara(res)) => Some(res), - Ok(_) => { - error!("got invalid response from lektor, not a current kara..."); - None - } - Err(e) => { - error!("failed to send the current kara command to lektor: {e}",); - None - } - } - } - }; - - info!("send {status:?} and {current:?}"); - if let Err(e) = responses_send.send((status, current)) { - error!("Failed to send a status response to amadeus: {e}"); - } - }); - - let ret = Self { - thread: Arc::new(Mutex::new(Cell::new(Some(thread)))), - quit, - joined, - }; - Ok(((responses_recv,), ret)) - } -} diff --git a/src/rust/amadeus-rs/amadeus-lib/src/deamon/command_deamon.rs b/src/rust/amadeus-rs/amadeus-lib/src/deamon/command_deamon.rs new file mode 100644 index 0000000000000000000000000000000000000000..270a9581fd3bba406941b240d988869212543637 --- /dev/null +++ b/src/rust/amadeus-rs/amadeus-lib/src/deamon/command_deamon.rs @@ -0,0 +1,57 @@ +use super::*; + +pub struct CommandDeamon { + thread: Arc<Mutex<Cell<Option<thread::JoinHandle<()>>>>>, + quit: Arc<AtomicBool>, + joined: Arc<AtomicBool>, +} + +impl Deamon for CommandDeamon { + type Channels = (Sender<LektorQuery>, Receiver<LektorResponse>); + implement_deamon_quit!(); + implement_deamon_joined!(); + + fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, Self)> { + let mut connexion = LektorConnexion::new(hostname, port)?; + + let (responses_send, responses_recv) = channel::<LektorResponse>(); + let (commands_send, commands_recv) = channel::<LektorQuery>(); + let quit = Arc::<AtomicBool>::new(AtomicBool::default()); + let joined = Arc::<AtomicBool>::new(AtomicBool::default()); + quit.store(false, atomic::Ordering::SeqCst); + joined.store(false, atomic::Ordering::SeqCst); + let quit_deamon = quit.clone(); + let joined_deamon = joined.clone(); + + let thread = thread::spawn(move || loop { + return_when_flagged!(quit_deamon, joined_deamon); + match commands_recv.recv() { + Ok(command) => { + let res = match connexion.send_query(command) { + Ok(ok) => ok, + Err(e) => { + error!("failed to send query to lektor: {e}"); + continue; + } + }; + if let Err(e) = responses_send.send(res) { + error!("failed to send response to amadeus: {e}") + } + } + Err(e) => { + error!("failed to get command from amadeus: {e}"); + info!("quiting the command deamon"); + quit_deamon.store(false, atomic::Ordering::SeqCst); + break; + } + }; + }); + + let ret = Self { + thread: Arc::new(Mutex::new(Cell::new(Some(thread)))), + quit, + joined, + }; + Ok(((commands_send, responses_recv), ret)) + } +} diff --git a/src/rust/amadeus-rs/amadeus-lib/src/deamon/mod.rs b/src/rust/amadeus-rs/amadeus-lib/src/deamon/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..77f590f0865185e1d82c658467a710bca1bfd697 --- /dev/null +++ b/src/rust/amadeus-rs/amadeus-lib/src/deamon/mod.rs @@ -0,0 +1,89 @@ +mod command_deamon; +mod status_deamon; +pub use command_deamon::*; +pub use status_deamon::*; + +use lkt_lib::*; +use log::*; +use std::{ + cell::Cell, + io, + sync::{ + atomic::{self, AtomicBool}, + mpsc::{channel, Receiver, Sender}, + Arc, Mutex, + }, + thread, time, +}; + +/// A common interface for all deamons. +pub trait Deamon: Sized { + type Channels; + + /// Quit the deamon + fn quit(&self); + + /// Tel whever the quit has been issued. + fn should_quit(&self) -> bool; + + /// Spawn a deamon + fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, Self)>; + + /// Returns true when the thread has terminated. + fn joined(&self) -> bool; + + /// Join the deamon. + fn join(&self); +} + +#[macro_export] +macro_rules! return_when_flagged { + ($arc_atomic: expr, $joined_deamon: expr) => { + if $arc_atomic.load(atomic::Ordering::SeqCst) { + $joined_deamon.store(true, atomic::Ordering::SeqCst); + return; + } + }; +} + +#[macro_export] +macro_rules! implement_deamon_quit { + () => { + fn quit(&self) { + self.quit.store(true, atomic::Ordering::SeqCst); + } + + fn should_quit(&self) -> bool { + self.quit.load(atomic::Ordering::SeqCst) + } + }; +} + +#[macro_export] +macro_rules! implement_deamon_joined { + () => { + fn joined(&self) -> bool { + return self.joined.load(atomic::Ordering::SeqCst); + } + + fn join(&self) { + let locked = self.thread.lock(); + if locked.is_err() { + error!("Failed to lock the mutex that has the join handler",) + } + let locked = locked.unwrap(); + let thread = locked.replace(None); + match thread { + Some(thread) => { + let _ = thread.join(); + self.joined.store(true, atomic::Ordering::SeqCst); + } + None => error!("Nothing to join!"), + } + } + }; +} + +pub(self) use implement_deamon_joined; +pub(self) use implement_deamon_quit; +pub(self) use return_when_flagged; diff --git a/src/rust/amadeus-rs/amadeus-lib/src/deamon/status_deamon.rs b/src/rust/amadeus-rs/amadeus-lib/src/deamon/status_deamon.rs new file mode 100644 index 0000000000000000000000000000000000000000..42a684ee72bc88d8e6372f0edd75e77bde2a0270 --- /dev/null +++ b/src/rust/amadeus-rs/amadeus-lib/src/deamon/status_deamon.rs @@ -0,0 +1,80 @@ +use super::*; + +pub type StatusDeamonMessageType = ( + LektorPlaybackStatusResponse, + Option<LektorCurrentKaraResponse>, +); + +pub struct StatusDeamon { + thread: Arc<Mutex<Cell<Option<thread::JoinHandle<()>>>>>, + quit: Arc<AtomicBool>, + joined: Arc<AtomicBool>, +} + +impl Deamon for StatusDeamon { + type Channels = (Receiver<StatusDeamonMessageType>,); + implement_deamon_quit!(); + implement_deamon_joined!(); + + fn spawn(hostname: String, port: i16) -> io::Result<(Self::Channels, StatusDeamon)> { + let mut connexion = LektorConnexion::new(hostname, port)?; + + let (responses_send, responses_recv) = channel(); + let quit = Arc::<AtomicBool>::new(AtomicBool::default()); + let joined = Arc::<AtomicBool>::new(AtomicBool::default()); + quit.store(false, atomic::Ordering::SeqCst); + joined.store(false, atomic::Ordering::SeqCst); + let quit_deamon = quit.clone(); + let joined_deamon = joined.clone(); + + let thread = thread::spawn(move || loop { + return_when_flagged!(quit_deamon, joined_deamon); + thread::sleep(time::Duration::from_secs(1)); + return_when_flagged!(quit_deamon, joined_deamon); + + let status = match connexion.send_query(LektorQuery::PlaybackStatus) { + Ok(LektorResponse::PlaybackStatus(res)) => res, + Ok(_) => { + error!("got invalid response from lektor, not a status..."); + continue; + } + Err(e) => { + error!("failed to send the playback status command to lektor: {e}"); + continue; + } + }; + + let current = match status.state() { + LektorState::Stopped => None, + LektorState::Play(_) | LektorState::Pause(_) => { + match connexion.send_query(LektorQuery::CurrentKara) { + Ok(LektorResponse::CurrentKara(res)) => Some(res), + Ok(_) => { + error!("got invalid response from lektor, not a current kara..."); + None + } + Err(e) => { + error!("failed to send the current kara command to lektor: {e}"); + None + } + } + } + }; + + info!("send {status:?} and {current:?}"); + if let Err(e) = responses_send.send((status, current)) { + error!("Failed to send a status response to amadeus: {e}"); + info!("quiting the status deamon"); + quit_deamon.store(false, atomic::Ordering::SeqCst); + break; + } + }); + + let ret = Self { + thread: Arc::new(Mutex::new(Cell::new(Some(thread)))), + quit, + joined, + }; + Ok(((responses_recv,), ret)) + } +} diff --git a/src/rust/amadeus-rs/amadeus/src/amadeus.rs b/src/rust/amadeus-rs/amadeus/src/amadeus.rs index 279803f8c30c30e07431c9f771f4028233d0428b..dd3c9f4098210e4016778b37ce048529d3f57b48 100644 --- a/src/rust/amadeus-rs/amadeus/src/amadeus.rs +++ b/src/rust/amadeus-rs/amadeus/src/amadeus.rs @@ -396,7 +396,7 @@ impl Amadeus<'_> { | DeleteKaraFromPlaylist(_) | AddPlaylistToQueue | InsertPlaylistToQueue => { - debug!("Execute action {act:?} on lektor item with id {id}") + debug!("execute action {act:?} on lektor item with id {id}") } // This should not occure on items because they are global @@ -452,13 +452,13 @@ impl Amadeus<'_> { // Handle the deamon closing process. if let Some((_, status_deamon)) = &self.status_deamon { - if status_deamon.should_joined() { + if status_deamon.should_quit() { status_deamon.join(); self.status_deamon = None; } }; if let Some((_, deamon)) = &self.deamon { - if deamon.should_joined() { + if deamon.should_quit() { deamon.join(); self.deamon = None; } diff --git a/src/rust/amadeus-rs/amadeus/src/logger.rs b/src/rust/amadeus-rs/amadeus/src/logger.rs index e5bb72b86c8bc6da134f91ae6bbc926d41d6312f..dc9bc4e5eb6d075ea04fe23e6ac86b96e0001a66 100644 --- a/src/rust/amadeus-rs/amadeus/src/logger.rs +++ b/src/rust/amadeus-rs/amadeus/src/logger.rs @@ -29,7 +29,12 @@ impl log::Log for SimpleLogger { fn log(&self, record: &Record) { if self.enabled(record.metadata()) { - eprintln!("{} - {}", record.level(), record.args()); + eprintln!( + "{} - {} - {}", + record.target(), + record.level(), + record.args() + ); } }