From 6f95eb567c606580ad06a27cacb4a39927983ebd Mon Sep 17 00:00:00 2001
From: Kubat <maelle.martin@proton.me>
Date: Fri, 18 Oct 2024 20:55:25 +0200
Subject: [PATCH] SEARCH: Implement search adaptors in lektord

---
 lektor_nkdb/src/database/epoch.rs  |  22 ++-
 lektor_nkdb/src/lib.rs             |  12 +-
 lektor_search/src/batch.rs         |   6 +
 lektor_search/src/lib.rs           |   2 +-
 lektor_search/src/traits.rs        |   4 +-
 lektord/Cargo.toml                 |  33 ++---
 lektord/src/app/mod.rs             |   1 +
 lektord/src/app/search_adaptors.rs | 213 +++++++++++++++++++++++++++++
 8 files changed, 270 insertions(+), 23 deletions(-)
 create mode 100644 lektord/src/app/search_adaptors.rs

diff --git a/lektor_nkdb/src/database/epoch.rs b/lektor_nkdb/src/database/epoch.rs
index 059a569e..27ddba19 100644
--- a/lektor_nkdb/src/database/epoch.rs
+++ b/lektor_nkdb/src/database/epoch.rs
@@ -1,6 +1,8 @@
 //! An epoch is a complete representation of the database at a point in time. It should be valid
 //! and sufficient to load the database and update it (well, appart from the files...)
 
+use hashbrown::hash_map;
+
 use crate::*;
 
 /// The epoch contains all available karas at a certain point in time. It can be submitted and
@@ -11,6 +13,9 @@ pub struct Epoch(EpochData, u64);
 /// Represent the data contained in an epoch.
 pub type EpochData = HashMap<KId, Kara>;
 
+/// Represent the keys of the epoch data, e.g. the ids of the kara in this epoch.
+pub type EpochIds<'a> = hash_map::Keys<'a, KId, Kara>;
+
 impl From<(EpochData, u64)> for Epoch {
     fn from((data, num): (EpochData, u64)) -> Self {
         Self(data, num)
@@ -18,7 +23,12 @@ impl From<(EpochData, u64)> for Epoch {
 }
 
 impl Epoch {
-    /// Get all the tuples (KId, RemoteKId) for the kara present in the epoch.
+    /// Get the local ids ([KId]) from this epoch.
+    pub fn local_ids(&self) -> EpochIds {
+        self.0.keys()
+    }
+
+    /// Get all the tuples ([KId], [RemoteKId]) for the kara present in the epoch.
     pub fn ids(&self) -> impl Iterator<Item = (KId, RemoteKId)> + '_ {
         self.0.values().map(|kara| (kara.id, kara.remote.clone()))
     }
@@ -52,4 +62,14 @@ impl Epoch {
     pub fn num(&self) -> u64 {
         self.1
     }
+
+    /// Get the number of karas in the epoch.
+    pub fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    /// Tells wether if the epoch is empty or not.
+    pub fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
 }
diff --git a/lektor_nkdb/src/lib.rs b/lektor_nkdb/src/lib.rs
index 16ab58f9..73d585f9 100644
--- a/lektor_nkdb/src/lib.rs
+++ b/lektor_nkdb/src/lib.rs
@@ -1,7 +1,10 @@
 //! A new implementation of a database to store informations about karas, playlists and such.
 
 pub use crate::{
-    database::{epoch::Epoch, update::UpdateHandler},
+    database::{
+        epoch::{Epoch, EpochIds},
+        update::UpdateHandler,
+    },
     id::{KId, RemoteKId},
     kara::{
         status::KaraStatus,
@@ -9,7 +12,10 @@ pub use crate::{
         timestamps::KaraTimeStamps,
         Kara,
     },
-    playlists::playlist::{Playlist, PlaylistInfo},
+    playlists::{
+        playlist::{Playlist, PlaylistInfo},
+        PlaylistsHandle,
+    },
     storage::{DatabaseDiskStorage, DatabaseStorage},
 };
 pub use kurisu_api::v2::{SongOrigin, SongType, SONGORIGIN_LENGTH, SONGTYPE_LENGTH};
@@ -18,7 +24,7 @@ use crate::database::{epoch::EpochData, pool::Pool};
 use anyhow::Context as _;
 use hashbrown::HashMap;
 use lektor_utils::pushvec::*;
-use playlists::{Playlists, PlaylistsHandle};
+use playlists::Playlists;
 
 mod database;
 mod id;
diff --git a/lektor_search/src/batch.rs b/lektor_search/src/batch.rs
index bfc7a10b..d46c7274 100644
--- a/lektor_search/src/batch.rs
+++ b/lektor_search/src/batch.rs
@@ -85,6 +85,12 @@ impl<const SIZE: usize, Item: Copy> Batch<SIZE, Item> {
     }
 }
 
+impl<const SIZE: usize, Item: Copy> Default for Batch<SIZE, Item> {
+    fn default() -> Self {
+        Self::from_array_maybe([None; SIZE])
+    }
+}
+
 impl<const SIZE: usize, Item: Copy> From<[Option<Item>; SIZE]> for Batch<SIZE, Item> {
     fn from(value: [Option<Item>; SIZE]) -> Self {
         Self::from_array_maybe(value)
diff --git a/lektor_search/src/lib.rs b/lektor_search/src/lib.rs
index 5c7878a4..1b16a034 100644
--- a/lektor_search/src/lib.rs
+++ b/lektor_search/src/lib.rs
@@ -18,7 +18,7 @@ pub async fn search<const BATCH_SIZE: usize>(
         return Default::default();
     };
 
-    stream::unfold(extractor, |state| async move {
+    stream::unfold(extractor, |mut state| async move {
         if state.is_empty().await {
             return None;
         }
diff --git a/lektor_search/src/traits.rs b/lektor_search/src/traits.rs
index b1fa5a02..94ae174d 100644
--- a/lektor_search/src/traits.rs
+++ b/lektor_search/src/traits.rs
@@ -4,10 +4,10 @@ use lektor_payloads::{KId, Kara};
 #[allow(async_fn_in_trait)]
 pub trait KaraIdExtractor {
     /// Get the next kara id.
-    async fn next_id(&self) -> Option<KId>;
+    async fn next_id(&mut self) -> Option<KId>;
 
     /// Get a next batch of kara id, to reduce any lock usage.
-    async fn next_id_batch<const SIZE: usize>(&self) -> Batch<SIZE, KId>;
+    async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId>;
 
     /// Get the number of karas to process until the extractor is empty.
     async fn count(&self) -> usize;
diff --git a/lektord/Cargo.toml b/lektord/Cargo.toml
index 84f35257..a4e54f9c 100644
--- a/lektord/Cargo.toml
+++ b/lektord/Cargo.toml
@@ -1,29 +1,30 @@
 [package]
-name = "lektord"
-version.workspace = true
-edition.workspace = true
-authors.workspace = true
-license.workspace = true
-rust-version.workspace = true
+name        = "lektord"
 description = "The lektord daemon"
 
+rust-version.workspace = true
+version.workspace      = true
+edition.workspace      = true
+authors.workspace      = true
+license.workspace      = true
+
 [dependencies]
-serde.workspace = true
+serde.workspace      = true
 serde_json.workspace = true
 
-log.workspace = true
-rand.workspace = true
-anyhow.workspace = true
+log.workspace       = true
+rand.workspace      = true
+anyhow.workspace    = true
 hashbrown.workspace = true
 
-futures.workspace = true
-
-axum.workspace = true
-tokio.workspace = true
+futures.workspace      = true
+tokio.workspace        = true
 tokio-stream.workspace = true
-hyper.workspace = true
+
+axum.workspace       = true
+hyper.workspace      = true
 hyper-util.workspace = true
-tower.workspace = true
+tower.workspace      = true
 
 clap.workspace = true
 
diff --git a/lektord/src/app/mod.rs b/lektord/src/app/mod.rs
index a9ad1295..d020f169 100644
--- a/lektord/src/app/mod.rs
+++ b/lektord/src/app/mod.rs
@@ -6,6 +6,7 @@ mod mpris;
 mod play_state;
 mod queue;
 mod routes;
+mod search_adaptors;
 
 use crate::LektorConfig;
 use anyhow::{Context, Result};
diff --git a/lektord/src/app/search_adaptors.rs b/lektord/src/app/search_adaptors.rs
new file mode 100644
index 00000000..dd414fe8
--- /dev/null
+++ b/lektord/src/app/search_adaptors.rs
@@ -0,0 +1,213 @@
+use super::LektorState;
+use lektor_nkdb::{Epoch, EpochIds, KId, Kara};
+use lektor_search::{Batch, KaraIdExtractor};
+use std::ops::Bound;
+
+/// Get a kara store, to get [Kara] by its [KId].
+pub fn kara_store(state: &LektorState) -> impl lektor_search::KaraStore + use<'_> {
+    KaraStore(&state.database)
+}
+
+/// Get a thing to iterate over all the kara ids.
+pub async fn database_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> {
+    let epoch = (state.database.last_epoch()).await;
+    Database(
+        epoch.map(Epoch::local_ids).unwrap_or_default(),
+        epoch.map(Epoch::len).unwrap_or_default(),
+    )
+}
+
+/// Get a thing to iterate over all the queue.
+pub async fn queue_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> {
+    Queue(state, 0)
+}
+
+/// Get a thing to iterate over all the history.
+pub async fn history_extractor(state: &LektorState) -> impl KaraIdExtractor + use<'_> {
+    History(state, 0)
+}
+
+/// Get a thing to iterate over all the kara ids of a playlist.
+pub async fn playlist_extractor(state: &LektorState, plt: KId) -> impl KaraIdExtractor + use<'_> {
+    Playlist {
+        handle: state.database.playlists(),
+        playlist: plt,
+        index: 0,
+    }
+}
+
+/// A database wrapper to implement [lektor_search::KaraStore].
+struct KaraStore<'a>(&'a lektor_nkdb::Database);
+
+/// A database wrapper to implement [lektor_search::KaraStore] to iterate over the history.
+struct History<'a>(&'a LektorState, usize);
+
+/// A database wrapper to implement [lektor_search::KaraStore] to iterate over the queue.
+struct Queue<'a>(&'a LektorState, usize);
+
+/// A database wrapper to implement [KaraIdExtractor] for a whole epoch.
+struct Database<'a>(EpochIds<'a>, usize);
+
+/// A database wrapper to implement [KaraIdExtractor] for a playlist.
+struct Playlist<'a, Storage: lektor_nkdb::DatabaseStorage> {
+    handle: lektor_nkdb::PlaylistsHandle<'a, Storage>,
+    playlist: KId,
+    index: usize,
+}
+
+impl<'a> KaraIdExtractor for Database<'a> {
+    async fn next_id(&mut self) -> Option<KId> {
+        self.0.next().inspect(|_| self.1 -= 1).copied()
+    }
+
+    async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> {
+        std::iter::from_fn(|| self.0.next().inspect(|_| self.1 -= 1).copied())
+            .enumerate()
+            .take(SIZE)
+            .fold([None; SIZE], |mut array, (idx, id)| {
+                array[idx] = Some(id);
+                array
+            })
+            .into()
+    }
+
+    async fn count(&self) -> usize {
+        self.1
+    }
+}
+
+impl<'a> KaraIdExtractor for History<'a> {
+    async fn next_id(&mut self) -> Option<KId> {
+        let item = (self.0.queue())
+            .read(|content, _| content.history((Bound::Included(self.1), Bound::Included(self.1))))
+            .await;
+
+        match &item[..] {
+            [] => None,
+            [id] => {
+                self.1 += 1;
+                Some(*id)
+            }
+
+            _ => unreachable!(),
+        }
+    }
+
+    async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> {
+        (self.0.queue())
+            .read(|content, _| {
+                content.history((Bound::Included(self.1), Bound::Excluded(self.1 + SIZE)))
+            })
+            .await
+            .into_iter()
+            .take(SIZE)
+            .enumerate()
+            .fold([None; SIZE], |mut array, (idx, id)| {
+                self.1 += 1;
+                array[idx] = Some(id);
+                array
+            })
+            .into()
+    }
+
+    async fn count(&self) -> usize {
+        (self.0.queue())
+            .read(|content, _| content.history_count() - self.1)
+            .await
+    }
+}
+
+impl<'a> KaraIdExtractor for Queue<'a> {
+    async fn next_id(&mut self) -> Option<KId> {
+        let item = (self.0.queue())
+            .read(|content, _| content.get((Bound::Included(self.1), Bound::Included(self.1))))
+            .await;
+
+        match &item[..] {
+            [] => None,
+            [(_, id)] => {
+                self.1 += 1;
+                Some(*id)
+            }
+
+            _ => unreachable!(),
+        }
+    }
+
+    async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> {
+        (self.0.queue())
+            .read(|content, _| {
+                content.get((Bound::Included(self.1), Bound::Excluded(self.1 + SIZE)))
+            })
+            .await
+            .into_iter()
+            .take(SIZE)
+            .enumerate()
+            .fold([None; SIZE], |mut array, (idx, (_, id))| {
+                self.1 += 1;
+                array[idx] = Some(id);
+                array
+            })
+            .into()
+    }
+
+    async fn count(&self) -> usize {
+        (self.0.queue())
+            .read(|content, _| content.count() - self.1)
+            .await
+    }
+}
+
+impl<'a, Storage: lektor_nkdb::DatabaseStorage> KaraIdExtractor for Playlist<'a, Storage> {
+    async fn next_id(&mut self) -> Option<KId> {
+        self.handle
+            .read(self.playlist, |plt| plt.iter_seq_content().nth(self.index))
+            .await
+            .inspect(|_| self.index += 1)
+            .map_err(|err| log::error!("{err}"))
+            .unwrap_or_default()
+    }
+
+    async fn next_id_batch<const SIZE: usize>(&mut self) -> Batch<SIZE, KId> {
+        self.handle
+            .read(self.playlist, |plt| {
+                plt.iter_seq_content()
+                    .skip(self.index)
+                    .take(SIZE)
+                    .enumerate()
+                    .fold([None; SIZE], |mut array, (idx, id)| {
+                        self.index += 1;
+                        array[idx] = Some(id);
+                        array
+                    })
+            })
+            .await
+            .map(Batch::from)
+            .unwrap_or_default()
+    }
+
+    async fn count(&self) -> usize {
+        self.handle
+            .read(self.playlist, |plt| plt.iter_seq_content().count())
+            .await
+            .map_err(|err| log::error!("{err}"))
+            .unwrap_or_default()
+    }
+}
+
+impl<'a> lektor_search::KaraStore for KaraStore<'a> {
+    async fn get_kara(&self, id: KId) -> Option<&Kara> {
+        self.0.get_kara_by_kid(id).await.ok()
+    }
+
+    async fn get_kara_batch<const SIZE: usize>(
+        &self,
+        batch: Batch<SIZE, KId>,
+    ) -> Batch<SIZE, &Kara> {
+        (self.0.last_epoch().await).map_or_else(Batch::default, |epoch| {
+            (batch.into_array())
+                .map(|maybe| maybe.and_then(|id| epoch.get_kara_by_kid(id)))
+                .into()
+        })
+    }
+}
-- 
GitLab