Skip to content
Extraits de code Groupes Projets
Non vérifiée Valider c2706d1c rédigé par Kubat's avatar Kubat
Parcourir les fichiers

MISC: Apply some of the remarks

parent 60374fcc
Aucune branche associée trouvée
Aucune étiquette associée trouvée
2 requêtes de fusion!206Use incremental upgrades when syncing epochs,!197Draft: Refactor the whole code.
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
use crate::*; use crate::*;
use hashbrown::hash_map; use hashbrown::hash_map;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ffi::OsStr, path::Path}; use std::{ffi::OsStr, mem, path::Path};
/// The epoch contains all available karas at a certain point in time. It can be submitted and /// The epoch contains all available karas at a certain point in time. It can be submitted and
/// available to all readers of the database or unsubmited and only one writter can edit it. /// available to all readers of the database or unsubmited and only one writter can edit it.
...@@ -105,9 +105,7 @@ impl Epoch { ...@@ -105,9 +105,7 @@ impl Epoch {
/// Replace the karas in the epoch by the provided new value /// Replace the karas in the epoch by the provided new value
/// Return the previous karas value /// Return the previous karas value
pub fn data_replace(&mut self, new_data: EpochData) -> EpochData { pub fn data_replace(&mut self, new_data: EpochData) -> EpochData {
let ret = self.data.clone(); mem::replace(&mut self.data, new_data)
self.data = new_data;
ret
} }
/// Get the number of the epoch /// Get the number of the epoch
......
...@@ -105,6 +105,13 @@ impl From<PoolSerDeProxy> for Pool { ...@@ -105,6 +105,13 @@ impl From<PoolSerDeProxy> for Pool {
} }
} }
impl From<Pool> for PoolSerDeProxy {
fn from(value: Pool) -> Self {
let Pool { kara_mapping, .. } = value;
Self { kara_mapping }
}
}
impl Default for Pool { impl Default for Pool {
fn default() -> Self { fn default() -> Self {
Self::from_iter(std::iter::empty::<(KId, RemoteKId, KaraState)>()) Self::from_iter(std::iter::empty::<(KId, RemoteKId, KaraState)>())
......
...@@ -253,15 +253,8 @@ impl<'a, Storage: DatabaseStorage> UpdateHandler<'a, Storage> { ...@@ -253,15 +253,8 @@ impl<'a, Storage: DatabaseStorage> UpdateHandler<'a, Storage> {
.await .await
} }
pub async fn delete_kara_v2<'b>(&'b self, repo: &str, rkid: u64) -> Result<KId> pub async fn delete_kara_v2(&self, repo: &str, rkid: u64) -> Result<KId> {
where if let (Some(id), _) = (self.pool.read().await).get_from_remote(RemoteKId::new(rkid, repo))
'b: 'a,
{
if let (Some(id), _) = self
.pool
.read()
.await
.get_from_remote(RemoteKId::new(rkid, repo))
{ {
self.new_epoch.borrow_mut().content().data_mut().remove(&id); self.new_epoch.borrow_mut().content().data_mut().remove(&id);
Ok(id) Ok(id)
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
use crate::database::pool::Pool; use crate::database::pool::Pool;
use crate::{database::epoch::*, id::*, storage::DatabaseStorage, Playlist}; use crate::{database::epoch::*, id::*, storage::DatabaseStorage, Playlist};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use futures::future;
use kurisu_api::SHA256; use kurisu_api::SHA256;
use tokio::sync::RwLock; use tokio::sync::RwLock;
......
...@@ -22,6 +22,13 @@ pub struct Repo { ...@@ -22,6 +22,13 @@ pub struct Repo {
updating: AtomicU8, updating: AtomicU8,
} }
#[derive(Debug, Default, Eq, PartialEq)]
pub struct UpdateCounts {
pub updated: usize,
pub deleted: usize,
pub downloaded: usize,
}
impl Repo { impl Repo {
/// Create a new repo downloader from the config. /// Create a new repo downloader from the config.
pub fn new(config: Vec<LektorRepoConfig>) -> Self { pub fn new(config: Vec<LektorRepoConfig>) -> Self {
...@@ -75,25 +82,24 @@ impl Repo { ...@@ -75,25 +82,24 @@ impl Repo {
let updated_list = match download_kara_list(&repo_config).await { let updated_list = match download_kara_list(&repo_config).await {
Err(err) => { Err(err) => {
log::error!("failed to download kara list for {}: {err}", repo_name); log::error!("failed to download kara list for {}: {err}", repo_name);
vec![] Default::default()
} }
Ok(KaraList { Ok(KaraList {
dbepoch, dbepoch: epoch,
dbepoch_validity, dbepoch_validity: validity,
karas: _, ..
}) if dbepoch != repo_info.epoch() }) if epoch != repo_info.epoch() || validity != repo_info.epoch_validity() => {
|| dbepoch_validity != repo_info.epoch_validity() => log::error!(
{ "epoch or epoch_validity not matching those in infos, got ({epoch},{validity}), expected ({},{}). {}",
log::error!("dbepoch or dbepoch_validity from karalist not matching those in dbinfo: got ({dbepoch},{dbepoch_validity}), expected ({},{}). Either it took dbinfo and karalist from 2 different repos, and then you need to fix your config, or the repo was modified between our 2 calls, and then you can retry the update operation now", repo_info.epoch(), repo_info.epoch_validity()); repo_info.epoch(),
vec![] repo_info.epoch_validity(),
"You may want to check out if the repo was modified live, or you may need to fix your config"
);
Default::default()
} }
Ok(KaraList { Ok(KaraList { karas, .. }) => karas,
dbepoch: _,
dbepoch_validity: _,
karas,
}) => karas,
}; };
self.update_epochs_from_karas(handler, updated_list, repo_name) self.update_epochs_from_karas(handler, updated_list, repo_name)
...@@ -129,7 +135,11 @@ impl Repo { ...@@ -129,7 +135,11 @@ impl Repo {
|| got_since != since || got_since != since
|| got_dbepoch != repo_info.epoch() => || got_dbepoch != repo_info.epoch() =>
{ {
log::error!("dbepoch or dbepoch_validity from karalist not matching those in changediff: got ({got_dbepoch},{got_dbepoch_validity},{got_since}), expected ({},{},{since}). Either it took dbinfo and changediff from 2 different repos, and then you need to fix your config, or the repo was modified between our 2 calls, and then you can retry the update operation now", repo_info.epoch(), repo_info.epoch_validity()); log::error!(
"dbepoch or dbepoch_validity from karalist not matching those in changediff: got ({got_dbepoch},{got_dbepoch_validity},{got_since}), expected ({},{},{since}). Either it took dbinfo and changediff from 2 different repos, and then you need to fix your config, or the repo was modified between our 2 calls, and then you can retry the update operation now",
repo_info.epoch(),
repo_info.epoch_validity()
);
return (vec![], vec![]); return (vec![], vec![]);
} }
...@@ -202,66 +212,59 @@ impl Repo { ...@@ -202,66 +212,59 @@ impl Repo {
.collect() .collect()
} }
/// Update the database throu the handler and returns the number of downloaded karas. /// Update the database through the handler and returns the number of downloaded karas.
pub async fn update_with<'a, Storage: DatabaseStorage + 'static>( pub async fn update_with<'a, Storage: DatabaseStorage + 'static>(
&self, &self,
handler: &'a UpdateHandler<'a, Storage>, handler: &'a UpdateHandler<'a, Storage>,
) -> Result<Option<(usize, usize, usize)>> { ) -> Result<Option<UpdateCounts>> {
match self match self
.updating .updating
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) .compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed)
{ {
Err(1) => bail!("repo is already updating"), Err(1) => bail!("repo is already updating"),
Ok(0) => { Ok(0) => {
log::info!("begin the download process"); let (config, infos) = get_freshest_repo(&self.configs)
.await
.context("could not find any valid repo to update, aborting")?;
if let Some((repo_config, repo_info)) = get_freshest_repo(&self.configs).await {
let repo_name = repo_config.name.as_str();
log::info!( log::info!(
"selecting repo {} because it is the most up to date (epoch = {})", "selecting repo {} because it is the most up to date (epoch = {})",
repo_name, config.name.as_str(),
repo_info.epoch(), infos.epoch(),
); );
if handler.is_up_to_date(repo_info.clone().into()).await { if handler.is_up_to_date(infos.clone().into()).await {
log::info!("already up to date against remote repo"); log::info!("already up to date against remote repo");
return Ok(None); return Ok(None);
} }
handler.set_epoch_infos(repo_info.clone().into()).await; handler.set_epoch_infos(infos.clone().into()).await;
let (updated, deleted) = let (updated, deleted) = match handler.recoverable_since(infos.clone().into()).await
match handler.recoverable_since(repo_info.clone().into()).await { {
Some(since) => { Some(since) => {
let _ = handler (handler.copy_last_epoch_data().await)
.copy_last_epoch_data() .context("unable to copy data from last epoch")?;
.await
.context("unable to copy data from last epoch");
self.do_epoch_with_changediff( self.do_epoch_with_changediff(handler, config.clone(), infos, since)
handler,
repo_config.clone(),
repo_info,
since,
)
.await .await
} }
None => ( None => (
self.do_epoch_from_scratch(handler, repo_config.clone(), repo_info) self.do_epoch_from_scratch(handler, config.clone(), infos)
.await, .await,
vec![], vec![],
), ),
}; };
let updated_count = updated.len(); let counts = UpdateCounts {
let downloaded = self.do_downloads(updated, handler, repo_config).await; updated: updated.len(),
deleted: deleted.len(),
downloaded: self.do_downloads(updated, handler, config).await.len(),
};
self.updating.store(0, Ordering::SeqCst); self.updating.store(0, Ordering::SeqCst);
Ok(Some((updated_count, deleted.len(), downloaded.len()))) Ok(Some(counts))
} else {
bail!("Could not find any valid repo, aborting");
}
} }
state => panic!("repo is in incoherent state, update flag is {state:?}"), state => panic!("repo is in incoherent state, update flag is {state:?}"),
} }
......
...@@ -16,6 +16,7 @@ use axum::{ ...@@ -16,6 +16,7 @@ use axum::{
}; };
use lektor_nkdb::*; use lektor_nkdb::*;
use lektor_payloads::*; use lektor_payloads::*;
use lektor_repo::UpdateCounts;
use lektor_search::KaraStore; use lektor_search::KaraStore;
use lektor_utils::decode_base64_json; use lektor_utils::decode_base64_json;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
...@@ -276,8 +277,7 @@ pub(crate) async fn adm_update( ...@@ -276,8 +277,7 @@ pub(crate) async fn adm_update(
local.spawn_local(async move { local.spawn_local(async move {
match tokio::task::spawn_local(async move { match tokio::task::spawn_local(async move {
let handle = state.database.update().await; let handle = state.database.update().await;
let updated = state.repo.update_with(&handle).await; match state.repo.update_with(&handle).await {
match updated {
Ok(Some(counts)) => { Ok(Some(counts)) => {
handle.finished().await; handle.finished().await;
Ok::<_, anyhow::Error>(counts) Ok::<_, anyhow::Error>(counts)
...@@ -285,7 +285,7 @@ pub(crate) async fn adm_update( ...@@ -285,7 +285,7 @@ pub(crate) async fn adm_update(
Ok(None) => { Ok(None) => {
log::info!("update had nothing to do, no need to write updated info"); log::info!("update had nothing to do, no need to write updated info");
Ok::<_, anyhow::Error>((0, 0, 0)) Ok::<_, anyhow::Error>(Default::default())
}, },
Err(err) => Err(err) Err(err) => Err(err)
...@@ -294,7 +294,9 @@ pub(crate) async fn adm_update( ...@@ -294,7 +294,9 @@ pub(crate) async fn adm_update(
.await .await
.map_err(|err| anyhow!("{err}")) .map_err(|err| anyhow!("{err}"))
{ {
Ok(Ok((updated, deleted, downloaded))) => log::info!("finished updating database by updating {updated} karas: ({downloaded} of which were downloaded) and deleting {deleted} karas"), Ok(Ok(UpdateCounts { updated, deleted, downloaded })) => {
log::info!("finished updating {updated} karas - {downloaded} of which were downloaded - and deleting {deleted} karas")
},
Err(err) | Ok(Err(err)) => log::error!("{err}"), Err(err) | Ok(Err(err)) => log::error!("{err}"),
} }
}); });
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Veuillez vous inscrire ou vous pour commenter