diff --git a/service/Cargo.lock b/service/Cargo.lock index 80632a5..1516f64 100644 --- a/service/Cargo.lock +++ b/service/Cargo.lock @@ -8,6 +8,7 @@ version = "0.0.0" dependencies = [ "anyhow", "axum", + "bincode", "dotenv", "reqwest", "serde", @@ -90,6 +91,26 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -1248,6 +1269,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.7" @@ -1278,6 +1305,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "want" version = "0.3.1" diff --git a/service/Cargo.toml b/service/Cargo.toml index 16625e2..1c68dfc 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -13,6 +13,7 @@ path = "src/lib/mod.rs" [dependencies] anyhow = "1.0.100" axum = { version = "0.6.9", features = ["headers"] } +bincode = "2.0.1" dotenv = "0.15.0" reqwest = "0.11.14" serde = "1.0.152" diff --git a/service/src/lib/domain/ci/models/repo.rs b/service/src/lib/domain/ci/models/repo.rs index 91d5231..024ee37 100644 --- a/service/src/lib/domain/ci/models/repo.rs +++ b/service/src/lib/domain/ci/models/repo.rs @@ -1,4 +1,50 @@ -use crate::outbound::db_custom::{DocumentField, Modifiable, Storable, Value}; +use crate::outbound::db_custom::{ + Value, + write_set::{Diffable, Storable}, +}; + +#[derive(Debug)] +struct DbRepository { + name: String, + deprecated_field: i32, +} + +impl Storable for DbRepository { + fn as_full_write_op(&self) -> crate::outbound::db_custom::write_set::WriteOperation { + todo!() + } + + fn as_partial_write_op( + &self, + other: &Self, + ) -> crate::outbound::db_custom::write_set::WriteOperation { + todo!() + } + + fn apply_partial_write(&mut self, op: &crate::outbound::db_custom::write_set::PartialWrite) { + todo!() + } + + fn field(&self, field_ident: &str) -> Option + where + Self: Sized, + { + todo!() + } + + fn set_field(&mut self, field_ident: &str, value: Value) + where + Self: Sized, + { + todo!() + } +} + +impl Diffable for DbRepository { + fn diff(&self, other: &Self) -> Vec { + todo!() + } +} #[derive(Debug, Clone)] pub struct Repository { @@ -6,21 +52,47 @@ pub struct Repository { } impl Storable for Repository { - fn cloned(&self) -> Box { - Box::new(self.clone()) - } - - fn write_modification(&mut self, field: &DocumentField) { + fn as_full_write_op(&self) -> crate::outbound::db_custom::write_set::WriteOperation + where + Self: Sized, + { todo!() } - fn field(&self, field: &'static str) -> Option { + fn as_partial_write_op( + &self, + other: &Self, + ) -> crate::outbound::db_custom::write_set::WriteOperation + where + Self: Sized, + { + todo!() + } + + fn apply_partial_write(&mut self, op: &crate::outbound::db_custom::write_set::PartialWrite) { + todo!() + } + + fn set_field(&mut self, field_ident: &str, value: Value) + where + Self: Sized, + { + todo!() + } + + fn field(&self, field_ident: &str) -> Option + where + Self: Sized, + { todo!() } } -impl Modifiable for Repository { - fn modifications_between(&self, other: &Self) -> Vec { +impl Diffable for Repository { + fn diff(&self, other: &Self) -> Vec + where + Self: Sized, + { todo!() } } diff --git a/service/src/lib/outbound/db_custom/mod.rs b/service/src/lib/outbound/db_custom/mod.rs index df6b545..29951cf 100644 --- a/service/src/lib/outbound/db_custom/mod.rs +++ b/service/src/lib/outbound/db_custom/mod.rs @@ -1,3 +1,5 @@ +pub mod write_set; + use std::{ any::{Any, TypeId}, collections::{HashMap, HashSet}, @@ -5,6 +7,8 @@ use std::{ sync::{Arc, Mutex}, }; +use crate::outbound::db_custom::write_set::{Diffable, Storable, Write, WriteOperation, WriteSet}; + #[derive(Default, Copy, Clone, PartialOrd, PartialEq, Eq, Hash, Debug)] struct TransactionId(u64); @@ -29,6 +33,43 @@ impl Id { } } +impl bincode::Encode for Id { + fn encode( + &self, + encoder: &mut E, + ) -> Result<(), bincode::error::EncodeError> { + bincode::enc::Encode::encode(&self.id, encoder)?; + + Ok(()) + } +} + +impl bincode::Decode for Id { + fn decode>( + decoder: &mut D, + ) -> Result { + let id = bincode::de::Decode::decode(decoder)?; + + Ok(Self { + id, + _type: PhantomData, + }) + } +} + +impl<'de, T, C> bincode::BorrowDecode<'de, C> for Id { + fn borrow_decode>( + decoder: &mut D, + ) -> Result { + let id = bincode::de::Decode::decode(decoder)?; + + Ok(Self { + id, + _type: PhantomData, + }) + } +} + impl std::cmp::Eq for Id {} impl std::cmp::PartialEq for Id { fn eq(&self, other: &Self) -> bool { @@ -41,7 +82,7 @@ impl std::hash::Hash for Id { } } -#[derive(Default, Copy, Clone, PartialEq, Eq, Hash, Debug)] +#[derive(Default, Copy, Clone, PartialEq, Eq, Hash, Debug, bincode::Encode, bincode::Decode)] struct TypelessId(u64); impl From> for TypelessId { @@ -66,9 +107,28 @@ impl From for TypelessId { pub struct Database { log: Vec, snapshots: HashMap, + registered_types: HashMap Box>, } impl Database { + fn register_type( + &mut self, + name: impl ToString, + decode_fn: fn(&[u8]) -> Box, + ) { + self.registered_types.insert(name.to_string(), decode_fn); + } + + fn decode(&self, data: &[u8]) -> Result, anyhow::Error> { + let (name, len): (String, usize) = + bincode::decode_from_slice(data, bincode::config::standard()) + .expect("decode MUST succeed"); + + let create_storable = self.registered_types.get(&name).expect("aalkdjhfl"); + + Ok(create_storable(&data[len..])) + } + fn build_snapshot(&mut self, timestamp: TransactionId) -> &Snapshot { let mut snapshot = Snapshot { timestamp, @@ -76,26 +136,26 @@ impl Database { }; for t in self.transactions_to(timestamp) { - for write in &t.write_set.0 { + for write in &t.write_set.writes { match &write.operation { - DocumentWriteOperation::New(data) => { - snapshot.cached.insert(write.id, data.cloned()); + WriteOperation::Full(data) => { + snapshot + .cached + .insert(write.id, self.decode(data).expect("decode MUST succeed")); } - DocumentWriteOperation::Modified(fields) => { - match snapshot.get_typeless_mut(write.id) { - Some(data) => { - for field in fields { - data.write_modification(field); - } - } - None => { - panic!( - "somehow found a modified write for id {:?} at timestamp {:?} that doesn't exist in snapshot at timestamp {:?}", - write.id, t.id, snapshot.timestamp - ) + WriteOperation::Partial(partial) => match snapshot.get_typeless_mut(write.id) { + Some(data) => { + for op in partial { + data.apply_partial_write(op); } } - } + None => { + panic!( + "somehow found a partial write for id {:?} at timestamp {:?} that doesn't exist in snapshot at timestamp {:?}", + write.id, t.id, snapshot.timestamp + ) + } + }, } } } @@ -104,14 +164,11 @@ impl Database { self.snapshots.get(×tamp).unwrap() } - fn transactions_to(&mut self, timestamp: TransactionId) -> impl Iterator { + fn transactions_to(&self, timestamp: TransactionId) -> impl Iterator { self.log.iter().take_while(move |t| t.id <= timestamp) } - fn transactions_from( - &mut self, - timestamp: TransactionId, - ) -> impl Iterator { + fn transactions_from(&self, timestamp: TransactionId) -> impl Iterator { self.log.iter().skip_while(move |t| t.id < timestamp) } @@ -292,10 +349,13 @@ impl<'a> PendingTransaction<'a> { pub fn insert(&mut self, id: Id, data: T) { // FIXME: decide on a way to generate unqiue ids - self.write_set.push(DocumentWrite::new(id.into(), data)); + self.write_set.writes.push(Write { + id: id.into(), + operation: data.as_full_write_op(), + }); } - pub fn modify( + pub fn modify( &mut self, id: Id, transform: impl Fn(T) -> T, @@ -305,14 +365,21 @@ impl<'a> PendingTransaction<'a> { let old_data = self.accessor.get(id.clone(), self.timestamp).unwrap(); let new_data = transform(old_data.clone()); - let modifications = old_data.modifications_between(&new_data); + let operation = old_data.as_partial_write_op(&new_data); - for modification in &modifications { - self.read_set.push(DocumentRead::Field(modification.field)); + match &operation { + WriteOperation::Full(_) => self.read_set.push(DocumentRead::Complete), + WriteOperation::Partial(partial_writes) => self.read_set.0.extend( + partial_writes + .iter() + .map(|w| DocumentRead::Field(w.field_ident.clone())), + ), } - self.write_set - .push(DocumentWrite::modification(id, modifications)); + self.write_set.writes.push(Write { + id: (&id).into(), + operation, + }); } pub fn get(&mut self, id: Id) -> Option { @@ -325,7 +392,7 @@ impl<'a> PendingTransaction<'a> { id: Id, field: &'static str, ) -> Option { - self.read_set.push(DocumentRead::Field(field)); + self.read_set.push(DocumentRead::Field(field.to_string())); self.accessor .get(id, self.timestamp) .and_then(|data| data.field(field)) @@ -335,8 +402,6 @@ impl<'a> PendingTransaction<'a> { // TODO: make these actual sets? #[derive(Debug, Default)] struct ReadSet(Vec); -#[derive(Debug, Default)] -struct WriteSet(Vec); impl ReadSet { fn push(&mut self, read: DocumentRead) { @@ -344,15 +409,9 @@ impl ReadSet { } } -impl WriteSet { - fn push(&mut self, write: DocumentWrite) { - self.0.push(write); - } -} - impl ReadSet { fn overlaps_with(&self, write_set: &WriteSet) -> bool { - if !write_set.0.is_empty() + if !write_set.writes.is_empty() && self .0 .iter() @@ -362,21 +421,21 @@ impl ReadSet { } let writes = write_set - .0 + .writes .iter() .filter_map(|write| match &write.operation { - DocumentWriteOperation::New(_) => None, - DocumentWriteOperation::Modified(fields) => Some(fields.iter()), + WriteOperation::Full(_) => None, + WriteOperation::Partial(partial) => Some(partial.iter()), }) .flatten() - .map(|field| field.field) + .map(|field| field.field_ident.as_str()) .collect::>(); let reads = self .0 .iter() .filter_map(|read| match read { DocumentRead::Complete => None, - DocumentRead::Field(field) => Some(*field), + DocumentRead::Field(field) => Some(field.as_ref()), }) .collect::>(); @@ -384,65 +443,13 @@ impl ReadSet { } } -pub trait Storable: Any + std::fmt::Debug { - fn cloned(&self) -> Box; - fn write_modification(&mut self, field: &DocumentField); - fn field(&self, field: &'static str) -> Option; -} -pub trait Modifiable { - fn modifications_between(&self, other: &Self) -> Vec; -} - #[derive(Debug)] enum DocumentRead { Complete, - Field(&'static str), + Field(String), } -#[derive(Debug)] -struct DocumentWrite { - id: TypelessId, - type_id: TypeId, - operation: DocumentWriteOperation, -} - -impl DocumentWrite { - fn new(id: TypelessId, data: T) -> Self { - Self { - id, - type_id: TypeId::of::(), - operation: DocumentWriteOperation::New(Box::new(data)), - } - } - - fn modification(id: Id, operation: Vec) -> Self { - Self { - id: id.into(), - type_id: TypeId::of::(), - operation: DocumentWriteOperation::Modified(operation), - } - } -} - -#[derive(Debug)] -pub enum DocumentWriteOperation { - New(Box), - Modified(Vec), -} - -#[derive(Debug, Clone)] -pub struct DocumentField { - field: &'static str, - value: Value, -} - -impl DocumentField { - fn of(field: &'static str, value: Value) -> Self { - Self { field, value } - } -} - -#[derive(Debug, Clone)] +#[derive(Debug, Clone, bincode::Encode, bincode::Decode)] pub enum Value { String(String), Int(i64), @@ -472,51 +479,121 @@ impl Value { _ => None, } } + + fn as_bytes(&self) -> Vec { + bincode::encode_to_vec(self, bincode::config::standard()).expect("encode MUST succeed") + } + + fn from_bytes(data: &[u8]) -> Self { + let (value, _): (Self, _) = bincode::decode_from_slice(data, bincode::config::standard()) + .expect("decode MUST succeed"); + + value + } } #[cfg(test)] mod tests { use std::collections::HashSet; + use crate::outbound::db_custom::write_set::FieldDiff; + use super::*; - #[derive(Debug, Clone)] + #[derive(Debug, Clone, bincode::Encode, bincode::Decode)] struct MyTestData { name: Name, age: i64, contacts: HashSet>, } - #[derive(Debug, Clone)] + #[derive(Debug, Clone, bincode::Encode, bincode::Decode)] struct Name { first: String, last: String, } - // TODO: make this a derive macro - impl Storable for MyTestData { - fn cloned(&self) -> Box { - Box::new(self.clone()) - } - fn write_modification(&mut self, field: &DocumentField) { - match field.field { - "name.first" => self.name.first = field.value.as_string().unwrap().to_string(), - "name.last" => self.name.last = field.value.as_string().unwrap().to_string(), - "age" => self.age = *field.value.as_int().unwrap(), - "contacts" => { - self.contacts = field - .value - .as_array() - .unwrap() - .iter() - .map(|id| Id::::new(id.0)) - .collect(); - } - _ => panic!("unknown field"), + #[derive(bincode::Encode)] + struct A { + name: String, + data: T, + } + + impl A { + fn with(name: impl ToString, data: T) -> Self { + Self { + name: name.to_string(), + data, } } - fn field(&self, field: &'static str) -> Option { - match field { + } + + impl Storable for MyTestData { + fn as_full_write_op(&self) -> WriteOperation + where + Self: Sized, + { + let data = + bincode::encode_to_vec(A::with("MyTestData", self), bincode::config::standard()) + .expect("encode MUST succeed"); + + WriteOperation::Full(data) + } + + fn as_partial_write_op(&self, other: &Self) -> WriteOperation + where + Self: Sized, + { + WriteOperation::Partial(self.diff(other).into_iter().map(Into::into).collect()) + } + + fn apply_partial_write(&mut self, op: &write_set::PartialWrite) { + let value = Value::from_bytes(&op.data); + self.set_field(&op.field_ident, value); + } + + fn set_field(&mut self, field_ident: &str, value: Value) + where + Self: Sized, + { + match field_ident { + "name.first" => { + let Value::String(name) = value else { + panic!("expected 'name.first' to be a 'String'"); + }; + + self.name.first = name; + } + "name.last" => { + let Value::String(name) = value else { + panic!("expected 'name.last' to be a 'String'"); + }; + + self.name.last = name; + } + "age" => { + let Value::Int(age) = value else { + panic!("expected 'age' to be a 'Int'"); + }; + + self.age = age; + } + "contacts" => { + let Value::Array(contacts) = value else { + panic!("expected 'contacts' to be a 'Vec'"); + }; + + self.contacts = contacts.into_iter().map(|id| Id::<_>::new(id.0)).collect(); + } + _ => panic!("invalid field '{field_ident}'"), + }; + } + + fn field(&self, field_ident: &str) -> Option + where + Self: Sized, + { + match field_ident { "name.first" => Some(Value::String(self.name.first.clone())), "name.last" => Some(Value::String(self.name.last.clone())), "age" => Some(Value::Int(self.age)), @@ -528,32 +605,34 @@ mod tests { } } - // TODO: make this a derive macro - impl Modifiable for MyTestData { - fn modifications_between(&self, other: &Self) -> Vec { + impl Diffable for MyTestData { + fn diff(&self, other: &Self) -> Vec + where + Self: Sized, + { let mut modifications = vec![]; if self.name.first != other.name.first { - modifications.push(DocumentField::of( + modifications.push(FieldDiff::of( "name.first", Value::String(other.name.first.clone()), )); } if self.name.last != other.name.last { - modifications.push(DocumentField::of( + modifications.push(FieldDiff::of( "name.last", Value::String(other.name.last.clone()), )); } if self.age != other.age { - modifications.push(DocumentField::of("age", Value::Int(other.age))); + modifications.push(FieldDiff::of("age", Value::Int(other.age))); } let contacts_diff = self.contacts.difference(&other.contacts); if contacts_diff.count() > 0 { - modifications.push(DocumentField::of( + modifications.push(FieldDiff::of( "contacts", Value::Array(other.contacts.iter().map(|id| id.into()).collect()), )); @@ -565,7 +644,15 @@ mod tests { #[test] fn test() { - let db = Database::default(); + let mut db = Database::default(); + db.register_type("MyTestData", |data| { + let (data, _): (MyTestData, usize) = + bincode::decode_from_slice(data, bincode::config::standard()) + .expect("decode MUST succeed"); + + Box::new(data) + }); + let mut accessor = DatabaseAccessor::new(db); let data = MyTestData { @@ -594,7 +681,7 @@ mod tests { ); }); - let mut cloned_accessor = accessor.clone(); + let cloned_accessor = accessor.clone(); let handle = std::thread::spawn(move || { cloned_accessor.transact(|t| { let Some(Value::String(last_name_of_oldy)) = diff --git a/service/src/lib/outbound/db_custom/write_set.rs b/service/src/lib/outbound/db_custom/write_set.rs new file mode 100644 index 0000000..cdd3741 --- /dev/null +++ b/service/src/lib/outbound/db_custom/write_set.rs @@ -0,0 +1,77 @@ +use bincode::{Decode, Encode}; + +use crate::outbound::db_custom::{TypelessId, Value}; + +pub trait Storable: Diffable + std::fmt::Debug + std::any::Any { + fn as_full_write_op(&self) -> WriteOperation + where + Self: Sized; + fn as_partial_write_op(&self, other: &Self) -> WriteOperation + where + Self: Sized; + + fn apply_partial_write(&mut self, op: &PartialWrite); + + fn set_field(&mut self, field_ident: &str, value: Value) + where + Self: Sized; + + fn field(&self, field_ident: &str) -> Option + where + Self: Sized; +} + +pub trait Diffable { + fn diff(&self, other: &Self) -> Vec + where + Self: Sized; +} + +pub struct FieldDiff { + pub(super) field_ident: String, + pub(super) value: Value, +} + +impl FieldDiff { + pub fn of(field_ident: impl ToString, value: Value) -> Self { + Self { + field_ident: field_ident.to_string(), + value, + } + } +} + +impl From for PartialWrite { + fn from(value: FieldDiff) -> Self { + let FieldDiff { field_ident, value } = value; + + PartialWrite { + field_ident, + data: value.as_bytes(), + } + } +} + +#[derive(Debug, Default, Encode, Decode)] +pub struct WriteSet { + // TODO: make an actual set + pub(super) writes: Vec, +} + +#[derive(Debug, Encode, Decode)] +pub struct Write { + pub(super) id: TypelessId, + pub(super) operation: WriteOperation, +} + +#[derive(Debug, Encode, Decode)] +pub enum WriteOperation { + Full(Vec), + Partial(Vec), +} + +#[derive(Debug, Encode, Decode)] +pub struct PartialWrite { + pub(super) field_ident: String, + pub(super) data: Vec, +}