// Ordinary Rust code, not Verus
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct Queue<T> {
buffer: Vec<UnsafeCell<MaybeUninit<T>>>,
head: AtomicU64,
tail: AtomicU64,
}
pub struct Producer<T> {
queue: Arc<Queue<T>>,
tail: usize,
}
pub struct Consumer<T> {
queue: Arc<Queue<T>>,
head: usize,
}
pub fn new_queue<T>(len: usize) -> (Producer<T>, Consumer<T>) {
// Create a vector of UnsafeCells to serve as the ring buffer
let mut backing_cells_vec = Vec::<UnsafeCell<MaybeUninit<T>>>::new();
while backing_cells_vec.len() < len {
let cell = UnsafeCell::new(MaybeUninit::uninit());
backing_cells_vec.push(cell);
}
// Initialize head and tail to 0 (empty)
let head_atomic = AtomicU64::new(0);
let tail_atomic = AtomicU64::new(0);
// Package it all into a queue object, and make a reference-counted pointer to it
// so it can be shared by the Producer and the Consumer.
let queue = Queue::<T> { head: head_atomic, tail: tail_atomic, buffer: backing_cells_vec };
let queue_arc = Arc::new(queue);
let prod = Producer::<T> { queue: queue_arc.clone(), tail: 0 };
let cons = Consumer::<T> { queue: queue_arc, head: 0 };
(prod, cons)
}
impl<T> Producer<T> {
pub fn enqueue(&mut self, t: T) {
// Loop: if the queue is full, then block until it is not.
loop {
let len = self.queue.buffer.len();
// Calculate the index of the slot right after `tail`, wrapping around
// if necessary. If the enqueue is successful, then we will be updating
// the `tail` to this value.
let next_tail = if self.tail + 1 == len { 0 } else { self.tail + 1 };
// Get the current `head` value from the shared atomic.
let head = self.queue.head.load(Ordering::SeqCst);
// Check to make sure there is room. (We can't advance the `tail` pointer
// if it would become equal to the head, since `tail == head` denotes
// an empty state.)
// If there's no room, we'll just loop and try again.
if head != next_tail as u64 {
// Here's the unsafe part: writing the given `t` value into the `UnsafeCell`.
unsafe {
(*self.queue.buffer[self.tail].get()).write(t);
}
// Update the `tail` (both the shared atomic and our local copy).
self.queue.tail.store(next_tail as u64, Ordering::SeqCst);
self.tail = next_tail;
// Done.
return;
}
}
}
}
impl<T> Consumer<T> {
pub fn dequeue(&mut self) -> T {
// Loop: if the queue is empty, then block until it is not.
loop {
let len = self.queue.buffer.len();
// Calculate the index of the slot right after `head`, wrapping around
// if necessary. If the enqueue is successful, then we will be updating
// the `head` to this value.
let next_head = if self.head + 1 == len { 0 } else { self.head + 1 };
// Get the current `tail` value from the shared atomic.
let tail = self.queue.tail.load(Ordering::SeqCst);
// Check to see if the queue is nonempty.
// If it's empty, we'll just loop and try again.
if self.head as u64 != tail {
// Load the stored message from the UnsafeCell
// (replacing it with "uninitialized" memory).
let t = unsafe {
let mut tmp = MaybeUninit::uninit();
std::mem::swap(&mut *self.queue.buffer[self.head].get(), &mut tmp);
tmp.assume_init()
};
// Update the `head` (both the shared atomic and our local copy).
self.queue.head.store(next_head as u64, Ordering::SeqCst);
self.head = next_head;
// Done. Return the value we loaded out of the buffer.
return t;
}
}
}
}
fn main() {
let (mut producer, mut consumer) = new_queue(20);
producer.enqueue(5);
let _x = consumer.dequeue();
}