diff --git a/lektor_nkdb/src/database/epoch.rs b/lektor_nkdb/src/database/epoch.rs index 059a569ef38daf2097e89c4133c6a97eeffea665..27ddba1952df02035043c7e470d4e786fd61e6a2 100644 --- a/lektor_nkdb/src/database/epoch.rs +++ b/lektor_nkdb/src/database/epoch.rs @@ -1,6 +1,8 @@ //! An epoch is a complete representation of the database at a point in time. It should be valid //! and sufficient to load the database and update it (well, appart from the files...) +use hashbrown::hash_map; + use crate::*; /// The epoch contains all available karas at a certain point in time. It can be submitted and @@ -11,6 +13,9 @@ pub struct Epoch(EpochData, u64); /// Represent the data contained in an epoch. pub type EpochData = HashMap<KId, Kara>; +/// Represent the keys of the epoch data, e.g. the ids of the kara in this epoch. +pub type EpochIds<'a> = hash_map::Keys<'a, KId, Kara>; + impl From<(EpochData, u64)> for Epoch { fn from((data, num): (EpochData, u64)) -> Self { Self(data, num) @@ -18,7 +23,12 @@ impl From<(EpochData, u64)> for Epoch { } impl Epoch { - /// Get all the tuples (KId, RemoteKId) for the kara present in the epoch. + /// Get the local ids ([KId]) from this epoch. + pub fn local_ids(&self) -> EpochIds { + self.0.keys() + } + + /// Get all the tuples ([KId], [RemoteKId]) for the kara present in the epoch. pub fn ids(&self) -> impl Iterator<Item = (KId, RemoteKId)> + '_ { self.0.values().map(|kara| (kara.id, kara.remote.clone())) } @@ -52,4 +62,14 @@ impl Epoch { pub fn num(&self) -> u64 { self.1 } + + /// Get the number of karas in the epoch. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Tells wether if the epoch is empty or not. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } diff --git a/lektor_nkdb/src/lib.rs b/lektor_nkdb/src/lib.rs index 16ab58f9664db8466b37b84326863683f5b9ca0a..73d585f9bcbb38ec835b1805d629d1c8085779c7 100644 --- a/lektor_nkdb/src/lib.rs +++ b/lektor_nkdb/src/lib.rs @@ -1,7 +1,10 @@ //! A new implementation of a database to store informations about karas, playlists and such. pub use crate::{ - database::{epoch::Epoch, update::UpdateHandler}, + database::{ + epoch::{Epoch, EpochIds}, + update::UpdateHandler, + }, id::{KId, RemoteKId}, kara::{ status::KaraStatus, @@ -9,7 +12,10 @@ pub use crate::{ timestamps::KaraTimeStamps, Kara, }, - playlists::playlist::{Playlist, PlaylistInfo}, + playlists::{ + playlist::{Playlist, PlaylistInfo}, + PlaylistsHandle, + }, storage::{DatabaseDiskStorage, DatabaseStorage}, }; pub use kurisu_api::v2::{SongOrigin, SongType, SONGORIGIN_LENGTH, SONGTYPE_LENGTH}; @@ -18,7 +24,7 @@ use crate::database::{epoch::EpochData, pool::Pool}; use anyhow::Context as _; use hashbrown::HashMap; use lektor_utils::pushvec::*; -use playlists::{Playlists, PlaylistsHandle}; +use playlists::Playlists; mod database; mod id; diff --git a/lektor_search/src/batch.rs b/lektor_search/src/batch.rs index bfc7a10b1f33760ae75e0f6a01051fe259a936fd..d46c7274cfafe2bbd2a02dab1bcb25cbc771e872 100644 --- a/lektor_search/src/batch.rs +++ b/lektor_search/src/batch.rs @@ -85,6 +85,12 @@ impl<const SIZE: usize, Item: Copy> Batch<SIZE, Item> { } } +impl<const SIZE: usize, Item: Copy> Default for Batch<SIZE, Item> { + fn default() -> Self { + Self::from_array_maybe([None; SIZE]) + } +} + impl<const SIZE: usize, Item: Copy> From<[Option<Item>; SIZE]> for Batch<SIZE, Item> { fn from(value: [Option<Item>; SIZE]) -> Self { Self::from_array_maybe(value) diff --git a/lektor_search/src/lib.rs b/lektor_search/src/lib.rs index 5c7878a45eca949ade901f86c3ccda34b32079d1..1b16a0340da685744eb0be241e4e4f96e17b980a 100644 --- a/lektor_search/src/lib.rs +++ b/lektor_search/src/lib.rs @@ -18,7 +18,7 @@ pub async fn search<const BATCH_SIZE: usize>( return Default::default(); }; - stream::unfold(extractor, |state| async move { + stream::unfold(extractor, |mut state| async move { if state.is_empty().await { return None; } diff --git a/lektor_search/src/traits.rs b/lektor_search/src/traits.rs index b1fa5a02b0f9509f718fec176d4e72f92bd1ae9a..94ae174dc5b52828232f9a5a9fa0510ba1aa9b8a 100644 --- a/lektor_search/src/traits.rs +++ b/lektor_search/src/traits.rs @@ -4,10 +4,10 @@ use lektor_payloads::{KId, Kara}; #[allow(async_fn_in_trait)] pub trait KaraIdExtractor { /// Get the next kara id. - async fn next_id(&self) -> Option<KId>; + async fn next_id(&mut self) -> Option<KId>; /// Get a next batch of kara id, to reduce any lock usage. - async fn next_id_batch<const SIZE: usize>(&self) -> Batch<SIZE, KId>; + async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId>; /// Get the number of karas to process until the extractor is empty. async fn count(&self) -> usize; diff --git a/lektord/Cargo.toml b/lektord/Cargo.toml index 84f35257ca013827d591b905935da3c521b41d54..a4e54f9cb1445f790f6d8d835a58f790405c8b3e 100644 --- a/lektord/Cargo.toml +++ b/lektord/Cargo.toml @@ -1,29 +1,30 @@ [package] -name = "lektord" -version.workspace = true -edition.workspace = true -authors.workspace = true -license.workspace = true -rust-version.workspace = true +name = "lektord" description = "The lektord daemon" +rust-version.workspace = true +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true + [dependencies] -serde.workspace = true +serde.workspace = true serde_json.workspace = true -log.workspace = true -rand.workspace = true -anyhow.workspace = true +log.workspace = true +rand.workspace = true +anyhow.workspace = true hashbrown.workspace = true -futures.workspace = true - -axum.workspace = true -tokio.workspace = true +futures.workspace = true +tokio.workspace = true tokio-stream.workspace = true -hyper.workspace = true + +axum.workspace = true +hyper.workspace = true hyper-util.workspace = true -tower.workspace = true +tower.workspace = true clap.workspace = true diff --git a/lektord/src/app/mod.rs b/lektord/src/app/mod.rs index a9ad12953c92c6bc853e60bf68968c4a6acdd6e6..d020f169bc1aa131ecd7e5cfb243c53848137add 100644 --- a/lektord/src/app/mod.rs +++ b/lektord/src/app/mod.rs @@ -6,6 +6,7 @@ mod mpris; mod play_state; mod queue; mod routes; +mod search_adaptors; use crate::LektorConfig; use anyhow::{Context, Result}; diff --git a/lektord/src/app/search_adaptors.rs b/lektord/src/app/search_adaptors.rs new file mode 100644 index 0000000000000000000000000000000000000000..dd414fe8471d1c63c0b55efb973b75ef438bcd8c --- /dev/null +++ b/lektord/src/app/search_adaptors.rs @@ -0,0 +1,213 @@ +use super::LektorState; +use lektor_nkdb::{Epoch, EpochIds, KId, Kara}; +use lektor_search::{Batch, KaraIdExtractor}; +use std::ops::Bound; + +/// Get a kara store, to get [Kara] by its [KId]. +pub fn kara_store(state: &LektorState) -> impl lektor_search::KaraStore + use<'_> { + KaraStore(&state.database) +} + +/// Get a thing to iterate over all the kara ids. +pub async fn database_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> { + let epoch = (state.database.last_epoch()).await; + Database( + epoch.map(Epoch::local_ids).unwrap_or_default(), + epoch.map(Epoch::len).unwrap_or_default(), + ) +} + +/// Get a thing to iterate over all the queue. +pub async fn queue_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> { + Queue(state, 0) +} + +/// Get a thing to iterate over all the history. +pub async fn history_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> { + History(state, 0) +} + +/// Get a thing to iterate over all the kara ids of a playlist. +pub async fn playlist_extractor(state: &LektorState, plt: KId) -> impl KaraIdExtractor + use<'_> { + Playlist { + handle: state.database.playlists(), + playlist: plt, + index: 0, + } +} + +/// A database wrapper to implement [lektor_search::KaraStore]. +struct KaraStore<'a>(&'a lektor_nkdb::Database); + +/// A database wrapper to implement [lektor_search::KaraStore] to iterate over the history. +struct History<'a>(&'a LektorState, usize); + +/// A database wrapper to implement [lektor_search::KaraStore] to iterate over the queue. +struct Queue<'a>(&'a LektorState, usize); + +/// A database wrapper to implement [KaraIdExtractor] for a whole epoch. +struct Database<'a>(EpochIds<'a>, usize); + +/// A database wrapper to implement [KaraIdExtractor] for a playlist. +struct Playlist<'a, Storage: lektor_nkdb::DatabaseStorage> { + handle: lektor_nkdb::PlaylistsHandle<'a, Storage>, + playlist: KId, + index: usize, +} + +impl<'a> KaraIdExtractor for Database<'a> { + async fn next_id(&mut self) -> Option<KId> { + self.0.next().inspect(|_| self.1 -= 1).copied() + } + + async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> { + std::iter::from_fn(|| self.0.next().inspect(|_| self.1 -= 1).copied()) + .enumerate() + .take(SIZE) + .fold([None; SIZE], |mut array, (idx, id)| { + array[idx] = Some(id); + array + }) + .into() + } + + async fn count(&self) -> usize { + self.1 + } +} + +impl<'a> KaraIdExtractor for History<'a> { + async fn next_id(&mut self) -> Option<KId> { + let item = (self.0.queue()) + .read(|content, _| content.history((Bound::Included(self.1), Bound::Included(self.1)))) + .await; + + match &item[..] { + [] => None, + [id] => { + self.1 += 1; + Some(*id) + } + + _ => unreachable!(), + } + } + + async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> { + (self.0.queue()) + .read(|content, _| { + content.history((Bound::Included(self.1), Bound::Excluded(self.1 + SIZE))) + }) + .await + .into_iter() + .take(SIZE) + .enumerate() + .fold([None; SIZE], |mut array, (idx, id)| { + self.1 += 1; + array[idx] = Some(id); + array + }) + .into() + } + + async fn count(&self) -> usize { + (self.0.queue()) + .read(|content, _| content.history_count() - self.1) + .await + } +} + +impl<'a> KaraIdExtractor for Queue<'a> { + async fn next_id(&mut self) -> Option<KId> { + let item = (self.0.queue()) + .read(|content, _| content.get((Bound::Included(self.1), Bound::Included(self.1)))) + .await; + + match &item[..] { + [] => None, + [(_, id)] => { + self.1 += 1; + Some(*id) + } + + _ => unreachable!(), + } + } + + async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> { + (self.0.queue()) + .read(|content, _| { + content.get((Bound::Included(self.1), Bound::Excluded(self.1 + SIZE))) + }) + .await + .into_iter() + .take(SIZE) + .enumerate() + .fold([None; SIZE], |mut array, (idx, (_, id))| { + self.1 += 1; + array[idx] = Some(id); + array + }) + .into() + } + + async fn count(&self) -> usize { + (self.0.queue()) + .read(|content, _| content.count() - self.1) + .await + } +} + +impl<'a, Storage: lektor_nkdb::DatabaseStorage> KaraIdExtractor for Playlist<'a, Storage> { + async fn next_id(&mut self) -> Option<KId> { + self.handle + .read(self.playlist, |plt| plt.iter_seq_content().nth(self.index)) + .await + .inspect(|_| self.index += 1) + .map_err(|err| log::error!("{err}")) + .unwrap_or_default() + } + + async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> { + self.handle + .read(self.playlist, |plt| { + plt.iter_seq_content() + .skip(self.index) + .take(SIZE) + .enumerate() + .fold([None; SIZE], |mut array, (idx, id)| { + self.index += 1; + array[idx] = Some(id); + array + }) + }) + .await + .map(Batch::from) + .unwrap_or_default() + } + + async fn count(&self) -> usize { + self.handle + .read(self.playlist, |plt| plt.iter_seq_content().count()) + .await + .map_err(|err| log::error!("{err}")) + .unwrap_or_default() + } +} + +impl<'a> lektor_search::KaraStore for KaraStore<'a> { + async fn get_kara(&self, id: KId) -> Option<&Kara> { + self.0.get_kara_by_kid(id).await.ok() + } + + async fn get_kara_batch<const SIZE: usize>( + &self, + batch: Batch<SIZE, KId>, + ) -> Batch<SIZE, &Kara> { + (self.0.last_epoch().await).map_or_else(Batch::default, |epoch| { + (batch.into_array()) + .map(|maybe| maybe.and_then(|id| epoch.get_kara_by_kid(id))) + .into() + }) + } +}