160 lines
4.5 KiB
Rust
160 lines
4.5 KiB
Rust
//! Thread-local channel context.
|
|
|
|
use super::select::Selected;
|
|
use super::waker::current_thread_id;
|
|
use crate::cell::Cell;
|
|
use crate::ptr;
|
|
use alloc_crate::sync::Arc;
|
|
use crate::sync::atomic::{Atomic, AtomicPtr, AtomicUsize, Ordering};
|
|
use crate::thread::{self, Thread};
|
|
use crate::time::Instant;
|
|
|
|
/// Thread-local context.
|
|
#[derive(Debug, Clone)]
|
|
pub struct Context {
|
|
inner: Arc<Inner>,
|
|
}
|
|
|
|
/// Inner representation of `Context`.
|
|
#[derive(Debug)]
|
|
struct Inner {
|
|
/// Selected operation.
|
|
select: Atomic<usize>,
|
|
|
|
/// A slot into which another thread may store a pointer to its `Packet`.
|
|
packet: Atomic<*mut ()>,
|
|
|
|
/// Thread handle.
|
|
thread: Thread,
|
|
|
|
/// Thread id.
|
|
thread_id: usize,
|
|
}
|
|
|
|
impl Context {
|
|
/// Creates a new context for the duration of the closure.
|
|
#[inline]
|
|
pub fn with<F, R>(f: F) -> R
|
|
where
|
|
F: FnOnce(&Context) -> R,
|
|
{
|
|
thread_local! {
|
|
/// Cached thread-local context.
|
|
static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
|
|
}
|
|
|
|
let mut f = Some(f);
|
|
let mut f = |cx: &Context| -> R {
|
|
let f = f.take().unwrap();
|
|
f(cx)
|
|
};
|
|
|
|
CONTEXT
|
|
.try_with(|cell| match cell.take() {
|
|
None => f(&Context::new()),
|
|
Some(cx) => {
|
|
cx.reset();
|
|
let res = f(&cx);
|
|
cell.set(Some(cx));
|
|
res
|
|
}
|
|
})
|
|
.unwrap_or_else(|_| f(&Context::new()))
|
|
}
|
|
|
|
/// Creates a new `Context`.
|
|
#[cold]
|
|
fn new() -> Context {
|
|
Context {
|
|
inner: Arc::new(Inner {
|
|
select: AtomicUsize::new(Selected::Waiting.into()),
|
|
packet: AtomicPtr::new(ptr::null_mut()),
|
|
thread: thread::current_or_unnamed(),
|
|
thread_id: current_thread_id(),
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Resets `select` and `packet`.
|
|
#[inline]
|
|
fn reset(&self) {
|
|
self.inner.select.store(Selected::Waiting.into(), Ordering::Release);
|
|
self.inner.packet.store(ptr::null_mut(), Ordering::Release);
|
|
}
|
|
|
|
/// Attempts to select an operation.
|
|
///
|
|
/// On failure, the previously selected operation is returned.
|
|
#[inline]
|
|
pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
|
|
self.inner
|
|
.select
|
|
.compare_exchange(
|
|
Selected::Waiting.into(),
|
|
select.into(),
|
|
Ordering::AcqRel,
|
|
Ordering::Acquire,
|
|
)
|
|
.map(|_| ())
|
|
.map_err(|e| e.into())
|
|
}
|
|
|
|
/// Stores a packet.
|
|
///
|
|
/// This method must be called after `try_select` succeeds and there is a packet to provide.
|
|
#[inline]
|
|
pub fn store_packet(&self, packet: *mut ()) {
|
|
if !packet.is_null() {
|
|
self.inner.packet.store(packet, Ordering::Release);
|
|
}
|
|
}
|
|
|
|
/// Waits until an operation is selected and returns it.
|
|
///
|
|
/// If the deadline is reached, `Selected::Aborted` will be selected.
|
|
///
|
|
/// # Safety
|
|
/// This may only be called from the thread this `Context` belongs to.
|
|
#[inline]
|
|
pub unsafe fn wait_until(&self, deadline: Option<Instant>) -> Selected {
|
|
loop {
|
|
// Check whether an operation has been selected.
|
|
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
|
|
if sel != Selected::Waiting {
|
|
return sel;
|
|
}
|
|
|
|
// If there's a deadline, park the current thread until the deadline is reached.
|
|
if let Some(end) = deadline {
|
|
let now = Instant::now();
|
|
|
|
if now < end {
|
|
// SAFETY: guaranteed by caller.
|
|
unsafe { self.inner.thread.park_timeout(end - now) };
|
|
} else {
|
|
// The deadline has been reached. Try aborting select.
|
|
return match self.try_select(Selected::Aborted) {
|
|
Ok(()) => Selected::Aborted,
|
|
Err(s) => s,
|
|
};
|
|
}
|
|
} else {
|
|
// SAFETY: guaranteed by caller.
|
|
unsafe { self.inner.thread.park() };
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Unparks the thread this context belongs to.
|
|
#[inline]
|
|
pub fn unpark(&self) {
|
|
self.inner.thread.unpark();
|
|
}
|
|
|
|
/// Returns the id of the thread this context belongs to.
|
|
#[inline]
|
|
pub fn thread_id(&self) -> usize {
|
|
self.inner.thread_id
|
|
}
|
|
}
|