diff --git a/src/decode.rs b/src/decode.rs index 6369b4866..9b4fc9de8 100644 --- a/src/decode.rs +++ b/src/decode.rs @@ -4848,23 +4848,7 @@ pub fn rav1d_submit_frame(c: &Rav1dContext, state: &mut Rav1dState) -> Rav1dResu } let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - // `cur` is not actually mutated from multiple threads concurrently - let cur = c.task_thread.cur.get(); - if cur != 0 && (cur as usize) < c.fc.len() { - c.task_thread.cur.update(|cur| cur - 1); - } + c.task_thread.advance_first(c.fc.len()); } let error = &mut *fc.task_thread.retval.try_lock().unwrap(); if error.is_some() { diff --git a/src/internal.rs b/src/internal.rs index 09b6fde97..a329e4e4f 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -1,11 +1,11 @@ use std::ffi::{c_int, c_uint}; -use std::mem; use std::ops::{Deref, Range}; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, Ordering}; +use std::sync::atomic::{self, AtomicBool, AtomicI32, AtomicU32}; use std::sync::{Arc, OnceLock}; use std::thread::JoinHandle; +use std::{cmp, mem}; -use atomig::{Atom, Atomic}; +use atomig::Atom; use libc::ptrdiff_t; use parking_lot::{Condvar, Mutex, RwLock, RwLockReadGuard}; use strum::FromRepr; @@ -50,7 +50,7 @@ use crate::recon::{ }; use crate::refmvs::{Rav1dRefmvsDSPContext, RefMvsFrame, RefMvsTemporalBlock, RefmvsTile}; use crate::relaxed_atomic::RelaxedAtomic; -use crate::thread_task::{Rav1dTaskIndex, Rav1dTasks}; +use crate::thread_task::Rav1dTasks; #[derive(Default)] pub struct Rav1dDSPContext { @@ -258,13 +258,37 @@ unsafe impl Sync for TaskThreadDataDelayedFg {} pub(crate) struct TaskThreadData { pub lock: Mutex<()>, pub cond: Condvar, + + /// Where we have multiple frame contexts, we keep track of which frame is 'first', + /// that is, which of the frames we're working on has the lowest frame number, + /// or is closest to the start of the data. + /// + /// Written only under the task thread lock, [`Self::lock`]. + /// Read under the lock, except in [`Self::mark_new_tasks_for_index`]. pub first: AtomicU32, + + /// Say we have several frame contexts. Each context may or may not have tasks, + /// and each task may or may not be blocked. Checking if tasks are blocked is not free, + /// so if a context has only blocked tasks, we use this variable to note that, + /// and then we can start looking from the first unblocked context. + /// + /// When decoding progress allows us to look again at an earlier context again, + /// [`Self::reset_task_cur`] is updated. When we run out of frame contexts, + /// or we wake from park, we update this from [`Self::reset_task_cur`]. + /// + /// `cur` is indexed from `first`: if `first = 1` and `cur = 2`, + /// then we start looking at frame context 3. + /// + /// Can only be modified or read under the task thread lock, [`Self::lock`]: + /// the atomic is for interior mutability rather than concurrency. pub cur: RelaxedAtomic, - /// This is used for delayed reset of the task cur pointer when - /// such operation is needed but the thread doesn't enter a critical - /// section (typically when executing the next sbrow task locklessly). - /// See [`crate::thread_task::reset_task_cur`]. + + /// When we note progress, we track where [`Self::cur`] should start from next time. + /// + /// Can be set without holding the task thread lock, [`Self::lock`]. + /// Only used under the task thread lock. pub reset_task_cur: AtomicU32, + pub cond_signaled: AtomicI32, pub delayed_fg_exec: RelaxedAtomic, pub delayed_fg_cond: Condvar, @@ -272,6 +296,196 @@ pub(crate) struct TaskThreadData { pub delayed_fg: RwLock, } +impl TaskThreadData { + /// When we finish a frame we advance [`Self::first`], and we must also adjust [`Self::cur`], + /// because `cur` is an offset of `first`. + /// We may also need to adjust [`Self::reset_task_cur`]. + /// + /// The task thread lock, [`Self::lock`], must be held and there must be more than 1 frame context. + pub(crate) fn advance_first(&self, n_fc: usize) { + // This read-modify-write doesn't need to be an atomic CAS: + // modifications are protected by the lock that we're holding. + let first = self.first.load(atomic::Ordering::SeqCst); + let new_first = if first as usize + 1 < n_fc { + self.first.fetch_add(1, atomic::Ordering::SeqCst) + 1 + } else { + self.first.store(0, atomic::Ordering::SeqCst); + 0 + }; + // `reset_task_cur` is used to mark the first frame context where, we hope, + // there is probably work to be done. When we use it to set `cur`, + // we unconditionally subtract `first`, so it may be more than the number of fcs. + // + // If `reset_task_cur` was the old `first`, it means the old target `cur` value was zero. + // The C code detects this and swaps in `u32::MAX`, + // which means that we will no longer swap in an updated `cur`. + // + // The general rule is that if we have potential progress for `fc_i`, + // we might also have progress for `fc_i + 1`, `fc_i + 2`, etc. + // + // If `first` has just incremented, and we subtract the new first from the old + // `reset_task_cur`, we will get -1, which equals `u32::MAX` (but needs a wrapping subtract). + // So doing nothing should lead to the same behaviour. If we wanted to keep `cur` at 0, + // we would need to increment `reset_task_cur` as well. + // + // If `first` is now zero, `reset_task_cur - first` might be beyond the number of fcs. + // This also doesn't actually directly cause a problem: when we reset `cur`, + // we only swap in a new `cur` if it's better than our existing `cur`. + // If the potential new value is greater than the number of frame contexts, + // it will never be swapped in, so the behaviour is identical to setting `u32::MAX`. + // + // However, the downside of not resetting `cur` is that it makes thread parking more likely, + // and if it's at all possible for a thread to be doing something that would be better! + // So if `reset_task_cur == first`, swap in `new_first` instead. + let _ = self.reset_task_cur.compare_exchange( + first, + new_first, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ); + + // We've advanced `first`, so because `cur` is indexed by `first`, we need to retreat it. + // There are two exceptions: + // - if `cur` is already 0, we can't go any further back + // - if `cur` is at the end of the frame contexts (`n_fc`), don't move it, + // because we haven't created any new tasks yet. + // `cur` is not mutated from multiple threads (it's protected by the lock), + // so we don't need an atomic CAS for this read-modify-write. + let cur = self.cur.get(); + if cur != 0 && (cur as usize) < n_fc { + self.cur.set(cur - 1); + } + } + + /// We probably have several frame contexts. Each context may or may not have tasks, + /// and each task may or may not be blocked. Checking if tasks are blocked is not free, + /// So when we're looking for a task to do, we try to skip frame contexts that have only blocked tasks. + /// We use [`Self::cur`], which is indexed from [`Self::first`]. + /// + /// This marks a frame context as able to do a task. + /// The general rule is that a notification for `fc_i` unlocks `fc_i, fc_i+1, fc_i+2 ... n_fc` + /// + /// The task thread lock, [`Self::lock`], should *not* be held. + pub(crate) fn mark_new_tasks_for_index(&self, mut frame_idx: u32, n_fc: u32) { + let mut reset_task_cur = self.reset_task_cur.load(atomic::Ordering::SeqCst); + loop { + let first = self.first.load(atomic::Ordering::SeqCst); + + // `cur` is indexed from `first`: `cur = 0` means we should start looking at the `first` frame context. + // `reset_task_cur` is _not_. It is set such that in `reset_cur_index`, + // we can unconditionally subtract `first` and get `cur`. To ensure that the property holds, + // we may need to add `n_fc` . + if frame_idx < first { + frame_idx += n_fc; + } + + // Only save a new frame index if it's 'better' (that is, closer to `first`) + // that what we've saved before. + if frame_idx < reset_task_cur { + reset_task_cur = match self.reset_task_cur.compare_exchange( + reset_task_cur, + frame_idx, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ) { + Ok(_) => { + // Check to see if `first` has moved, and if so do the same check as in `advance_first`. + let new_first = self.first.load(atomic::Ordering::SeqCst); + let _ = self.reset_task_cur.compare_exchange( + first, + new_first, + atomic::Ordering::SeqCst, + atomic::Ordering::SeqCst, + ); + return; + } + Err(c) => c, + }; + } else { + break; + } + } + } + + /// Potentially resets [`Self::cur`], if [`Self::reset_task_cur`] indicates that more tasks are available. + /// Resets [`Self::reset_task_cur`] to indicate that no new tasks are available. + /// Returns true if [`Self::cur`] was moved and thus that more tasks are likely available. + /// + /// The task thread lock, [`Self::lock`], must be held. + pub(crate) fn reset_cur_index(&self) -> bool { + // `reset_task_cur` is either an index + `first` if a new task has been marked, + // or it's `u32::MAX` if we haven't had any tasks added since the last reset. + let new_cur = self.reset_task_cur.swap(u32::MAX, atomic::Ordering::SeqCst); + + // If it's `u32::MAX`, then bail without moving `cur`. + if new_cur == u32::MAX { + return false; + } + + // `cur` is atomic for interior mutability not concurrency, it's ok to cache. + let cur = self.cur.get(); + + // This is a common case optimisation. If `cur` is at the start, we cannot do better. + if cur == 0 { + return false; + } + + // It should always be the case that `reset_task_cur >= first`. However, + // we're not mutating the two variables under the one lock, + // nor are we doing something like a 16-byte CAS to keep them consistent, + // so it is possible for them to briefly desynchronise. + // Handle that with a wrapping subtract, which will create a huge positive u32 value, + // which will never be used because it will always be worse than a real `cur`. + // Later on, the delayed fg task of the frame, or the init task of the new frame, + // will cause new tasks to be created, or another running task will signal progress, + // any of which will end up getting our threads going again, + // so there shouldn't be any risk of a hang. + let new_cur = new_cur.wrapping_sub(self.first.load(atomic::Ordering::SeqCst)); + + // If it's better than our existing index, then reset `cur` and say we did something. + if new_cur < cur { + self.cur.set(new_cur); + true + } else { + false + } + } + + /// Equivalent to [`Self::mark_new_tasks_for_index`], + /// then [`Self::reset_cur_index`], but more efficient. + /// + /// The task thread lock, [`Self::lock`], must be held. + pub(crate) fn mark_and_reset_cur(&self, mut frame_idx: u32, n_fc: u32) { + // We want to reset `cur` to the better of `frame_idx` or `reset_task_cur`, + // if that's an improvement. + + // We must always reset this even if we are about to bail. + let reset_cur = self.reset_task_cur.swap(u32::MAX, atomic::Ordering::SeqCst); + + // `cur` is atomic for interior mutability not concurrency, it's ok to cache. + let cur = self.cur.get(); + + // This is a common case optimisation. If `cur` is at the start, we cannot do better. + if cur == 0 { + return; + } + + let first = self.first.load(atomic::Ordering::SeqCst); + + // This will never be called with `u32::MAX`, so we do not need to special case that. + if frame_idx < first { + frame_idx += n_fc; + } + + let min_reset_cur = u32::min(frame_idx, reset_cur); + // See `reset_cur_index` for why `wrapping_sub` is right here. + let new_cur = min_reset_cur.wrapping_sub(first); + if new_cur < cur { + self.cur.set(new_cur); + } + } +} + #[derive(Default)] #[repr(C)] pub(crate) struct Rav1dContextRefs { @@ -391,7 +605,7 @@ unsafe impl Send for Rav1dContext {} // See discussion in https://github.com/memorysafety/rav1d/pull/1329 unsafe impl Sync for Rav1dContext {} -#[derive(Default)] +#[derive(Default, Clone, PartialEq, Eq)] #[repr(C)] pub struct Rav1dTask { // frame thread id @@ -406,8 +620,6 @@ pub struct Rav1dTask { pub recon_progress: c_int, pub deblock_progress: c_int, pub deps_skip: RelaxedAtomic, - // only used in task queue - pub next: Atomic, } impl Rav1dTask { @@ -421,28 +633,69 @@ impl Rav1dTask { ..Default::default() } } +} - pub fn next(&self) -> Rav1dTaskIndex { - self.next.load(Ordering::SeqCst) - } - - pub fn set_next(&self, next: Rav1dTaskIndex) { - self.next.store(next, Ordering::SeqCst) +impl PartialOrd for Rav1dTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) } } -impl Rav1dTask { - pub fn without_next(&self) -> Self { - Self { - frame_idx: self.frame_idx, - tile_idx: self.tile_idx, - type_0: self.type_0, - sby: self.sby, - recon_progress: self.recon_progress, - deblock_progress: self.deblock_progress, - deps_skip: self.deps_skip.clone(), - next: Default::default(), +impl Ord for Rav1dTask { + /// We want `sort` to put tasks in priority order. + /// To do that, we return: + /// - `Less` if `self` is of higher priority than `other` + /// - `Greater` if `self` is of lower priority than `other` + /// + /// This is farily straightforwardly translated from `insert_tasks` in `task_thread.c`, + /// and as such inherits the limitations of that function. Specifically, + /// it requires that there are no Init, InitCdf or film grain tasks, + /// and that there are no duplicate tasks. + fn cmp(&self, other: &Self) -> cmp::Ordering { + // entropy coding precedes other steps + if other.type_0 == TaskType::TileEntropy { + if self.type_0 > TaskType::TileEntropy { + return cmp::Ordering::Greater; + } + // both are entropy + if self.sby > other.sby { + return cmp::Ordering::Greater; + } + if self.sby < other.sby { + return cmp::Ordering::Less; + } + // same sby + } else { + if self.type_0 == TaskType::TileEntropy { + return cmp::Ordering::Less; + } + if self.sby > other.sby { + return cmp::Ordering::Greater; + } + if self.sby < other.sby { + return cmp::Ordering::Less; + } + // same sby + if self.type_0 > other.type_0 { + return cmp::Ordering::Greater; + } + if self.type_0 < other.type_0 { + return cmp::Ordering::Less; + } + // same task type + } + + // sort by tile-id + assert!( + self.type_0 == TaskType::TileReconstruction || self.type_0 == TaskType::TileEntropy + ); + assert!(self.type_0 == other.type_0); + assert!(other.sby == self.sby); + assert!(self.tile_idx != other.tile_idx); + if self.tile_idx < other.tile_idx { + return cmp::Ordering::Less; } + return cmp::Ordering::Greater; } } @@ -664,13 +917,6 @@ pub struct Rav1dFrameContextLf { pub restore_planes: LrRestorePlanes, } -#[derive(Default)] -#[repr(C)] -pub struct Rav1dFrameContextTaskThreadPendingTasks { - pub head: Rav1dTaskIndex, - pub tail: Rav1dTaskIndex, -} - #[derive(Default)] #[repr(C)] pub(crate) struct Rav1dFrameContextTaskThread { diff --git a/src/lib.rs b/src/lib.rs index f7b4083a5..779a35d5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -552,22 +552,7 @@ fn drain_picture(c: &Rav1dContext, state: &mut Rav1dState, out: &mut Rav1dPictur } let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - let cur = c.task_thread.cur.get(); - if cur != 0 && (cur as usize) < c.fc.len() { - c.task_thread.cur.set(cur - 1); - } + c.task_thread.advance_first(c.fc.len()); drained = true; } else if drained { break; diff --git a/src/obu.rs b/src/obu.rs index 59d99bcc3..6d23b3050 100644 --- a/src/obu.rs +++ b/src/obu.rs @@ -2499,23 +2499,7 @@ fn parse_obus( let out_delayed = &mut state.frame_thread.out_delayed[next as usize]; if out_delayed.p.data.is_some() || fc.task_thread.error.load(Ordering::SeqCst) != 0 { - let first = c.task_thread.first.load(Ordering::SeqCst); - if first as usize + 1 < c.fc.len() { - c.task_thread.first.fetch_add(1, Ordering::SeqCst); - } else { - c.task_thread.first.store(0, Ordering::SeqCst); - } - let _ = c.task_thread.reset_task_cur.compare_exchange( - first, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - if c.task_thread.cur.get() != 0 - && (c.task_thread.cur.get() as usize) < c.fc.len() - { - c.task_thread.cur.update(|cur| cur - 1); - } + c.task_thread.advance_first(c.fc.len()); } let error = &mut *fc.task_thread.retval.try_lock().unwrap(); if error.is_some() { diff --git a/src/relaxed_atomic.rs b/src/relaxed_atomic.rs index 59011b368..ac0aed300 100644 --- a/src/relaxed_atomic.rs +++ b/src/relaxed_atomic.rs @@ -59,3 +59,11 @@ impl From for RelaxedAtomic { Self::new(value) } } + +impl PartialEq for RelaxedAtomic { + fn eq(&self, other: &Self) -> bool { + self.get() == other.get() + } +} + +impl Eq for RelaxedAtomic {} diff --git a/src/thread_task.rs b/src/thread_task.rs index 9caf374d5..7f60deda9 100644 --- a/src/thread_task.rs +++ b/src/thread_task.rs @@ -1,12 +1,10 @@ use std::ffi::{c_int, c_uint}; -use std::num::NonZeroU32; -use std::ops::{Add, AddAssign, Deref}; +use std::ops::Deref; use std::process::abort; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::Arc; use std::{cmp, mem, thread}; -use atomig::{Atom, Atomic}; use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::cdf::rav1d_cdf_thread_update; @@ -30,265 +28,56 @@ use crate::internal::{ TaskType, }; use crate::iter::wrapping_iter; -use crate::relaxed_atomic::RelaxedAtomic; pub const FRAME_ERROR: u32 = u32::MAX - 1; pub const TILE_ERROR: i32 = i32::MAX - 1; -/// This function resets the cur pointer to the first frame theoretically -/// executable after a task completed (ie. each time we update some progress or -/// insert some tasks in the queue). -/// When frame_idx is set, it can be either from a completed task, or from tasks -/// inserted in the queue, in which case we have to make sure the cur pointer -/// isn't past this insert. -/// The special case where frame_idx is UINT_MAX is to handle the reset after -/// completing a task and locklessly signaling progress. In this case we don't -/// enter a critical section, which is needed for this function, so we set an -/// atomic for a delayed handling, happening here. Meaning we can call this -/// function without any actual update other than what's in the atomic, hence -/// this special case. -#[inline] -fn reset_task_cur(c: &Rav1dContext, ttd: &TaskThreadData, mut frame_idx: c_uint) -> c_int { - fn curr_found(c: &Rav1dContext, ttd: &TaskThreadData, first: usize) -> c_int { - for fc in wrapping_iter(c.fc.iter(), first + ttd.cur.get() as usize) { - fc.task_thread.tasks.cur_prev.set(Rav1dTaskIndex::NONE); - } - return 1; - } - let min_frame_idx: c_uint; - let cur_frame_idx: c_uint; - let first = ttd.first.load(Ordering::SeqCst); - let mut reset_frame_idx: c_uint = ttd.reset_task_cur.swap(u32::MAX, Ordering::SeqCst); - if reset_frame_idx < first { - if frame_idx == u32::MAX { - return 0 as c_int; - } - reset_frame_idx = u32::MAX; - } - if ttd.cur.get() == 0 - && c.fc[first as usize] - .task_thread - .tasks - .cur_prev - .get() - .is_none() - { - return 0 as c_int; - } - if reset_frame_idx != u32::MAX { - if frame_idx == u32::MAX { - if reset_frame_idx > first.wrapping_add(ttd.cur.get()) { - return 0 as c_int; - } - ttd.cur.set(reset_frame_idx.wrapping_sub(first)); - return curr_found(c, ttd, first as usize); - } - } else { - if frame_idx == u32::MAX { - return 0 as c_int; - } - } - if frame_idx < first { - frame_idx += c.fc.len() as c_uint; - } - min_frame_idx = cmp::min(reset_frame_idx, frame_idx); - cur_frame_idx = first.wrapping_add(ttd.cur.get()); - if (ttd.cur.get() as usize) < c.fc.len() && cur_frame_idx < min_frame_idx { - return 0 as c_int; - } - ttd.cur.set(min_frame_idx.wrapping_sub(first)); - while (ttd.cur.get() as usize) < c.fc.len() { - if c.fc[((first + ttd.cur.get()) as usize) % c.fc.len()] - .task_thread - .tasks - .head - .load(Ordering::SeqCst) - .is_some() - { - break; - } - ttd.cur.update(|cur| cur + 1); - } - return curr_found(c, ttd, first as usize); -} - -#[inline] -fn reset_task_cur_async(ttd: &TaskThreadData, mut frame_idx: c_uint, n_frames: c_uint) { - let first = ttd.first.load(Ordering::SeqCst); - if frame_idx < first { - frame_idx += n_frames; - } - let mut last_idx = frame_idx; - loop { - frame_idx = last_idx; - last_idx = ttd.reset_task_cur.swap(frame_idx, Ordering::SeqCst); - if !(last_idx < frame_idx) { - break; - } - } - if frame_idx == first && ttd.first.load(Ordering::SeqCst) != first { - let _ = ttd.reset_task_cur.compare_exchange( - frame_idx, - u32::MAX, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } -} - #[derive(Default)] pub struct Rav1dTasks { // TODO: probably should be a VecDeque, we need to empty this and I don't think we do yet. tasks: RwLock>, pending_tasks: Mutex>, pending_tasks_merge: AtomicBool, - - pub head: Atomic, - // Points to the task directly before the cur pointer in the queue. - // This cur pointer is theoretical here, we actually keep track of the - // "prev_t" variable. This is needed to not loose the tasks in - // [head;cur-1] when picking one for execution. - pub cur_prev: RelaxedAtomic, } impl Rav1dTasks { - fn insert_tasks_between( - &self, - c: &Rav1dContext, - first: Rav1dTaskIndex, - last: Rav1dTaskIndex, - a: Rav1dTaskIndex, - b: Rav1dTaskIndex, - cond_signal: c_int, - ) { + fn insert_tasks(&self, c: &Rav1dContext, frame_idx: c_uint, cond_signal: c_int) { let ttd = &*c.task_thread; if c.flush.load(Ordering::SeqCst) { return; } - if a.is_some() { - assert_eq!(self.index(a).next(), b); - self.index(a).set_next(first); - } else { - self.head.store(first, Ordering::SeqCst); - } - self.index(last).set_next(b); - reset_task_cur(c, ttd, self.index(first).frame_idx); + ttd.mark_and_reset_cur(frame_idx, c.fc.len() as u32); if cond_signal != 0 && ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } } - fn insert_tasks( - &self, - c: &Rav1dContext, - first: Rav1dTaskIndex, - last: Rav1dTaskIndex, - cond_signal: c_int, - ) { - // insert task back into task queue - let mut prev_t = Rav1dTaskIndex::NONE; - let mut t = self.head.load(Ordering::SeqCst); - while t.is_some() { - 'next: { - // entropy coding precedes other steps - if self.index(t).type_0 == TaskType::TileEntropy { - if self.index(first).type_0 > TaskType::TileEntropy { - break 'next; - } - // both are entropy - if self.index(first).sby > self.index(t).sby { - break 'next; - } - if self.index(first).sby < self.index(t).sby { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same sby - } else { - if self.index(first).type_0 == TaskType::TileEntropy { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - if self.index(first).sby > self.index(t).sby { - break 'next; - } - if self.index(first).sby < self.index(t).sby { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same sby - if self.index(first).type_0 > self.index(t).type_0 { - break 'next; - } - if (self.index(first).type_0) < self.index(t).type_0 { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - // same task type - } - - // sort by tile-id - assert!( - self.index(first).type_0 == TaskType::TileReconstruction - || self.index(first).type_0 == TaskType::TileEntropy - ); - assert!(self.index(first).type_0 == self.index(t).type_0); - assert!(self.index(t).sby == self.index(first).sby); - let t_tile_idx = self.index(first).tile_idx; - let p_tile_idx = self.index(t).tile_idx; - assert!(t_tile_idx != p_tile_idx); - if !(t_tile_idx > p_tile_idx) { - self.insert_tasks_between(c, first, last, prev_t, t, cond_signal); - return; - } - } - // next: - prev_t = t; - t = self.index(t).next(); + fn push(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) { + let frame_idx = task.frame_idx; + { + let mut tasks = self.tasks.try_write().unwrap(); + let idx = match tasks.binary_search(&task) { + Ok(_) => panic!("Tried to add an identical task!"), + Err(idx) => idx, + }; + tasks.insert(idx, task); } - self.insert_tasks_between(c, first, last, prev_t, Rav1dTaskIndex::NONE, cond_signal); - } - - fn push(&self, task: Rav1dTask) -> Rav1dTaskIndex { - let mut tasks = self.tasks.try_write().unwrap(); - tasks.push(task); - // 1-based index into tasks, so we use length after pushing - Rav1dTaskIndex(NonZeroU32::new(tasks.len() as u32)) + self.insert_tasks(c, frame_idx, cond_signal); } pub fn clear(&self) { self.tasks.try_write().unwrap().clear(); self.pending_tasks.try_lock().unwrap().clear(); self.pending_tasks_merge.store(false, Ordering::SeqCst); - self.head.store(Default::default(), Ordering::Relaxed); - self.cur_prev.set(Default::default()); } - pub fn remove(&self, t: Rav1dTaskIndex, prev_t: Rav1dTaskIndex) -> Option { - let next_t = self.index(t).next(); - if prev_t.is_some() { - self.index(prev_t) - .next - .compare_exchange(t, next_t, Ordering::SeqCst, Ordering::SeqCst) - .ok()?; - } else { - self.head - .compare_exchange(t, next_t, Ordering::SeqCst, Ordering::SeqCst) - .ok()?; - } - self.index(t).set_next(Rav1dTaskIndex::NONE); - Some(self.index(t).without_next()) + pub fn remove(&self, t: usize) -> Rav1dTask { + self.tasks.try_write().unwrap().remove(t) } #[inline] - fn index<'a>(&'a self, index: Rav1dTaskIndex) -> impl Deref + 'a { - if let Some(index) = index.raw_index() { - RwLockReadGuard::map(self.tasks.try_read().unwrap(), |tasks| { - &tasks[index as usize] - }) - } else { - panic!("Cannot index with None"); - } + fn index<'a>(&'a self, index: usize) -> impl Deref + 'a { + RwLockReadGuard::map(self.tasks.try_read().unwrap(), |tasks| &tasks[index]) } #[inline] @@ -301,68 +90,60 @@ impl Rav1dTasks { fn merge_pending_frame(&self, c: &Rav1dContext) -> bool { let merge = self.pending_tasks_merge.swap(false, Ordering::SeqCst); if merge { - let mut pending_tasks = self.pending_tasks.lock(); - let range = { - let mut tasks = self.tasks.try_write().unwrap(); - if self.head.load(Ordering::Relaxed).is_none() { - tasks.clear(); + let mut tasks = self.tasks.try_write().unwrap(); + let new_start = tasks.len(); + + { + let mut pending_tasks = self.pending_tasks.lock(); + if pending_tasks.len() == 0 { + // This is a 'spurious' merge: that is, `merge` is set but the vector is empty. + // + // We always populate the vector under the lock, release the lock, and then set `merge`. + // We do not synchronise the vector and the flag. So it can fall out of alignment, e.g., + // if we have 2 threads: + // 1. say we begin with `merge` set and tasks in the `pending_tasks` vector. + // 2. thread A begins `merge_pending_frame`, and clears `pending_tasks_merge`. + // 3. thread B goes to add a pending task and takes the vector lock. + // 4. thread A waits on the vector lock. + // 5. thread B inserts some tasks, releases the lock, and sets `pending_tasks_merge`. + // 6. thread A can now take the lock, and drain the vector. + // We now have an empty vector: A has drained both the original contents, + // and the task added by thread B - but `pending_tasks_merge` is true, + // because it was set by B. + // + // So, if `merge_pending_frame` is called before another pending task is added, + // we will get this spurious merge. + return false; } - let start = tasks.len() as u32; tasks.extend(pending_tasks.drain(..)); - start..tasks.len() as u32 - }; - - for i in range { - // 1-based index, so we have to add 1 - let task_idx = Rav1dTaskIndex(NonZeroU32::new(i + 1)); - self.insert_tasks(c, task_idx, task_idx, 0); } - } - merge - } -} - -impl Rav1dFrameContextTaskThread { - fn insert_task(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) -> Rav1dTaskIndex { - let idx = self.tasks.push(task); - self.tasks.insert_tasks(c, idx, idx, cond_signal); - idx - } -} - -/// 1-based index into the task queue vector. 0 is reserved for None. -#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Debug, Atom)] -#[repr(transparent)] -pub struct Rav1dTaskIndex(Option); - -impl Rav1dTaskIndex { - pub const NONE: Self = Self(None); - - // Return the zero-based index into the task queue vector or `None` - pub fn raw_index(self) -> Option { - self.0.map(|i| i.get() - 1) - } - pub fn is_none(self) -> bool { - self.0.is_none() - } + // Unlike in the C code, we only do a single `insert_tasks`. There are two reasons: + // + // - in the C code, the `insert_tasks` is what creates the sorted linked list. + // In the Rust code, the sorting is done via sorting the `Vec` explictly. + // + // - The key remaining function of is updating our position in the frame contexts, + // by updating the `reset_task_cur` and `cur` variables. Here, as in C, + // every task we're merging is for the same frame context, so the arguments are always the same, + // and nothing is changed by trying to repeatedly do the same update. + let frame_idx = tasks[new_start].frame_idx; + self.insert_tasks(c, frame_idx, 0); + tasks.sort(); + } - pub fn is_some(self) -> bool { - self.0.is_some() + merge } -} -impl Add for Rav1dTaskIndex { - type Output = Self; - - fn add(self, rhs: u32) -> Self::Output { - Self(self.0.and_then(|i| NonZeroU32::new(i.get() + rhs))) + /// Are there non-pending tasks in the task list? + pub fn is_empty(&self) -> bool { + self.tasks.try_read().unwrap().is_empty() } } -impl AddAssign for Rav1dTaskIndex { - fn add_assign(&mut self, rhs: u32) { - *self = *self + rhs; +impl Rav1dFrameContextTaskThread { + fn insert_task(&self, c: &Rav1dContext, task: Rav1dTask, cond_signal: c_int) { + self.tasks.push(c, task, cond_signal); } } @@ -448,7 +229,6 @@ pub(crate) fn rav1d_task_create_tile_sbrow( }, frame_idx: fc.index as c_uint, tile_idx: tile_idx as c_uint, - next: Default::default(), }; pending_tasks.push(t); } @@ -512,7 +292,7 @@ fn ensure_progress<'l, 'ttd: 'l>( type_0, recon_progress: 0, deblock_progress: t.sby, - ..t.without_next() + ..t.clone() }; f.task_thread.tasks.add_pending(t); *task_thread_lock = Some(ttd.lock.lock()); @@ -770,7 +550,6 @@ pub fn rav1d_worker_task(task_thread: Arc) { let ttd = &*ttd_clone; fn park<'ttd>( - c: &Rav1dContext, tc: &mut Rav1dTaskContext, ttd: &TaskThreadData, task_thread_lock: &mut MutexGuard<'ttd, ()>, @@ -781,13 +560,13 @@ pub fn rav1d_worker_task(task_thread: Arc) { ttd.cond_signaled.store(0, Ordering::SeqCst); ttd.cond.wait(task_thread_lock); tc.task_thread.flushed.set(false); - reset_task_cur(c, ttd, u32::MAX); + ttd.reset_cur_index(); } let mut task_thread_lock = Some(ttd.lock.lock()); 'outer: while !tc.task_thread.die.get() { if c.flush.load(Ordering::SeqCst) { - park(c, &mut tc, ttd, task_thread_lock.as_mut().unwrap()); + park(&mut tc, ttd, task_thread_lock.as_mut().unwrap()); continue 'outer; } @@ -798,7 +577,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { continue 'outer; } - let (fc, t_idx, prev_t) = 'found: { + let (fc, t_idx, new_t) = 'found: { if c.fc.len() > 1 { // run init tasks second 'init_tasks: for fc in @@ -808,13 +587,14 @@ pub fn rav1d_worker_task(task_thread: Arc) { if fc.task_thread.init_done.load(Ordering::SeqCst) != 0 { continue 'init_tasks; } - let t_idx = tasks.head.load(Ordering::SeqCst); - if t_idx.is_none() { + if tasks.is_empty() { continue 'init_tasks; } + let t_idx = 0; + let t = tasks.index(t_idx); if t.type_0 == TaskType::Init { - break 'found (fc, t_idx, Rav1dTaskIndex::NONE); + break 'found (fc, t_idx, None); } if t.type_0 == TaskType::InitCdf { // XXX This can be a simple else, if adding tasks of both @@ -834,7 +614,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { fc.task_thread .error .fetch_or((p1 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx, Rav1dTaskIndex::NONE); + break 'found (fc, t_idx, None); } } } @@ -845,126 +625,112 @@ pub fn rav1d_worker_task(task_thread: Arc) { let fc = &c.fc[(first + ttd.cur.get()) as usize % c.fc.len()]; let tasks = &fc.task_thread.tasks; tasks.merge_pending_frame(c); - let mut prev_t = tasks.cur_prev.get(); - let mut t_idx = if prev_t.is_some() { - tasks.index(prev_t).next() - } else { - tasks.head.load(Ordering::SeqCst) - }; - while t_idx.is_some() { - let t = tasks.index(t_idx); - 'next: { - if t.type_0 == TaskType::InitCdf { - break 'next; - } - if matches!( - t.type_0, - TaskType::TileEntropy | TaskType::TileReconstruction - ) { - // We need to block here because we are seeing rare - // contention. The fields we access out of - // `Rav1dFrameData` are probably ok to read - // concurrently with other tasks writing, but we - // haven't separated out these fields. - let f = fc.data.read(); + 'tasks: for (t_idx, t) in tasks.tasks.try_read().unwrap().iter().enumerate() { + if t.type_0 == TaskType::InitCdf { + continue 'tasks; + } + if matches!( + t.type_0, + TaskType::TileEntropy | TaskType::TileReconstruction + ) { + // We need to block here because we are seeing rare + // contention. The fields we access out of + // `Rav1dFrameData` are probably ok to read + // concurrently with other tasks writing, but we + // haven't separated out these fields. + let f = fc.data.read(); - // if not bottom sbrow of tile, this task will be re-added - // after it's finished - if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { - break 'found (fc, t_idx, prev_t); - } - } else if t.recon_progress != 0 { - // We need to block here because we are seeing rare - // contention. - let f = fc.data.read(); - let p = t.type_0 == TaskType::EntropyProgress; - let error = fc.task_thread.error.load(Ordering::SeqCst); - let done = fc.task_thread.done[p as usize].load(Ordering::SeqCst); - assert!(done == 0 || error != 0, "done: {done}, error: {error}"); - let frame_hdr = fc.frame_hdr(); - let tile_row_base = frame_hdr.tiling.cols as c_int - * f.frame_thread.next_tile_row[p as usize].get(); - if p { - let p1_0 = fc.frame_thread_progress.entropy.load(Ordering::SeqCst); - if p1_0 < t.sby { - break 'next; - } - fc.task_thread - .error - .fetch_or((p1_0 == TILE_ERROR) as c_int, Ordering::SeqCst); - } - for tc_0 in 0..frame_hdr.tiling.cols { - let ts = &f.ts[(tile_row_base + tc_0 as c_int) as usize]; - let p2 = ts.progress[p as usize].load(Ordering::SeqCst); - if p2 < t.recon_progress { - break 'next; - } - fc.task_thread - .error - .fetch_or((p2 == TILE_ERROR) as c_int, Ordering::SeqCst); - } - if (t.sby + 1) < f.sbh { - // add sby+1 to list to replace this one - let next_t = Rav1dTask { - sby: t.sby + 1, - recon_progress: t.sby + 2, - ..t.without_next() - }; - let ntr = f.frame_thread.next_tile_row[p as usize].get() + 1; - let start = frame_hdr.tiling.row_start_sb[ntr as usize] as c_int; - if next_t.sby == start { - f.frame_thread.next_tile_row[p as usize].set(ntr); - } - drop(t); - fc.task_thread.insert_task(c, next_t, 0); - } - break 'found (fc, t_idx, prev_t); - } else if t.type_0 == TaskType::Cdef { - let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() - [(t.sby - 1 >> 5) as usize] - .load(Ordering::SeqCst); - if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { - break 'found (fc, t_idx, prev_t); + // if not bottom sbrow of tile, this task will be re-added + // after it's finished + if check_tile(&f, &fc.task_thread, &t, (c.fc.len() > 1) as c_int) == 0 { + break 'found (fc, t_idx, None); + } + } else if t.recon_progress != 0 { + // We need to block here because we are seeing rare + // contention. + let f = fc.data.read(); + let p = t.type_0 == TaskType::EntropyProgress; + let error = fc.task_thread.error.load(Ordering::SeqCst); + let done = fc.task_thread.done[p as usize].load(Ordering::SeqCst); + assert!(done == 0 || error != 0, "done: {done}, error: {error}"); + let frame_hdr = fc.frame_hdr(); + let tile_row_base = frame_hdr.tiling.cols as c_int + * f.frame_thread.next_tile_row[p as usize].get(); + if p { + let p1_0 = fc.frame_thread_progress.entropy.load(Ordering::SeqCst); + if p1_0 < t.sby { + continue 'tasks; } - } else { - if t.deblock_progress == 0 { - unreachable!(); + fc.task_thread + .error + .fetch_or((p1_0 == TILE_ERROR) as c_int, Ordering::SeqCst); + } + for tc_0 in 0..frame_hdr.tiling.cols { + let ts = &f.ts[(tile_row_base + tc_0 as c_int) as usize]; + let p2 = ts.progress[p as usize].load(Ordering::SeqCst); + if p2 < t.recon_progress { + continue 'tasks; } - let p1_2 = fc.frame_thread_progress.deblock.load(Ordering::SeqCst); - if p1_2 >= t.deblock_progress { - fc.task_thread - .error - .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); - break 'found (fc, t_idx, prev_t); + fc.task_thread + .error + .fetch_or((p2 == TILE_ERROR) as c_int, Ordering::SeqCst); + } + if (t.sby + 1) < f.sbh { + // add sby+1 to list to replace this one + let next_t = Rav1dTask { + sby: t.sby + 1, + recon_progress: t.sby + 2, + ..t.clone() + }; + let ntr = f.frame_thread.next_tile_row[p as usize].get() + 1; + let start = frame_hdr.tiling.row_start_sb[ntr as usize] as c_int; + if next_t.sby == start { + f.frame_thread.next_tile_row[p as usize].set(ntr); } + break 'found (fc, t_idx, Some(next_t)); + } + break 'found (fc, t_idx, None); + } else if t.type_0 == TaskType::Cdef { + let p1_1 = fc.frame_thread_progress.copy_lpf.try_read().unwrap() + [(t.sby - 1 >> 5) as usize] + .load(Ordering::SeqCst); + if p1_1 as c_uint & (1 as c_uint) << (t.sby - 1 & 31) != 0 { + break 'found (fc, t_idx, None); + } + } else { + if t.deblock_progress == 0 { + unreachable!(); + } + let p1_2 = fc.frame_thread_progress.deblock.load(Ordering::SeqCst); + if p1_2 >= t.deblock_progress { + fc.task_thread + .error + .fetch_or((p1_2 == TILE_ERROR) as c_int, Ordering::SeqCst); + break 'found (fc, t_idx, None); } } - // next: - prev_t = t_idx; - t_idx = t.next(); - tasks.cur_prev.set(prev_t); } ttd.cur.update(|cur| cur + 1); } - if reset_task_cur(c, ttd, u32::MAX) != 0 { + if ttd.reset_cur_index() { continue 'outer; } if merge_pending(c) != 0 { continue 'outer; } - park(c, &mut tc, ttd, task_thread_lock.as_mut().unwrap()); + park(&mut tc, ttd, task_thread_lock.as_mut().unwrap()); continue 'outer; }; // found: // remove t from list - let Some(mut t) = fc.task_thread.tasks.remove(t_idx, prev_t) else { - // Another thread already consumed the task - eprintln!("Task {t_idx:?} already consumed"); - continue 'outer; - }; - if t.type_0 > TaskType::InitCdf - && fc.task_thread.tasks.head.load(Ordering::SeqCst).is_none() - { + let mut t = fc.task_thread.tasks.remove(t_idx); + + // insert new task into list if one was given + if let Some(new_t) = new_t { + fc.task_thread.insert_task(c, new_t, 0); + } + + if t.type_0 > TaskType::InitCdf && fc.task_thread.tasks.is_empty() { ttd.cur.update(|cur| cur + 1); } // we don't need to check cond_signaled here, since we found a task @@ -997,7 +763,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); abort_frame(c, fc, res.and_then(|_| Err(Rav1dError::InvalidArgument))); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); } else { t.type_0 = TaskType::InitCdf; if p1_3 != 0 { @@ -1089,7 +855,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); abort_frame(c, fc, res_0); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); fc.task_thread.init_done.store(1, Ordering::SeqCst); } continue 'outer; @@ -1122,7 +888,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { t.deps_skip = 0.into(); if check_tile(&f, &fc.task_thread, &t, uses_2pass) == 0 { ts.progress[p_1 as usize].store(progress, Ordering::SeqCst); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1137,7 +903,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { assert!(task_thread_lock.is_none(), "thread lock should not be held"); task_thread_lock = Some(ttd.lock.lock()); ts.progress[p_1 as usize].store(progress, Ordering::SeqCst); - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); error_0 = fc.task_thread.error.load(Ordering::SeqCst); let frame_hdr = &***f.frame_hdr.as_ref().unwrap(); if frame_hdr.refresh_context != 0 @@ -1229,7 +995,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { if error_0 != 0 { TILE_ERROR } else { sby + 1 }, Ordering::SeqCst, ); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1268,7 +1034,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { (f.bd_fn().filter_sbrow_cdef)(c, &f, &mut tc, sby); } drop(f); - reset_task_cur_async(ttd, t.frame_idx, c.fc.len() as u32); + ttd.mark_new_tasks_for_index(t.frame_idx, c.fc.len() as u32); if ttd.cond_signaled.fetch_or(1, Ordering::SeqCst) == 0 { ttd.cond.notify_one(); } @@ -1338,7 +1104,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { task_thread_lock = Some(ttd.lock.lock()); let num_tasks = fc.task_thread.task_counter.fetch_sub(1, Ordering::SeqCst) - 1; if (sby + 1) < sbh && num_tasks != 0 { - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } if num_tasks == 0 @@ -1359,7 +1125,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { ); fc.task_thread.cond.notify_one(); } - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS @@ -1393,7 +1159,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { task_thread_lock = Some(ttd.lock.lock()); let num_tasks_0 = fc.task_thread.task_counter.fetch_sub(1, Ordering::SeqCst) - 1; if (sby + 1) < sbh && num_tasks_0 != 0 { - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); continue 'outer; } if num_tasks_0 == 0 @@ -1414,7 +1180,7 @@ pub fn rav1d_worker_task(task_thread: Arc) { ); fc.task_thread.cond.notify_one(); } - reset_task_cur(c, ttd, t.frame_idx); + ttd.mark_and_reset_cur(t.frame_idx, c.fc.len() as u32); break 'found_unlocked; }