Single-Producer, Single-Consumer queue, unverified Rust source

// Ordinary Rust code, not Verus use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; struct Queue<T> { buffer: Vec<UnsafeCell<MaybeUninit<T>>>, head: AtomicU64, tail: AtomicU64, } pub struct Producer<T> { queue: Arc<Queue<T>>, tail: usize, } pub struct Consumer<T> { queue: Arc<Queue<T>>, head: usize, } pub fn new_queue<T>(len: usize) -> (Producer<T>, Consumer<T>) { // Create a vector of UnsafeCells to serve as the ring buffer let mut backing_cells_vec = Vec::<UnsafeCell<MaybeUninit<T>>>::new(); while backing_cells_vec.len() < len { let cell = UnsafeCell::new(MaybeUninit::uninit()); backing_cells_vec.push(cell); } // Initialize head and tail to 0 (empty) let head_atomic = AtomicU64::new(0); let tail_atomic = AtomicU64::new(0); // Package it all into a queue object, and make a reference-counted pointer to it // so it can be shared by the Producer and the Consumer. let queue = Queue::<T> { head: head_atomic, tail: tail_atomic, buffer: backing_cells_vec }; let queue_arc = Arc::new(queue); let prod = Producer::<T> { queue: queue_arc.clone(), tail: 0 }; let cons = Consumer::<T> { queue: queue_arc, head: 0 }; (prod, cons) } impl<T> Producer<T> { pub fn enqueue(&mut self, t: T) { // Loop: if the queue is full, then block until it is not. loop { let len = self.queue.buffer.len(); // Calculate the index of the slot right after `tail`, wrapping around // if necessary. If the enqueue is successful, then we will be updating // the `tail` to this value. let next_tail = if self.tail + 1 == len { 0 } else { self.tail + 1 }; // Get the current `head` value from the shared atomic. let head = self.queue.head.load(Ordering::SeqCst); // Check to make sure there is room. (We can't advance the `tail` pointer // if it would become equal to the head, since `tail == head` denotes // an empty state.) // If there's no room, we'll just loop and try again. if head != next_tail as u64 { // Here's the unsafe part: writing the given `t` value into the `UnsafeCell`. unsafe { (*self.queue.buffer[self.tail].get()).write(t); } // Update the `tail` (both the shared atomic and our local copy). self.queue.tail.store(next_tail as u64, Ordering::SeqCst); self.tail = next_tail; // Done. return; } } } } impl<T> Consumer<T> { pub fn dequeue(&mut self) -> T { // Loop: if the queue is empty, then block until it is not. loop { let len = self.queue.buffer.len(); // Calculate the index of the slot right after `head`, wrapping around // if necessary. If the enqueue is successful, then we will be updating // the `head` to this value. let next_head = if self.head + 1 == len { 0 } else { self.head + 1 }; // Get the current `tail` value from the shared atomic. let tail = self.queue.tail.load(Ordering::SeqCst); // Check to see if the queue is nonempty. // If it's empty, we'll just loop and try again. if self.head as u64 != tail { // Load the stored message from the UnsafeCell // (replacing it with "uninitialized" memory). let t = unsafe { let mut tmp = MaybeUninit::uninit(); std::mem::swap(&mut *self.queue.buffer[self.head].get(), &mut tmp); tmp.assume_init() }; // Update the `head` (both the shared atomic and our local copy). self.queue.head.store(next_head as u64, Ordering::SeqCst); self.head = next_head; // Done. Return the value we loaded out of the buffer. return t; } } } } fn main() { let (mut producer, mut consumer) = new_queue(20); producer.enqueue(5); let _x = consumer.dequeue(); }