diff --git a/lektor_repo/src/lib.rs b/lektor_repo/src/lib.rs index 5e0ccebcc515f0df0cd70491494859da01e35edd..48aea004b15b43f57639f1e34a105f97fab6ec76 100644 --- a/lektor_repo/src/lib.rs +++ b/lektor_repo/src/lib.rs @@ -41,19 +41,22 @@ impl Repo { self.updating.load(Ordering::SeqCst) != 0 } - /// Do the update for all repos and returned the total of updated karas. + /// Do the update for all repos and returnes the number of repos it updated from async fn perform_update<'a, Storage: DatabaseStorage + 'static>( &self, handler: &'a UpdateHandler<'a, Storage>, - ) -> usize { - let count = stream::iter(&self.configs) + ) -> Result<usize> { + let infos = stream::iter(&self.configs) .then(|cfg| async move { (cfg, download_kara_list(cfg).await) }) - .then(|(cfg, res)| async move { - let list = res.unwrap_or_else(|err| { + .filter_map(|(cfg, res)| async move { match res { + Ok(l) => Some((cfg, l)), + Err(err) => { log::error!("failed to download kara list for {}: {err}", cfg.name); - vec![] - }); - stream::iter(list).then(move |kara| async move { + None + } + }}) + .then(|(cfg, res)| async move { + let _ = stream::iter(res).take(2).then(move |kara| async move { log::trace!("got kara from {}: {kara:#?}", cfg.name); if let Some(to_dl) = handler.add_kara_v2(&cfg.name, kara).await { if let Err(err) = download_kara(cfg, handler, to_dl).await { @@ -61,19 +64,22 @@ impl Repo { } } }) + .collect::<FuturesUnordered<_>>() + .await; + cfg }) - .flatten() - .count(); - let infos = stream::iter(&self.configs) .then(download_dbinfos) - .collect::<FuturesUnordered<_>>(); - let (count, infos) = tokio::join!(count, infos); - let infos: Vec<_> = infos.into_iter().filter_map(Result::ok).collect(); - log::info!("infos for repos: {infos:#?}"); - count + .collect::<FuturesUnordered<_>>() + .await; + + log::info!("infos for repos: {:#?}", infos); + match infos.len() { + l if l <= 0 => bail!("No valid repos to update karas from"), + l => Ok(l) + } } - /// Update the database throu the handler and returns the number of downloaded karas. + /// Update the database throu the handler and returns the number of repos it updated from pub async fn update_with<'a, Storage: DatabaseStorage + 'static>( &self, handler: &'a UpdateHandler<'a, Storage>, @@ -87,7 +93,7 @@ impl Repo { log::info!("begin the download process"); let download_count = self.perform_update(handler).await; self.updating.store(0, Ordering::SeqCst); - Ok(download_count) + download_count } state => unreachable!("repo is in incoherent state, update flag is {state:?}"), } diff --git a/lektord/src/app/routes.rs b/lektord/src/app/routes.rs index cf23152101c32ba784c1e5849cb6fdc3c31e69d6..4311e1aa9b2dcdb2ad3ee64ac1d927cd629671c9 100644 --- a/lektord/src/app/routes.rs +++ b/lektord/src/app/routes.rs @@ -283,7 +283,7 @@ pub(crate) async fn adm_update( .await .map_err(|err| anyhow!("{err}")) { - Ok(Ok(count)) => log::info!("finished updating database with karas: {count}"), + Ok(Ok(count)) => log::info!("finished updating database with karas from {count} repos"), Err(err) | Ok(Err(err)) => log::error!("{err}"), } });