Skip to content
Extraits de code Groupes Projets
Vérifiée Valider f928cca8 rédigé par Kubat's avatar Kubat
Parcourir les fichiers

AMADEUS: Handle deamon deconnexion by user request without having amadeus hangging

parent b7413672
Aucune branche associée trouvée
Aucune étiquette associée trouvée
1 requête de fusion!193AMADEUS: Implementation of lkt-lib
Pipeline #3238 en échec
......@@ -21,30 +21,38 @@ impl Deamon for CommandDeamon {
quit.store(false, atomic::Ordering::SeqCst);
joined.store(false, atomic::Ordering::SeqCst);
let quit_deamon = quit.clone();
let joined_deamon = joined.clone();
let thread = thread::spawn(move || loop {
return_when_flagged!(quit_deamon, joined_deamon);
match commands_recv.recv() {
Ok(command) => {
let res = match connexion.send_query(command) {
Ok(ok) => ok,
Err(e) => {
error!("failed to send query to lektor: {e}");
continue;
let thread = thread::spawn(move || {
loop {
break_when_flagged!(quit_deamon);
match commands_recv.recv_timeout(Duration::from_secs(1)) {
Ok(command) => {
let res = match connexion.send_query(command) {
Ok(ok) => ok,
Err(e) => {
error!("failed to send query to lektor: {e}");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
};
if let Err(e) = responses_send.send(res) {
error!("failed to send response to amadeus: {e}");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
};
if let Err(e) = responses_send.send(res) {
error!("failed to send response to amadeus: {e}")
}
}
Err(e) => {
error!("failed to get command from amadeus: {e}");
info!("quiting the command deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
};
Err(RecvTimeoutError::Timeout) => {}
Err(e) => {
error!("failed to get command from amadeus: {e}");
info!("quiting the command deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
};
}
info!("quiting the command deamon!");
});
let ret = Self {
......
......@@ -10,10 +10,11 @@ use std::{
io,
sync::{
atomic::{self, AtomicBool},
mpsc::{channel, Receiver, Sender},
mpsc::{channel, Receiver, RecvTimeoutError, Sender},
Arc, Mutex,
},
thread, time,
thread,
time::Duration,
};
/// A common interface for all deamons.
......@@ -37,11 +38,10 @@ pub trait Deamon: Sized {
}
#[macro_export]
macro_rules! return_when_flagged {
($arc_atomic: expr, $joined_deamon: expr) => {
macro_rules! break_when_flagged {
($arc_atomic: expr) => {
if $arc_atomic.load(atomic::Ordering::SeqCst) {
$joined_deamon.store(true, atomic::Ordering::SeqCst);
return;
break;
}
};
}
......@@ -50,6 +50,7 @@ macro_rules! return_when_flagged {
macro_rules! implement_deamon_quit {
() => {
fn quit(&self) {
debug!("asked to quit a deamon!");
self.quit.store(true, atomic::Ordering::SeqCst);
}
......@@ -84,6 +85,6 @@ macro_rules! implement_deamon_joined {
};
}
pub(self) use break_when_flagged;
pub(self) use implement_deamon_joined;
pub(self) use implement_deamon_quit;
pub(self) use return_when_flagged;
......@@ -25,49 +25,55 @@ impl Deamon for StatusDeamon {
quit.store(false, atomic::Ordering::SeqCst);
joined.store(false, atomic::Ordering::SeqCst);
let quit_deamon = quit.clone();
let joined_deamon = joined.clone();
let thread = thread::spawn(move || loop {
return_when_flagged!(quit_deamon, joined_deamon);
thread::sleep(time::Duration::from_secs(1));
return_when_flagged!(quit_deamon, joined_deamon);
let thread = thread::spawn(move || {
loop {
break_when_flagged!(quit_deamon);
thread::sleep(Duration::from_secs(1));
break_when_flagged!(quit_deamon);
let status = match connexion.send_query(LektorQuery::PlaybackStatus) {
Ok(LektorResponse::PlaybackStatus(res)) => res,
Ok(_) => {
error!("got invalid response from lektor, not a status...");
continue;
}
Err(e) => {
error!("failed to send the playback status command to lektor: {e}");
continue;
}
};
let status = match connexion.send_query(LektorQuery::PlaybackStatus) {
Ok(LektorResponse::PlaybackStatus(res)) => res,
Ok(_) => {
error!("got invalid response from lektor, not a status...");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
Err(e) => {
error!("failed to send the playback status command to lektor: {e}");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
};
let current = match status.state() {
LektorState::Stopped => None,
LektorState::Play(_) | LektorState::Pause(_) => {
match connexion.send_query(LektorQuery::CurrentKara) {
Ok(LektorResponse::CurrentKara(res)) => Some(res),
Ok(_) => {
error!("got invalid response from lektor, not a current kara...");
None
}
Err(e) => {
error!("failed to send the current kara command to lektor: {e}");
None
let current = match status.state() {
LektorState::Stopped => None,
LektorState::Play(_) | LektorState::Pause(_) => {
match connexion.send_query(LektorQuery::CurrentKara) {
Ok(LektorResponse::CurrentKara(res)) => Some(res),
Ok(_) => {
error!("got invalid response from lektor, not a current kara...");
None
}
Err(e) => {
error!("failed to send the current kara command to lektor: {e}");
None
}
}
}
}
};
};
info!("send {status:?} and {current:?}");
if let Err(e) = responses_send.send((status, current)) {
error!("Failed to send a status response to amadeus: {e}");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
info!("send {status:?} and {current:?}");
if let Err(e) = responses_send.send((status, current)) {
error!("Failed to send a status response to amadeus: {e}");
info!("quiting the status deamon");
quit_deamon.store(false, atomic::Ordering::SeqCst);
break;
}
}
info!("quiting the status deamon!");
});
let ret = Self {
......
......@@ -378,6 +378,28 @@ impl Amadeus<'_> {
}
}
/// Handle quits/exits of the deamons
fn handle_deamons_exit(&mut self) {
// Handle the deamon closing process.
if let Some(((socket,), status_deamon)) = &self.status_deamon {
if status_deamon.should_quit() {
drop(socket);
self.lektord_status = None;
self.lektord_current = None;
status_deamon.join();
self.status_deamon = None;
}
};
if let Some(((socket_out, socket_in), deamon)) = &self.deamon {
if deamon.should_quit() {
drop(socket_out);
drop(socket_in);
deamon.join();
self.deamon = None;
}
};
}
/// Handle actions recieved from the user. May communicate to the deamons
/// and send infos/commands to lektord if they are available.
fn handle_action(&mut self) {
......@@ -434,12 +456,14 @@ impl Amadeus<'_> {
}
}
DisconnectFromLektord => {
debug!("asked to close the deamons");
if let Some((_, status_deamon)) = &self.status_deamon {
status_deamon.quit();
}
if let Some((_, deamon)) = &self.deamon {
deamon.quit();
}
break;
}
PlaybackPrevious | PlaybackPlay | PlaybackPause | PlaybackNext => {
......@@ -449,20 +473,6 @@ impl Amadeus<'_> {
_ => unreachable!(),
}
}
// Handle the deamon closing process.
if let Some((_, status_deamon)) = &self.status_deamon {
if status_deamon.should_quit() {
status_deamon.join();
self.status_deamon = None;
}
};
if let Some((_, deamon)) = &self.deamon {
if deamon.should_quit() {
deamon.join();
self.deamon = None;
}
};
}
fn apply_settings(&mut self) {
......@@ -502,8 +512,9 @@ impl App for Amadeus<'_> {
self.last_render_instant = self.begin_render_instant;
self.handle_action();
self.handle_deamons_events();
self.handle_deamons_exit();
self.handle_action();
if self.has_config_changed {
self.apply_settings();
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter