Skip to content
Extraits de code Groupes Projets
Non vérifiée Valider 95d34c8d rédigé par Kubat's avatar Kubat
Parcourir les fichiers

Merge remote-tracking branch 'origin/rust-misc-debug' into rust-misc-debug

# Conflicts:
#	lektor_nkdb/src/database/pool.rs
#	lektor_nkdb/src/database/update.rs
#	lektor_nkdb/src/lib.rs
#	lektor_nkdb/src/storage/disk_storage.rs
#	lektor_utils/src/dkmap.rs
#	lektor_utils/tests/dkmap.rs
parents 1a2f7347 49dcbf21
Branches
Aucune étiquette associée trouvée
2 requêtes de fusion!205misc debug and harderning, mostly related to repo syncing,!197Draft: Refactor the whole code.
...@@ -23,6 +23,10 @@ impl From<(EpochData, u64)> for Epoch { ...@@ -23,6 +23,10 @@ impl From<(EpochData, u64)> for Epoch {
} }
impl Epoch { impl Epoch {
pub fn epoch_id(&self) -> u64 {
self.1
}
/// Get the local ids ([KId]) from this epoch. /// Get the local ids ([KId]) from this epoch.
pub fn local_ids(&self) -> EpochIds { pub fn local_ids(&self) -> EpochIds {
self.0.keys() self.0.keys()
......
...@@ -5,11 +5,10 @@ ...@@ -5,11 +5,10 @@
//! specific things like the [u64] to [Kid] / [Kara] mappings, do that in the epoch struct. //! specific things like the [u64] to [Kid] / [Kara] mappings, do that in the epoch struct.
use crate::{kara::status::KaraState, strings, EpochData, KId, Kara, KaraStatus, RemoteKId}; use crate::{kara::status::KaraState, strings, EpochData, KId, Kara, KaraStatus, RemoteKId};
use anyhow::{Context, Result}; use anyhow::{Context as _, Result};
use futures::prelude::*; use futures::prelude::*;
use lektor_utils::dkmap::DKMap; use lektor_utils::dkmap::DKMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
/// ID mapping from local to remote. Note that there is a one-to-one relation between /// ID mapping from local to remote. Note that there is a one-to-one relation between
/// [RemoteKId] and [KId], even with metadata updates or file update. /// [RemoteKId] and [KId], even with metadata updates or file update.
...@@ -87,25 +86,19 @@ pub struct Pool { ...@@ -87,25 +86,19 @@ pub struct Pool {
kara_mapping: KaraMapping, kara_mapping: KaraMapping,
/// The next local ID. /// The next local ID.
next_id: AtomicU64, next_id: u64,
} }
impl From<PoolSerDeProxy> for Pool { impl From<PoolSerDeProxy> for Pool {
fn from(proxy: PoolSerDeProxy) -> Self { fn from(proxy: PoolSerDeProxy) -> Self {
let next_id = AtomicU64::new( let PoolSerDeProxy { kara_mapping } = proxy;
proxy let next_id = kara_mapping
.kara_mapping .iter()
.clone() .map(|(KId(id), _, _)| *id + 1)
.into_iter()
.map(|(KId(id), _, _)| id + 1)
.max() .max()
.unwrap_or(1), .unwrap_or(1);
);
Self { Self { next_id, kara_mapping }
next_id,
kara_mapping: proxy.kara_mapping,
}
} }
} }
...@@ -125,10 +118,8 @@ impl<M: IntoMapping> FromIterator<M> for Pool { ...@@ -125,10 +118,8 @@ impl<M: IntoMapping> FromIterator<M> for Pool {
}) })
.unzip(); .unzip();
let next_id = ids.into_iter().max().unwrap_or(0) + 1;
Self { Self {
next_id: AtomicU64::new(next_id), next_id: ids.into_iter().max().unwrap_or(0) + 1,
kara_mapping: DKMap::from_iter(mappings), kara_mapping: DKMap::from_iter(mappings),
} }
} }
...@@ -142,8 +133,10 @@ impl Pool { ...@@ -142,8 +133,10 @@ impl Pool {
} }
/// Get a new and unique [KId] /// Get a new and unique [KId]
pub(crate) fn next_kid(&self) -> KId { pub(crate) fn next_kid(&mut self) -> KId {
KId(self.next_id.fetch_add(1, Ordering::AcqRel)) let ret = self.next_id;
self.next_id += 1;
KId(ret)
} }
pub(crate) fn add_mapping(&mut self, mapping: impl IntoMapping) { pub(crate) fn add_mapping(&mut self, mapping: impl IntoMapping) {
......
...@@ -56,14 +56,14 @@ impl<Storage: DatabaseStorage> Database<Storage> { ...@@ -56,14 +56,14 @@ impl<Storage: DatabaseStorage> Database<Storage> {
let mut last_epoch = epochs let mut last_epoch = epochs
.push(storage.read_last_epoch().await?.unwrap_or_default().into()) .push(storage.read_last_epoch().await?.unwrap_or_default().into())
.await; .await;
let mut pool = Pool::from_iter(last_epoch.content().data().values()); let pool = storage.get_pool().await?;
log::info!("factorize content in the last_epoch and in the playlists"); log::info!("factorize content in the last_epoch and in the playlists");
pool.factorize_epoch_data(last_epoch.content().data_mut()) pool.factorize_epoch_data(last_epoch.content().data_mut())
.await; .await;
log::info!("database loaded from disk");
last_epoch.finished(); last_epoch.finished();
log::info!("database loaded");
Ok(Self { Ok(Self {
playlists: Playlists::new( playlists: Playlists::new(
...@@ -72,7 +72,7 @@ impl<Storage: DatabaseStorage> Database<Storage> { ...@@ -72,7 +72,7 @@ impl<Storage: DatabaseStorage> Database<Storage> {
.await .await
.context("failed to read playlists")?, .context("failed to read playlists")?,
), ),
pool: RwLock::new(pool), pool,
epochs, epochs,
storage, storage,
}) })
......
...@@ -16,7 +16,7 @@ use std::{ ...@@ -16,7 +16,7 @@ use std::{
use tokio::{ use tokio::{
fs, fs,
io::AsyncWriteExt as _, io::AsyncWriteExt as _,
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{Receiver, Sender, channel},
}; };
#[allow(clippy::doc_overindented_list_items)] #[allow(clippy::doc_overindented_list_items)]
...@@ -41,7 +41,6 @@ pub struct DatabaseDiskStorage { ...@@ -41,7 +41,6 @@ pub struct DatabaseDiskStorage {
prefix: PathBuf, prefix: PathBuf,
next_epoch: AtomicU64, next_epoch: AtomicU64,
playlist_pipeline: Sender<PlaylistWriteEvent>, playlist_pipeline: Sender<PlaylistWriteEvent>,
pool: Pool,
pool_pipeline: Sender<PoolWriteEvent>, pool_pipeline: Sender<PoolWriteEvent>,
} }
...@@ -141,6 +140,8 @@ impl DatabaseStorage for DatabaseDiskStorage { ...@@ -141,6 +140,8 @@ impl DatabaseStorage for DatabaseDiskStorage {
type File = (PathBuf, fs::File); type File = (PathBuf, fs::File);
async fn load_from_prefix(prefix: PathBuf) -> anyhow::Result<Self> { async fn load_from_prefix(prefix: PathBuf) -> anyhow::Result<Self> {
log::info!("load storage from disk");
// Prepare folders. // Prepare folders.
for folder in ["data", "epoch", "playlists"].map(|folder| prefix.join(folder)) { for folder in ["data", "epoch", "playlists"].map(|folder| prefix.join(folder)) {
if let Err(err) = fs::create_dir_all(&folder).await { if let Err(err) = fs::create_dir_all(&folder).await {
...@@ -173,7 +174,6 @@ impl DatabaseStorage for DatabaseDiskStorage { ...@@ -173,7 +174,6 @@ impl DatabaseStorage for DatabaseDiskStorage {
prefix, prefix,
next_epoch: AtomicU64::new(last_epoch + 1), next_epoch: AtomicU64::new(last_epoch + 1),
playlist_pipeline: send_playlist, playlist_pipeline: send_playlist,
pool,
pool_pipeline: send_pool, pool_pipeline: send_pool,
}) })
} }
...@@ -215,6 +215,21 @@ impl DatabaseStorage for DatabaseDiskStorage { ...@@ -215,6 +215,21 @@ impl DatabaseStorage for DatabaseDiskStorage {
Ok(Some((data.into_inner(), id))) Ok(Some((data.into_inner(), id)))
} }
async fn read_playlists(&self) -> anyhow::Result<Vec<Playlist>> {
FolderReader::new(self.prefix.join("playlists"))
.await?
.unfold_entries::<KId>()
.then(|id| async move {
let path = self.path_from_root(format!("playlists/{id}.json"));
log::info!("try to load playlist from path: {}", path.display());
Ok(serde_json::from_slice(&fs::read(path).await?)?)
})
.collect::<FuturesUnordered<_>>()
.await
.into_iter()
.collect::<anyhow::Result<Vec<Playlist>>>()
}
async fn write_epoch(&self, data: &EpochData) -> anyhow::Result<()> { async fn write_epoch(&self, data: &EpochData) -> anyhow::Result<()> {
let num = self.next_epoch(); let num = self.next_epoch();
write_json_from_root( write_json_from_root(
...@@ -227,9 +242,21 @@ impl DatabaseStorage for DatabaseDiskStorage { ...@@ -227,9 +242,21 @@ impl DatabaseStorage for DatabaseDiskStorage {
Ok(()) Ok(())
} }
async fn get_pool(&self) -> anyhow::Result<Pool> {
match &fs::read(self.prefix.join("pool.json")).await {
Ok(content) => Ok(serde_json::from_slice::<PoolSerDeProxy>(content)
.context("unable to deserialize pool.json from disk")?
.into()),
Err(err) => {
log::info!("seems like there is no pool on disk ({err}), so start from scratch...");
Ok(Pool::from_iter(std::iter::empty::<(id::KId, id::RemoteKId, KaraState)>()))
}
}
}
async fn write_pool(&self, data: &Pool) -> anyhow::Result<()> { async fn write_pool(&self, data: &Pool) -> anyhow::Result<()> {
// FIXME: here, self.pool is not updated to match what is in data // FIXME: here, self.pool is not updated to match what is in data
let data = serde_json::to_string(&data.to_serialize_proxy()) let data = serde_json::to_string(&data.to_serialize_proxy().await)
.context("error while serializing pool")?; .context("error while serializing pool")?;
self.pool_pipeline self.pool_pipeline
.send(PoolWriteEvent::Write(data)) .send(PoolWriteEvent::Write(data))
...@@ -320,7 +347,7 @@ impl DatabaseStorage for DatabaseDiskStorage { ...@@ -320,7 +347,7 @@ impl DatabaseStorage for DatabaseDiskStorage {
match status { match status {
KaraStatus::Virtual => true, KaraStatus::Virtual => true,
KaraStatus::Physical => match self.get_kara_cached_checksum(id).await { KaraStatus::Physical => match self.get_kara_cached_checksum(id).await {
Ok(known) => match self.pool.get_state(id) { Ok(known) => match self.pool.get_state(id).await {
Some(KaraState::PhysicalAvailable { hash, .. }) => known == hash, Some(KaraState::PhysicalAvailable { hash, .. }) => known == hash,
_ => false, _ => false,
}, },
......
...@@ -37,13 +37,16 @@ pub trait DatabaseStorage: Sized + std::fmt::Debug { ...@@ -37,13 +37,16 @@ pub trait DatabaseStorage: Sized + std::fmt::Debug {
// =========== // // =========== //
// Epoch stuff // // Epoch stuff //
// =========== // // =========== //
/// Read the last valid epoch from disk/memory/network/like-the-implementation-does. /// Read the last valid epoch from disk/memory/network/like-the-implementation-does.
async fn read_last_epoch(&self) -> Result<Option<(EpochData, u64)>>; async fn read_last_epoch(&self) -> Result<Option<(EpochData, u64)>>;
async fn get_epochs(&self) -> PushVec<Epoch>;
/// Write the data from an epoch to the disk. /// Write the data from an epoch to the disk.
async fn write_epoch(&self, epoch: &EpochData) -> Result<()>; async fn write_epoch(&self, epoch: &EpochData) -> Result<()>;
async fn get_pool(&self) -> Result<RwLock<Pool>>;
/// Write the pool to the disk. /// Write the pool to the disk.
async fn write_pool(&self, data: &Pool) -> Result<()>; async fn write_pool(&self, data: &Pool) -> Result<()>;
......
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use std::cmp::Eq; use std::cmp::Eq;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::Hash; use std::hash::Hash;
use std::mem;
#[derive(Debug, Serialize, Deserialize, Default, Clone)] #[derive(Debug, Serialize, Clone, Default)]
pub struct DKMap<K1: Hash + Eq, K2: Hash + Eq, Values> { pub struct DKMap<K1: Hash + Eq, K2: Hash + Eq, Values> {
#[serde(skip)]
primary_key_mapping: HashMap<K1, usize>, primary_key_mapping: HashMap<K1, usize>,
#[serde(skip)]
secondary_key_mapping: HashMap<K2, Vec<usize>>, secondary_key_mapping: HashMap<K2, Vec<usize>>,
values: Vec<Values>,
}
#[derive(Debug, Deserialize, Clone)]
struct DKMapDeserializeProxy<Values> {
values: Vec<Values>, values: Vec<Values>,
} }
...@@ -105,3 +112,18 @@ where ...@@ -105,3 +112,18 @@ where
self.values.into_iter() self.values.into_iter()
} }
} }
impl<'de, K1, K2, Values> Deserialize<'de> for DKMap<K1, K2, (K1, K2, Values)>
where
K1: Hash + Eq + Copy + Deserialize<'de>,
K2: Hash + Eq + Clone + Deserialize<'de>,
Values: Copy + Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let proxy = DKMapDeserializeProxy::<(K1, K2, Values)>::deserialize(deserializer)?;
Ok(DKMap::new(proxy.values))
}
}
...@@ -8,8 +8,8 @@ type Value = (PrimaryKey, SecondaryKey, u64); ...@@ -8,8 +8,8 @@ type Value = (PrimaryKey, SecondaryKey, u64);
type TestDKMap = DKMap<PrimaryKey, SecondaryKey, Value>; type TestDKMap = DKMap<PrimaryKey, SecondaryKey, Value>;
#[tokio::test] #[test]
async fn test_from_iter() { fn test_from_iter() {
let map = TestDKMap::from_iter(std::iter::empty()); let map = TestDKMap::from_iter(std::iter::empty());
assert_eq!(map.get_k1(&1), None); assert_eq!(map.get_k1(&1), None);
assert_eq!(map.get_k2(&1), None); assert_eq!(map.get_k2(&1), None);
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Veuillez vous inscrire ou vous pour commenter