Skip to content
Extraits de code Groupes Projets
Vérifiée Valider 542198aa rédigé par Kubat's avatar Kubat
Parcourir les fichiers

REPO: Reorganize the code...

parent 18173fbd
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
use crate::*; use crate::*;
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use std::path::{Path, PathBuf}; use lektor_db::models::RemoteKaraId;
use std::{
num::NonZeroUsize,
path::{Path, PathBuf},
};
pub struct DownloadBuilder { pub struct DownloadBuilder {
hosts: Vec<RepoConfig>, hosts: Vec<RepoConfig>,
...@@ -8,6 +12,7 @@ pub struct DownloadBuilder { ...@@ -8,6 +12,7 @@ pub struct DownloadBuilder {
uri: Option<LktUri>, uri: Option<LktUri>,
dry: bool, dry: bool,
db: LktLockDbPtr, db: LktLockDbPtr,
concurrent_count: NonZeroUsize,
} }
pub struct Download { pub struct Download {
...@@ -16,6 +21,7 @@ pub struct Download { ...@@ -16,6 +21,7 @@ pub struct Download {
uri: LktUri, uri: LktUri,
dry: bool, dry: bool,
db: LktLockDbPtr, db: LktLockDbPtr,
concurrent_count: NonZeroUsize,
} }
/// TODO: Add an uri type in the kurisu API and add a converter /// TODO: Add an uri type in the kurisu API and add a converter
...@@ -30,7 +36,7 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S ...@@ -30,7 +36,7 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S
Type(ty) => format!("type={ty}"), Type(ty) => format!("type={ty}"),
Language(lang) => format!("lang={lang}"), Language(lang) => format!("lang={lang}"),
FuzzySearch(string) | Search(string) => { FuzzySearch(string) | Search(string) => {
format!("search={}", string.join(" ")) format!("search={}", string.join("%"))
} }
Playlist(plt) => { Playlist(plt) => {
return Err(format!( return Err(format!(
...@@ -44,13 +50,28 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S ...@@ -44,13 +50,28 @@ fn formatter_for_api_version(version: RepoApiVersion) -> fn(&LktUri) -> Result<S
ret ret
})) }))
}, },
RepoApiVersion::V2 => |_uri| todo!(),
RepoApiVersion::V2 => |uri| {
Ok(match uri {
Id(id) => format!("id/{id}"),
Type(ty) => format!("type/{ty}"),
Playlist(plt) => format!("playlist/{plt}"),
Origin(origin) => format!("origin/{origin}"),
Language(lang) => format!("lang/{lang}"),
KaraMaker(author) => format!("karamaker/{author}"),
FuzzySearch(string) | Search(string) => {
format!("search/{}", string.join("%"))
}
Any => "all".to_string(),
})
},
} }
} }
impl Download { impl Download {
pub fn builder(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> DownloadBuilder { pub fn builder(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> DownloadBuilder {
DownloadBuilder { DownloadBuilder {
concurrent_count: NonZeroUsize::new(1).expect("oupsy"),
uri: None, uri: None,
dry: false, dry: false,
queue, queue,
...@@ -59,6 +80,10 @@ impl Download { ...@@ -59,6 +80,10 @@ impl Download {
} }
} }
pub fn concurrent(&self) -> Option<usize> {
Some(self.concurrent_count.into())
}
/// Download the metadata of the kara. We update the database with the downloaded metadata. The /// Download the metadata of the kara. We update the database with the downloaded metadata. The
/// function only returns the karas' id viewed from the repo. /// function only returns the karas' id viewed from the repo.
async fn download_metadata<'a>( async fn download_metadata<'a>(
...@@ -125,41 +150,36 @@ impl Download { ...@@ -125,41 +150,36 @@ impl Download {
todo!() todo!()
} }
pub async fn download(self) { fn find_repo_id(&self) -> Option<i64> {
let db = &self.db;
let uri = &self.uri;
let dry = self.dry;
let repo_id = {
let mut db = self.db.lock().expect("failed to lock the database..."); let mut db = self.db.lock().expect("failed to lock the database...");
let repo_id = let find = |RepoConfig { name, .. }: &RepoConfig| {
self.hosts db.get_repo_id(name)
.iter() .map_err(|err| log::error!(target: "REPO", "no id found for repo: {err}"))
.find_map(|RepoConfig { name, .. }| match db.get_repo_id(name) { .ok()
Ok(repo_id) => Some(repo_id), };
Err(err) => { self.hosts.iter().find_map(find)
log::error!(target: "REPO", "no id found for repo: {err}");
None
} }
});
match repo_id { pub async fn download(self) {
let repo_id = match self.find_repo_id() {
Some(repo_id) => repo_id, Some(repo_id) => repo_id,
None => return, None => return,
}
}; };
stream::iter(&self.hosts) stream::iter(&self.hosts)
.for_each_concurrent(2, |RepoConfig { name, api, urls }| async move { .for_each_concurrent(self.concurrent(), |repo| {
let uri = match formatter_for_api_version(*api)(&uri) { self.download_uri_per_repo(repo_id, repo)
Ok(uri) => uri, })
Err(err) => { .await;
log::error!(target: "REPO", "{err}");
return;
} }
};
let uri = &uri; async fn download_uri_per_repo(&self, repo_id: i64, repo: &RepoConfig) {
let ok_urls = stream::iter(urls.iter()) let RepoConfig { name, api, urls } = repo;
let Ok(uri) = formatter_for_api_version(*api)(&self.uri).map_err(|err| log::error!(target: "REPO", "{err}")) else { return; };
let (uri, urls_stream) = (&uri, stream::iter(urls.iter()));
let ok_urls = urls_stream
.filter_map(|base| async move { .filter_map(|base| async move {
Download::download_metadata(db.clone(), *api, base, repo_id, &uri) Download::download_metadata(self.db.clone(), *api, base, repo_id, uri)
.await .await
.map_err(|err| log::error!(target: "REPO", "failed to download metadata with uri `{uri}`: {err}")) .map_err(|err| log::error!(target: "REPO", "failed to download metadata with uri `{uri}`: {err}"))
.ok() .ok()
...@@ -169,42 +189,69 @@ impl Download { ...@@ -169,42 +189,69 @@ impl Download {
return; return;
}; };
if dry { if self.dry {
log::info!("fetch metadata for karas from {name}: {karas_id:?}"); log::info!("fetch metadata for karas from {name}: {karas_id:?}");
return; return;
} }
let other_urls = urls.iter().filter_map(|url| (url != base).then_some(url.as_str())); let other_urls = urls
.iter()
.filter_map(|url| url.ne(base).then_some(url.as_str()));
let search_urls: Vec<_> = Some(base).into_iter().chain(other_urls).collect(); let search_urls: Vec<_> = Some(base).into_iter().chain(other_urls).collect();
let search_urls = &search_urls; let urls = &search_urls[..];
let (karas_id, errors): (Vec<_>, Vec<_>) = karas_id let (karas_id, errors): (Vec<_>, Vec<_>) = karas_id
.into_iter() .into_iter()
.map(|repo_kara_id| Download::build_destination_file_path(db.clone(), repo_id, repo_kara_id)) .map(|repo_kara_id| {
Download::build_destination_file_path(self.db.clone(), repo_id, repo_kara_id)
})
.partition(Result::is_ok); .partition(Result::is_ok);
for error in errors.into_iter().map(Result::unwrap_err) { for error in errors.into_iter().map(Result::unwrap_err) {
log::error!(target: "REPO", "failed to build a file path for kara from repo {name}: {error}"); log::error!(target: "REPO", "failed to build a file path for kara from repo {name}: {error}");
} }
stream::iter(karas_id.into_iter().map(Result::unwrap)).for_each_concurrent(2, |(repo_kara_id, destination)| async move { stream::iter(karas_id.into_iter().map(Result::unwrap))
if let Err(err) = Download::download_kara(db.clone(), *api, &search_urls[..], repo_id, repo_kara_id, &destination).await { .for_each_concurrent(self.concurrent(), |(repo_kara_id, dest)| {
log::error!(target: "REPO", "failed to download file `{}` for kara {repo_kara_id}: {err}", destination.to_string_lossy()); self.download_kara_into_location(*api, name, repo_id, repo_kara_id, urls, dest)
})
.await;
}
async fn download_kara_into_location(
&self,
api: RepoApiVersion,
name: impl AsRef<str>,
repo_id: i64,
repo_kara_id: i64,
search_urls: &[&str],
dest: impl AsRef<Path>,
) {
let (dest, name, db) = (dest.as_ref(), name.as_ref(), self.db.clone());
if let Err(err) =
Download::download_kara(db.clone(), api, search_urls, repo_id, repo_kara_id, dest).await
{
log::error!(target: "REPO", "failed to download file `{}` for kara {repo_kara_id}: {err}", dest.to_string_lossy());
return; return;
} }
if let Err(err) = db.lock().expect("failed to lock the database...") let mut db = db.lock().expect("failed to lock the database...");
.make_kara_available(lektor_db::models::RemoteKaraId { repo_id, repo_kara_id }) { let remote_id = RemoteKaraId {
repo_id,
repo_kara_id,
};
if let Err(err) = db.make_kara_available(remote_id) {
log::error!(target: "REPO", "failed to make kara `{repo_kara_id}` from {name} available: {err}"); log::error!(target: "REPO", "failed to make kara `{repo_kara_id}` from {name} available: {err}");
return; return;
} }
log::info!("downloaded kara {repo_kara_id} from {name} at location {}", destination.to_string_lossy()); log::info!(
}).await; "downloaded kara {repo_kara_id} from {name} at location {}",
}) dest.to_string_lossy()
.await; );
} }
} }
impl DownloadBuilder { impl DownloadBuilder {
pub fn build(self) -> Download { pub fn build(self) -> Download {
Download { Download {
concurrent_count: self.concurrent_count,
queue: self.queue, queue: self.queue,
hosts: self.hosts, hosts: self.hosts,
uri: self.uri.unwrap_or_default(), uri: self.uri.unwrap_or_default(),
...@@ -213,6 +260,11 @@ impl DownloadBuilder { ...@@ -213,6 +260,11 @@ impl DownloadBuilder {
} }
} }
pub fn concurrent(mut self, concurrent: NonZeroUsize) -> Self {
self.concurrent_count = concurrent;
self
}
pub fn dry(mut self) -> Self { pub fn dry(mut self) -> Self {
self.dry = true; self.dry = true;
self self
......
//! The crate responsible of downloading karas from kurisu. //! The crate responsible of downloading karas from kurisu.
pub(crate) use commons::*; pub(crate) use commons::*;
pub(crate) use hashbrown::HashMap;
pub(crate) use lektor_c_compat::rs::*; pub(crate) use lektor_c_compat::rs::*;
pub(crate) use lektor_config::*; pub(crate) use lektor_config::*;
pub(crate) use lektor_db::{connexion::LktDatabaseConnection, uri::LktUri}; pub(crate) use lektor_db::{connexion::LktDatabaseConnection, uri::LktUri};
pub(crate) use smallstring::SmallString;
pub(crate) use std::sync::{Arc, Mutex}; pub(crate) use std::sync::{Arc, Mutex};
pub(crate) use tokio::{runtime::Runtime, task::JoinHandle}; pub(crate) use tokio::{runtime::Runtime, task::JoinHandle};
......
use crate::*; use crate::*;
use std::num::NonZeroUsize;
/// The structure responsible to download karas from kurisu. We use a a pointer /// The structure responsible to download karas from kurisu. We use a a pointer
/// instead of an Arc because we will call manually drop on instances of the /// instead of an Arc because we will call manually drop on instances of the
...@@ -12,6 +13,11 @@ pub struct LktModuleRepoRs { ...@@ -12,6 +13,11 @@ pub struct LktModuleRepoRs {
rt: Runtime, rt: Runtime,
} }
fn concurrent() -> NonZeroUsize {
std::thread::available_parallelism()
.expect("failed to get an estimate of the default amount of parallelism")
}
impl LktModuleRepoRs { impl LktModuleRepoRs {
pub fn new(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> Self { pub fn new(queue: LktCQueue, hosts: Vec<RepoConfig>, db: LktLockDbPtr) -> Self {
let par = std::thread::available_parallelism() let par = std::thread::available_parallelism()
...@@ -35,6 +41,7 @@ impl LktModuleRepoRs { ...@@ -35,6 +41,7 @@ impl LktModuleRepoRs {
self.rt.spawn( self.rt.spawn(
download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) download::Download::builder(self.queue, self.hosts.clone(), self.db.clone())
.dry() .dry()
.concurrent(concurrent())
.build() .build()
.download(), .download(),
), ),
...@@ -46,6 +53,7 @@ impl LktModuleRepoRs { ...@@ -46,6 +53,7 @@ impl LktModuleRepoRs {
self.handlers.push( self.handlers.push(
self.rt.spawn( self.rt.spawn(
download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) download::Download::builder(self.queue, self.hosts.clone(), self.db.clone())
.concurrent(concurrent())
.build() .build()
.download(), .download(),
), ),
...@@ -58,6 +66,7 @@ impl LktModuleRepoRs { ...@@ -58,6 +66,7 @@ impl LktModuleRepoRs {
self.rt.spawn( self.rt.spawn(
download::Download::builder(self.queue, self.hosts.clone(), self.db.clone()) download::Download::builder(self.queue, self.hosts.clone(), self.db.clone())
.uri(uri) .uri(uri)
.concurrent(concurrent())
.build() .build()
.download(), .download(),
), ),
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Veuillez vous inscrire ou vous pour commenter