Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make DocHande !Sync #57

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ tempfile = "3.6.0"
[dev-dependencies]
clap = { version = "4.2.5", features = ["derive"] }
reqwest = { version = "0.11.17", features = ["json", "blocking"], default-features = false }
axum = { version = "0.6.18" }
axum-macros = "0.3.7"
axum = { version = "0.6.18", features = ["macros"] }
tokio = { version = "1.27", features = ["full"] }
tokio-serde = {version = "0.8.0", features = ["json"]}
tokio-tungstenite = { version = "0.20.1", features = ["connect"] }
Expand Down
46 changes: 28 additions & 18 deletions examples/distributed_bakery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use automerge_repo::{ConnDirection, DocHandle, DocumentId, Repo, Storage, StorageError};
use autosurgeon::{hydrate, reconcile, Hydrate, Reconcile};
use axum::debug_handler;
use axum::extract::State;
use axum::routing::get;
use axum::{Json, Router};
Expand All @@ -12,13 +13,15 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::Handle;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::oneshot::{channel as oneshot, Sender as OneShotSender};
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{sleep, Duration};

#[debug_handler]
async fn get_doc_id(State(state): State<Arc<AppState>>) -> Json<DocumentId> {
Json(state.doc_handle.document_id())
Json(state.doc_handle.lock().await.document_id())
}

#[debug_handler]
async fn increment(State(state): State<Arc<AppState>>) -> Result<Json<u32>, ()> {
let permit = state.handler_sem.acquire().await;
if permit.is_err() {
Expand All @@ -43,8 +46,8 @@ async fn increment(State(state): State<Arc<AppState>>) -> Result<Json<u32>, ()>
Err(())
}

async fn increment_output(doc_handle: &DocHandle) -> Result<u32, ()> {
let (latest, closing) = doc_handle.with_doc_mut(|doc| {
async fn increment_output(doc_handle: &Mutex<DocHandle>) -> Result<u32, ()> {
let (latest, closing) = doc_handle.lock().await.with_doc_mut(|doc| {
let mut bakery: Bakery = hydrate(doc).unwrap();
bakery.output += 1;
let mut tx = doc.transaction();
Expand All @@ -60,8 +63,8 @@ async fn increment_output(doc_handle: &DocHandle) -> Result<u32, ()> {
Ok(latest)
}

async fn run_bakery_algorithm(doc_handle: &DocHandle, customer_id: &String) {
let (our_number, closing) = doc_handle.with_doc_mut(|doc| {
async fn run_bakery_algorithm(doc_handle: &Mutex<DocHandle>, customer_id: &String) {
let (our_number, closing) = doc_handle.lock().await.with_doc_mut(|doc| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the locking behaviour here and elsewhere is probably wrong, but I'm not sure what to do...

@gterzian if you could advise?

// At the start of the algorithm,
// pick a number that is higher than all others.
let mut bakery: Bakery = hydrate(doc).unwrap();
Expand All @@ -86,11 +89,15 @@ async fn run_bakery_algorithm(doc_handle: &DocHandle, customer_id: &String) {
return;
}
loop {
doc_handle.changed().await.unwrap();
let doc_handle_changed = doc_handle.lock().await.changed();
doc_handle_changed.await.unwrap();

// Perform reads outside of closure,
// to avoid holding read lock.
let bakery: Bakery = doc_handle.with_doc(|doc| hydrate(doc).unwrap());
let bakery: Bakery = doc_handle
.lock()
.await
.with_doc(|doc| hydrate(doc).unwrap());

if bakery.closing {
return;
Expand Down Expand Up @@ -174,7 +181,8 @@ async fn acknowlegde_changes(doc_handle: DocHandle, customer_id: String) {
}

loop {
doc_handle.changed().await.unwrap();
let doc_handle_changed = doc_handle.changed();
doc_handle_changed.await.unwrap();

// Perform reads outside of closure,
// to avoid holding read lock.
Expand Down Expand Up @@ -215,8 +223,8 @@ async fn acknowlegde_changes(doc_handle: DocHandle, customer_id: String) {
}
}

async fn start_outside_the_bakery(doc_handle: &DocHandle, customer_id: &String) {
doc_handle.with_doc_mut(|doc| {
async fn start_outside_the_bakery(doc_handle: &Mutex<DocHandle>, customer_id: &String) {
doc_handle.lock().await.with_doc_mut(|doc| {
let mut bakery: Bakery = hydrate(doc).unwrap();
let our_info = bakery.customers.get_mut(customer_id).unwrap();
our_info.number = 0;
Expand Down Expand Up @@ -268,7 +276,7 @@ struct Args {
}

struct AppState {
doc_handle: DocHandle,
doc_handle: Mutex<DocHandle>,
customer_id: String,
handler_sem: Semaphore,
}
Expand Down Expand Up @@ -433,15 +441,17 @@ async fn main() {
let (increment_stop_tx, increment_stop_rx) = mpsc::channel(1);

let app_state = Arc::new(AppState {
doc_handle: doc_handle.clone(),
doc_handle: Mutex::new(doc_handle.clone()),
customer_id: customer_id.clone(),
handler_sem: Semaphore::new(1),
});

let doc_handle_clone = doc_handle.clone();
handle.spawn(async move {
// Continuously request new increments.
request_increment(doc_handle_clone, http_addrs, increment_stop_rx).await;
handle.spawn({
let doc_handle = doc_handle.clone();
async move {
// Continuously request new increments.
request_increment(doc_handle, http_addrs, increment_stop_rx).await;
}
});

handle.spawn(async move {
Expand Down Expand Up @@ -469,7 +479,7 @@ async fn main() {
// which acts as a shutdown signal
// to tasks reading the doc.
// Note: this prevents peers from re-joining after shutdown.
app_state.doc_handle.with_doc_mut(|doc| {
app_state.doc_handle.lock().await.with_doc_mut(|doc| {
let mut bakery: Bakery = hydrate(doc).unwrap();
bakery.closing = true;
let mut tx = doc.transaction();
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use automerge::transaction::Transactable;
use automerge::{AutoSerde, ReadDoc};
use automerge_repo::{ConnDirection, Repo, RepoHandle, Storage};
use automerge_repo::{DocumentId, StorageError};
use axum::debug_handler;
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use axum_macros::debug_handler;
use clap::Parser;
use futures::future::{BoxFuture, TryFutureExt};
use futures::FutureExt;
Expand Down
4 changes: 4 additions & 0 deletions src/dochandle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::repo::{new_repo_future_with_resolver, RepoError, RepoEvent, RepoFutur
use automerge::{Automerge, ChangeHash};
use crossbeam_channel::Sender;
use parking_lot::{Mutex, RwLock};
use std::cell::Cell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -30,6 +32,7 @@ pub struct DocHandle {
/// that doesn't require a mutabale reference to the handle.
/// Note: the mutex is not shared between clones of the same handle.
last_heads: Mutex<Vec<ChangeHash>>,
_not_sync: PhantomData<Cell<&'static ()>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

impl Clone for DocHandle {
Expand Down Expand Up @@ -79,6 +82,7 @@ impl DocHandle {
handle_count,
local_repo_id,
last_heads: Mutex::new(last_heads),
_not_sync: PhantomData,
}
}

Expand Down
14 changes: 8 additions & 6 deletions tests/document_changed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_document_changed_over_sync() {
let expected_repo_id = repo_handle_2.get_repo_id().clone();

// Create a document for one repo.
let document_handle_1 = repo_handle_1.new_document();
let doc_handle_1 = repo_handle_1.new_document();

// Add network adapters
let mut peers = HashMap::new();
Expand All @@ -48,7 +48,7 @@ fn test_document_changed_over_sync() {

// Spawn a task that awaits the requested doc handle,
// and then edits the document.
let doc_id = document_handle_1.document_id();
let doc_id = doc_handle_1.document_id();
rt.spawn(async move {
// Request the document.
let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap();
Expand All @@ -69,16 +69,17 @@ fn test_document_changed_over_sync() {
let repo_id = repo_handle_1.get_repo_id().clone();
rt.spawn(async move {
// Edit the document.
document_handle_1.with_doc_mut(|doc| {
doc_handle_1.with_doc_mut(|doc| {
let mut tx = doc.transaction();
tx.put(automerge::ROOT, "repo_id", format!("{}", repo_id))
.expect("Failed to change the document.");
tx.commit();
});
loop {
// Await changes until the edit comes through over sync.
document_handle_1.changed().await.unwrap();
let equals = document_handle_1.with_doc(|doc| {
let doc_handle_1_changed = doc_handle_1.changed();
doc_handle_1_changed.await.unwrap();
let equals = doc_handle_1.with_doc(|doc| {
let val = doc
.get(automerge::ROOT, "repo_id")
.expect("Failed to read the document.")
Expand Down Expand Up @@ -156,7 +157,8 @@ fn test_document_changed_locally() {
rt.spawn(async move {
start_wait_sender.send(()).await.unwrap();
// Await the local change.
doc_handle.changed().await.unwrap();
let doc_handle_changed = doc_handle.changed();
doc_handle_changed.await.unwrap();
doc_handle.with_doc(|doc| {
let val = doc
.get(automerge::ROOT, "repo_id")
Expand Down
6 changes: 2 additions & 4 deletions tests/simple_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@ fn test_simple_sync() {
rt.spawn(async move {
let mut synced = 0;
for doc_handle in documents {
let doc_id = doc_handle.document_id();
for repo_handle in repo_handles_clone.iter() {
repo_handle
.request_document(doc_handle.document_id())
.await
.unwrap();
repo_handle.request_document(doc_id.clone()).await.unwrap();
synced += 1;
}
}
Expand Down
Loading