use super::collectible::{Collectible, Link};
use super::exit_guard::ExitGuard;
use super::maybe_std::fence as maybe_std_fence;
use super::{Epoch, Tag};
use std::ptr::{self, addr_of_mut, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
use std::sync::atomic::{AtomicPtr, AtomicU8};
#[derive(Debug, Default)]
#[repr(align(128))]
pub(super) struct Collector {
state: AtomicU8,
announcement: Epoch,
next_epoch_update: u8,
has_garbage: bool,
num_readers: u32,
previous_instance_link: Option<NonNull<dyn Collectible>>,
current_instance_link: Option<NonNull<dyn Collectible>>,
next_instance_link: Option<NonNull<dyn Collectible>>,
next_link: AtomicPtr<Collector>,
link: Link,
}
#[derive(Debug, Default)]
pub(super) struct CollectorRoot {
epoch: AtomicU8,
chain_head: AtomicPtr<Collector>,
}
struct CollectorAnchor;
impl Collector {
const CADENCE: u8 = u8::MAX;
const INACTIVE: u8 = 1_u8 << 2;
const INVALID: u8 = 1_u8 << 3;
#[inline]
pub(super) fn current() -> *mut Collector {
LOCAL_COLLECTOR.with(|local_collector| {
let mut collector_ptr = local_collector.load(Relaxed);
if collector_ptr.is_null() {
collector_ptr = COLLECTOR_ANCHOR.with(CollectorAnchor::alloc);
local_collector.store(collector_ptr, Relaxed);
}
collector_ptr
})
}
#[inline]
pub(super) unsafe fn new_guard(collector_ptr: *mut Collector, collect_garbage: bool) {
if (*collector_ptr).num_readers == 0 {
debug_assert_eq!(
(*collector_ptr).state.load(Relaxed) & Self::INACTIVE,
Self::INACTIVE
);
(*collector_ptr).num_readers = 1;
let new_epoch = Epoch::from_u8(GLOBAL_ROOT.epoch.load(Relaxed));
if cfg!(feature = "loom") || cfg!(not(any(target_arch = "x86", target_arch = "x86_64")))
{
(*collector_ptr).state.store(new_epoch.into(), Relaxed);
maybe_std_fence(SeqCst);
} else {
(*collector_ptr).state.swap(new_epoch.into(), SeqCst);
}
if (*collector_ptr).announcement != new_epoch {
(*collector_ptr).announcement = new_epoch;
if collect_garbage {
let mut exit_guard =
ExitGuard::new((collector_ptr, false), |(collector_ptr, result)| {
if !result {
Self::end_guard(collector_ptr);
}
});
Collector::epoch_updated(exit_guard.0);
exit_guard.1 = true;
}
}
} else {
debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INACTIVE, 0);
assert_ne!(
(*collector_ptr).num_readers,
u32::MAX,
"Too many EBR guards"
);
(*collector_ptr).num_readers += 1;
}
}
#[inline]
pub(super) unsafe fn end_guard(collector_ptr: *mut Collector) {
debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(
(*collector_ptr).state.load(Relaxed),
u8::from((*collector_ptr).announcement)
);
if (*collector_ptr).num_readers == 1 {
(*collector_ptr).num_readers = 0;
if (*collector_ptr).next_epoch_update == 0 {
if (*collector_ptr).has_garbage
|| Tag::into_tag(GLOBAL_ROOT.chain_head.load(Relaxed)) == Tag::Second
{
Collector::scan(collector_ptr);
}
(*collector_ptr).next_epoch_update = if (*collector_ptr).has_garbage {
Self::CADENCE / 4
} else {
Self::CADENCE
};
} else {
(*collector_ptr).next_epoch_update =
(*collector_ptr).next_epoch_update.saturating_sub(1);
}
(*collector_ptr).state.store(
u8::from((*collector_ptr).announcement) | Self::INACTIVE,
Release,
);
} else {
(*collector_ptr).num_readers -= 1;
}
}
#[inline]
pub(super) fn current_epoch() -> Epoch {
Epoch::from_u8(GLOBAL_ROOT.epoch.load(Relaxed))
}
#[inline]
pub(super) fn accelerate(&mut self) {
mark_scan_enforced();
self.next_epoch_update = 0;
}
#[inline]
pub(super) unsafe fn collect(
collector_ptr: *mut Collector,
instance_ptr: *mut dyn Collectible,
) {
if instance_ptr.is_null() {
return;
}
(*instance_ptr).set_next_ptr((*collector_ptr).current_instance_link.take());
(*collector_ptr).current_instance_link = NonNull::new(instance_ptr);
(*collector_ptr).next_epoch_update = (*collector_ptr)
.next_epoch_update
.saturating_sub(1)
.min(Self::CADENCE / 4);
(*collector_ptr).has_garbage = true;
}
#[inline]
pub(super) fn pass_garbage() -> bool {
LOCAL_COLLECTOR.with(|local_collector| {
let collector_ptr = local_collector.load(Relaxed);
if collector_ptr.is_null() {
return true;
}
let collector = unsafe { &*collector_ptr };
if collector.num_readers != 0 {
return false;
}
if collector.has_garbage {
collector.state.fetch_or(Collector::INVALID, Release);
local_collector.store(ptr::null_mut(), Relaxed);
mark_scan_enforced();
}
true
})
}
fn alloc() -> *mut Collector {
let boxed = Box::new(Collector::default());
boxed.state.store(Self::INACTIVE, Relaxed);
let ptr = Box::into_raw(boxed);
let mut current = GLOBAL_ROOT.chain_head.load(Relaxed);
loop {
unsafe {
(*ptr)
.next_link
.store(Tag::unset_tag(current).cast_mut(), Relaxed);
}
let tag = Tag::into_tag(current);
let new = Tag::update_tag(ptr, tag).cast_mut();
if let Err(actual) = GLOBAL_ROOT
.chain_head
.compare_exchange_weak(current, new, Release, Relaxed)
{
current = actual;
} else {
break;
}
}
ptr
}
unsafe fn epoch_updated(collector_ptr: *mut Collector) {
debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(
(*collector_ptr).state.load(Relaxed),
u8::from((*collector_ptr).announcement)
);
let mut garbage_link = (*collector_ptr).next_instance_link.take();
(*collector_ptr).next_instance_link = (*collector_ptr).previous_instance_link.take();
(*collector_ptr).previous_instance_link = (*collector_ptr).current_instance_link.take();
(*collector_ptr).has_garbage = (*collector_ptr).next_instance_link.is_some()
|| (*collector_ptr).previous_instance_link.is_some();
while let Some(instance_ptr) = garbage_link.take() {
garbage_link = (*instance_ptr.as_ptr()).next_ptr();
let mut guard = ExitGuard::new(garbage_link, |mut garbage_link| {
while let Some(instance_ptr) = garbage_link.take() {
garbage_link = (*instance_ptr.as_ptr()).next_ptr();
std::sync::atomic::compiler_fence(Acquire);
Collector::collect(collector_ptr, instance_ptr.as_ptr());
}
});
std::sync::atomic::compiler_fence(Acquire);
drop(Box::from_raw(instance_ptr.as_ptr()));
garbage_link = guard.take();
}
}
unsafe fn clear_for_drop(collector_ptr: *mut Collector) {
for mut link in [
(*collector_ptr).previous_instance_link.take(),
(*collector_ptr).current_instance_link.take(),
(*collector_ptr).next_instance_link.take(),
] {
while let Some(instance_ptr) = link.take() {
link = (*instance_ptr.as_ptr()).next_ptr();
drop(Box::from_raw(instance_ptr.as_ptr()));
}
}
while let Some(link) = (*collector_ptr).current_instance_link.take() {
let mut current = Some(link);
while let Some(instance_ptr) = current.take() {
current = (*instance_ptr.as_ptr()).next_ptr();
drop(Box::from_raw(instance_ptr.as_ptr()));
}
}
}
unsafe fn scan(collector_ptr: *mut Collector) -> bool {
debug_assert_eq!((*collector_ptr).state.load(Relaxed) & Self::INVALID, 0);
let lock_result = Self::lock_chain();
if let Ok(mut current_collector_ptr) = lock_result {
let _guard = ExitGuard::new((), |()| Self::unlock_chain());
let known_epoch = (*collector_ptr).state.load(Relaxed);
let mut update_global_epoch = true;
let mut prev_collector_ptr: *mut Collector = ptr::null_mut();
while !current_collector_ptr.is_null() {
if ptr::eq(collector_ptr, current_collector_ptr) {
prev_collector_ptr = current_collector_ptr;
current_collector_ptr = (*collector_ptr).next_link.load(Relaxed);
continue;
}
let collector_state = (*current_collector_ptr).state.load(Acquire);
let next_collector_ptr = (*current_collector_ptr).next_link.load(Relaxed);
if (collector_state & Self::INVALID) != 0 {
let result = if prev_collector_ptr.is_null() {
GLOBAL_ROOT
.chain_head
.fetch_update(Release, Relaxed, |p| {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
if ptr::eq(Tag::unset_tag(p), current_collector_ptr) {
Some(Tag::update_tag(next_collector_ptr, tag).cast_mut())
} else {
None
}
})
.is_ok()
} else {
(*prev_collector_ptr)
.next_link
.store(next_collector_ptr, Relaxed);
true
};
if result {
Self::collect(collector_ptr, current_collector_ptr);
current_collector_ptr = next_collector_ptr;
continue;
}
} else if (collector_state & Self::INACTIVE) == 0 && collector_state != known_epoch
{
update_global_epoch = false;
break;
}
prev_collector_ptr = current_collector_ptr;
current_collector_ptr = next_collector_ptr;
}
if update_global_epoch {
maybe_std_fence(SeqCst);
GLOBAL_ROOT
.epoch
.store(Epoch::from_u8(known_epoch).next().into(), Relaxed);
return true;
}
}
false
}
unsafe fn clear_chain() -> bool {
let lock_result = Self::lock_chain();
if let Ok(collector_head) = lock_result {
let _guard = ExitGuard::new((), |()| Self::unlock_chain());
let mut current_collector_ptr = collector_head;
while !current_collector_ptr.is_null() {
if ((*current_collector_ptr).state.load(Acquire) & Self::INVALID) == 0 {
return false;
}
current_collector_ptr = (*current_collector_ptr).next_link.load(Relaxed);
}
let result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| {
if Tag::unset_tag(p) == collector_head {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
Some(Tag::update_tag(ptr::null::<Collector>(), tag).cast_mut())
} else {
None
}
});
if result.is_ok() {
let mut current_collector_ptr = collector_head;
while !current_collector_ptr.is_null() {
let next_collector_ptr = (*current_collector_ptr).next_link.load(Relaxed);
drop(Box::from_raw(current_collector_ptr));
current_collector_ptr = next_collector_ptr;
}
return true;
}
}
false
}
fn lock_chain() -> Result<*mut Collector, *mut Collector> {
GLOBAL_ROOT
.chain_head
.fetch_update(Acquire, Acquire, |p| {
let tag = Tag::into_tag(p);
if tag == Tag::First || tag == Tag::Both {
None
} else {
Some(Tag::update_tag(p, Tag::First).cast_mut())
}
})
.map(|p| Tag::unset_tag(p).cast_mut())
}
fn unlock_chain() {
loop {
let result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| {
let tag = Tag::into_tag(p);
debug_assert!(tag == Tag::First || tag == Tag::Both);
let new_tag = if tag == Tag::First {
Tag::None
} else {
Tag::Second
};
Some(Tag::update_tag(p, new_tag).cast_mut())
});
if result.is_ok() {
break;
}
}
}
}
impl Drop for Collector {
#[inline]
fn drop(&mut self) {
let collector_ptr = addr_of_mut!(*self);
unsafe {
Self::clear_for_drop(collector_ptr);
}
}
}
impl Collectible for Collector {
#[inline]
fn next_ptr(&self) -> Option<NonNull<dyn Collectible>> {
self.link.next_ptr()
}
#[inline]
fn set_next_ptr(&self, next_ptr: Option<NonNull<dyn Collectible>>) {
self.link.set_next_ptr(next_ptr);
}
}
impl CollectorAnchor {
fn alloc(&self) -> *mut Collector {
let _: &CollectorAnchor = self;
Collector::alloc()
}
}
impl Drop for CollectorAnchor {
#[inline]
fn drop(&mut self) {
unsafe {
clear_local_collector();
}
}
}
fn mark_scan_enforced() {
let _result = GLOBAL_ROOT.chain_head.fetch_update(Release, Relaxed, |p| {
let new_tag = match Tag::into_tag(p) {
Tag::None => Tag::Second,
Tag::First => Tag::Both,
Tag::Second | Tag::Both => return None,
};
Some(Tag::update_tag(p, new_tag).cast_mut())
});
}
unsafe fn clear_local_collector() {
LOCAL_COLLECTOR.with(|local_collector| {
let collector_ptr = local_collector.load(Relaxed);
if !collector_ptr.is_null() {
(*collector_ptr).state.fetch_or(Collector::INVALID, Release);
}
let mut temp_collector = Collector::default();
temp_collector.state.store(Collector::INACTIVE, Relaxed);
local_collector.store(addr_of_mut!(temp_collector), Release);
if !Collector::clear_chain() {
mark_scan_enforced();
}
Collector::clear_for_drop(addr_of_mut!(temp_collector));
local_collector.store(ptr::null_mut(), Release);
});
}
thread_local! {
static COLLECTOR_ANCHOR: CollectorAnchor = const { CollectorAnchor };
static LOCAL_COLLECTOR: AtomicPtr<Collector> = AtomicPtr::default();
}
static GLOBAL_ROOT: CollectorRoot = CollectorRoot {
epoch: AtomicU8::new(0),
chain_head: AtomicPtr::new(ptr::null_mut()),
};