diff --git a/README.md b/README.md index 9953675..cee6838 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,109 @@ # Sangria An adaptive commit protocol for distributed transactions. + + +## Installation guide + +```bash +sudo apt-get update && sudo apt-get install -y \ + git \ + cmake \ + make \ + clang + + +git clone https://github.com/google/flatbuffers.git && \ +cd flatbuffers && \ +git checkout v23.5.26 && \ +cmake -G "Unix Makefiles" -DCMAKE_BUILD_TYPE=Release && \ +sudo make -j$(nproc) && \ +sudo make install + +cd ../ + +sudo apt-get -qy update && \ + sudo apt-get -qy install apt-transport-https + +sudo apt-get -qy update && \ +sudo apt-get -qy install dos2unix openssh-server pwgen +``` + +### install rust +```bash +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh +source ~/.cargo/env +``` + +### install docker +```bash +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc +echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \ +$(. /etc/os-release && echo "${UBUNTU_CODENAME:-$VERSION_CODENAME}") stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +sudo apt-get update +sudo apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin +``` + +### install protobuf +```bash +curl -OL https://github.com/google/protobuf/releases/download/v30.2/protoc-30.2-linux-x86_64.zip +unzip protoc-30.2-linux-x86_64.zip -d protoc3 + +sudo mv protoc3/bin/* /usr/local/bin/ +sudo mv protoc3/include/* /usr/local/include/ +``` + +### set cassandra +```bash +sudo docker run -d -p 9042:9042 --name cassandra --ulimit nofile=100000:100000 cassandra:5.0 +cd atomix +sudo docker exec -i cassandra cqlsh < schema/cassandra/atomix/keyspace.cql +sudo docker exec -i cassandra cqlsh -k atomix < schema/cassandra/atomix/schema.cql +``` + +As you run stuff sometimes you may want to clean up cassandra to reduce its load and make a clean start. +Use: +```bash +sudo docker exec -i cassandra cqlsh -e "TRUNCATE atomix.range_map; TRUNCATE atomix.epoch; TRUNCATE atomix.range_leases; TRUNCATE atomix.records; TRUNCATE atomix.wal; TRUNCATE atomix.transactions; TRUNCATE atomix.keyspaces;" +``` + + +## Run guide + +check out these script files to see how I automate the execution of the Sangria experiments. + +- here you can find (with some effort) the instrutions to build and run the servers you need to run sangria: `workload-generator/scripts/atomix_setup.py` +always make sure you build with the --release flag + +- `workload-generator/scripts/run_experiments.py` this is a script that I use to run all experiments automatically. each funtion (calling them in `main` - uncommenting the one I want every time) corresponds to a specific experiment. +as you can see I'm using Ray tune which is an amazing tool that lets you define +a grid of different configuration parameters for each experiment and then runs all different configurations one after the other sequentially. +Then results are logged in a specific directory and processed and plotted from there. + +- `workload-generator/scripts/ray_task.py` +this is the function called for each instance of an experiment. e.g. for the experiment `tradeoff_contention_vs_resolver_capacity_experiment` ray runs different combinations of configs as specified in run_experiments.py. For each such combination the run_workload in ray_task.py function is called. The python code in the scripts is generally horrible and made in a rush. but if you spend some time reading/understanding it you ll familiarize yourself on how to set up and run the experiments how and in which order to spin up the servers etc. + +focus on the run_experiments.py / atomix_setup.py / ray_task.py files. understand how ray works and maybe as a first step try to reproduce the first experiment/function in the run_experiments.py file. + + +You'll need 5 big servers to setup sangria. universe / warden / rangeserver / resolver / frontend. Try to spin them in that order to avoid errors if you run something manually. Running a workload manually instead of using the ray automation is useful to debug stuff especially if you enable logs/printing messages. + +Inside the workload-generator crate you'll find my implementation for a custom/synthetic workload generator. see how I call it in the automated scripts. + +I call it twice if you notice. First call is to initiate a workload of fake transactions whose purpose is to just overload the resolver and limit its capacity. the second workload is the main one whose metrics we get in the end. again the code in the scripts is horrible but it's worth trying to understand it. + +once you get to a point where you have a running setup where you understand how the servers work, how to manually run one workload (i.e. no ray in which case you'll see that you need to be careful about the order in which you spin the servers - when you reset them or when you clean up cassandra between runs) or how to run multiple configs for an experiment sequentially automaticall with ray.. then and only then you should start looking into the Resolver code - understand how dependency resolution is implemented and then inside the rangeserver/impl.rs to see the implementation of strict/pipelined/adaptive 2pc etc. + + +### advice + +- try to isolate the parts of the codebase that you'll need. +- understand how those parts work deeply even if it feels cumbersome - you'll find it useful eventually. +- set clear intermediate goals every step of the way. +e.g. 1) learn how to run the servers 2) learn how to run the experiments using the scripts 3) run manually an experiment using the X config files. +- read as much code as possible - the code is the ultimate source of truth - have a holistic view of the codebase and then concentrate only on those portions you need -- and dig deeply into those. +- catching up with all that without the constant help of me (sorry but with work/life and all I don't expect to always be immediately responsive but will try to be decent) is gonna be overwhelming. navigating that chaos / making sense of everything mainly on your own and keeping your coolness is part of research and the phd process if you ever wanna go down that path :p so see it as a test drive diff --git a/cascading-abort-proposal/cascading-abort-implementation.md b/cascading-abort-proposal/cascading-abort-implementation.md new file mode 100644 index 0000000..3600d52 --- /dev/null +++ b/cascading-abort-proposal/cascading-abort-implementation.md @@ -0,0 +1,461 @@ +# Cascading Abort Implementation - How to Abort Dependent Transactions + +## The Problem: What Happens When Dependencies Fail? + +Imagine this scenario happening in the current Sangria system: + +``` +Transaction A: WRITE key1 = "hello" (commits successfully) +Transaction B: READ key1, WRITE key2 = "world" (depends on A, commits successfully) +Transaction C: READ key2, WRITE key3 = "!" (depends on B, FAILS during commit) +Transaction D: READ key3, WRITE key4 = "done" (depends on C, still waiting...) +``` + +**Current Behavior**: Transaction D will wait forever for C to commit, but C has already failed! + +**Needed Behavior**: When C fails, D should be automatically aborted and all its pending changes rolled back. + +## How Cascading Abort Should Work + +Let's trace through what should happen when Transaction C fails: + +### Step 1: Transaction C Fails During Group Commit + +**Current Code**: `resolver/src/core/group_commit.rs:285-293` + +```rust +// Current code - panics on failure! +if let Err(e) = tx_state_store_clone + .try_batch_commit_transactions(&tx_ids_vec, 0) + .await +{ + panic!( + "Error committing transactions to tx_state_store {:?}: {:?}", + participant_range_clone, e + ); +} +``` + +**What Actually Happens Now**: The entire system panics and crashes! + +**What Should Happen**: Register the failed transactions for cascading abort. + +### Step 2: Resolver Should Track Failed Transactions + +**Current Code**: `resolver/src/core/resolver.rs:37-41` + +```rust +// Current resolver state +#[derive(Default)] +pub struct State { + info_per_transaction: HashMap, + resolved_transactions: HashSet, // Successfully committed +} +``` + +**What We Need**: Add tracking for aborted transactions so we know which ones failed. + +### Step 3: Find Who Was Waiting for Failed Transaction + +**Current Code**: `resolver/src/core/resolver.rs:219-242` + +```rust +// This code currently only handles successful commits +let dependents = mem::take(&mut transaction_info.dependents); +for dependent in dependents.iter() { + let dependent_transaction_info = state.info_per_transaction.get_mut(&dependent).unwrap(); + dependent_transaction_info.num_dependencies -= 1; + + if dependent_transaction_info.num_dependencies == 0 { + // Dependent becomes ready to commit + new_ready_to_commit.push(dependent_transaction_info.clone()); + } +} +``` + +**What We Need**: Similar logic but for aborts - cascade the abort to all dependents. + +## Concrete Implementation Walkthrough + +Let's walk through the exact code changes needed to make cascading abort work: + +### Change 1: Add Aborted Transaction Tracking + +**File**: `resolver/src/core/resolver.rs:37-41` + +**Current Code**: +```rust +#[derive(Default)] +pub struct State { + info_per_transaction: HashMap, + resolved_transactions: HashSet, +} +``` + +**New Code**: +```rust +#[derive(Default)] +pub struct State { + info_per_transaction: HashMap, + resolved_transactions: HashSet, // Successfully committed + aborted_transactions: HashSet, // Failed transactions +} +``` + +### Change 2: Implement Cascading Abort Logic + +**File**: `resolver/src/core/resolver.rs` (add new method after line 267) + +**New Code**: +```rust +pub async fn register_aborted_transactions( + resolver: Arc, + aborted_transaction_ids: Vec, +) -> Result, Error> { + let mut cascaded_aborts = Vec::new(); + + { + let mut state = resolver.state.write().await; + let mut aborts_to_propagate = aborted_transaction_ids.clone(); + + // Process each failed transaction and find its dependents + while !aborts_to_propagate.is_empty() { + let abort_tx_id = aborts_to_propagate.pop().unwrap(); + + // Mark this transaction as aborted + state.aborted_transactions.insert(abort_tx_id); + + if let Some(transaction_info) = state.info_per_transaction.get_mut(&abort_tx_id) { + // Get all transactions waiting for this failed one + let dependents = mem::take(&mut transaction_info.dependents); + + for dependent_id in dependents { + // Check if dependent hasn't already been resolved or aborted + if !state.resolved_transactions.contains(&dependent_id) + && !state.aborted_transactions.contains(&dependent_id) { + + // This dependent must also be aborted! + aborts_to_propagate.push(dependent_id); + cascaded_aborts.push(dependent_id); + + info!("Cascading abort from {} to dependent {}", abort_tx_id, dependent_id); + } + } + } + + // Wake up any transactions waiting for the aborted transaction + if let Some(sender) = resolver.waiting_transactions.write().await.remove(&abort_tx_id) { + let _ = sender.send(()); // This will cause waiting transaction to check abort status + } + } + } + + info!("Cascaded abort to {} transactions: {:?}", cascaded_aborts.len(), cascaded_aborts); + Ok(cascaded_aborts) +} +``` + +**What This Does**: +- Marks failed transactions as aborted +- Finds all their dependents using existing `dependents` field +- Recursively aborts dependents (cascading effect) +- Wakes up waiting transactions so they can check if they've been aborted + +### Change 3: Handle Group Commit Failures + +**File**: `resolver/src/core/group_commit.rs:285-293` + +**Current Code**: +```rust +if let Err(e) = tx_state_store_clone + .try_batch_commit_transactions(&tx_ids_vec, 0) + .await +{ + panic!( + "Error committing transactions to tx_state_store {:?}: {:?}", + participant_range_clone, e + ); +} +``` + +**New Code**: +```rust +if let Err(e) = tx_state_store_clone + .try_batch_commit_transactions(&tx_ids_vec, 0) + .await +{ + error!("Batch commit failed for range {:?}: {:?}", participant_range_clone, e); + + // Instead of panicking, register these transactions as aborted + let resolver_clone = resolver.clone(); // Need resolver reference passed to group_commit + let failed_tx_ids = tx_ids_vec.clone(); + + tokio::spawn(async move { + if let Err(abort_err) = Resolver::register_aborted_transactions(resolver_clone, failed_tx_ids).await { + error!("Failed to register aborted transactions: {:?}", abort_err); + } + }); + + return Err(Error::GroupCommitFailed); +} +``` + +**What This Does**: Instead of crashing, register failed transactions for cascading abort. + +### Change 4: Check for Aborts in Commit Method + +**File**: `resolver/src/core/resolver.rs:65-144` (modify existing method) + +**Current Code**: +```rust +// Block until the transaction is actually committed +r.await.unwrap(); +info!("Transaction {} finally committed!", transaction_id); +Ok(()) +``` + +**New Code**: +```rust +// Block until the transaction is committed OR aborted +r.await.unwrap(); + +// Check if transaction was aborted while waiting +{ + let state = resolver.state.read().await; + if state.aborted_transactions.contains(&transaction_id) { + info!("Transaction {} was aborted due to cascading abort", transaction_id); + return Err(Error::TransactionAborted(TransactionAbortReason::CascadingAbort)); + } +} + +info!("Transaction {} finally committed!", transaction_id); +Ok(()) +``` + +**What This Does**: After waiting, check if the transaction was aborted due to cascading failure. + +### Change 5: Prevent New Dependencies on Aborted Transactions + +**File**: `resolver/src/core/resolver.rs:82-102` (modify existing loop) + +**Current Code**: +```rust +for dependency in dependencies { + if !state.resolved_transactions.contains(&dependency) { + // Dependency is not yet resolved, so we need to wait for it + num_pending_dependencies += 1; + state.info_per_transaction + .entry(dependency) + .or_insert(TransactionInfo::default(dependency, fake)) + .dependents + .insert(transaction_id); + } +} +``` + +**New Code**: +```rust +for dependency in dependencies { + if state.aborted_transactions.contains(&dependency) { + // Dependency was aborted! This transaction should abort immediately + info!("Transaction {} depends on aborted transaction {}, aborting", + transaction_id, dependency); + return Err(Error::TransactionAborted(TransactionAbortReason::DependencyAborted)); + } + + if !state.resolved_transactions.contains(&dependency) { + // Dependency is not yet resolved, so we need to wait for it + num_pending_dependencies += 1; + state.info_per_transaction + .entry(dependency) + .or_insert(TransactionInfo::default(dependency, fake)) + .dependents + .insert(transaction_id); + } +} +``` + +**What This Does**: Check if any dependency is already aborted before waiting for it. + +## Concrete Example: How Cascading Abort Works + +Let's trace through the exact execution with our example transactions: + +### Initial State +``` +Transaction A: WRITE key1 = "hello" (committed successfully) +Transaction B: READ key1, WRITE key2 = "world" (committed successfully) +Transaction C: READ key2, WRITE key3 = "!" (depends on B, about to commit) +Transaction D: READ key3, WRITE key4 = "done" (depends on C, waiting in resolver) +``` + +**Resolver State**: +```rust +info_per_transaction: { + C: TransactionInfo { id: C, num_dependencies: 0, dependents: {D} }, + D: TransactionInfo { id: D, num_dependencies: 1, dependents: {} } +} +resolved_transactions: {A, B} +aborted_transactions: {} +waiting_transactions: {D: channel} +``` + +### Step 1: Transaction C Fails During Group Commit + +**What Happens**: Range server or transaction state store fails C's commit + +```rust +// In group_commit.rs - C's commit fails +tx_state_store_clone.try_batch_commit_transactions(&[C.id], 0).await // FAILS! + +// New error handling kicks in +tokio::spawn(async move { + Resolver::register_aborted_transactions(resolver_clone, vec![C.id]).await; +}); +``` + +### Step 2: Cascading Abort Processes C's Failure + +```rust +// register_aborted_transactions processes C +let mut aborts_to_propagate = vec![C.id]; + +while !aborts_to_propagate.is_empty() { + let abort_tx_id = aborts_to_propagate.pop().unwrap(); // C.id + + // Mark C as aborted + state.aborted_transactions.insert(C.id); + + // Find C's dependents + let dependents = mem::take(&mut transaction_info.dependents); // dependents = {D} + + for dependent_id in dependents { // For D + if !state.resolved_transactions.contains(&D.id) && !state.aborted_transactions.contains(&D.id) { + // D must be aborted too! + aborts_to_propagate.push(D.id); + cascaded_aborts.push(D.id); + } + } + + // Wake up D so it can see it's been aborted + let sender = resolver.waiting_transactions.write().await.remove(&C.id); + sender.send(()).unwrap(); // D's thread wakes up +} +``` + +### Step 3: Transaction D Wakes Up and Sees It's Aborted + +```rust +// Transaction D's thread wakes up from r.await +r.await.unwrap(); + +// Check abort status +let state = resolver.state.read().await; +if state.aborted_transactions.contains(&D.id) { + return Err(Error::TransactionAborted(TransactionAbortReason::CascadingAbort)); +} +``` + +### Final State After Cascading Abort +```rust +info_per_transaction: { + C: TransactionInfo { id: C, num_dependencies: 0, dependents: {} }, // dependents cleared + D: TransactionInfo { id: D, num_dependencies: 1, dependents: {} } // still marked as waiting +} +resolved_transactions: {A, B} +aborted_transactions: {C, D} // Both C and D are now aborted +waiting_transactions: {} // D was removed and woken up +``` + +## Rollback of Pending Operations + +### How Pending Writes Get Rolled Back + +**Current Transaction Structure**: `coordinator/src/transaction.rs:33-38` + +```rust +struct ParticipantRange { + readset: HashSet, // Keys read by transaction + writeset: HashMap, // Pending writes that need rollback + deleteset: HashSet, // Pending deletes that need rollback + leader_sequence_number: u64, +} +``` + +**When Cascading Abort Happens**: The coordinator's existing `record_abort()` method already handles rollback: + +```rust +// coordinator/src/transaction.rs:170-203 - existing abort handling +async fn record_abort(&mut self) -> Result<(), Error> { + self.state = State::Aborted; + + // Notify all participant ranges to roll back prepared changes + let mut abort_join_set = JoinSet::new(); + for range_id in self.participant_ranges.keys() { + let range_client = self.range_client.clone(); + abort_join_set.spawn_on(async move { + range_client.abort_transaction(transaction_info, &range_id).await + }, &self.runtime); + } + + // Record abort in transaction state store + self.tx_state_store.try_abort_transaction(self.id).await.unwrap(); + + while abort_join_set.join_next().await.is_some() {} + Ok(()) +} +``` + +**What Gets Rolled Back**: +- All pending writes in `participant_ranges[].writeset` +- All pending deletes in `participant_ranges[].deleteset` +- Any locks held by the transaction +- The transaction's prepare records in range servers + +## Integration with Existing Error Handling + +### Coordinator Integration + +**File**: `coordinator/src/transaction.rs:170-203` (modify existing method) + +Add resolver notification to existing abort handling: + +```rust +async fn record_abort(&mut self) -> Result<(), Error> { + self.state = State::Aborted; + + // NEW: Notify resolver of abort to trigger cascading + if let Err(e) = self.resolver.register_aborted_transactions(vec![self.id]).await { + error!("Failed to register abort with resolver: {:?}", e); + } + + // Existing abort logic continues unchanged + let mut abort_join_set = JoinSet::new(); + for range_id in self.participant_ranges.keys() { + // ... existing range abort notifications + } + + self.tx_state_store.try_abort_transaction(self.id).await.unwrap(); + while abort_join_set.join_next().await.is_some() {} + Ok(()) +} +``` + +This ensures that any transaction abort (whether from timeout, explicit abort, or commit failure) triggers cascading abort checking. + +## Summary: The Complete Cascading Flow + +1. **Transaction C fails** during group commit (range server error, state store failure, etc.) +2. **Group commit error handler** registers C for cascading abort instead of panicking +3. **Resolver processes cascading abort**: + - Marks C as aborted + - Finds C's dependents (D) using existing dependency graph + - Recursively marks D as aborted + - Wakes up D's waiting thread +4. **Transaction D wakes up** and checks its status, sees it's aborted +5. **D's coordinator calls record_abort()** which: + - Notifies all ranges to rollback D's pending writes + - Records abort in transaction state store + - Cleans up D's locks and prepared state + +The cascading abort leverages Sangria's existing dependency tracking (`dependents` field) and abort infrastructure (`record_abort()` method) to ensure that when ancestor transactions fail, all their dependent children are properly aborted and rolled back. \ No newline at end of file diff --git a/codebase-understanding/01-codebase-overview.md b/codebase-understanding/01-codebase-overview.md new file mode 100644 index 0000000..a7a75ed --- /dev/null +++ b/codebase-understanding/01-codebase-overview.md @@ -0,0 +1,341 @@ +# Sangria Codebase Overview - How It Actually Works + +## What Happens When You Run a Transaction + +Let's trace through what actually happens when you execute a simple transaction like: +``` +START TRANSACTION +PUT key1 = "value1" +GET key2 +COMMIT +``` + +Here's the **actual code flow** through Sangria's distributed system: + +### Step 1: Client Starts Transaction + +**File**: `frontend/src/main.rs` → **gRPC Handler** + +```rust +// frontend/src/server.rs (gRPC service implementation) +async fn start_transaction( + &self, + request: Request, +) -> Result, Status> { + let keyspace_id = request.get_ref().keyspace_id; + + // Create new transaction with unique ID + let transaction_info = Arc::new(TransactionInfo::new()); + let transaction = Transaction::new( + transaction_info.clone(), + self.range_client.clone(), + self.range_assignment_oracle.clone(), + // ... other dependencies + ); + + // Store transaction in coordinator + self.transactions.write().await.insert(transaction_info.id, transaction); + + Ok(Response::new(StartTransactionResponse { + transaction_id: transaction_info.id.to_string(), + })) +} +``` + +**What actually happened**: Frontend creates a `Transaction` object and stores it in memory. This transaction will track all your reads/writes. + +### Step 2: Client Executes PUT Operation + +**File**: `coordinator/src/transaction.rs:152-159` + +```rust +pub async fn put(&mut self, keyspace: &Keyspace, key: Bytes, val: Bytes) -> Result<(), Error> { + self.check_still_running()?; // Make sure transaction hasn't been aborted + + // Figure out which range owns this key + let full_record_key = self.resolve_full_record_key(keyspace, key.clone()).await?; + + // Get or create participant range for this range + let participant_range = self.get_participant_range(full_record_key.range_id); + + // Store the write locally (doesn't hit database yet!) + participant_range.deleteset.remove(&key); // Remove any pending deletes + participant_range.writeset.insert(key, val.clone()); // Add to local writeset + + Ok(()) +} +``` + +**What actually happened**: The PUT doesn't write to the database yet! It just stores `key1 = "value1"` in the transaction's local `writeset`. The actual write happens during commit. + +### Step 3: Client Executes GET Operation + +**File**: `coordinator/src/transaction.rs:103-150` + +```rust +pub async fn get(&mut self, keyspace: &Keyspace, key: Bytes) -> Result, Error> { + self.check_still_running()?; + + let full_record_key = self.resolve_full_record_key(keyspace, key.clone()).await?; + let participant_range = self.get_participant_range(full_record_key.range_id); + + // Read-your-writes: Check if we wrote to this key already + if let Some(v) = participant_range.writeset.get(&key) { + return Ok(Some(v.clone())); // Return our own write + } + if participant_range.deleteset.contains(&key) { + return Ok(None); // We deleted it + } + + // Actually read from the database + let get_result = self.range_client.get( + self.transaction_info.clone(), + &full_record_key.range_id, + vec![key.clone()], + ).await.unwrap(); + + // CRITICAL: Update transaction dependencies! + self.dependencies.extend(get_result.dependencies); + + // Add to readset for conflict detection + participant_range.readset.insert(key.clone()); + + let val = get_result.vals.first().unwrap().clone(); + Ok(val) +} +``` + +**What actually happened**: +1. GET checks local writeset first (read-your-writes) +2. If not found locally, reads from range server +3. **Critically**: Range server returns not just the value, but also `dependencies` - other transactions this read depends on +4. These dependencies will determine commit order later! + +### Step 4: Client Calls COMMIT - The 2PC Begins + +**File**: `coordinator/src/transaction.rs:220-226` + +```rust +pub async fn commit(&mut self, resolver_average_load: f64, num_open_clients: u32) -> Result<(), Error> { + self.check_still_running()?; + + // --- PHASE 1: PREPARE --- + self.state = State::Preparing; + let mut prepare_join_set = JoinSet::new(); + + // Send prepare to ALL ranges we touched + for (range_id, info) in &self.participant_ranges { + let has_reads = !info.readset.is_empty(); + let writes: Vec = info.writeset.iter().map(|(k, v)| Record { + key: k.clone(), val: v.clone() + }).collect(); + let deletes: Vec = info.deleteset.iter().cloned().collect(); + + prepare_join_set.spawn_on(async move { + range_client.prepare_transaction( + transaction_info, + &range_id, + has_reads, + &writes, // Our pending writes + &deletes, // Our pending deletes + resolver_average_load, + num_open_clients, + ).await + }, &self.runtime); + } +``` + +**What actually happened**: The coordinator sends "prepare" requests **in parallel** to every range server that this transaction touched. Each prepare request includes: +- All writes this transaction wants to make to that range +- All deletes this transaction wants to make +- Whether this transaction read from that range + +### Step 5: Range Servers Respond to PREPARE + +**File**: `rangeserver/src/range_manager/impl.rs` (prepare_transaction handler) + +Each range server: +1. **Locks the keys** this transaction wants to write/delete +2. **Validates** the transaction can proceed (no conflicts, leader hasn't changed) +3. **Returns more dependencies** based on what it finds +4. **Stores prepare record** but doesn't actually apply the changes yet + +### Step 6: Coordinator Makes Commit Decision + +**File**: `coordinator/src/transaction.rs:290-434` + +```rust +// After all prepare responses come back... +self.dependencies.extend(res.dependencies); // Collect ALL dependencies + +// Now decide how to commit based on strategy +match self.commit_strategy { + CommitStrategy::Adaptive | CommitStrategy::Pipelined => { + if !self.dependencies.is_empty() { + // HAS DEPENDENCIES -> Use resolver pipeline + info!("Delegating commit to resolver for transaction {}", self.id); + let participants_info = self.participant_ranges.iter() + .map(|(range_id, info)| ParticipantRangeInfo::new(*range_id, !info.writeset.is_empty())) + .collect(); + + self.resolver.commit(self.id, self.dependencies.clone(), participants_info).await?; + } else { + // NO DEPENDENCIES -> Direct commit (fast path) + info!("Committing transaction {:?} without Resolver", self.id); + + // 1. Record commit decision in transaction state store + self.tx_state_store.try_commit_transaction(self.id, 0).await.unwrap(); + self.state = State::Committed; + + // 2. Tell all ranges to apply the changes in parallel + let mut commit_join_set = JoinSet::new(); + for (range_id, info) in self.participant_ranges.iter() { + if !info.writeset.is_empty() { // Only notify ranges with writes + commit_join_set.spawn_on(async move { + range_client.commit_transactions(vec![transaction_info.id], &range_id, 0).await + }, &self.runtime); + } + } + while commit_join_set.join_next().await.is_some() {} + } + } +} +``` + +## The Two Commit Paths + +### Fast Path (No Dependencies) +``` +Transaction → Prepare → Decision → Commit → Done +``` +**When**: Transaction doesn't conflict with any ongoing transactions +**Code Path**: Direct to range servers, bypasses resolver entirely + +### Pipeline Path (Has Dependencies) +``` +Transaction → Prepare → Decision → Resolver → Group Commit → Done +``` +**When**: Transaction depends on other transactions (read something another transaction wrote) +**Code Path**: Goes through resolver for dependency ordering + +## Key Data Structures - What They Actually Store + +### Transaction Object (`coordinator/src/transaction.rs:40-54`) +```rust +pub struct Transaction { + id: Uuid, // Unique transaction ID + state: State, // Running/Preparing/Committed/Aborted + participant_ranges: HashMap, // WHO we're talking to + dependencies: HashSet, // WHO we're waiting for + range_client: Arc, // HOW to talk to ranges + resolver: Arc, // HOW to handle dependencies + // ... +} + +struct ParticipantRange { + readset: HashSet, // Keys we read (for conflict detection) + writeset: HashMap, // Key-value pairs we want to write + deleteset: HashSet, // Keys we want to delete + leader_sequence_number: u64, // To detect if range leader changed +} +``` + +**In English**: Each transaction keeps track of: +- **What it wants to do**: `writeset`, `deleteset`, `readset` +- **Where it needs to do it**: `participant_ranges` (which database ranges) +- **What it's waiting for**: `dependencies` (other transaction IDs) + +### Resolver State (`resolver/src/core/resolver.rs:37-41`) +```rust +pub struct State { + info_per_transaction: HashMap, // All pending transactions + resolved_transactions: HashSet, // Successfully committed +} + +pub struct TransactionInfo { + id: Uuid, + num_dependencies: u32, // How many transactions I'm waiting for + dependents: HashSet, // Who is waiting for ME + participant_ranges_info: Vec, // Where I need to commit +} +``` + +**In English**: The resolver maintains a **dependency graph**: +- "Transaction B is waiting for Transaction A" +- "When A finishes, wake up B" +- "B has writes to Range 1 and Range 3" + +## Component Architecture - How They Actually Communicate + +``` +[Client] + ↓ gRPC +[Frontend] ←→ [Coordinator] + ↓ gRPC ↓ gRPC +[RangeServer1] [Resolver] ←→ [GroupCommit] +[RangeServer2] ↓ gRPC +[RangeServer3] [TxStateStore] +``` + +### Communication Patterns in Code: + +**Client → Frontend**: Standard gRPC calls +```rust +// Proto definition +service Frontend { + rpc StartTransaction(StartTransactionRequest) returns (StartTransactionResponse); + rpc Put(PutRequest) returns (PutResponse); + rpc Get(GetRequest) returns (GetResponse); + rpc Commit(CommitRequest) returns (CommitResponse); +} +``` + +**Coordinator → RangeServer**: Parallel async calls +```rust +// Multiple ranges contacted simultaneously +for (range_id, info) in &self.participant_ranges { + prepare_join_set.spawn_on(async move { + range_client.prepare_transaction(/* ... */).await + }, &self.runtime); +} +``` + +**Coordinator → Resolver**: Dependency submission +```rust +self.resolver.commit( + self.id, // Who I am + self.dependencies.clone(), // Who I depend on + participants_info // What I want to do +).await?; +``` + +## The Complete Transaction Flow + +1. **Client sends PUT/GET operations** + - Stored locally in transaction's `writeset`/`readset` + - Dependencies collected from range servers during GETs + +2. **Client sends COMMIT** + - Coordinator enters PREPARE phase + - Parallel prepare requests sent to all participant ranges + +3. **Range servers validate and lock** + - Lock requested keys + - Return additional dependencies + - Store prepare records + +4. **Coordinator decides commit strategy** + - **No dependencies**: Fast path (direct commit) + - **Has dependencies**: Pipeline path (through resolver) + +5. **Pipeline processing** (if dependencies exist) + - Transaction waits in resolver for dependencies to clear + - When ready, added to group commit batch + - Batch committed to all ranges simultaneously + +6. **Range servers apply changes** + - Apply prepared writes/deletes to storage + - Release locks + - Update dependency tracking + +This is how a simple PUT/GET/COMMIT transaction actually flows through the entire distributed system! \ No newline at end of file diff --git a/codebase-understanding/02-resolver-and-2pc-mechanisms.md b/codebase-understanding/02-resolver-and-2pc-mechanisms.md new file mode 100644 index 0000000..b4885f8 --- /dev/null +++ b/codebase-understanding/02-resolver-and-2pc-mechanisms.md @@ -0,0 +1,377 @@ +# Resolver and 2PC Mechanisms - How Dependencies Actually Work + +## The Problem: Why Do We Need a Resolver? + +Imagine three transactions running simultaneously: +``` +Transaction A: WRITE key1 = "hello" +Transaction B: READ key1, WRITE key2 = "world" +Transaction C: READ key2, WRITE key3 = "!" +``` + +**The Problem**: B depends on A, and C depends on B. If they commit in wrong order (C, A, B), we get inconsistent results! + +**The Solution**: The resolver tracks these dependencies and ensures correct commit ordering. + +## How the Resolver Actually Works + +Let's trace through what happens when Transaction B (depends on A) tries to commit: + +### Step 1: Transaction B Calls Resolver Commit + +**File**: `resolver/src/core/resolver.rs:65-76` + +Transaction B arrives at the resolver saying "I want to commit, but I depend on Transaction A": + +```rust +pub async fn commit( + resolver: Arc, + transaction_id: Uuid, // B's ID + dependencies: HashSet, // {A's ID} + participant_ranges_info: Vec, // Where B wants to write + fake: bool, +) -> Result<(), Error> { + // Quick optimization: read-only transactions don't need dependency tracking + if participant_ranges_info.iter().all(|info| !info.has_writes) { + return Ok(()); + } + + let (s, r) = oneshot::channel(); // Create a "wake me up" channel + let mut num_pending_dependencies = 0; +``` + +### Step 2: Dependency Registration + +**File**: `resolver/src/core/resolver.rs:82-102` + +```rust +{ + let mut state = resolver.state.write().await; // Lock the dependency graph + + for dependency in dependencies { // For each transaction B depends on + if !state.resolved_transactions.contains(&dependency) { + // Transaction A hasn't committed yet, so B must wait + num_pending_dependencies += 1; + + // CRITICAL: Register B as waiting for A + state.info_per_transaction + .entry(dependency) // Get A's info + .or_insert(TransactionInfo::default(dependency, fake)) + .dependents // A's list of who's waiting for it + .insert(transaction_id); // Add B to that list + } else { + info!("Dependency {:?} was already resolved", dependency); + } + } +``` + +**What Actually Happened**: +- Resolver looks up Transaction A: "Has A committed yet? No." +- Resolver adds B to A's `dependents` list: `A.dependents = {B}` +- Now the resolver knows: "When A commits, wake up B" + +### Step 3: Waiting vs. Ready to Commit + +**File**: `resolver/src/core/resolver.rs:104-141` + +```rust + let transaction_info = state.info_per_transaction + .entry(transaction_id) + .or_insert(TransactionInfo::default(transaction_id, fake)); + + transaction_info.num_dependencies = num_pending_dependencies; // B.num_dependencies = 1 + transaction_info.participant_ranges_info = participant_ranges_info; + + resolver.waiting_transactions.write().await.insert(transaction_id, s); // Store B's "wake up" channel + + if num_pending_dependencies == 0 { + // B has no pending dependencies - can commit immediately! + info!("No pending dependencies, committing transaction {:?}", transaction_id); + + resolver.group_commit.add_transactions(&vec![transaction_info.clone()]).await?; + resolver.bg_runtime.spawn(async move { + let _ = Self::trigger_commit(resolver_clone, vec![transaction_info]).await; + }); + } +} + +// Block until transaction is committed (or dependency resolved) +r.await.unwrap(); // Transaction B waits here until A commits +``` + +**What Actually Happened**: +- Transaction B is **blocked** waiting for A to commit +- The resolver stores B's "wake up" channel +- B's thread literally waits at `r.await` until someone signals the channel + +## The Cascade Effect - When Dependencies Resolve + +Now let's see what happens when Transaction A finally commits: + +### Step 1: Transaction A Finishes and Notifies Resolver + +**File**: `resolver/src/core/resolver.rs:195-208` + +When Transaction A successfully commits, the resolver is notified: + +```rust +pub async fn register_committed_transactions( + resolver: Arc, + transaction_ids: Vec, // [A's ID] - A just committed! +) -> Result<(), Error> { + let mut new_ready_to_commit = Vec::new(); + + { + let mut state = resolver.state.write().await; + for transaction_id in transaction_ids { + state.resolved_transactions.insert(transaction_id); // Mark A as resolved + } + + let mut new_resolved_dependencies = vec![A's ID]; +``` + +### Step 2: Find Who Was Waiting for A + +**File**: `resolver/src/core/resolver.rs:211-242` + +The resolver now finds all transactions that were waiting for A: + +```rust + while !new_resolved_dependencies.is_empty() { + let transaction_id = new_resolved_dependencies.pop().unwrap(); // A's ID + + if let Some(transaction_info) = state.info_per_transaction.get_mut(&transaction_id) { + // Get everyone who was waiting for A + let dependents = mem::take(&mut transaction_info.dependents); // dependents = {B} + + for dependent in dependents.iter() { // For each transaction waiting for A + let dependent_transaction_info = state.info_per_transaction.get_mut(&dependent).unwrap(); + + // B was waiting for 1 transaction (A), now it's 0 + dependent_transaction_info.num_dependencies -= 1; + + if dependent_transaction_info.num_dependencies == 0 { + // B now has no pending dependencies! Ready to commit! + new_ready_to_commit.push(dependent_transaction_info.clone()); + new_resolved_dependencies.push(*dependent); // B might wake up others too + } + } + } + } +``` + +**What Actually Happened**: +- A committed, so resolver looks at A's `dependents` list: finds {B} +- B was waiting for 1 transaction, now 0 → B is ready to commit! +- If C was waiting for B, it would be added to the next round + +### Step 3: Wake Up Waiting Transactions + +**File**: `resolver/src/core/resolver.rs:245-265` + +```rust + // Add newly ready transactions to group commit + if !new_ready_to_commit.is_empty() { + resolver.group_commit.add_transactions(&new_ready_to_commit).await; + } + } + + // Trigger commit for newly ready transactions + if !new_ready_to_commit.is_empty() { + info!("New ready to commit transactions: {:?}", + new_ready_to_commit.iter().map(|tx| tx.id).collect::>()); + + let resolver_clone = resolver.clone(); + resolver.bg_runtime.spawn(async move { + let _ = Resolver::trigger_commit(resolver_clone, new_ready_to_commit).await; + }); + } +``` + +### Step 4: Group Commit Executes + +**File**: `resolver/src/core/resolver.rs:149-178` + +Now the resolver triggers a group commit for Transaction B: + +```rust +async fn trigger_commit( + resolver: Arc, + transactions: Vec, // [B's info] +) -> Result<(), Error> { + info!("Triggering commit for transactions {:?}", + transactions.iter().map(|tx| tx.id).collect::>()); + + // Execute the actual commit + let finished_transactions = resolver.group_commit.commit().await?; + + // Wake up the waiting transactions + { + let mut waiting_transactions = resolver.waiting_transactions.write().await; + for transaction in finished_transactions { + let sender = waiting_transactions.remove(&transaction.id).unwrap(); + sender.send(()).unwrap(); // This unblocks Transaction B's await! + } + } + + // Register B as committed (might wake up C if C depends on B) + let finished_transaction_ids = finished_transactions.iter().map(|tx| tx.id).collect::>(); + if !finished_transaction_ids.is_empty() { + Self::spawn_register_committed_transactions(resolver, finished_transaction_ids); + } +} +``` + +**What Actually Happened**: +- B gets added to group commit and commits to the database +- B's "wake up" channel gets signaled: `sender.send(())` +- Transaction B's thread unblocks from `r.await` +- Resolver registers B as committed (might wake up C) + +## Group Commit - Batching for Performance + +### The Problem with Individual Commits + +Without group commit, each transaction would commit individually: +``` +Transaction A commits → tells database → waits for ack +Transaction B commits → tells database → waits for ack +Transaction C commits → tells database → waits for ack +``` + +### The Group Commit Solution + +**File**: `resolver/src/core/group_commit.rs:152-170` + +Instead, transactions get batched together: + +```rust +pub async fn add_transactions(&self, transactions: &Vec) -> Result<(), Error> { + // Group transactions by which database ranges they touch + let mut tmp_group_per_participant = HashMap::new(); + + for transaction in transactions { + for participant_range in transaction.participant_ranges_info.iter() { + if participant_range.has_writes { // Only ranges with writes need commits + tmp_group_per_participant + .entry(participant_range.participant_range) // Range ID + .or_insert_with(|| Vec::new()) + .push(transaction.clone()); // Add transaction to this range's batch + } + } + } +``` + +**Example**: If we have: +- Transaction A: writes to Range 1, Range 2 +- Transaction B: writes to Range 2, Range 3 +- Transaction C: writes to Range 1 + +The grouping becomes: +- **Range 1**: [Transaction A, Transaction C] +- **Range 2**: [Transaction A, Transaction B] +- **Range 3**: [Transaction B] + +### Parallel Range Commits + +**File**: `resolver/src/core/group_commit.rs:227-325` + +Now all ranges commit their batches in parallel: + +```rust +pub async fn commit(&self) -> Result, Error> { + let mut commit_join_set = JoinSet::>::new(); + + for participant_range in non_empty_groups.iter() { + commit_join_set.spawn(async move { + // Get all transactions for this range + let transactions = std::mem::take(&mut *group_clone); + let tx_ids_vec = transactions.iter().map(|tx| tx.id).collect(); + + // Batch commit to transaction state store + tx_state_store_clone.try_batch_commit_transactions(&tx_ids_vec, 0).await?; + + // Tell range to apply all changes at once + range_client.commit_transactions(tx_ids_vec, &participant_range_clone, 0).await?; + }); + } + + while let Some(res) = commit_join_set.join_next().await {} +``` + +**What Actually Happened**: +``` +Range 1: Batch commits [A, C] simultaneously +Range 2: Batch commits [A, B] simultaneously +Range 3: Batch commits [B] simultaneously + +All ranges commit in parallel! +``` + +## The Complete Dependency Resolution Flow + +Let's put it all together with a concrete example: + +### Initial State +``` +Transaction A: WRITE key1 = "hello" (no dependencies) +Transaction B: READ key1, WRITE key2 = "world" (depends on A) +Transaction C: READ key2, WRITE key3 = "!" (depends on B) +``` + +### Step-by-Step Execution + +**1. Transaction A Commits (No Dependencies)** +- A takes fast path, commits directly +- A notifies resolver: "I'm done" +- Resolver looks up A's dependents: finds {B} +- B becomes ready to commit + +**2. Transaction B Commits (Was Waiting for A)** +- B gets added to group commit batch +- B commits to database +- B notifies resolver: "I'm done" +- Resolver looks up B's dependents: finds {C} +- C becomes ready to commit + +**3. Transaction C Commits (Was Waiting for B)** +- C gets added to group commit batch +- C commits to database +- Done! + +### The Data Structures During Execution + +**Resolver State at Start**: +```rust +info_per_transaction: { + A: TransactionInfo { id: A, num_dependencies: 0, dependents: {B} }, + B: TransactionInfo { id: B, num_dependencies: 1, dependents: {C} }, + C: TransactionInfo { id: C, num_dependencies: 1, dependents: {} } +} +resolved_transactions: {} +waiting_transactions: {B: channel, C: channel} +``` + +**After A Commits**: +```rust +info_per_transaction: { + A: TransactionInfo { id: A, num_dependencies: 0, dependents: {} }, // dependents cleared + B: TransactionInfo { id: B, num_dependencies: 0, dependents: {C} }, // decremented to 0 + C: TransactionInfo { id: C, num_dependencies: 1, dependents: {} } +} +resolved_transactions: {A} +waiting_transactions: {C: channel} // B removed, woken up +``` + +**After B Commits**: +```rust +info_per_transaction: { + A: TransactionInfo { id: A, num_dependencies: 0, dependents: {} }, + B: TransactionInfo { id: B, num_dependencies: 0, dependents: {} }, // dependents cleared + C: TransactionInfo { id: C, num_dependencies: 0, dependents: {} } // decremented to 0 +} +resolved_transactions: {A, B} +waiting_transactions: {} // C removed, woken up +``` + +This is exactly how Sangria ensures that transactions commit in the correct dependency order while maximizing parallelism through batching! \ No newline at end of file diff --git a/codebase-understanding/03-pipeline-2pc-implementation.md b/codebase-understanding/03-pipeline-2pc-implementation.md new file mode 100644 index 0000000..9f28b66 --- /dev/null +++ b/codebase-understanding/03-pipeline-2pc-implementation.md @@ -0,0 +1,390 @@ +# Pipeline 2PC Implementation - How Pipelined 2PC Actually Works + +## What Is "Pipelined" 2PC? + +Traditional 2PC processes transactions one at a time: +``` +Transaction A: Prepare → Commit → Done +Transaction B: Prepare → Commit → Done +Transaction C: Prepare → Commit → Done +``` + +Pipelined 2PC processes multiple transactions simultaneously: +``` +Transaction A: Prepare → Commit → Done +Transaction B: Prepare → Commit → Done +Transaction C: Prepare → Commit → Done +``` + +Multiple transactions flow through different stages of 2PC at the same time, like cars on an assembly line. + +## How Pipelined 2PC Actually Works in Code + +Let's trace through what happens when multiple transactions hit the system simultaneously: + +### Step 1: Transaction Reaches Commit Decision Point + +**File**: `coordinator/src/transaction.rs:356-375` + +When a transaction is ready to commit, it makes a **smart routing decision**: + +```rust +CommitStrategy::Adaptive | CommitStrategy::Pipelined => { + if !self.dependencies.is_empty() { + // HAS DEPENDENCIES -> Go through resolver (pipeline path) + info!("Delegating commit to resolver for transaction {}", self.id); + let participants_info = self.participant_ranges + .iter() + .map(|(range_id, info)| { + ParticipantRangeInfo::new(*range_id, !info.writeset.is_empty()) + }) + .collect(); + + self.resolver.commit(self.id, self.dependencies.clone(), participants_info).await?; + } else { + // NO DEPENDENCIES -> Direct commit (fast path) + info!("Committing transaction {:?} without Resolver", self.id); + // Skip the resolver entirely... + } +} +``` + +**In English**: +- **Has Dependencies**: "I need to wait for other transactions, so use the pipeline system" +- **No Dependencies**: "I'm independent, so take the express lane" + +### Step 2A: Fast Path (No Dependencies) - Parallel Processing + +**File**: `coordinator/src/transaction.rs:376-430` + +Independent transactions can commit **simultaneously**: + +```rust +// NO DEPENDENCIES -> Direct commit (fast path) +// 1. Record commit decision atomically +let _ = self.tx_state_store.try_commit_transaction(self.id, 0).await.unwrap(); +self.state = State::Committed; + +// 2. Notify all participant ranges IN PARALLEL +let mut commit_join_set = JoinSet::new(); +for (range_id, info) in self.participant_ranges.iter() { + let range_id = *range_id; + let has_writes = !info.writeset.is_empty(); + if has_writes { + commit_join_set.spawn_on(async move { + range_client.commit_transactions(vec![transaction_info.id], &range_id, 0).await + }, &self.runtime); + } +} +while commit_join_set.join_next().await.is_some() {} +``` + +**What Actually Happens**: +- Multiple independent transactions can execute this code **simultaneously** +- Each transaction commits to state store independently +- Each transaction notifies its ranges in parallel with other transactions +- **No coordination needed** between independent transactions + +### Step 2B: Pipeline Path (Has Dependencies) - Batched Processing + +**File**: `resolver/src/core/resolver.rs:65-144` + +Dependent transactions enter the resolver pipeline: + +```rust +pub async fn commit( + resolver: Arc, + transaction_id: Uuid, + dependencies: HashSet, + participant_ranges_info: Vec, + fake: bool, +) -> Result<(), Error> { + let (s, r) = oneshot::channel(); + + { + let mut state = resolver.state.write().await; + + // Check dependencies and register transaction + for dependency in dependencies { + if !state.resolved_transactions.contains(&dependency) { + num_pending_dependencies += 1; + state.info_per_transaction + .entry(dependency) + .or_insert(TransactionInfo::default(dependency, fake)) + .dependents + .insert(transaction_id); + } + } + + if num_pending_dependencies == 0 { + // Ready to commit immediately! Add to batch. + resolver.group_commit.add_transactions(&vec![transaction_info.clone()]).await?; + resolver.bg_runtime.spawn(async move { + let _ = Self::trigger_commit(resolver_clone, vec![transaction_info]).await; + }); + } + } + + r.await.unwrap(); // Wait for commit to complete +} +``` + +**Key Insight**: Transactions that are ready to commit get **batched together** instead of committing individually! + +## The Pipeline Magic: Group Commit Batching + +### Step 3: Multiple Transactions Get Batched Together + +**File**: `resolver/src/core/group_commit.rs:152-191` + +Instead of committing transactions one by one, the resolver **groups them by range**: + +```rust +pub async fn add_transactions(&self, transactions: &Vec) -> Result<(), Error> { + // Group transactions by participant range + info!("Grouping transactions by participant range"); + let mut tmp_group_per_participant = HashMap::new(); + + for transaction in transactions { + for participant_range in transaction.participant_ranges_info.iter() { + if participant_range.has_writes { + tmp_group_per_participant + .entry(participant_range.participant_range) // Range A, B, C, etc. + .or_insert_with(|| Vec::new()) + .push(transaction.clone()); // Add transaction to this range's batch + } + } + } + + // Add all batches to their respective ranges IN PARALLEL + let mut join_set = JoinSet::<()>::new(); + for (participant_range, transactions) in tmp_group_per_participant.iter() { + let participant_range = participant_range.clone(); + let transactions = transactions.clone(); + + join_set.spawn(async move { + let state = state_clone.read().await; + let mut group = state.group_per_participant + .get(&participant_range) + .unwrap() + .write().await; + group.extend(transactions.iter().cloned()); // Add to range's batch + }); + } + while let Some(_) = join_set.join_next().await {} +} +``` + +**Example**: If we have 5 transactions ready to commit: +- **Transaction 1**: writes to Range A, Range B +- **Transaction 2**: writes to Range B, Range C +- **Transaction 3**: writes to Range A +- **Transaction 4**: writes to Range C +- **Transaction 5**: writes to Range A, Range C + +The batching creates: +- **Range A batch**: [Transaction 1, Transaction 3, Transaction 5] +- **Range B batch**: [Transaction 1, Transaction 2] +- **Range C batch**: [Transaction 2, Transaction 4, Transaction 5] + +### Step 4: Parallel Batch Execution + +**File**: `resolver/src/core/group_commit.rs:227-325` + +Now the real pipeline magic happens - **all ranges commit their batches simultaneously**: + +```rust +pub async fn commit(&self) -> Result, Error> { + let mut commit_join_set = JoinSet::>::new(); + + // For each range with pending transactions + for participant_range in non_empty_groups.iter() { + let participant_range_clone = participant_range.clone(); + let tx_state_store_clone = self.tx_state_store.clone(); + let range_client = self.range_client.clone(); + + // Spawn parallel task for each range + commit_join_set.spawn(async move { + // 1. Get all transactions for this range + let mut group_clone = group_guard_clone.write().await; + let transactions = std::mem::take(&mut *group_clone); // Extract batch + drop(group_clone); + + if transactions.is_empty() { + return Ok(()); + } + + let tx_ids_vec = transactions.iter().map(|tx| tx.id).collect(); + + // 2. Batch commit to transaction state store + tx_state_store_clone.try_batch_commit_transactions(&tx_ids_vec, 0).await?; + + // 3. Notify range to apply all changes at once + range_client.commit_transactions(tx_ids_vec, &participant_range_clone, 0).await?; + + Ok(()) + }); + } + + // Wait for all ranges to complete their batches + while let Some(res) = commit_join_set.join_next().await {} +} +``` + +**What Actually Happens**: +``` +Time → +Range A: Batch commits [Tx1, Tx3, Tx5] ←─┐ +Range B: Batch commits [Tx1, Tx2] ←─┼─ All happening in parallel! +Range C: Batch commits [Tx2, Tx4, Tx5] ←─┘ +``` + +## Pipeline Flow Example + +Let's trace through a concrete example with 3 transactions: + +### Initial State +``` +Transaction A: WRITE key1="hello" (no dependencies, ready immediately) +Transaction B: WRITE key2="world" (no dependencies, ready immediately) +Transaction C: WRITE key3="!" (depends on A, must wait) +``` + +### Pipeline Execution Timeline + +**T=0: All transactions start committing** +- **Transaction A**: Takes fast path (no dependencies) +- **Transaction B**: Takes fast path (no dependencies) +- **Transaction C**: Goes to resolver, waits for A + +**T=1: Fast path transactions execute in parallel** +```rust +// A and B execute simultaneously +Transaction A: tx_state_store.try_commit_transaction(A.id, 0).await +Transaction B: tx_state_store.try_commit_transaction(B.id, 0).await + +// Both notify their ranges in parallel +Transaction A: range_client.commit_transactions([A.id], range_1, 0).await +Transaction B: range_client.commit_transactions([B.id], range_2, 0).await +``` + +**T=2: Transaction A finishes, wakes up C** +```rust +// A registers as committed +resolver.register_committed_transactions(vec![A.id]).await; + +// C becomes ready (A was its only dependency) +new_ready_to_commit = vec![C]; +resolver.group_commit.add_transactions(&new_ready_to_commit).await; +``` + +**T=3: Pipeline processes C** +```rust +// C gets added to group commit batch (might batch with other newly ready transactions) +resolver.trigger_commit(resolver_clone, vec![C]).await; +``` + +### The "Pipeline" Visualization + +``` +Timeline → + +Fast Path (A,B): [Prepare] → [Commit] → [Done] + [Prepare] → [Commit] → [Done] + +Pipeline Path (C): [Wait] → [Batch] → [Commit] → [Done] + +Dependency Flow: A finishes → Wakes C → C batches with others → C commits +``` + +## Key Pipeline Optimizations in Code + +### 1. Early Lock Release + +**File**: `coordinator/src/transaction.rs:422-430` + +```rust +if any_early_lock_releases { + info!("At least one early lock release happened, registering transaction as committed in the resolver"); + // Spawn async and don't wait for it to complete. + let resolver = self.resolver.clone(); + let tx_id = self.id; + self.runtime.spawn(async move { + let _ = resolver.register_committed_transactions(vec![tx_id]).await; + }); +} +``` + +**What This Means**: Range servers can release locks **before** getting final confirmation, improving concurrency. The resolver is notified asynchronously to maintain dependency tracking. + +### 2. Batch State Store Operations + +**File**: `resolver/src/core/group_commit.rs:285-287` + +```rust +// Instead of individual commits: +// tx_state_store.try_commit_transaction(tx1.id, 0).await; +// tx_state_store.try_commit_transaction(tx2.id, 0).await; +// tx_state_store.try_commit_transaction(tx3.id, 0).await; + +// Batch multiple transactions together: +tx_state_store_clone.try_batch_commit_transactions(&tx_ids_vec, 0).await?; +``` + +**Performance Gain**: One database round-trip instead of N round-trips for N transactions. + +### 3. Parallel Range Processing + +**File**: `resolver/src/core/group_commit.rs:190-213` + +```rust +// Add transactions to ALL ranges simultaneously +let mut join_set = JoinSet::<()>::new(); +for (participant_range, transactions) in tmp_group_per_participant.iter() { + join_set.spawn(async move { + // Each range processes its batch concurrently + let mut group = state.group_per_participant.get(&participant_range).unwrap().write().await; + group.extend(transactions.iter().cloned()); + }); +} +while let Some(_) = join_set.join_next().await {} +``` + +**Concurrency Gain**: Lock acquisition for different ranges happens in parallel instead of sequentially. + +### 4. Dependency-Based Smart Routing + +The routing decision happens **before** any expensive operations: + +```rust +if !self.dependencies.is_empty() { + // Route to pipeline (will batch with others) +} else { + // Route to fast path (immediate execution) +} +``` + +**Efficiency**: Independent transactions skip the resolver entirely, while dependent transactions get proper ordering through batching. + +## Why Pipelined 2PC Is Faster + +### Traditional 2PC Bottlenecks +1. **Serial Processing**: One transaction at a time +2. **Individual State Store Writes**: One round-trip per transaction +3. **Range Notification Delays**: Each transaction notifies ranges separately + +### Pipeline 2PC Solutions +1. **Parallel Processing**: Independent transactions execute simultaneously +2. **Batch State Store Writes**: Multiple transactions per round-trip +3. **Batch Range Notifications**: Multiple transactions notify each range together +4. **Smart Routing**: Bypass pipeline for independent transactions + +### Performance Numbers (Conceptual) +``` +Traditional: 3 transactions × 2 phases × 10ms = 60ms total +Pipelined: 3 transactions ÷ 2 batch size × 10ms = 15ms total +``` + +The pipeline keeps the system busy with multiple transactions in different phases instead of idle time between individual transaction commits. + +This is how Sangria achieves high throughput while maintaining the correctness guarantees of 2PC! \ No newline at end of file diff --git a/codebase-understanding/04-test-architecture.md b/codebase-understanding/04-test-architecture.md new file mode 100644 index 0000000..4e49247 --- /dev/null +++ b/codebase-understanding/04-test-architecture.md @@ -0,0 +1,372 @@ +# Test Architecture and Testing Infrastructure + +## Overview + +Sangria employs a sophisticated testing infrastructure that simulates a complete distributed database system within single-process test environments. The testing architecture combines comprehensive mocking, real storage integration, and protocol-faithful message passing to validate the 2PC implementation and transaction semantics. + +## Test Organization Structure + +### Integration Tests +- **Frontend**: `frontend/tests/integration_tests.rs` - End-to-end transaction flow testing +- **RangeClient**: `rangeclient/tests/integration_tests.rs` - 2PC protocol and storage integration +- **Universe**: `universe/tests/integration_tests.rs` - Keyspace management testing + +### Unit Tests +- Embedded within component modules using `#[cfg(test)]` blocks +- Found in: `rangeserver/src/storage/cassandra.rs`, `rangeserver/src/range_manager/impl.rs`, etc. +- Use Tokio's async test framework (`#[tokio::test]`) + +### Test Frameworks +- **Primary**: Tokio async test framework +- **Utilities**: `test-case = "3"` for parameterized testing +- **Runtime**: Custom `tokio::runtime::Runtime` instances for test isolation + +## Mock Infrastructure + +### Frontend Mock Components (`frontend/src/for_testing/`) + +#### MockEpochPublisher (`mock_epoch_publisher.rs`) +```rust +pub struct MockEpochPublisher { + cancellation_token: CancellationToken, + current_epoch: Arc, + network: UdpFastNetwork, +} +``` + +**Features:** +- Simulates epoch management system for distributed versioning +- Listens for `ReadEpochRequest` messages via UDP Fast Network +- Maintains atomic epoch counter (starts at 1) +- FlatBuffer serialization/deserialization for protocol fidelity +- Graceful shutdown via cancellation tokens + +**Key Operations:** +```rust +// Epoch request handling +let epoch_request = ReadEpochRequest::follow(bb, None); +let current_epoch = self.current_epoch.load(Ordering::SeqCst); +let response = ReadEpochResponse::create(&mut builder, &ReadEpochResponseArgs { + epoch: current_epoch, +}); +``` + +#### MockRangeServer (`mock_rangeserver.rs`) +```rust +pub struct MockRangeServer { + data: Arc>>, + pending_prepare_records: Arc>>, + epoch_reader: Arc, + network: UdpFastNetwork, +} +``` + +**Capabilities:** +- **2PC Operations**: Complete prepare, commit, abort protocol implementation +- **Data Operations**: CRUD operations with read-your-writes semantics +- **State Management**: In-memory key-value store with pending transaction records +- **Epoch Integration**: Uses EpochReader for epoch lease management +- **Thread Safety**: RwLock for data, Mutex for pending records + +**2PC Protocol Implementation:** +```rust +// Prepare phase handling +RangeServerRequest::PrepareTransactionReq(req) => { + // Validate epoch lease + // Store prepare records + // Return dependencies and epoch information +} + +// Commit phase handling +RangeServerRequest::CommitTransactionsReq(req) => { + // Apply prepared writes to storage + // Clean up pending records + // Update dependencies +} +``` + +#### MockUniverse (`mock_universe.rs`) +- **Purpose**: Simulates keyspace management service +- **Features**: gRPC Universe service implementation, keyspace CRUD operations +- **Storage**: In-memory keyspace registry with automatic range ID generation + +### RangeServer Mock Components (`rangeserver/src/for_testing/`) + +#### MockWarden (`mock_warden.rs`) +- **Purpose**: Simulates cluster coordination and range assignment +- **Features**: Range server registration, assignment management, gRPC streaming + +#### EpochSupplier (`epoch_supplier.rs`) +- **Purpose**: Controllable epoch progression for testing +- **Features**: Manual epoch setting, waiter pattern for epoch advancement + +## Integration Test Implementation + +### Frontend Integration Tests (`frontend/tests/integration_tests.rs`) + +#### Full End-to-End Transaction Flow +```rust +#[tokio::test] +async fn test_frontend() { + // 1. Setup comprehensive test configuration + let config = Config { + frontend_proto_address: frontend_address, + rangeserver_proto_address: mock_rangeserver_address, + // ... all service addresses + }; + + // 2. Start mock services + let epoch_publisher = MockEpochPublisher::start(config.clone()).await?; + let range_server = MockRangeServer::start(config.clone()).await?; + let universe = MockUniverse::start(config.clone()).await?; + let frontend = FrontendServer::start(config.clone()).await?; + + // 3. Execute transaction workflow + let frontend_client = FrontendClient::new(config.clone()).await?; + + // Create keyspace + let keyspace_id = frontend_client.create_keyspace(...).await?; + + // Start transaction + let transaction = frontend_client.start_transaction(keyspace_id).await?; + + // Write operation + transaction.put(key, value).await?; + + // Read-your-writes validation + let read_value = transaction.get(key).await?; + assert_eq!(read_value, Some(value)); + + // Commit transaction (triggers 2PC) + transaction.commit().await?; + + // Verify persistence in new transaction + let new_transaction = frontend_client.start_transaction(keyspace_id).await?; + let committed_value = new_transaction.get(key).await?; + assert_eq!(committed_value, Some(value)); +} +``` + +**Test Coverage:** +- Keyspace creation and management +- Transaction lifecycle (start, operations, commit) +- Read-your-writes semantics +- 2PC protocol execution +- Cross-transaction persistence +- Delete operations and abort handling + +### RangeClient Integration Tests (`rangeclient/tests/integration_tests.rs`) + +#### 2PC Protocol Testing +```rust +#[tokio::test] +async fn read_modify_write() { + // Setup with real Cassandra storage + let cassandra_session = create_test_session().await; + let range_server = RangeServer::start_with_storage(cassandra_session).await; + + // Create transaction info + let transaction_info = TransactionInfo::new(); + + // Read operation (establishes dependencies) + let get_result = range_client.get( + transaction_info.clone(), + &range_id, + vec![key.clone()] + ).await?; + + // Write operation (prepare phase) + let prepare_result = range_client.prepare_transaction( + transaction_info.clone(), + &range_id, + true, // has_reads + &[Record { key: key.clone(), val: value.clone() }], + &[], // no deletes + 0.0, // resolver_load + 1 // num_clients + ).await?; + + // Commit phase + range_client.commit_transactions( + vec![transaction_info.id], + &range_id, + 0 // epoch + ).await?; + + // Verify persistence + let final_result = range_client.get( + new_transaction_info, + &range_id, + vec![key.clone()] + ).await?; + assert_eq!(final_result.vals[0], Some(value)); +} +``` + +**Test Scenarios:** +- **Unknown Range**: Error handling for non-existent ranges +- **Read Initial**: Reading from empty ranges +- **No-Write Commits**: Read-only transaction commits +- **Prefetching**: Read optimization testing + +### Universe Integration Tests (`universe/tests/integration_tests.rs`) + +#### Keyspace Management Testing +```rust +#[tokio::test] +async fn test_create_and_list_keyspace_handlers() { + // Real Cassandra integration + let cassandra_session = create_session().await; + let universe_service = UniverseService::new(cassandra_session); + + // Test keyspace creation + let keyspace_req = CreateKeyspaceRequest { + namespace: "test_namespace".to_string(), + name: "test_keyspace".to_string(), + base_key_ranges: create_base_ranges(), + }; + + let keyspace_id = universe_service.create_keyspace(keyspace_req).await?; + + // Test keyspace listing + let keyspaces = universe_service.list_keyspaces().await?; + assert!(keyspaces.iter().any(|ks| ks.id == keyspace_id)); + + // Test keyspace lookup + let found_keyspace = universe_service.get_keyspace_by_id(keyspace_id).await?; + assert_eq!(found_keyspace.namespace, "test_namespace"); +} +``` + +## Test Configuration and Setup + +### Configuration Patterns +```rust +let config = Config { + // Service endpoints + frontend_proto_address: frontend_address, + rangeserver_proto_address: mock_rangeserver_address, + universe_proto_address: mock_universe_address, + + // Network configuration + frontend_fast_network_address: frontend_fast_address, + rangeserver_fast_network_address: mock_rangeserver_fast_address, + + // Resource allocation + frontend_fast_network_dedicated_core: Some(2), + rangeserver_fast_network_dedicated_core: Some(3), + + // Timing configuration + transaction_timeout: Duration::from_secs(30), + operation_timeout: Duration::from_secs(10), + + // Storage configuration + cassandra_endpoints: vec!["127.0.0.1:9042".to_string()], + cassandra_keyspace: "test_keyspace".to_string(), +}; +``` + +### Dependencies and Setup +- **Cassandra**: Required for storage-layer integration tests +- **Port Management**: Dynamic port allocation to avoid conflicts +- **Runtime Isolation**: Separate Tokio runtimes for component isolation +- **Network Setup**: UDP/TCP sockets with proper addressing + +## Testing Patterns + +### Distributed System Simulation +```rust +// Multi-component setup pattern +async fn setup_distributed_system() -> TestEnvironment { + // Start services in dependency order + let epoch_publisher = MockEpochPublisher::start(config.clone()).await?; + let universe = MockUniverse::start(config.clone()).await?; + let range_server = MockRangeServer::start(config.clone()).await?; + let frontend = FrontendServer::start(config.clone()).await?; + + // Wait for service readiness + wait_for_service_ready(&frontend_address).await?; + + TestEnvironment { + epoch_publisher, + universe, + range_server, + frontend, + config, + } +} +``` + +### Correctness Verification +```rust +// State assertion patterns +async fn verify_transaction_state(range_server: &MockRangeServer, expected_state: &TransactionState) { + let data = range_server.data.read().await; + let pending = range_server.pending_prepare_records.lock().await; + + // Verify committed data + for (key, expected_value) in &expected_state.committed_data { + assert_eq!(data.get(key), Some(expected_value)); + } + + // Verify no pending records for committed transactions + for tx_id in &expected_state.committed_transactions { + assert!(!pending.values().any(|(_, id, _)| id == tx_id)); + } +} +``` + +### Timing and Concurrency +```rust +// Proper async coordination +async fn coordinate_distributed_operation() { + let (tx, rx) = oneshot::channel(); + + // Background task for service + tokio::spawn(async move { + let result = perform_distributed_operation().await; + tx.send(result).unwrap(); + }); + + // Wait with timeout + let result = tokio::time::timeout( + Duration::from_secs(10), + rx + ).await??; + + verify_operation_result(result); +} +``` + +## Test Coverage Analysis + +### 2PC Protocol Coverage +- **Prepare Phase**: Transaction preparation with epoch validation +- **Commit Phase**: Group commit execution and persistence +- **Abort Phase**: Transaction rollback and cleanup +- **Dependency Handling**: Cross-transaction ordering + +### Component Coverage +- **Frontend**: Complete transaction coordinator testing +- **RangeClient**: 2PC participant protocol testing +- **Universe**: Keyspace management testing +- **Storage**: Cassandra integration testing + +### Gaps and Limitations +- **Resolver Testing**: Limited dedicated resolver integration tests +- **Failure Injection**: No systematic failure scenario testing +- **Performance Testing**: No load or stress test infrastructure +- **Multi-Node Testing**: Single-process simulation only +- **Network Partition**: Limited network failure simulation + +## Key Testing Infrastructure Benefits + +### Strengths +1. **Protocol Fidelity**: Tests use actual message formats and network protocols +2. **Real Storage Integration**: Cassandra integration provides realistic storage testing +3. **Comprehensive Mocking**: Well-designed mock infrastructure simulates distributed components +4. **Isolation**: Proper test isolation with separate runtimes and clean state +5. **Async Correctness**: Proper async/await patterns with cancellation and timeouts + +### Architecture Quality +The Sangria testing infrastructure demonstrates sophisticated distributed systems testing practices with proper abstractions for network simulation, comprehensive component mocking, and integration with real storage systems. While coverage could be enhanced in areas like failure injection and resolver testing, the existing infrastructure effectively validates core 2PC protocol implementation and transaction semantics. \ No newline at end of file diff --git a/codebase-understanding/README.md b/codebase-understanding/README.md new file mode 100644 index 0000000..837076b --- /dev/null +++ b/codebase-understanding/README.md @@ -0,0 +1,83 @@ +# Sangria Codebase Understanding + +This directory contains comprehensive documentation of the Sangria distributed database system, focusing on the core working parts including resolver code, 2PC mechanisms, pipeline implementations, and test architecture. + +## Documentation Structure + +### [01 - Codebase Overview](./01-codebase-overview.md) +- **Project Structure**: Main modules and their responsibilities +- **Architecture Overview**: High-level system design and components +- **Key Entry Points**: Server main functions and client libraries +- **Testing Infrastructure**: Overview of test organization + +### [02 - Resolver and 2PC Mechanisms](./02-resolver-and-2pc-mechanisms.md) +- **Resolver Core Implementation**: Transaction dependency tracking and resolution +- **2PC Implementation**: Two-phase commit protocol in the coordinator +- **Dependency Resolution Flow**: How transactions wait for dependencies +- **Transaction Operations**: Read, write, delete operations and their semantics +- **Error Handling**: Abort scenarios and recovery mechanisms +- **Key Optimizations**: Group commit, early lock release, adaptive strategies + +### [03 - Pipeline 2PC Implementation](./03-pipeline-2pc-implementation.md) +- **Group Commit Architecture**: Batching optimization for transaction throughput +- **Pipeline Batching Mechanism**: How transactions are grouped and committed +- **Commit Strategy Implementation**: Traditional vs Adaptive/Pipelined strategies +- **Pipeline Optimizations**: Early lock release, dependency routing, batch operations +- **Concurrency Management**: Fine-grained locking and parallel processing +- **Performance Benefits**: Throughput improvements and latency considerations + +### [04 - Test Architecture](./04-test-architecture.md) +- **Test Organization**: Integration and unit test structure +- **Mock Infrastructure**: Comprehensive mocking of distributed components +- **Integration Test Implementation**: End-to-end transaction flow testing +- **Test Configuration**: Setup patterns and dependency management +- **Testing Patterns**: Distributed system simulation and correctness verification +- **Coverage Analysis**: What's tested and what gaps exist + +## Quick Reference + +### Core Components +- **Frontend** (`frontend/`): Client-facing transaction coordinator +- **Coordinator** (`coordinator/`): 2PC transaction coordination logic +- **Resolver** (`resolver/`): Dependency resolution and group commit +- **RangeServer** (`rangeserver/`): Data storage and range management +- **Universe** (`universe/`): Cluster membership and keyspace management + +### Key Files +- `coordinator/src/transaction.rs` - Main 2PC implementation +- `resolver/src/core/resolver.rs` - Dependency resolution logic +- `resolver/src/core/group_commit.rs` - Pipeline batching mechanism +- `frontend/tests/integration_tests.rs` - End-to-end testing +- `frontend/src/for_testing/` - Mock infrastructure + +### Transaction Flow +1. **Start Transaction**: Frontend creates transaction context +2. **Operations**: Read/write operations collect dependencies +3. **Prepare Phase**: Parallel prepare requests to all participant ranges +4. **Dependency Resolution**: Resolver orders transactions by dependencies +5. **Commit Phase**: Group commit executes batched transactions +6. **Completion**: Participants apply changes and release locks + +### Commit Strategies +- **Traditional**: Standard 2PC with state store first, then participants +- **Adaptive**: Routes based on dependencies (direct path vs resolver) +- **Pipelined**: Optimized with batching and early lock release + +### Testing Approach +- **Comprehensive Mocking**: Full distributed system simulation +- **Real Storage**: Cassandra integration for storage layer testing +- **Protocol Fidelity**: Actual message formats and network protocols +- **Async Correctness**: Proper concurrency and cancellation patterns + +## System Architecture Summary + +Sangria implements an adaptive distributed transaction system with: + +1. **Multi-modal 2PC**: Traditional, pipelined, and adaptive commit strategies +2. **Dependency-based Ordering**: Resolver tracks transaction dependencies for serializable execution +3. **Range-based Sharding**: Data partitioned across ranges managed by range servers +4. **Group Commit Optimization**: Batches commits for better throughput +5. **Epoch-based Versioning**: Consistent snapshots across the distributed system +6. **Cassandra Backend**: Persistent storage for transaction state and data + +The system is designed for high-performance distributed transactions with adaptive optimization based on workload characteristics and system load. \ No newline at end of file diff --git a/workload-generator/scripts/run_experiments.py b/workload-generator/scripts/run_experiments.py index e0c07cd..7284d48 100644 --- a/workload-generator/scripts/run_experiments.py +++ b/workload-generator/scripts/run_experiments.py @@ -353,11 +353,11 @@ def main(): if BUILD_ATOMIX: atomix_setup.build_servers() - # tradeoff_contention_vs_resolver_capacity_experiment(ray_logs_dir) + tradeoff_contention_vs_resolver_capacity_experiment(ray_logs_dir) # runtime_variations_contention_experiment(ray_logs_dir) # runtime_variations_resolver_capacity_experiment(ray_logs_dir) # mixed_workload_experiment(ray_logs_dir) - ycsb_experiment(ray_logs_dir) + #ycsb_experiment(ray_logs_dir) ray.shutdown()