diff --git a/src/rust/lektor_repo/src/download.rs b/src/rust/lektor_repo/src/download.rs index c792110463bf5a686b029ba2bf6b1b868a2a9288..1d3698e599659df9fc34aa6568a8b7c34767aa37 100644 --- a/src/rust/lektor_repo/src/download.rs +++ b/src/rust/lektor_repo/src/download.rs @@ -1,6 +1,10 @@ use crate::*; use futures::stream::{self, StreamExt}; -use std::path::{Path, PathBuf}; +use lektor_db::models::RemoteKaraId; +use std::{ + num::NonZeroUsize, + path::{Path, PathBuf}, +}; pub struct DownloadBuilder { hosts: Vec<RepoConfig>, @@ -8,6 +12,7 @@ pub struct DownloadBuilder { uri: Option<LktUri>, dry: bool, db: LktLockDbPtr, + concurrent_count: NonZeroUsize, } pub struct Download { @@ -16,6 +21,7 @@ pub struct Download { uri: LktUri, dry: bool, db: LktLockDbPtr, + concurrent_count: NonZeroUsize, } /// TODO: Add an uri type in the kurisu API and add a converter @@ -30,7 +36,7 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S Type(ty) => format!("type={ty}"), Language(lang) => format!("lang={lang}"), FuzzySearch(string) | Search(string) => { - format!("search={}", string.join(" ")) + format!("search={}", string.join("%")) } Playlist(plt) => { return Err(format!( @@ -44,13 +50,28 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S ret })) }, - RepoApiVersion::V2 => |_uri| todo!(), + + RepoApiVersion::V2 => |uri| { + Ok(match uri { + Id(id) => format!("id/{id}"), + Type(ty) => format!("type/{ty}"), + Playlist(plt) => format!("playlist/{plt}"), + Origin(origin) => format!("origin/{origin}"), + Language(lang) => format!("lang/{lang}"), + KaraMaker(author) => format!("karamaker/{author}"), + FuzzySearch(string) | Search(string) => { + format!("search/{}", string.join("%")) + } + Any => "all".to_string(), + }) + }, } } impl Download { pub fn builder(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> DownloadBuilder { DownloadBuilder { + concurrent_count: NonZeroUsize::new(1).expect("oupsy"), uri: None, dry: false, queue, @@ -59,6 +80,10 @@ impl Download { } } + pub fn concurrent(&self) -> Option<usize> { + Some(self.concurrent_count.into()) + } + /// Download the metadata of the kara. We update the database with the downloaded metadata. The /// function only returns the karas' id viewed from the repo. async fn download_metadata<'a>( @@ -125,86 +150,108 @@ impl Download { todo!() } + fn find_repo_id(&self) -> Option<i64> { + let mut db = self.db.lock().expect("failed to lock the database..."); + let find = |RepoConfig { name, .. }: &RepoConfig| { + db.get_repo_id(name) + .map_err(|err| log::error!(target: "REPO", "no id found for repo: {err}")) + .ok() + }; + self.hosts.iter().find_map(find) + } + pub async fn download(self) { - let db = &self.db; - let uri = &self.uri; - let dry = self.dry; - let repo_id = { - let mut db = self.db.lock().expect("failed to lock the database..."); - let repo_id = - self.hosts - .iter() - .find_map(|RepoConfig { name, .. }| match db.get_repo_id(name) { - Ok(repo_id) => Some(repo_id), - Err(err) => { - log::error!(target: "REPO", "no id found for repo: {err}"); - None - } - }); - match repo_id { - Some(repo_id) => repo_id, - None => return, - } + let repo_id = match self.find_repo_id() { + Some(repo_id) => repo_id, + None => return, }; stream::iter(&self.hosts) - .for_each_concurrent(2, |RepoConfig { name, api, urls }| async move { - let uri = match formatter_for_api_version(*api)(&uri) { - Ok(uri) => uri, - Err(err) => { - log::error!(target: "REPO", "{err}"); - return; - } - }; - let uri = &uri; - let ok_urls = stream::iter(urls.iter()) - .filter_map(|base | async move { - Download::download_metadata(db.clone(), *api, base, repo_id, &uri) - .await - .map_err(|err| log::error!(target: "REPO", "failed to download metadata with uri `{uri}`: {err}")) - .ok() - }); - let Some((base, karas_id)) = Box::pin(ok_urls).next().await else { - log::error!("can't find uri {uri} in repo {name}"); - return; - }; - - if dry { - log::info!("fetch metadata for karas from {name}: {karas_id:?}"); - return; - } + .for_each_concurrent(self.concurrent(), |repo| { + self.download_uri_per_repo(repo_id, repo) + }) + .await; + } - let other_urls = urls.iter().filter_map(|url| (url != base).then_some(url.as_str())); - let search_urls: Vec<_> = Some(base).into_iter().chain(other_urls).collect(); - let search_urls = &search_urls; - let (karas_id, errors): (Vec<_>, Vec<_>) = karas_id - .into_iter() - .map(|repo_kara_id| Download::build_destination_file_path(db.clone(), repo_id, repo_kara_id)) - .partition(Result::is_ok); - for error in errors.into_iter().map(Result::unwrap_err) { - log::error!(target: "REPO", "failed to build a file path for kara from repo {name}: {error}"); - } + async fn download_uri_per_repo(&self, repo_id: i64, repo: &RepoConfig) { + let RepoConfig { name, api, urls } = repo; + let Ok(uri) = formatter_for_api_version(*api)(&self.uri).map_err(|err| log::error!(target: "REPO", "{err}")) else { return; }; + let (uri, urls_stream) = (&uri, stream::iter(urls.iter())); + let ok_urls = urls_stream + .filter_map(|base| async move { + Download::download_metadata(self.db.clone(), *api, base, repo_id, uri) + .await + .map_err(|err| log::error!(target: "REPO", "failed to download metadata with uri `{uri}`: {err}")) + .ok() + }); + let Some((base, karas_id)) = Box::pin(ok_urls).next().await else { + log::error!("can't find uri {uri} in repo {name}"); + return; + }; - stream::iter(karas_id.into_iter().map(Result::unwrap)).for_each_concurrent(2, |(repo_kara_id, destination)| async move { - if let Err(err) = Download::download_kara(db.clone(), *api, &search_urls[..], repo_id, repo_kara_id, &destination).await { - log::error!(target: "REPO", "failed to download file `{}` for kara {repo_kara_id}: {err}", destination.to_string_lossy()); - return; - } - if let Err(err) = db.lock().expect("failed to lock the database...") - .make_kara_available(lektor_db::models::RemoteKaraId { repo_id, repo_kara_id }) { - log::error!(target: "REPO", "failed to make kara `{repo_kara_id}` from {name} available: {err}"); - return; - } - log::info!("downloaded kara {repo_kara_id} from {name} at location {}", destination.to_string_lossy()); - }).await; + if self.dry { + log::info!("fetch metadata for karas from {name}: {karas_id:?}"); + return; + } + + let other_urls = urls + .iter() + .filter_map(|url| url.ne(base).then_some(url.as_str())); + let search_urls: Vec<_> = Some(base).into_iter().chain(other_urls).collect(); + let urls = &search_urls[..]; + let (karas_id, errors): (Vec<_>, Vec<_>) = karas_id + .into_iter() + .map(|repo_kara_id| { + Download::build_destination_file_path(self.db.clone(), repo_id, repo_kara_id) + }) + .partition(Result::is_ok); + for error in errors.into_iter().map(Result::unwrap_err) { + log::error!(target: "REPO", "failed to build a file path for kara from repo {name}: {error}"); + } + + stream::iter(karas_id.into_iter().map(Result::unwrap)) + .for_each_concurrent(self.concurrent(), |(repo_kara_id, dest)| { + self.download_kara_into_location(*api, name, repo_id, repo_kara_id, urls, dest) }) .await; } + + async fn download_kara_into_location( + &self, + api: RepoApiVersion, + name: impl AsRef<str>, + repo_id: i64, + repo_kara_id: i64, + search_urls: &[&str], + dest: impl AsRef<Path>, + ) { + let (dest, name, db) = (dest.as_ref(), name.as_ref(), self.db.clone()); + if let Err(err) = + Download::download_kara(db.clone(), api, search_urls, repo_id, repo_kara_id, dest).await + { + log::error!(target: "REPO", "failed to download file `{}` for kara {repo_kara_id}: {err}", dest.to_string_lossy()); + return; + } + let mut db = db.lock().expect("failed to lock the database..."); + let remote_id = RemoteKaraId { + repo_id, + repo_kara_id, + }; + if let Err(err) = db.make_kara_available(remote_id) { + log::error!(target: "REPO", "failed to make kara `{repo_kara_id}` from {name} available: {err}"); + return; + } + log::info!( + "downloaded kara {repo_kara_id} from {name} at location {}", + dest.to_string_lossy() + ); + } } impl DownloadBuilder { pub fn build(self) -> Download { Download { + concurrent_count: self.concurrent_count, queue: self.queue, hosts: self.hosts, uri: self.uri.unwrap_or_default(), @@ -213,6 +260,11 @@ impl DownloadBuilder { } } + pub fn concurrent(mut self, concurrent: NonZeroUsize) -> Self { + self.concurrent_count = concurrent; + self + } + pub fn dry(mut self) -> Self { self.dry = true; self diff --git a/src/rust/lektor_repo/src/lib.rs b/src/rust/lektor_repo/src/lib.rs index 4b05c0f5b7faefcd898b6abdfd0af090d60b5062..7b94a71579ed44b472d9700e4859f4cf04ab8a7d 100644 --- a/src/rust/lektor_repo/src/lib.rs +++ b/src/rust/lektor_repo/src/lib.rs @@ -1,11 +1,9 @@ //! The crate responsible of downloading karas from kurisu. pub(crate) use commons::*; -pub(crate) use hashbrown::HashMap; pub(crate) use lektor_c_compat::rs::*; pub(crate) use lektor_config::*; pub(crate) use lektor_db::{connexion::LktDatabaseConnection, uri::LktUri}; -pub(crate) use smallstring::SmallString; pub(crate) use std::sync::{Arc, Mutex}; pub(crate) use tokio::{runtime::Runtime, task::JoinHandle}; diff --git a/src/rust/lektor_repo/src/repo.rs b/src/rust/lektor_repo/src/repo.rs index b236b20321e1302a7b7e2a9335ffd68371889e0e..f187cbacdff0ad488b658ef409e073ced52a21ee 100644 --- a/src/rust/lektor_repo/src/repo.rs +++ b/src/rust/lektor_repo/src/repo.rs @@ -1,4 +1,5 @@ use crate::*; +use std::num::NonZeroUsize; /// The structure responsible to download karas from kurisu. We use a a pointer /// instead of an Arc because we will call manually drop on instances of the @@ -12,6 +13,11 @@ pub struct LktModuleRepoRs { rt: Runtime, } +fn concurrent() -> NonZeroUsize { + std::thread::available_parallelism() + .expect("failed to get an estimate of the default amount of parallelism") +} + impl LktModuleRepoRs { pub fn new(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> Self { let par = std::thread::available_parallelism() @@ -35,6 +41,7 @@ impl LktModuleRepoRs { self.rt.spawn( download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) .dry() + .concurrent(concurrent()) .build() .download(), ), @@ -46,6 +53,7 @@ impl LktModuleRepoRs { self.handlers.push( self.rt.spawn( download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) + .concurrent(concurrent()) .build() .download(), ), @@ -58,6 +66,7 @@ impl LktModuleRepoRs { self.rt.spawn( download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) .uri(uri) + .concurrent(concurrent()) .build() .download(), ),