From 36afb6c75a90eb1a3185defadf7458d9e2a683f2 Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Thu, 17 Jul 2025 18:56:00 +1000 Subject: [PATCH 1/7] `PartialEq` and `Eq` for `RelaxedAtomic` --- src/relaxed_atomic.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/relaxed_atomic.rs b/src/relaxed_atomic.rs index 59011b368..ac0aed300 100644 --- a/src/relaxed_atomic.rs +++ b/src/relaxed_atomic.rs @@ -59,3 +59,11 @@ impl From for RelaxedAtomic { Self::new(value) } } + +impl PartialEq for RelaxedAtomic { + fn eq(&self, other: &Self) -> bool { + self.get() == other.get() + } +} + +impl Eq for RelaxedAtomic {} From 3a02af0d171821917612e20705d80ee8bbad6623 Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Mon, 23 Jun 2025 01:26:31 +1000 Subject: [PATCH 2/7] Replace the linked list ordering for `Rav1dTask` with a sorted `Vec` This is both simpler to follow and also seems to be a bit faster. --- src/internal.rs | 94 ++++++++++----- src/thread_task.rs | 281 +++++++++++---------------------------------- 2 files changed, 131 insertions(+), 244 deletions(-) diff --git a/src/internal.rs b/src/internal.rs index 09b6fde97..b2f123906 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -1,11 +1,11 @@ use std::ffi::{c_int, c_uint}; -use std::mem; use std::ops::{Deref, Range}; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32}; use std::sync::{Arc, OnceLock}; use std::thread::JoinHandle; +use std::{cmp, mem}; -use atomig::{Atom, Atomic}; +use atomig::Atom; use libc::ptrdiff_t; use parking_lot::{Condvar, Mutex, RwLock, RwLockReadGuard}; use strum::FromRepr; @@ -50,7 +50,7 @@ use crate::recon::{ }; use crate::refmvs::{Rav1dRefmvsDSPContext, RefMvsFrame, RefMvsTemporalBlock, RefmvsTile}; use crate::relaxed_atomic::RelaxedAtomic; -use crate::thread_task::{Rav1dTaskIndex, Rav1dTasks}; +use crate::thread_task::Rav1dTasks; #[derive(Default)] pub struct Rav1dDSPContext { @@ -391,7 +391,7 @@ unsafe impl Send for Rav1dContext {} // See discussion in https://github.com/memorysafety/rav1d/pull/1329 unsafe impl Sync for Rav1dContext {} -#[derive(Default)] +#[derive(Default, Clone, PartialEq, Eq)] #[repr(C)] pub struct Rav1dTask { // frame thread id @@ -406,8 +406,6 @@ pub struct Rav1dTask { pub recon_progress: c_int, pub deblock_progress: c_int, pub deps_skip: RelaxedAtomic, - // only used in task queue - pub next: Atomic, } impl Rav1dTask { @@ -421,28 +419,69 @@ impl Rav1dTask { ..Default::default() } } +} - pub fn next(&self) -> Rav1dTaskIndex { - self.next.load(Ordering::SeqCst) - } - - pub fn set_next(&self, next: Rav1dTaskIndex) { - self.next.store(next, Ordering::SeqCst) +impl PartialOrd for Rav1dTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } -impl Rav1dTask { - pub fn without_next(&self) -> Self { - Self { - frame_idx: self.frame_idx, - tile_idx: self.tile_idx, - type_0: self.type_0, - sby: self.sby, - recon_progress: self.recon_progress, - deblock_progress: self.deblock_progress, - deps_skip: self.deps_skip.clone(), - next: Default::default(), +impl Ord for Rav1dTask { + /// We want `sort` to put tasks in priority order. + /// To do that, we return: + /// - `Less` if `self` is of higher priority than `other` + /// - `Greater` if `self` is of lower priority than `other` + /// + /// This is farily straightforwardly translated from `insert_tasks` in `task_thread.c`, + /// and as such inherits the limitations of that function. Specifically, + /// it requires that there are no Init, InitCdf or film grain tasks, + /// and that there are no duplicate tasks. + fn cmp(&self, other: &Self) -> cmp::Ordering { + // entropy coding precedes other steps + if other.type_0 == TaskType::TileEntropy { + if self.type_0 > TaskType::TileEntropy { + return cmp::Ordering::Greater; + } + // both are entropy + if self.sby > other.sby { + return cmp::Ordering::Greater; + } + if self.sby < other.sby { + return cmp::Ordering::Less; + } + // same sby + } else { + if self.type_0 == TaskType::TileEntropy { + return cmp::Ordering::Less; + } + if self.sby > other.sby { + return cmp::Ordering::Greater; + } + if self.sby < other.sby { + return cmp::Ordering::Less; + } + // same sby + if self.type_0 > other.type_0 { + return cmp::Ordering::Greater; + } + if self.type_0 < other.type_0 { + return cmp::Ordering::Less; + } + // same task type + } + + // sort by tile-id + assert!( + self.type_0 == TaskType::TileReconstruction || self.type_0 == TaskType::TileEntropy + ); + assert!(self.type_0 == other.type_0); + assert!(other.sby == self.sby); + assert!(self.tile_idx != other.tile_idx); + if self.tile_idx < other.tile_idx { + return cmp::Ordering::Less; } + return cmp::Ordering::Greater; } } @@ -664,13 +703,6 @@ pub struct Rav1dFrameContextLf { pub restore_planes: LrRestorePlanes, } -#[derive(Default)] -#[repr(C)] -pub struct Rav1dFrameContextTaskThreadPendingTasks { - pub head: Rav1dTaskIndex, - pub tail: Rav1dTaskIndex, -} - #[derive(Default)] #[repr(C)] pub(crate) struct Rav1dFrameContextTaskThread { diff --git a/src/thread_task.rs b/src/thread_task.rs index 9caf374d5..0ac623f81 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -1,12 +1,10 @@ use std::ffi::{c_int, c_uint}; -use std::num::NonZeroU32; -use std::ops::{Add, AddAssign, Deref}; +use std::ops::Deref; use std::process::abort; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use std::{cmp, mem, thread}; -use atomig::{Atom, Atomic}; use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::cdf::rav1d_cdf_thread_update; @@ -30,7 +28,6 @@ use crate::internal::{ TaskType, }; use crate::iter::wrapping_iter; -use crate::relaxed_atomic::RelaxedAtomic; pub const FRAME_ERROR: u32 = u32::MAX - 1; pub const TILE_ERROR: i32 = i32::MAX - 1; @@ -51,7 +48,7 @@ pub const TILE_ERROR: i32 = i32::MAX - 1; fn reset_task_cur(c: &Rav1dContext, ttd: &TaskThreadData, mut frame_idx: c_uint) -> c_int { fn curr_found(c: &Rav1dContext, ttd: &TaskThreadData, first: usize) -> c_int { for fc in wrapping_iter(c.fc.iter(), first + ttd.cur.get() as usize) { - fc.task_thread.tasks.cur_prev.set(Rav1dTaskIndex::NONE); + fc.task_thread.tasks.looked.store(false, Ordering::Relaxed); } return 1; } @@ -69,9 +66,9 @@ fn reset_task_cur(c: &Rav1dContext, ttd: &TaskThreadData, mut frame_idx: c_uint) && c.fc[first as usize] .task_thread .tasks - .cur_prev - .get() - .is_none() + .looked + .load(Ordering::Relaxed) + == false { return 0 as c_int; } @@ -101,9 +98,8 @@ fn reset_task_cur(c: &Rav1dContext, ttd: &TaskThreadData, mut frame_idx: c_uint) if c.fc[((first + ttd.cur.get()) as usize) % c.fc.len()] .task_thread .tasks - .head - .load(Ordering::SeqCst) - .is_some() + .is_empty() + == false { break; } @@ -143,152 +139,49 @@ pub struct Rav1dTasks { pending_tasks: Mutex>, pending_tasks_merge: AtomicBool, - pub head: Atomic, - // Points to the task directly before the cur pointer in the queue. - // This cur pointer is theoretical here, we actually keep track of the - // "prev_t" variable. This is needed to not loose the tasks in - // [head;cur-1] when picking one for execution. - pub cur_prev: RelaxedAtomic, + // Have we looked at any tasks in this set of tasks? + pub looked: AtomicBool, } impl Rav1dTasks { - fn insert_tasks_between( - &self, - c: &Rav1dContext, - first: Rav1dTaskIndex, - last: Rav1dTaskIndex, - a: Rav1dTaskIndex, - b: Rav1dTaskIndex, - cond_signal: c_int, - ) { + fn insert_tasks(&self, c: &Rav1dContext, frame_idx: c_uint, cond_signal: c_int) { let ttd = &*c.task_thread; if c.flush.load(Ordering::SeqCst) { return; } - if a.is_some() { - assert_eq!(self.index(a).next(), b); - self.index(a).set_next(first); - } else { - self.head.store(first, Ordering::SeqCst); - } - self.index(last).set_next(b); - reset_task_cur(c, ttd, self.index(first).frame_idx); + reset_task_cur(c, ttd, frame_idx); if cond_signal != 0 && ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } } - fn insert_tasks( - &self, - c: &Rav1dContext, - first: Rav1dTaskIndex, - last: Rav1dTaskIndex, - cond_signal: c_int, - ) { - // insert task back into task queue - let mut prev_t = Rav1dTaskIndex::NONE; - let mut t = self.head.load(Ordering::SeqCst); - while t.is_some() { - 'next: { - // entropy coding precedes other steps - if self.index(t).type_0 == TaskType::TileEntropy { - if self.index(first).type_0 > TaskType::TileEntropy { - break 'next; - } - // both are entropy - if self.index(first).sby > self.index(t).sby { - break 'next; - } - if self.index(first).sby < self.index(t).sby { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same sby - } else { - if self.index(first).type_0 == TaskType::TileEntropy { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - if self.index(first).sby > self.index(t).sby { - break 'next; - } - if self.index(first).sby < self.index(t).sby { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same sby - if self.index(first).type_0 > self.index(t).type_0 { - break 'next; - } - if (self.index(first).type_0) < self.index(t).type_0 { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same task type - } - - // sort by tile-id - assert!( - self.index(first).type_0 == TaskType::TileReconstruction - || self.index(first).type_0 == TaskType::TileEntropy - ); - assert!(self.index(first).type_0 == self.index(t).type_0); - assert!(self.index(t).sby == self.index(first).sby); - let t_tile_idx = self.index(first).tile_idx; - let p_tile_idx = self.index(t).tile_idx; - assert!(t_tile_idx != p_tile_idx); - if !(t_tile_idx > p_tile_idx) { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - } - // next: - prev_t = t; - t = self.index(t).next(); + fn push(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) { + let frame_idx = task.frame_idx; + { + let mut tasks = self.tasks.try_write().unwrap(); + let idx = match tasks.binary_search(&task) { + Ok(_) => panic!("Tried to add an identical task!"), + Err(idx) => idx, + }; + tasks.insert(idx, task); } - self.insert_tasks_between(c, first, last, prev_t, Rav1dTaskIndex::NONE, cond_signal); - } - - fn push(&self, task: Rav1dTask) -> Rav1dTaskIndex { - let mut tasks = self.tasks.try_write().unwrap(); - tasks.push(task); - // 1-based index into tasks, so we use length after pushing - Rav1dTaskIndex(NonZeroU32::new(tasks.len() as u32)) + self.insert_tasks(c, frame_idx, cond_signal); } pub fn clear(&self) { self.tasks.try_write().unwrap().clear(); self.pending_tasks.try_lock().unwrap().clear(); self.pending_tasks_merge.store(false, Ordering::SeqCst); - self.head.store(Default::default(), Ordering::Relaxed); - self.cur_prev.set(Default::default()); + self.looked.store(false, Ordering::Relaxed); } - pub fn remove(&self, t: Rav1dTaskIndex, prev_t: Rav1dTaskIndex) -> Option { - let next_t = self.index(t).next(); - if prev_t.is_some() { - self.index(prev_t) - .next - .compare_exchange(t, next_t, Ordering::SeqCst, Ordering::SeqCst) - .ok()?; - } else { - self.head - .compare_exchange(t, next_t, Ordering::SeqCst, Ordering::SeqCst) - .ok()?; - } - self.index(t).set_next(Rav1dTaskIndex::NONE); - Some(self.index(t).without_next()) + pub fn remove(&self, t: usize) -> Rav1dTask { + self.tasks.try_write().unwrap().remove(t) } #[inline] - fn index<'a>(&'a self, index: Rav1dTaskIndex) -> impl Deref + 'a { - if let Some(index) = index.raw_index() { - RwLockReadGuard::map(self.tasks.try_read().unwrap(), |tasks| { - &tasks[index as usize] - }) - } else { - panic!("Cannot index with None"); - } + fn index<'a>(&'a self, index: usize) -> impl Deref + 'a { + RwLockReadGuard::map(self.tasks.try_read().unwrap(), |tasks| &tasks[index]) } #[inline] @@ -301,68 +194,43 @@ impl Rav1dTasks { fn merge_pending_frame(&self, c: &Rav1dContext) -> bool { let merge = self.pending_tasks_merge.swap(false, Ordering::SeqCst); if merge { - let mut pending_tasks = self.pending_tasks.lock(); - let range = { + let new_start = self.len(); + { + let mut pending_tasks = self.pending_tasks.lock(); let mut tasks = self.tasks.try_write().unwrap(); - if self.head.load(Ordering::Relaxed).is_none() { - tasks.clear(); - } - let start = tasks.len() as u32; tasks.extend(pending_tasks.drain(..)); - start..tasks.len() as u32 - }; - - for i in range { - // 1-based index, so we have to add 1 - let task_idx = Rav1dTaskIndex(NonZeroU32::new(i + 1)); - self.insert_tasks(c, task_idx, task_idx, 0); + } + { + // take a read lock because reset_task_cur takes a read lock also + let tasks = self.tasks.try_read().unwrap(); + for t in tasks[new_start..].iter() { + self.insert_tasks(c, t.frame_idx, 0); + } + } + { + // take a write lock to sort + let mut tasks = self.tasks.try_write().unwrap(); + tasks.sort(); } } - merge - } -} - -impl Rav1dFrameContextTaskThread { - fn insert_task(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) -> Rav1dTaskIndex { - let idx = self.tasks.push(task); - self.tasks.insert_tasks(c, idx, idx, cond_signal); - idx - } -} -/// 1-based index into the task queue vector. 0 is reserved for None. -#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Debug, Atom)] -#[repr(transparent)] -pub struct Rav1dTaskIndex(Option); - -impl Rav1dTaskIndex { - pub const NONE: Self = Self(None); - - // Return the zero-based index into the task queue vector or `None` - pub fn raw_index(self) -> Option { - self.0.map(|i| i.get() - 1) - } - - pub fn is_none(self) -> bool { - self.0.is_none() + merge } - pub fn is_some(self) -> bool { - self.0.is_some() + /// How many (non-pending) tasks do we have in the task list? + pub fn len(&self) -> usize { + self.tasks.try_read().unwrap().len() } -} -impl Add for Rav1dTaskIndex { - type Output = Self; - - fn add(self, rhs: u32) -> Self::Output { - Self(self.0.and_then(|i| NonZeroU32::new(i.get() + rhs))) + /// Are there non-pending tasks in the task list? + pub fn is_empty(&self) -> bool { + self.tasks.try_read().unwrap().is_empty() } } -impl AddAssign for Rav1dTaskIndex { - fn add_assign(&mut self, rhs: u32) { - *self = *self + rhs; +impl Rav1dFrameContextTaskThread { + fn insert_task(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) { + self.tasks.push(c, task, cond_signal); } } @@ -448,7 +316,6 @@ pub(crate) fn rav1d_task_create_tile_sbrow( }, frame_idx: fc.index as c_uint, tile_idx: tile_idx as c_uint, - next: Default::default(), }; pending_tasks.push(t); } @@ -512,7 +379,7 @@ fn ensure_progress<'l, 'ttd: 'l>( type_0, recon_progress: 0, deblock_progress: t.sby, - ..t.without_next() + ..t.clone() }; f.task_thread.tasks.add_pending(t); *task_thread_lock = Some(ttd.lock.lock()); @@ -798,7 +665,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { continue 'outer; } - let (fc, t_idx, prev_t) = 'found: { + let (fc, t_idx) = 'found: { if c.fc.len() > 1 { // run init tasks second 'init_tasks: for fc in @@ -808,13 +675,14 @@ pub fn rav1d_worker_task(task_thread: Arc) { if fc.task_thread.init_done.load(Ordering::SeqCst) != 0 { continue 'init_tasks; } - let t_idx = tasks.head.load(Ordering::SeqCst); - if t_idx.is_none() { + if tasks.is_empty() { continue 'init_tasks; } + let t_idx = 0; + let t = tasks.index(t_idx); if t.type_0 == TaskType::Init { - break 'found (fc, t_idx, Rav1dTaskIndex::NONE); + break 'found (fc, t_idx); } if t.type_0 == TaskType::InitCdf { // XXX This can be a simple else, if adding tasks of both @@ -834,7 +702,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { fc.task_thread .error .fetch_or((p1 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx, Rav1dTaskIndex::NONE); + break 'found (fc, t_idx); } } } @@ -845,13 +713,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { let fc = &c.fc[(first + ttd.cur.get()) as usize % c.fc.len()]; let tasks = &fc.task_thread.tasks; tasks.merge_pending_frame(c); - let mut prev_t = tasks.cur_prev.get(); - let mut t_idx = if prev_t.is_some() { - tasks.index(prev_t).next() - } else { - tasks.head.load(Ordering::SeqCst) - }; - while t_idx.is_some() { + for t_idx in 0..tasks.len() { let t = tasks.index(t_idx); 'next: { if t.type_0 == TaskType::InitCdf { @@ -871,7 +733,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { // if not bottom sbrow of tile, this task will be re-added // after it's finished if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { - break 'found (fc, t_idx, prev_t); + break 'found (fc, t_idx); } } else if t.recon_progress != 0 { // We need to block here because we are seeing rare @@ -908,7 +770,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { let next_t = Rav1dTask { sby: t.sby + 1, recon_progress: t.sby + 2, - ..t.without_next() + ..t.clone() }; let ntr = f.frame_thread.next_tile_row[p as usize].get() + 1; let start = frame_hdr.tiling.row_start_sb[ntr as usize] as c_int; @@ -918,13 +780,13 @@ pub fn rav1d_worker_task(task_thread: Arc) { drop(t); fc.task_thread.insert_task(c, next_t, 0); } - break 'found (fc, t_idx, prev_t); + break 'found (fc, t_idx); } else if t.type_0 == TaskType::Cdef { let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() [(t.sby - 1 >> 5) as usize] .load(Ordering::SeqCst); if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { - break 'found (fc, t_idx, prev_t); + break 'found (fc, t_idx); } } else { if t.deblock_progress == 0 { @@ -935,14 +797,12 @@ pub fn rav1d_worker_task(task_thread: Arc) { fc.task_thread .error .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx, prev_t); + break 'found (fc, t_idx); } } } // next: - prev_t = t_idx; - t_idx = t.next(); - tasks.cur_prev.set(prev_t); + tasks.looked.store(true, Ordering::Relaxed); } ttd.cur.update(|cur| cur + 1); } @@ -957,14 +817,9 @@ pub fn rav1d_worker_task(task_thread: Arc) { }; // found: // remove t from list - let Some(mut t) = fc.task_thread.tasks.remove(t_idx, prev_t) else { - // Another thread already consumed the task - eprintln!("Task {t_idx:?} already consumed"); - continue 'outer; - }; - if t.type_0 > TaskType::InitCdf - && fc.task_thread.tasks.head.load(Ordering::SeqCst).is_none() - { + let mut t = fc.task_thread.tasks.remove(t_idx); + + if t.type_0 > TaskType::InitCdf && fc.task_thread.tasks.is_empty() { ttd.cur.update(|cur| cur + 1); } // we don't need to check cond_signaled here, since we found a task From ce1f0a42cfa51065194292dda8a7d85ba9d448d0 Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Sun, 6 Jul 2025 15:59:39 +1000 Subject: [PATCH 3/7] Do fewer `reset_task_cur` calls in `merge_pending` --- src/thread_task.rs | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/src/thread_task.rs b/src/thread_task.rs index 0ac623f81..96eeafaf7 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -197,15 +197,44 @@ impl Rav1dTasks { let new_start = self.len(); { let mut pending_tasks = self.pending_tasks.lock(); + if pending_tasks.len() == 0 { + // This is a 'spurious' merge: that is, `merge` is set but the vector is empty. + // + // We always populate the vector under the lock, release the lock, and then set `merge`. + // We do not synchronise the vector and the flag. So it can fall out of alignment, e.g., + // if we have 2 threads: + // 1. say we begin with `merge` set and tasks in the `pending_tasks` vector. + // 2. thread A begins `merge_pending_frame`, and clears `pending_tasks_merge`. + // 3. thread B goes to add a pending task and takes the vector lock. + // 4. thread A waits on the vector lock. + // 5. thread B inserts some tasks, releases the lock, and sets `pending_tasks_merge`. + // 6. thread A can now take the lock, and drain the vector. + // We now have an empty vector: A has drained both the original contents, + // and the task added by thread B - but `pending_tasks_merge` is true, + // because it was set by B. + // + // So, if `merge_pending_frame` is called before another pending task is added, + // we will get this spurious merge. + return false; + } let mut tasks = self.tasks.try_write().unwrap(); tasks.extend(pending_tasks.drain(..)); } + + // Unlike in the C code, we only do a single `insert_tasks`. There are two reasons: + // + // - in the C code, the `insert_tasks` is what creates the sorted linked list. + // In the Rust code, the sorting is done via sorting the `Vec` explictly. + // + // - The key remaining function of is updating our position in the frame contexts, + // by updating the `reset_task_cur` and `cur` variables. Here, as in C, + // every task we're merging is for the same frame context, so the arguments are always the same, + // and nothing is changed by trying to repeatedly do the same update. { // take a read lock because reset_task_cur takes a read lock also let tasks = self.tasks.try_read().unwrap(); - for t in tasks[new_start..].iter() { - self.insert_tasks(c, t.frame_idx, 0); - } + let frame_idx = tasks[new_start].frame_idx; + self.insert_tasks(c, frame_idx, 0); } { // take a write lock to sort From bfb6e6c8c45dd7a35d64f812c9d2854863d84843 Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Tue, 22 Jul 2025 19:29:25 +1000 Subject: [PATCH 4/7] Extract common logic to advance `first` into its own function No behavioural change. --- src/decode.rs | 18 +----------------- src/internal.rs | 39 ++++++++++++++++++++++++++++++++++++++- src/lib.rs | 17 +---------------- src/obu.rs | 18 +----------------- 4 files changed, 41 insertions(+), 51 deletions(-) diff --git a/src/decode.rs b/src/decode.rs index 6369b4866..9b4fc9de8 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -4848,23 +4848,7 @@ pub fn rav1d_submit_frame(c: &Rav1dContext, state: &mut Rav1dState) -> Rav1dResu } let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - // `cur` is not actually mutated from multiple threads concurrently - let cur = c.task_thread.cur.get(); - if cur != 0 && (cur as usize) < c.fc.len() { - c.task_thread.cur.update(|cur| cur - 1); - } + c.task_thread.advance_first(c.fc.len()); } let error = &mut *fc.task_thread.retval.try_lock().unwrap(); if error.is_some() { diff --git a/src/internal.rs b/src/internal.rs index b2f123906..786780e16 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -1,6 +1,6 @@ use std::ffi::{c_int, c_uint}; use std::ops::{Deref, Range}; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32}; +use std::sync::atomic::{self, AtomicBool, AtomicI32, AtomicU32}; use std::sync::{Arc, OnceLock}; use std::thread::JoinHandle; use std::{cmp, mem}; @@ -272,6 +272,43 @@ pub(crate) struct TaskThreadData { pub delayed_fg: RwLock, } +impl TaskThreadData { + /// When we finish a frame we advance [`Self::first`], and we must also adjust [`Self::cur`], + /// because `cur` is an offset of `first`. + /// We may also need to adjust [`Self::reset_task_cur`]. + /// + /// The task thread lock, [`Self::lock`], must be held and there must be more than 1 frame context. + pub(crate) fn advance_first(&self, n_fc: usize) { + // This read-modify-write doesn't need to be an atomic CAS: + // modifications are protected by the lock that we're holding. + let first = self.first.load(atomic::Ordering::SeqCst); + if first as usize + 1 < n_fc { + self.first.fetch_add(1, atomic::Ordering::SeqCst); + } else { + self.first.store(0, atomic::Ordering::SeqCst); + } + + let _ = self.reset_task_cur.compare_exchange( + first, + u32::MAX, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ); + + // We've advanced `first`, so because `cur` is indexed by `first`, we need to retreat it. + // There are two exceptions: + // - if `cur` is already 0, we can't go any further back + // - if `cur` is at the end of the frame contexts (`n_fc`), don't move it, + // because we haven't created any new tasks yet. + // `cur` is not mutated from multiple threads (it's protected by the lock), + // so we don't need an atomic CAS for this read-modify-write. + let cur = self.cur.get(); + if cur != 0 && (cur as usize) < n_fc { + self.cur.set(cur - 1); + } + } +} + #[derive(Default)] #[repr(C)] pub(crate) struct Rav1dContextRefs { diff --git a/src/lib.rs b/src/lib.rs index f7b4083a5..779a35d5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -552,22 +552,7 @@ fn drain_picture(c: &Rav1dContext, state: &mut Rav1dState, out: &mut Rav1dPictur } let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - let cur = c.task_thread.cur.get(); - if cur != 0 && (cur as usize) < c.fc.len() { - c.task_thread.cur.set(cur - 1); - } + c.task_thread.advance_first(c.fc.len()); drained = true; } else if drained { break; diff --git a/src/obu.rs b/src/obu.rs index 59d99bcc3..6d23b3050 100644 --- a/src/obu.rs +++ b/src/obu.rs @@ -2499,23 +2499,7 @@ fn parse_obus( let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - if c.task_thread.cur.get() != 0 - && (c.task_thread.cur.get() as usize) < c.fc.len() - { - c.task_thread.cur.update(|cur| cur - 1); - } + c.task_thread.advance_first(c.fc.len()); } let error = &mut *fc.task_thread.retval.try_lock().unwrap(); if error.is_some() { From dc12f1ad154f984a8fa9f27e18cf103dff43027c Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Sat, 5 Jul 2025 18:17:16 +1000 Subject: [PATCH 5/7] Replace `reset_task_cur(_async)` with simpler, more documented functions We keep largely the same algorithm and variable meanings, but we make things a lot simpler in how they're coded up, and we document a lot more. --- src/internal.rs | 195 ++++++++++++++++++++++++++++++++++++++++++--- src/thread_task.rs | 153 +++++------------------------------ 2 files changed, 206 insertions(+), 142 deletions(-) diff --git a/src/internal.rs b/src/internal.rs index 786780e16..a329e4e4f 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -258,13 +258,37 @@ unsafe impl Sync for TaskThreadDataDelayedFg {} pub(crate) struct TaskThreadData { pub lock: Mutex<()>, pub cond: Condvar, + + /// Where we have multiple frame contexts, we keep track of which frame is 'first', + /// that is, which of the frames we're working on has the lowest frame number, + /// or is closest to the start of the data. + /// + /// Written only under the task thread lock, [`Self::lock`]. + /// Read under the lock, except in [`Self::mark_new_tasks_for_index`]. pub first: AtomicU32, + + /// Say we have several frame contexts. Each context may or may not have tasks, + /// and each task may or may not be blocked. Checking if tasks are blocked is not free, + /// so if a context has only blocked tasks, we use this variable to note that, + /// and then we can start looking from the first unblocked context. + /// + /// When decoding progress allows us to look again at an earlier context again, + /// [`Self::reset_task_cur`] is updated. When we run out of frame contexts, + /// or we wake from park, we update this from [`Self::reset_task_cur`]. + /// + /// `cur` is indexed from `first`: if `first = 1` and `cur = 2`, + /// then we start looking at frame context 3. + /// + /// Can only be modified or read under the task thread lock, [`Self::lock`]: + /// the atomic is for interior mutability rather than concurrency. pub cur: RelaxedAtomic, - /// This is used for delayed reset of the task cur pointer when - /// such operation is needed but the thread doesn't enter a critical - /// section (typically when executing the next sbrow task locklessly). - /// See [`crate::thread_task::reset_task_cur`]. + + /// When we note progress, we track where [`Self::cur`] should start from next time. + /// + /// Can be set without holding the task thread lock, [`Self::lock`]. + /// Only used under the task thread lock. pub reset_task_cur: AtomicU32, + pub cond_signaled: AtomicI32, pub delayed_fg_exec: RelaxedAtomic, pub delayed_fg_cond: Condvar, @@ -282,15 +306,40 @@ impl TaskThreadData { // This read-modify-write doesn't need to be an atomic CAS: // modifications are protected by the lock that we're holding. let first = self.first.load(atomic::Ordering::SeqCst); - if first as usize + 1 < n_fc { - self.first.fetch_add(1, atomic::Ordering::SeqCst); + let new_first = if first as usize + 1 < n_fc { + self.first.fetch_add(1, atomic::Ordering::SeqCst) + 1 } else { self.first.store(0, atomic::Ordering::SeqCst); - } - + 0 + }; + // `reset_task_cur` is used to mark the first frame context where, we hope, + // there is probably work to be done. When we use it to set `cur`, + // we unconditionally subtract `first`, so it may be more than the number of fcs. + // + // If `reset_task_cur` was the old `first`, it means the old target `cur` value was zero. + // The C code detects this and swaps in `u32::MAX`, + // which means that we will no longer swap in an updated `cur`. + // + // The general rule is that if we have potential progress for `fc_i`, + // we might also have progress for `fc_i + 1`, `fc_i + 2`, etc. + // + // If `first` has just incremented, and we subtract the new first from the old + // `reset_task_cur`, we will get -1, which equals `u32::MAX` (but needs a wrapping subtract). + // So doing nothing should lead to the same behaviour. If we wanted to keep `cur` at 0, + // we would need to increment `reset_task_cur` as well. + // + // If `first` is now zero, `reset_task_cur - first` might be beyond the number of fcs. + // This also doesn't actually directly cause a problem: when we reset `cur`, + // we only swap in a new `cur` if it's better than our existing `cur`. + // If the potential new value is greater than the number of frame contexts, + // it will never be swapped in, so the behaviour is identical to setting `u32::MAX`. + // + // However, the downside of not resetting `cur` is that it makes thread parking more likely, + // and if it's at all possible for a thread to be doing something that would be better! + // So if `reset_task_cur == first`, swap in `new_first` instead. let _ = self.reset_task_cur.compare_exchange( first, - u32::MAX, + new_first, atomic::Ordering::SeqCst, atomic::Ordering::SeqCst, ); @@ -307,6 +356,134 @@ impl TaskThreadData { self.cur.set(cur - 1); } } + + /// We probably have several frame contexts. Each context may or may not have tasks, + /// and each task may or may not be blocked. Checking if tasks are blocked is not free, + /// So when we're looking for a task to do, we try to skip frame contexts that have only blocked tasks. + /// We use [`Self::cur`], which is indexed from [`Self::first`]. + /// + /// This marks a frame context as able to do a task. + /// The general rule is that a notification for `fc_i` unlocks `fc_i, fc_i+1, fc_i+2 ... n_fc` + /// + /// The task thread lock, [`Self::lock`], should *not* be held. + pub(crate) fn mark_new_tasks_for_index(&self, mut frame_idx: u32, n_fc: u32) { + let mut reset_task_cur = self.reset_task_cur.load(atomic::Ordering::SeqCst); + loop { + let first = self.first.load(atomic::Ordering::SeqCst); + + // `cur` is indexed from `first`: `cur = 0` means we should start looking at the `first` frame context. + // `reset_task_cur` is _not_. It is set such that in `reset_cur_index`, + // we can unconditionally subtract `first` and get `cur`. To ensure that the property holds, + // we may need to add `n_fc` . + if frame_idx < first { + frame_idx += n_fc; + } + + // Only save a new frame index if it's 'better' (that is, closer to `first`) + // that what we've saved before. + if frame_idx < reset_task_cur { + reset_task_cur = match self.reset_task_cur.compare_exchange( + reset_task_cur, + frame_idx, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) { + Ok(_) => { + // Check to see if `first` has moved, and if so do the same check as in `advance_first`. + let new_first = self.first.load(atomic::Ordering::SeqCst); + let _ = self.reset_task_cur.compare_exchange( + first, + new_first, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ); + return; + } + Err(c) => c, + }; + } else { + break; + } + } + } + + /// Potentially resets [`Self::cur`], if [`Self::reset_task_cur`] indicates that more tasks are available. + /// Resets [`Self::reset_task_cur`] to indicate that no new tasks are available. + /// Returns true if [`Self::cur`] was moved and thus that more tasks are likely available. + /// + /// The task thread lock, [`Self::lock`], must be held. + pub(crate) fn reset_cur_index(&self) -> bool { + // `reset_task_cur` is either an index + `first` if a new task has been marked, + // or it's `u32::MAX` if we haven't had any tasks added since the last reset. + let new_cur = self.reset_task_cur.swap(u32::MAX, atomic::Ordering::SeqCst); + + // If it's `u32::MAX`, then bail without moving `cur`. + if new_cur == u32::MAX { + return false; + } + + // `cur` is atomic for interior mutability not concurrency, it's ok to cache. + let cur = self.cur.get(); + + // This is a common case optimisation. If `cur` is at the start, we cannot do better. + if cur == 0 { + return false; + } + + // It should always be the case that `reset_task_cur >= first`. However, + // we're not mutating the two variables under the one lock, + // nor are we doing something like a 16-byte CAS to keep them consistent, + // so it is possible for them to briefly desynchronise. + // Handle that with a wrapping subtract, which will create a huge positive u32 value, + // which will never be used because it will always be worse than a real `cur`. + // Later on, the delayed fg task of the frame, or the init task of the new frame, + // will cause new tasks to be created, or another running task will signal progress, + // any of which will end up getting our threads going again, + // so there shouldn't be any risk of a hang. + let new_cur = new_cur.wrapping_sub(self.first.load(atomic::Ordering::SeqCst)); + + // If it's better than our existing index, then reset `cur` and say we did something. + if new_cur < cur { + self.cur.set(new_cur); + true + } else { + false + } + } + + /// Equivalent to [`Self::mark_new_tasks_for_index`], + /// then [`Self::reset_cur_index`], but more efficient. + /// + /// The task thread lock, [`Self::lock`], must be held. + pub(crate) fn mark_and_reset_cur(&self, mut frame_idx: u32, n_fc: u32) { + // We want to reset `cur` to the better of `frame_idx` or `reset_task_cur`, + // if that's an improvement. + + // We must always reset this even if we are about to bail. + let reset_cur = self.reset_task_cur.swap(u32::MAX, atomic::Ordering::SeqCst); + + // `cur` is atomic for interior mutability not concurrency, it's ok to cache. + let cur = self.cur.get(); + + // This is a common case optimisation. If `cur` is at the start, we cannot do better. + if cur == 0 { + return; + } + + let first = self.first.load(atomic::Ordering::SeqCst); + + // This will never be called with `u32::MAX`, so we do not need to special case that. + if frame_idx < first { + frame_idx += n_fc; + } + + let min_reset_cur = u32::min(frame_idx, reset_cur); + // See `reset_cur_index` for why `wrapping_sub` is right here. + let new_cur = min_reset_cur.wrapping_sub(first); + if new_cur < cur { + self.cur.set(new_cur); + } + } } #[derive(Default)] diff --git a/src/thread_task.rs b/src/thread_task.rs index 96eeafaf7..6abf4079d 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -32,115 +32,12 @@ use crate::iter::wrapping_iter; pub const FRAME_ERROR: u32 = u32::MAX - 1; pub const TILE_ERROR: i32 = i32::MAX - 1; -/// This function resets the cur pointer to the first frame theoretically -/// executable after a task completed (ie. each time we update some progress or -/// insert some tasks in the queue). -/// When frame_idx is set, it can be either from a completed task, or from tasks -/// inserted in the queue, in which case we have to make sure the cur pointer -/// isn't past this insert. -/// The special case where frame_idx is UINT_MAX is to handle the reset after -/// completing a task and locklessly signaling progress. In this case we don't -/// enter a critical section, which is needed for this function, so we set an -/// atomic for a delayed handling, happening here. Meaning we can call this -/// function without any actual update other than what's in the atomic, hence -/// this special case. -#[inline] -fn reset_task_cur(c: &Rav1dContext, ttd: &TaskThreadData, mut frame_idx: c_uint) -> c_int { - fn curr_found(c: &Rav1dContext, ttd: &TaskThreadData, first: usize) -> c_int { - for fc in wrapping_iter(c.fc.iter(), first + ttd.cur.get() as usize) { - fc.task_thread.tasks.looked.store(false, Ordering::Relaxed); - } - return 1; - } - let min_frame_idx: c_uint; - let cur_frame_idx: c_uint; - let first = ttd.first.load(Ordering::SeqCst); - let mut reset_frame_idx: c_uint = ttd.reset_task_cur.swap(u32::MAX, Ordering::SeqCst); - if reset_frame_idx < first { - if frame_idx == u32::MAX { - return 0 as c_int; - } - reset_frame_idx = u32::MAX; - } - if ttd.cur.get() == 0 - && c.fc[first as usize] - .task_thread - .tasks - .looked - .load(Ordering::Relaxed) - == false - { - return 0 as c_int; - } - if reset_frame_idx != u32::MAX { - if frame_idx == u32::MAX { - if reset_frame_idx > first.wrapping_add(ttd.cur.get()) { - return 0 as c_int; - } - ttd.cur.set(reset_frame_idx.wrapping_sub(first)); - return curr_found(c, ttd, first as usize); - } - } else { - if frame_idx == u32::MAX { - return 0 as c_int; - } - } - if frame_idx < first { - frame_idx += c.fc.len() as c_uint; - } - min_frame_idx = cmp::min(reset_frame_idx, frame_idx); - cur_frame_idx = first.wrapping_add(ttd.cur.get()); - if (ttd.cur.get() as usize) < c.fc.len() && cur_frame_idx < min_frame_idx { - return 0 as c_int; - } - ttd.cur.set(min_frame_idx.wrapping_sub(first)); - while (ttd.cur.get() as usize) < c.fc.len() { - if c.fc[((first + ttd.cur.get()) as usize) % c.fc.len()] - .task_thread - .tasks - .is_empty() - == false - { - break; - } - ttd.cur.update(|cur| cur + 1); - } - return curr_found(c, ttd, first as usize); -} - -#[inline] -fn reset_task_cur_async(ttd: &TaskThreadData, mut frame_idx: c_uint, n_frames: c_uint) { - let first = ttd.first.load(Ordering::SeqCst); - if frame_idx < first { - frame_idx += n_frames; - } - let mut last_idx = frame_idx; - loop { - frame_idx = last_idx; - last_idx = ttd.reset_task_cur.swap(frame_idx, Ordering::SeqCst); - if !(last_idx < frame_idx) { - break; - } - } - if frame_idx == first && ttd.first.load(Ordering::SeqCst) != first { - let _ = ttd.reset_task_cur.compare_exchange( - frame_idx, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } -} - #[derive(Default)] pub struct Rav1dTasks { // TODO: probably should be a VecDeque, we need to empty this and I don't think we do yet. tasks: RwLock>, pending_tasks: Mutex>, pending_tasks_merge: AtomicBool, - - // Have we looked at any tasks in this set of tasks? - pub looked: AtomicBool, } impl Rav1dTasks { @@ -149,7 +46,7 @@ impl Rav1dTasks { if c.flush.load(Ordering::SeqCst) { return; } - reset_task_cur(c, ttd, frame_idx); + ttd.mark_and_reset_cur(frame_idx, c.fc.len() as u32); if cond_signal != 0 && ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -172,7 +69,6 @@ impl Rav1dTasks { self.tasks.try_write().unwrap().clear(); self.pending_tasks.try_lock().unwrap().clear(); self.pending_tasks_merge.store(false, Ordering::SeqCst); - self.looked.store(false, Ordering::Relaxed); } pub fn remove(&self, t: usize) -> Rav1dTask { @@ -195,6 +91,8 @@ impl Rav1dTasks { let merge = self.pending_tasks_merge.swap(false, Ordering::SeqCst); if merge { let new_start = self.len(); + let mut tasks = self.tasks.try_write().unwrap(); + { let mut pending_tasks = self.pending_tasks.lock(); if pending_tasks.len() == 0 { @@ -217,7 +115,6 @@ impl Rav1dTasks { // we will get this spurious merge. return false; } - let mut tasks = self.tasks.try_write().unwrap(); tasks.extend(pending_tasks.drain(..)); } @@ -230,17 +127,9 @@ impl Rav1dTasks { // by updating the `reset_task_cur` and `cur` variables. Here, as in C, // every task we're merging is for the same frame context, so the arguments are always the same, // and nothing is changed by trying to repeatedly do the same update. - { - // take a read lock because reset_task_cur takes a read lock also - let tasks = self.tasks.try_read().unwrap(); - let frame_idx = tasks[new_start].frame_idx; - self.insert_tasks(c, frame_idx, 0); - } - { - // take a write lock to sort - let mut tasks = self.tasks.try_write().unwrap(); - tasks.sort(); - } + let frame_idx = tasks[new_start].frame_idx; + self.insert_tasks(c, frame_idx, 0); + tasks.sort(); } merge @@ -666,7 +555,6 @@ pub fn rav1d_worker_task(task_thread: Arc) { let ttd = &*ttd_clone; fn park<'ttd>( - c: &Rav1dContext, tc: &mut Rav1dTaskContext, ttd: &TaskThreadData, task_thread_lock: &mut MutexGuard<'ttd, ()>, @@ -677,13 +565,13 @@ pub fn rav1d_worker_task(task_thread: Arc) { ttd.cond_signaled.store(0, Ordering::SeqCst); ttd.cond.wait(task_thread_lock); tc.task_thread.flushed.set(false); - reset_task_cur(c, ttd, u32::MAX); + ttd.reset_cur_index(); } let mut task_thread_lock = Some(ttd.lock.lock()); 'outer: while !tc.task_thread.die.get() { if c.flush.load(Ordering::SeqCst) { - park(c, &mut tc, ttd, task_thread_lock.as_mut().unwrap()); + park(&mut tc, ttd, task_thread_lock.as_mut().unwrap()); continue 'outer; } @@ -831,17 +719,16 @@ pub fn rav1d_worker_task(task_thread: Arc) { } } // next: - tasks.looked.store(true, Ordering::Relaxed); } ttd.cur.update(|cur| cur + 1); } - if reset_task_cur(c, ttd, u32::MAX) != 0 { + if ttd.reset_cur_index() { continue 'outer; } if merge_pending(c) != 0 { continue 'outer; } - park(c, &mut tc, ttd, task_thread_lock.as_mut().unwrap()); + park(&mut tc, ttd, task_thread_lock.as_mut().unwrap()); continue 'outer; }; // found: @@ -881,7 +768,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); abort_frame(c, fc, res.and_then(|_| Err(Rav1dError::InvalidArgument))); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); } else { t.type_0 = TaskType::InitCdf; if p1_3 != 0 { @@ -973,7 +860,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); abort_frame(c, fc, res_0); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); fc.task_thread.init_done.store(1, Ordering::SeqCst); } continue 'outer; @@ -1006,7 +893,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { t.deps_skip = 0.into(); if check_tile(&f, &fc.task_thread, &t, uses_2pass) == 0 { ts.progress[p_1 as usize].store(progress, Ordering::SeqCst); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1021,7 +908,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); ts.progress[p_1 as usize].store(progress, Ordering::SeqCst); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); error_0 = fc.task_thread.error.load(Ordering::SeqCst); let frame_hdr = &***f.frame_hdr.as_ref().unwrap(); if frame_hdr.refresh_context != 0 @@ -1113,7 +1000,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { if error_0 != 0 { TILE_ERROR } else { sby + 1 }, Ordering::SeqCst, ); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1152,7 +1039,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { (f.bd_fn().filter_sbrow_cdef)(c, &f, &mut tc, sby); } drop(f); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1222,7 +1109,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { task_thread_lock = Some(ttd.lock.lock()); let num_tasks = fc.task_thread.task_counter.fetch_sub(1, Ordering::SeqCst) - 1; if (sby + 1) < sbh && num_tasks != 0 { - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } if num_tasks == 0 @@ -1243,7 +1130,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { ); fc.task_thread.cond.notify_one(); } - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS @@ -1277,7 +1164,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { task_thread_lock = Some(ttd.lock.lock()); let num_tasks_0 = fc.task_thread.task_counter.fetch_sub(1, Ordering::SeqCst) - 1; if (sby + 1) < sbh && num_tasks_0 != 0 { - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } if num_tasks_0 == 0 @@ -1298,7 +1185,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { ); fc.task_thread.cond.notify_one(); } - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); break 'found_unlocked; } From 4d25cb04e4c60b432d33d91fdf5d12853d2073ff Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Sun, 20 Jul 2025 23:54:14 +1000 Subject: [PATCH 6/7] Remove the now-unnecessary `'next` block When iterating tasks, we used to need to do certain things at the end of the loop. We no longer need to do those, so we can get rid of a block and reduce indentation and improve clarity. Broken out from the other changes to make it clearer that the bulk of the change is whitespace rather than substantive. --- src/thread_task.rs | 159 ++++++++++++++++++++++----------------------- 1 file changed, 78 insertions(+), 81 deletions(-) diff --git a/src/thread_task.rs b/src/thread_task.rs index 6abf4079d..99111844a 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -630,95 +630,92 @@ pub fn rav1d_worker_task(task_thread: Arc) { let fc = &c.fc[(first + ttd.cur.get()) as usize % c.fc.len()]; let tasks = &fc.task_thread.tasks; tasks.merge_pending_frame(c); - for t_idx in 0..tasks.len() { + 'tasks: for t_idx in 0..tasks.len() { let t = tasks.index(t_idx); - 'next: { - if t.type_0 == TaskType::InitCdf { - break 'next; - } - if matches!( - t.type_0, - TaskType::TileEntropy | TaskType::TileReconstruction - ) { - // We need to block here because we are seeing rare - // contention. The fields we access out of - // `Rav1dFrameData` are probably ok to read - // concurrently with other tasks writing, but we - // haven't separated out these fields. - let f = fc.data.read(); + if t.type_0 == TaskType::InitCdf { + continue 'tasks; + } + if matches!( + t.type_0, + TaskType::TileEntropy | TaskType::TileReconstruction + ) { + // We need to block here because we are seeing rare + // contention. The fields we access out of + // `Rav1dFrameData` are probably ok to read + // concurrently with other tasks writing, but we + // haven't separated out these fields. + let f = fc.data.read(); - // if not bottom sbrow of tile, this task will be re-added - // after it's finished - if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { - break 'found (fc, t_idx); - } - } else if t.recon_progress != 0 { - // We need to block here because we are seeing rare - // contention. - let f = fc.data.read(); - let p = t.type_0 == TaskType::EntropyProgress; - let error = fc.task_thread.error.load(Ordering::SeqCst); - let done = fc.task_thread.done[p as usize].load(Ordering::SeqCst); - assert!(done == 0 || error != 0, "done: {done}, error: {error}"); - let frame_hdr = fc.frame_hdr(); - let tile_row_base = frame_hdr.tiling.cols as c_int - * f.frame_thread.next_tile_row[p as usize].get(); - if p { - let p1_0 = fc.frame_thread_progress.entropy.load(Ordering::SeqCst); - if p1_0 < t.sby { - break 'next; - } - fc.task_thread - .error - .fetch_or((p1_0 == TILE_ERROR) as c_int, Ordering::SeqCst); - } - for tc_0 in 0..frame_hdr.tiling.cols { - let ts = &f.ts[(tile_row_base + tc_0 as c_int) as usize]; - let p2 = ts.progress[p as usize].load(Ordering::SeqCst); - if p2 < t.recon_progress { - break 'next; - } - fc.task_thread - .error - .fetch_or((p2 == TILE_ERROR) as c_int, Ordering::SeqCst); - } - if (t.sby + 1) < f.sbh { - // add sby+1 to list to replace this one - let next_t = Rav1dTask { - sby: t.sby + 1, - recon_progress: t.sby + 2, - ..t.clone() - }; - let ntr = f.frame_thread.next_tile_row[p as usize].get() + 1; - let start = frame_hdr.tiling.row_start_sb[ntr as usize] as c_int; - if next_t.sby == start { - f.frame_thread.next_tile_row[p as usize].set(ntr); - } - drop(t); - fc.task_thread.insert_task(c, next_t, 0); - } + // if not bottom sbrow of tile, this task will be re-added + // after it's finished + if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { break 'found (fc, t_idx); - } else if t.type_0 == TaskType::Cdef { - let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() - [(t.sby - 1 >> 5) as usize] - .load(Ordering::SeqCst); - if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { - break 'found (fc, t_idx); + } + } else if t.recon_progress != 0 { + // We need to block here because we are seeing rare + // contention. + let f = fc.data.read(); + let p = t.type_0 == TaskType::EntropyProgress; + let error = fc.task_thread.error.load(Ordering::SeqCst); + let done = fc.task_thread.done[p as usize].load(Ordering::SeqCst); + assert!(done == 0 || error != 0, "done: {done}, error: {error}"); + let frame_hdr = fc.frame_hdr(); + let tile_row_base = frame_hdr.tiling.cols as c_int + * f.frame_thread.next_tile_row[p as usize].get(); + if p { + let p1_0 = fc.frame_thread_progress.entropy.load(Ordering::SeqCst); + if p1_0 < t.sby { + continue 'tasks; } - } else { - if t.deblock_progress == 0 { - unreachable!(); + fc.task_thread + .error + .fetch_or((p1_0 == TILE_ERROR) as c_int, Ordering::SeqCst); + } + for tc_0 in 0..frame_hdr.tiling.cols { + let ts = &f.ts[(tile_row_base + tc_0 as c_int) as usize]; + let p2 = ts.progress[p as usize].load(Ordering::SeqCst); + if p2 < t.recon_progress { + continue 'tasks; } - let p1_2 = fc.frame_thread_progress.deblock.load(Ordering::SeqCst); - if p1_2 >= t.deblock_progress { - fc.task_thread - .error - .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx); + fc.task_thread + .error + .fetch_or((p2 == TILE_ERROR) as c_int, Ordering::SeqCst); + } + if (t.sby + 1) < f.sbh { + // add sby+1 to list to replace this one + let next_t = Rav1dTask { + sby: t.sby + 1, + recon_progress: t.sby + 2, + ..t.clone() + }; + let ntr = f.frame_thread.next_tile_row[p as usize].get() + 1; + let start = frame_hdr.tiling.row_start_sb[ntr as usize] as c_int; + if next_t.sby == start { + f.frame_thread.next_tile_row[p as usize].set(ntr); } + drop(t); + fc.task_thread.insert_task(c, next_t, 0); + } + break 'found (fc, t_idx); + } else if t.type_0 == TaskType::Cdef { + let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() + [(t.sby - 1 >> 5) as usize] + .load(Ordering::SeqCst); + if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { + break 'found (fc, t_idx); + } + } else { + if t.deblock_progress == 0 { + unreachable!(); + } + let p1_2 = fc.frame_thread_progress.deblock.load(Ordering::SeqCst); + if p1_2 >= t.deblock_progress { + fc.task_thread + .error + .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); + break 'found (fc, t_idx); } } - // next: } ttd.cur.update(|cur| cur + 1); } From e1c16d05222b21a19f8f454080e1bea63e091637 Mon Sep 17 00:00:00 2001 From: Daniel Axtens Date: Mon, 21 Jul 2025 00:12:40 +1000 Subject: [PATCH 7/7] Take the task vector read lock less Even though `.try_read().unwrap()` in `.index()` and `.len()` is guaranteed to succeed, it's not free. Try to do it less. This requries that we have slightly more involved handling for when we have to insert a new task as a result of finding a task, but it means that we don't have to take the read lock every iteration of the loop. --- src/thread_task.rs | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/thread_task.rs b/src/thread_task.rs index 99111844a..7f60deda9 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -90,8 +90,8 @@ impl Rav1dTasks { fn merge_pending_frame(&self, c: &Rav1dContext) -> bool { let merge = self.pending_tasks_merge.swap(false, Ordering::SeqCst); if merge { - let new_start = self.len(); let mut tasks = self.tasks.try_write().unwrap(); + let new_start = tasks.len(); { let mut pending_tasks = self.pending_tasks.lock(); @@ -135,11 +135,6 @@ impl Rav1dTasks { merge } - /// How many (non-pending) tasks do we have in the task list? - pub fn len(&self) -> usize { - self.tasks.try_read().unwrap().len() - } - /// Are there non-pending tasks in the task list? pub fn is_empty(&self) -> bool { self.tasks.try_read().unwrap().is_empty() @@ -582,7 +577,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { continue 'outer; } - let (fc, t_idx) = 'found: { + let (fc, t_idx, new_t) = 'found: { if c.fc.len() > 1 { // run init tasks second 'init_tasks: for fc in @@ -599,7 +594,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { let t = tasks.index(t_idx); if t.type_0 == TaskType::Init { - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } if t.type_0 == TaskType::InitCdf { // XXX This can be a simple else, if adding tasks of both @@ -619,7 +614,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { fc.task_thread .error .fetch_or((p1 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } } } @@ -630,8 +625,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { let fc = &c.fc[(first + ttd.cur.get()) as usize % c.fc.len()]; let tasks = &fc.task_thread.tasks; tasks.merge_pending_frame(c); - 'tasks: for t_idx in 0..tasks.len() { - let t = tasks.index(t_idx); + 'tasks: for (t_idx, t) in tasks.tasks.try_read().unwrap().iter().enumerate() { if t.type_0 == TaskType::InitCdf { continue 'tasks; } @@ -649,7 +643,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { // if not bottom sbrow of tile, this task will be re-added // after it's finished if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } } else if t.recon_progress != 0 { // We need to block here because we are seeing rare @@ -693,16 +687,15 @@ pub fn rav1d_worker_task(task_thread: Arc) { if next_t.sby == start { f.frame_thread.next_tile_row[p as usize].set(ntr); } - drop(t); - fc.task_thread.insert_task(c, next_t, 0); + break 'found (fc, t_idx, Some(next_t)); } - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } else if t.type_0 == TaskType::Cdef { let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() [(t.sby - 1 >> 5) as usize] .load(Ordering::SeqCst); if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } } else { if t.deblock_progress == 0 { @@ -713,7 +706,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { fc.task_thread .error .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx); + break 'found (fc, t_idx, None); } } } @@ -732,6 +725,11 @@ pub fn rav1d_worker_task(task_thread: Arc) { // remove t from list let mut t = fc.task_thread.tasks.remove(t_idx); + // insert new task into list if one was given + if let Some(new_t) = new_t { + fc.task_thread.insert_task(c, new_t, 0); + } + if t.type_0 > TaskType::InitCdf && fc.task_thread.tasks.is_empty() { ttd.cur.update(|cur| cur + 1); }