From 36a8b52a166ebbea70a98d74ee2dc2b31df0a797 Mon Sep 17 00:00:00 2001 From: Patrick Cleavelin Date: Tue, 4 Nov 2025 16:32:29 -0600 Subject: [PATCH] get db imported --- service/src/lib/domain/ci.rs | 2 + service/src/lib/domain/ci/base_service.rs | 24 +- service/src/lib/domain/ci/interface.rs | 7 +- service/src/lib/domain/ci/models.rs | 1 + service/src/lib/domain/ci/models/repo.rs | 26 + service/src/lib/inbound/http.rs | 15 +- service/src/lib/inbound/http/handlers.rs | 36 ++ service/src/lib/mod.rs | 1 + service/src/lib/outbound.rs | 2 + service/src/lib/outbound/db_custom/mod.rs | 646 ++++++++++++++++++++++ service/src/lib/outbound/resource.rs | 13 + service/src/main.rs | 4 +- 12 files changed, 761 insertions(+), 16 deletions(-) create mode 100644 service/src/lib/domain/ci/models.rs create mode 100644 service/src/lib/domain/ci/models/repo.rs create mode 100644 service/src/lib/outbound.rs create mode 100644 service/src/lib/outbound/db_custom/mod.rs create mode 100644 service/src/lib/outbound/resource.rs diff --git a/service/src/lib/domain/ci.rs b/service/src/lib/domain/ci.rs index 204f714..7b4f54f 100644 --- a/service/src/lib/domain/ci.rs +++ b/service/src/lib/domain/ci.rs @@ -1,4 +1,6 @@ pub mod interface; +pub mod models; + mod base_service; pub use base_service::Service as BaseService; diff --git a/service/src/lib/domain/ci/base_service.rs b/service/src/lib/domain/ci/base_service.rs index c9391dd..988cd45 100644 --- a/service/src/lib/domain/ci/base_service.rs +++ b/service/src/lib/domain/ci/base_service.rs @@ -1,6 +1,24 @@ -use super::interface::CiService; +use super::{interface::CiService, models::repo::Repository}; +use crate::outbound::{ + db_custom::{DatabaseAccessor, Id}, + resource::Resource, +}; #[derive(Clone)] -pub struct Service {} +pub struct Service { + store: DatabaseAccessor, +} -impl CiService for Service {} +impl Service { + pub fn new() -> Self { + Self { + store: DatabaseAccessor::new(Default::default()), + } + } +} + +impl CiService for Service { + fn repos(&self) -> Vec { + self.store.get_all() + } +} diff --git a/service/src/lib/domain/ci/interface.rs b/service/src/lib/domain/ci/interface.rs index a231ddf..54e6fff 100644 --- a/service/src/lib/domain/ci/interface.rs +++ b/service/src/lib/domain/ci/interface.rs @@ -1 +1,6 @@ -pub trait CiService: Send + Sync + Clone + 'static {} +use super::models::repo::Repository; +use crate::outbound::resource::Resource; + +pub trait CiService: Send + Sync + Clone + 'static { + fn repos(&self) -> Vec; +} diff --git a/service/src/lib/domain/ci/models.rs b/service/src/lib/domain/ci/models.rs new file mode 100644 index 0000000..c426b23 --- /dev/null +++ b/service/src/lib/domain/ci/models.rs @@ -0,0 +1 @@ +pub mod repo; diff --git a/service/src/lib/domain/ci/models/repo.rs b/service/src/lib/domain/ci/models/repo.rs new file mode 100644 index 0000000..91d5231 --- /dev/null +++ b/service/src/lib/domain/ci/models/repo.rs @@ -0,0 +1,26 @@ +use crate::outbound::db_custom::{DocumentField, Modifiable, Storable, Value}; + +#[derive(Debug, Clone)] +pub struct Repository { + pub name: String, +} + +impl Storable for Repository { + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn write_modification(&mut self, field: &DocumentField) { + todo!() + } + + fn field(&self, field: &'static str) -> Option { + todo!() + } +} + +impl Modifiable for Repository { + fn modifications_between(&self, other: &Self) -> Vec { + todo!() + } +} diff --git a/service/src/lib/inbound/http.rs b/service/src/lib/inbound/http.rs index 3b6153c..46e606c 100644 --- a/service/src/lib/inbound/http.rs +++ b/service/src/lib/inbound/http.rs @@ -2,15 +2,11 @@ mod handlers; use std::{net::SocketAddr, sync::Arc}; -use axum::{ - routing::{get, post}, -}; -use tower_http::cors::CorsLayer; +use axum::routing::{get, post}; use reqwest::Method; +use tower_http::cors::CorsLayer; -use crate::{ - domain::ci::interface::CiService, -}; +use crate::domain::ci::interface::CiService; #[derive(Clone)] pub(crate) struct ApiState @@ -25,9 +21,7 @@ pub struct HttpServer { } impl HttpServer { - pub fn new( - ci_service: impl CiService, - ) -> anyhow::Result { + pub fn new(ci_service: impl CiService) -> anyhow::Result { let state = ApiState { ci_service: Arc::new(ci_service), }; @@ -62,4 +56,5 @@ where { axum::Router::>::new() .route("/health", get(handlers::health)) + .route("/repos", get(handlers::all_repos)) } diff --git a/service/src/lib/inbound/http/handlers.rs b/service/src/lib/inbound/http/handlers.rs index b116af3..aa9cafa 100644 --- a/service/src/lib/inbound/http/handlers.rs +++ b/service/src/lib/inbound/http/handlers.rs @@ -1,3 +1,39 @@ +use axum::extract::{Json, State}; +use serde::Serialize; + +use crate::domain::ci::interface::CiService; +use crate::domain::ci::models::repo::Repository; +use crate::inbound::http::ApiState; + pub(super) async fn health() -> String { "OK".to_string() } + +#[derive(Serialize)] +pub(super) struct RepositoryListResponse { + repos: Vec, +} +#[derive(Serialize)] +pub(super) struct RepositoryResponse { + name: String, +} +impl From for RepositoryResponse { + fn from(value: Repository) -> Self { + Self { name: value.name } + } +} + +pub(super) async fn all_repos( + State(state): State>, +) -> Json { + let repos = RepositoryListResponse { + repos: state + .ci_service + .repos() + .into_iter() + .map(Into::into) + .collect(), + }; + + Json(repos) +} diff --git a/service/src/lib/mod.rs b/service/src/lib/mod.rs index 4a475ea..a65f027 100644 --- a/service/src/lib/mod.rs +++ b/service/src/lib/mod.rs @@ -1,2 +1,3 @@ pub mod domain; pub mod inbound; +pub mod outbound; diff --git a/service/src/lib/outbound.rs b/service/src/lib/outbound.rs new file mode 100644 index 0000000..f4479a9 --- /dev/null +++ b/service/src/lib/outbound.rs @@ -0,0 +1,2 @@ +pub mod db_custom; +pub mod resource; diff --git a/service/src/lib/outbound/db_custom/mod.rs b/service/src/lib/outbound/db_custom/mod.rs new file mode 100644 index 0000000..df6b545 --- /dev/null +++ b/service/src/lib/outbound/db_custom/mod.rs @@ -0,0 +1,646 @@ +use std::{ + any::{Any, TypeId}, + collections::{HashMap, HashSet}, + marker::PhantomData, + sync::{Arc, Mutex}, +}; + +#[derive(Default, Copy, Clone, PartialOrd, PartialEq, Eq, Hash, Debug)] +struct TransactionId(u64); + +impl From for TransactionId { + fn from(id: u64) -> Self { + Self(id) + } +} + +#[derive(Debug, Copy, Clone)] +pub struct Id { + id: u64, + _type: PhantomData, +} + +impl Id { + pub fn new(id: u64) -> Self { + Self { + id, + _type: PhantomData, + } + } +} + +impl std::cmp::Eq for Id {} +impl std::cmp::PartialEq for Id { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} +impl std::hash::Hash for Id { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +#[derive(Default, Copy, Clone, PartialEq, Eq, Hash, Debug)] +struct TypelessId(u64); + +impl From> for TypelessId { + fn from(id: Id) -> Self { + Self(id.id) + } +} + +impl From<&Id> for TypelessId { + fn from(id: &Id) -> Self { + Self(id.id) + } +} + +impl From for TypelessId { + fn from(id: u64) -> Self { + Self(id) + } +} + +#[derive(Debug, Default)] +pub struct Database { + log: Vec, + snapshots: HashMap, +} + +impl Database { + fn build_snapshot(&mut self, timestamp: TransactionId) -> &Snapshot { + let mut snapshot = Snapshot { + timestamp, + cached: Default::default(), + }; + + for t in self.transactions_to(timestamp) { + for write in &t.write_set.0 { + match &write.operation { + DocumentWriteOperation::New(data) => { + snapshot.cached.insert(write.id, data.cloned()); + } + DocumentWriteOperation::Modified(fields) => { + match snapshot.get_typeless_mut(write.id) { + Some(data) => { + for field in fields { + data.write_modification(field); + } + } + None => { + panic!( + "somehow found a modified write for id {:?} at timestamp {:?} that doesn't exist in snapshot at timestamp {:?}", + write.id, t.id, snapshot.timestamp + ) + } + } + } + } + } + } + + self.snapshots.insert(timestamp, snapshot); + self.snapshots.get(×tamp).unwrap() + } + + fn transactions_to(&mut self, timestamp: TransactionId) -> impl Iterator { + self.log.iter().take_while(move |t| t.id <= timestamp) + } + + fn transactions_from( + &mut self, + timestamp: TransactionId, + ) -> impl Iterator { + self.log.iter().skip_while(move |t| t.id < timestamp) + } + + fn apply_transaction(&mut self, candidate: TransactionCandidate) { + let timestamp = (self.log.len() as u64).into(); + + println!("applying transaction to {:?}: {:?}", timestamp, &candidate); + + self.log.push(Transaction::commit(timestamp, candidate)); + self.build_snapshot(timestamp); + } +} + +#[derive(Clone)] +pub struct DatabaseAccessor { + db: Arc>, +} + +impl DatabaseAccessor { + pub fn new(db: Database) -> Self { + Self { + db: Arc::new(Mutex::new(db)), + } + } +} + +impl DatabaseAccessor { + // TODO + pub fn get_all(&self) -> Vec { + vec![] + } + + pub fn transact(&self, f: F) -> O + where + F: Fn(&mut PendingTransaction) -> O, + { + let mut me = self.clone(); + + for i in 0..5 { + println!("Starting transaction attempt {i}"); + + let current_timestamp = self.db.lock().unwrap().log.len() as u64; + + let (candidate, result): (TransactionCandidate, O) = { + let mut pending = PendingTransaction::new(&mut me, current_timestamp.into()); + let result = f(&mut pending); + + (pending.into(), result) + }; + + println!("transaction read set: {:?}", candidate.read_set); + + let mut db = self.db.lock().unwrap(); + + if db + .transactions_from(candidate.timestamp) + .any(|t| candidate.read_set.overlaps_with(&t.write_set)) + { + println!( + "Read set overlaps with write set: {:#?}", + candidate.read_set + ); + + continue; + } + + db.apply_transaction(candidate); + + return result; + } + + panic!("failed to apply transaction after 5 attempts"); + } + + fn get( + &mut self, + id: Id, + timestamp: TransactionId, + ) -> Option { + let db = &mut self.db.lock().unwrap(); + + let snapshot = { + if db.snapshots.contains_key(×tamp) { + db.snapshots.get(×tamp).unwrap() + } else { + db.build_snapshot(timestamp) + } + }; + + Some(snapshot.get(id).unwrap()) + } +} + +#[derive(Debug)] +struct Transaction { + id: TransactionId, + write_set: WriteSet, +} + +impl Transaction { + fn commit(id: TransactionId, candidate: TransactionCandidate) -> Self { + Self { + id, + write_set: candidate.write_set, + } + } +} + +#[derive(Debug, Default)] +struct Snapshot { + timestamp: TransactionId, + cached: HashMap>, +} + +impl Snapshot { + fn get(&self, id: Id) -> Option { + let readable = self.cached.get(&id.into())?; + + if (**readable).type_id() != TypeId::of::() { + panic!( + "Type mismatch: expected {:?}, got {:?}", + TypeId::of::(), + (**readable).type_id() + ); + // None + } else if let Some(data) = (&**readable as &dyn Any).downcast_ref::() { + Some((*data).clone()) + } else { + panic!( + "Type mismatch: expected {:?}, got {:?}", + TypeId::of::(), + (**readable).type_id() + ); + } + } + + fn get_typeless_mut( + &mut self, + id: TypelessId, + ) -> Option<&mut Box> { + self.cached.get_mut(&id) + } +} + +#[derive(Debug)] +struct TransactionCandidate { + timestamp: TransactionId, + read_set: ReadSet, + write_set: WriteSet, +} + +pub struct PendingTransaction<'a> { + accessor: &'a mut DatabaseAccessor, + timestamp: TransactionId, + read_set: ReadSet, + write_set: WriteSet, +} + +impl<'a> From> for TransactionCandidate { + fn from(pending: PendingTransaction) -> Self { + Self { + timestamp: pending.timestamp, + read_set: pending.read_set, + write_set: pending.write_set, + } + } +} + +impl<'a> PendingTransaction<'a> { + fn new(accessor: &'a mut DatabaseAccessor, timestamp: TransactionId) -> Self { + Self { + accessor, + timestamp, + read_set: ReadSet::default(), + write_set: WriteSet::default(), + } + } + pub fn insert(&mut self, id: Id, data: T) { + // FIXME: decide on a way to generate unqiue ids + + self.write_set.push(DocumentWrite::new(id.into(), data)); + } + + pub fn modify( + &mut self, + id: Id, + transform: impl Fn(T) -> T, + ) { + println!("attempting modification in timestamp {:?}", self.timestamp); + + let old_data = self.accessor.get(id.clone(), self.timestamp).unwrap(); + let new_data = transform(old_data.clone()); + + let modifications = old_data.modifications_between(&new_data); + + for modification in &modifications { + self.read_set.push(DocumentRead::Field(modification.field)); + } + + self.write_set + .push(DocumentWrite::modification(id, modifications)); + } + + pub fn get(&mut self, id: Id) -> Option { + self.read_set.push(DocumentRead::Complete); + self.accessor.get(id, self.timestamp) + } + + pub fn get_field( + &mut self, + id: Id, + field: &'static str, + ) -> Option { + self.read_set.push(DocumentRead::Field(field)); + self.accessor + .get(id, self.timestamp) + .and_then(|data| data.field(field)) + } +} + +// TODO: make these actual sets? +#[derive(Debug, Default)] +struct ReadSet(Vec); +#[derive(Debug, Default)] +struct WriteSet(Vec); + +impl ReadSet { + fn push(&mut self, read: DocumentRead) { + self.0.push(read); + } +} + +impl WriteSet { + fn push(&mut self, write: DocumentWrite) { + self.0.push(write); + } +} + +impl ReadSet { + fn overlaps_with(&self, write_set: &WriteSet) -> bool { + if !write_set.0.is_empty() + && self + .0 + .iter() + .any(|read| matches!(read, DocumentRead::Complete)) + { + return true; + } + + let writes = write_set + .0 + .iter() + .filter_map(|write| match &write.operation { + DocumentWriteOperation::New(_) => None, + DocumentWriteOperation::Modified(fields) => Some(fields.iter()), + }) + .flatten() + .map(|field| field.field) + .collect::>(); + let reads = self + .0 + .iter() + .filter_map(|read| match read { + DocumentRead::Complete => None, + DocumentRead::Field(field) => Some(*field), + }) + .collect::>(); + + reads.intersection(&writes).count() > 0 + } +} + +pub trait Storable: Any + std::fmt::Debug { + fn cloned(&self) -> Box; + fn write_modification(&mut self, field: &DocumentField); + fn field(&self, field: &'static str) -> Option; +} +pub trait Modifiable { + fn modifications_between(&self, other: &Self) -> Vec; +} + +#[derive(Debug)] +enum DocumentRead { + Complete, + Field(&'static str), +} + +#[derive(Debug)] +struct DocumentWrite { + id: TypelessId, + type_id: TypeId, + operation: DocumentWriteOperation, +} + +impl DocumentWrite { + fn new(id: TypelessId, data: T) -> Self { + Self { + id, + type_id: TypeId::of::(), + operation: DocumentWriteOperation::New(Box::new(data)), + } + } + + fn modification(id: Id, operation: Vec) -> Self { + Self { + id: id.into(), + type_id: TypeId::of::(), + operation: DocumentWriteOperation::Modified(operation), + } + } +} + +#[derive(Debug)] +pub enum DocumentWriteOperation { + New(Box), + Modified(Vec), +} + +#[derive(Debug, Clone)] +pub struct DocumentField { + field: &'static str, + value: Value, +} + +impl DocumentField { + fn of(field: &'static str, value: Value) -> Self { + Self { field, value } + } +} + +#[derive(Debug, Clone)] +pub enum Value { + String(String), + Int(i64), + Float(f64), + Bool(bool), + Array(Vec), +} + +impl Value { + fn as_string(&self) -> Option<&String> { + match self { + Value::String(s) => Some(s), + _ => None, + } + } + + fn as_int(&self) -> Option<&i64> { + match self { + Value::Int(i) => Some(i), + _ => None, + } + } + + fn as_array(&self) -> Option<&Vec> { + match self { + Value::Array(a) => Some(a), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + + #[derive(Debug, Clone)] + struct MyTestData { + name: Name, + age: i64, + contacts: HashSet>, + } + + #[derive(Debug, Clone)] + struct Name { + first: String, + last: String, + } + + // TODO: make this a derive macro + impl Storable for MyTestData { + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + fn write_modification(&mut self, field: &DocumentField) { + match field.field { + "name.first" => self.name.first = field.value.as_string().unwrap().to_string(), + "name.last" => self.name.last = field.value.as_string().unwrap().to_string(), + "age" => self.age = *field.value.as_int().unwrap(), + "contacts" => { + self.contacts = field + .value + .as_array() + .unwrap() + .iter() + .map(|id| Id::::new(id.0)) + .collect(); + } + _ => panic!("unknown field"), + } + } + fn field(&self, field: &'static str) -> Option { + match field { + "name.first" => Some(Value::String(self.name.first.clone())), + "name.last" => Some(Value::String(self.name.last.clone())), + "age" => Some(Value::Int(self.age)), + "contacts" => Some(Value::Array( + self.contacts.iter().map(|id| id.into()).collect(), + )), + _ => None, + } + } + } + + // TODO: make this a derive macro + impl Modifiable for MyTestData { + fn modifications_between(&self, other: &Self) -> Vec { + let mut modifications = vec![]; + + if self.name.first != other.name.first { + modifications.push(DocumentField::of( + "name.first", + Value::String(other.name.first.clone()), + )); + } + + if self.name.last != other.name.last { + modifications.push(DocumentField::of( + "name.last", + Value::String(other.name.last.clone()), + )); + } + + if self.age != other.age { + modifications.push(DocumentField::of("age", Value::Int(other.age))); + } + + let contacts_diff = self.contacts.difference(&other.contacts); + if contacts_diff.count() > 0 { + modifications.push(DocumentField::of( + "contacts", + Value::Array(other.contacts.iter().map(|id| id.into()).collect()), + )); + } + + modifications + } + } + + #[test] + fn test() { + let db = Database::default(); + let mut accessor = DatabaseAccessor::new(db); + + let data = MyTestData { + name: Name { + first: "John".to_string(), + last: "Doe".to_string(), + }, + age: 42, + contacts: HashSet::new(), + }; + + println!("initial state: {:#?}", accessor.db.lock().unwrap()); + + accessor.transact(move |t| { + t.insert(Id::new(1), data.clone()); + t.insert( + Id::new(2), + MyTestData { + name: Name { + first: "Oldy".to_string(), + last: "McOlderton".to_string(), + }, + age: 69, + contacts: vec![Id::new(1)].into_iter().collect(), + }, + ); + }); + + let mut cloned_accessor = accessor.clone(); + let handle = std::thread::spawn(move || { + cloned_accessor.transact(|t| { + let Some(Value::String(last_name_of_oldy)) = + t.get_field(Id::::new(2), "name.last") + else { + panic!("expected a string") + }; + + t.modify(Id::::new(1), |mut data| { + data.name.last = format!("Gearbox {}", last_name_of_oldy); + data + }); + }); + }); + + let id = Id::::new(1); + accessor.transact(|t| { + t.modify(id.clone(), |mut data| { + data.name.last = "Not McOlderton Gearbox".to_string(); + + data + }); + }); + + let oldys_contacts = accessor.transact(|t| { + let oldy = t.get(Id::::new(2)).unwrap(); + + oldy.contacts + .into_iter() + .filter_map(|id| t.get(id)) + .map(|data| format!("{} {}", data.name.first, data.name.last)) + .collect::>() + }); + + handle.join().unwrap(); + + let data0 = accessor.get(Id::::new(1), 0.into()); + let data1 = accessor.get(Id::::new(1), 1.into()); + let data2 = accessor.get(Id::::new(1), 2.into()); + let data3 = accessor.get(Id::::new(1), 3.into()); + + println!("data at timestamp 0: {:#?}", data0); + println!("data at timestamp 1: {:#?}", data1); + println!("data at timestamp 2: {:#?}", data2); + println!("data at timestamp 3: {:#?}", data3); + + panic!("oldys contacts: {:#?}", oldys_contacts); + } +} diff --git a/service/src/lib/outbound/resource.rs b/service/src/lib/outbound/resource.rs new file mode 100644 index 0000000..23d9a13 --- /dev/null +++ b/service/src/lib/outbound/resource.rs @@ -0,0 +1,13 @@ +use std::marker::PhantomData; + +pub struct Resource {} + +pub enum DataStore { + Direct, +} + +impl Resource { + pub fn fetch_all() -> Vec { + vec![] + } +} diff --git a/service/src/main.rs b/service/src/main.rs index 9ea52d0..80b8401 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -1,8 +1,8 @@ -use a_ci_lib::{inbound, domain::ci::BaseService}; +use a_ci_lib::{domain::ci::BaseService, inbound}; #[tokio::main] async fn main() -> Result<(), Box> { - let http_server = inbound::http::HttpServer::new(BaseService {})?; + let http_server = inbound::http::HttpServer::new(BaseService::new())?; http_server.run().await;