Single-Producer, Single-Consumer queue, tutorial
Here, we’ll walk through an example of verifying a single-producer, single-consumer queue. Specifically, we’re interested in the following interface:
type Producer<T>
type Consumer<T>
impl<T> Producer<T> {
pub fn enqueue(&mut self, t: T)
}
impl<T> Consumer<T> {
pub fn dequeue(&mut self) -> T
}
pub fn new_queue<T>(len: usize) -> (Producer<T>, Consumer<T>)
Unverified implementation
First, let’s discuss the reference implementation, written in ordinary Rust, that we’re going to be verifying (an equivalent of).
The implementation is going to use a ring buffer of fixed length len
,
with a head
and a tail
pointer,
with the producer adding new entries to the tail
and
the consumer popping old entries from the head
.
Thus the entries in the range [head, tail)
will always be full.
If head == tail
then the ring buffer will be considered empty,
and if head > tail
, then the interval wraps around.
The crucial design choice is what data structure to use for the buffer itself. The key requirements of the buffer are:
- A reference to the buffer memory must be shared between threads (the producer and the consumer)
- Each entry might or might not store a valid
T
element at any given point in time.
In our unverified Rust implementation, we’ll let each entry be an UnsafeCell
.
The UnsafeCell
gives us interior mutability, so that we can
read and write the contents from multiple threads without
any extra metadata associated to each entry. UnsafeCell
is of course, as the name suggests, unsafe, meaning that it’s up to the programmer to ensure the all these reads and writes are performed safely. For our purposes, safely mostly means data-race-free.
More specifically, we’ll use an UnsafeCell<MaybeUninit<T>>
for each entry.
The MaybeUninit
allows for the possibility that an entry is uninitialized. Like with UnsafeCell
, there are no
runtime safety checks, so it is entirely upon the programmer to make sure it doesn’t try to read
from the entry when it’s uninitialized.
Hang on, why not just use
Option<T>
?To be safer, we could use an
Option<T>
instead ofMaybeUninit<T>
, but we are already doing low-level data management anyway, and anOption<T>
would be less efficient. In particular, if we used anOption<T>
, then popping an entry out of the queue would mean we having to writeNone
back into the queue to signify its emptiness. WithMaybeUninit<T>
, we can just move theT
out and leave the memory “uninitialized” without actually having to overwrite its bytes.)
So the buffer will be represented by UnsafeCell<MaybeUninit<T>>
.
We’ll also use atomics to represent the head
and tail
.
struct Queue<T> {
buffer: Vec<UnsafeCell<MaybeUninit<T>>>,
head: AtomicU64,
tail: AtomicU64,
}
The producer and consumer types will each have a reference to the queue.
Also, the producer will have a redundant copy of the tail
(which slightly reduces contended
access to the shared atomic tail
), and likewise,
the consumer will have a redundant copy of the head
.
(This is possible because we only have a single producer and consumer each
the producer is the only entity that ever updates the tail
and
the consumer is the only entity that ever updates the head
.)
pub struct Producer<T> {
queue: Arc<Queue<T>>,
tail: usize,
}
pub struct Consumer<T> {
queue: Arc<Queue<T>>,
head: usize,
}
Finally, we come to the actual implementation:
pub fn new_queue<T>(len: usize) -> (Producer<T>, Consumer<T>) {
// Create a vector of UnsafeCells to serve as the ring buffer
let mut backing_cells_vec = Vec::<UnsafeCell<MaybeUninit<T>>>::new();
while backing_cells_vec.len() < len {
let cell = UnsafeCell::new(MaybeUninit::uninit());
backing_cells_vec.push(cell);
}
// Initialize head and tail to 0 (empty)
let head_atomic = AtomicU64::new(0);
let tail_atomic = AtomicU64::new(0);
// Package it all into a queue object, and make a reference-counted pointer to it
// so it can be shared by the Producer and the Consumer.
let queue = Queue::<T> { head: head_atomic, tail: tail_atomic, buffer: backing_cells_vec };
let queue_arc = Arc::new(queue);
let prod = Producer::<T> { queue: queue_arc.clone(), tail: 0 };
let cons = Consumer::<T> { queue: queue_arc, head: 0 };
(prod, cons)
}
impl<T> Producer<T> {
pub fn enqueue(&mut self, t: T) {
// Loop: if the queue is full, then block until it is not.
loop {
let len = self.queue.buffer.len();
// Calculate the index of the slot right after `tail`, wrapping around
// if necessary. If the enqueue is successful, then we will be updating
// the `tail` to this value.
let next_tail = if self.tail + 1 == len { 0 } else { self.tail + 1 };
// Get the current `head` value from the shared atomic.
let head = self.queue.head.load(Ordering::SeqCst);
// Check to make sure there is room. (We can't advance the `tail` pointer
// if it would become equal to the head, since `tail == head` denotes
// an empty state.)
// If there's no room, we'll just loop and try again.
if head != next_tail as u64 {
// Here's the unsafe part: writing the given `t` value into the `UnsafeCell`.
unsafe {
(*self.queue.buffer[self.tail].get()).write(t);
}
// Update the `tail` (both the shared atomic and our local copy).
self.queue.tail.store(next_tail as u64, Ordering::SeqCst);
self.tail = next_tail;
// Done.
return;
}
}
}
}
impl<T> Consumer<T> {
pub fn dequeue(&mut self) -> T {
// Loop: if the queue is empty, then block until it is not.
loop {
let len = self.queue.buffer.len();
// Calculate the index of the slot right after `head`, wrapping around
// if necessary. If the enqueue is successful, then we will be updating
// the `head` to this value.
let next_head = if self.head + 1 == len { 0 } else { self.head + 1 };
// Get the current `tail` value from the shared atomic.
let tail = self.queue.tail.load(Ordering::SeqCst);
// Check to see if the queue is nonempty.
// If it's empty, we'll just loop and try again.
if self.head as u64 != tail {
// Load the stored message from the UnsafeCell
// (replacing it with "uninitialized" memory).
let t = unsafe {
let mut tmp = MaybeUninit::uninit();
std::mem::swap(&mut *self.queue.buffer[self.head].get(), &mut tmp);
tmp.assume_init()
};
// Update the `head` (both the shared atomic and our local copy).
self.queue.head.store(next_head as u64, Ordering::SeqCst);
self.head = next_head;
// Done. Return the value we loaded out of the buffer.
return t;
}
}
}
}
Verified implementation
With verification, we always need to start with the question of what, exactly, we are
verifying. In this case, we aren’t actually going to add any new specifications to
the enqueue
or dequeue
functions; our only aim is to implement the queue and
show that it is memory-type-safe, e.g., that dequeue
returns a well-formed T
value without
exhibiting any undefined behavior.
Showing this is not actually trivial! The unverified Rust code above used unsafe
code,
which means memory-type-safety is not a given.
In our verified queue implementation, we will need a safe, verified alternative to the
unsafe
code. We’ll start with introducing our verified alternative to UnsafeCell<MaybeUninit<T>>
.
Verified interior mutability with PCell
In Verus, PCell
(standing for “permissioned cell”) is the verified equivalent.
(By equivalent, we mean that, subject to inlining and the optimizations doing what we expect,
it ought to generate the same machine code.)
Unlike UnsafeCell
, the optional-initializedness is built-in, so PCell<T>
can stand in
for UnsafeCell<MaybeUninit<T>>
.
In order to verify our use of PCell
, Verus requires the user to present a special permission token
on each access (read or write) to the PCell
. Each PCell
has a unique identifier (given by pcell.id()
)
and each permission token connects an identifier to the (possibly uninitialized value) stored at the cell.
In the permission token, this value is represented as an Option
type, though the option tag has no runtime representation.
fn main() {
// Construct a new pcell and obtain the permission for it.
let (pcell, Tracked(mut perm)) = PCell::<u64>::empty();
// Initially, cell is unitialized, and the `perm` token
// represents that as the value `None`.
// The meaning of the permission token is given by its _view_, here `perm@`.
//
// The expression `pcell_opt![ pcell.id() => Option::None ]` can be read as roughly,
// "the cell with value pcell.id() has value None".
assert(perm@ === pcell_opt![ pcell.id() => Option::None ]);
// The above could also be written by accessing the fields of the
// `PointsToData` struct:
assert(perm@.pcell === pcell.id());
assert(perm@.value === Option::None);
// We can write a value to the pcell (thus initializing it).
// This only requires an `&` reference to the PCell, but it does
// mutate the `perm` token.
pcell.put(Tracked(&mut perm), 5);
// Having written the value, this is reflected in the token:
assert(perm@ === pcell_opt![ pcell.id() => Option::Some(5) ]);
// We can take the value back out:
let x = pcell.take(Tracked(&mut perm));
// Which leaves it uninitialized again:
assert(x == 5);
assert(perm@ === pcell_opt![ pcell.id() => Option::None ]);
}
After erasure, the above code reduces to something like this:
fn main() {
let pcell = PCell::<u64>::empty();
pcell.put(5);
let x = pcell.take();
}
Using PCell
in a verified queue.
Let’s look back at the Rust code from above (the code that used UnsafeCell
) and mark six points of interest:
four places where we manipulate an atomic and two where we manipulate a cell:
impl<T> Producer<T> {
pub fn enqueue(&mut self, t: T) {
loop {
let len = self.queue.buffer.len();
let next_tail = if self.tail + 1 == len
{ 0 } else { self.tail + 1 };
let head = self.queue.head.load(Ordering::SeqCst); + produce_start (atomic load)
+
if head != next_tail as u64 { +
unsafe { +
(*self.queue.buffer[self.tail].get()).write(t); + write to cell
} +
+
self.queue.tail.store(next_tail as u64, Ordering::SeqCst); + produce_end (atomic store)
self.tail = next_tail;
return;
}
}
}
}
impl<T> Consumer<T> {
pub fn dequeue(&mut self) -> T {
loop {
let len = self.queue.buffer.len();
let next_head = if self.head + 1 == len
{ 0 } else { self.head + 1 };
let tail = self.queue.tail.load(Ordering::SeqCst); + consume_start (atomic load)
+
if self.head as u64 != tail { +
let t = unsafe { +
let mut tmp = MaybeUninit::uninit(); +
std::mem::swap( + read from cell
&mut *self.queue.buffer[self.head].get(), +
&mut tmp); +
tmp.assume_init() +
}; +
+
self.queue.head.store(next_head as u64, Ordering::SeqCst); + consume_end (atomic store)
self.head = next_head;
return t;
}
}
}
}
Now, if we’re going to be using a PCell
instead of an UnsafeCell
, then at the two points where we manipulate the cell,
we are somehow going to need to have the permission token at those points.
Furthermore, we have four points that manipulate atomics. Informally, these atomic operations let us synchronize access to the cell in a data-race-free way. Formally, in the verified setting, these atomics will let us transfer control of the permission tokens that we need to access the cells.
Specifically:
enqueue
needs to obtain the permission atproduce_start
, use it to perform a write, and relinquish it atproduce_end
.dequeue
needs to obtain the permission atconsume_start
, use it to perform a read, and relinquish it atconsume_end
.
Woah, woah, woah. Why is this so complicated? We marked the 6 places of interest, so now let’s go build a
tokenized_state_machine!
with those 6 transitions already!Good question. That approach might have worked if we were using an atomic to store the value
T
instead of aPCell
(although this would, of course, require theT
to be word-sized).However, the
PCell
requires its own considerations. The crucial point is that reading or writing toPCell
is non-atomic. That sounds tautological, but I’m not just talking about the name of the type here. By atomic or non-atomic, I’m actually referring to the atomicity of the operation in the execution model. We can’t freely abstractPCell
operations as atomic operations that allow arbitrary interleaving.Okay, so what would go wrong in Verus if we tried?
Recall how it works for atomic memory locations. With an atomic memory location, we can access it any time if we just have an atomic invariant for it. We open the invariant (acquiring permission to access the value along with any ghost data), perform the operation, which is atomic, and then close the invariant. In this scenario, the invariant is restored after a single atomic operation, as is necessary.
But we can’t do the same for
PCell
.PCell
operations are non-atomic, so we can’t perform aPCell
read or write while an atomic invariant is open. Thus, thePCell
’s permission token needs to be transferred at the points in the program where we are performing atomic operations, that is, at the four marked atomic operations.
Abstracting the program while handling permissions
Verus’s tokenized_state_machine!
supports a notion called storage.
An instance of a protocol is allowed to store tokens, which client threads can operate on
by temporarily “checking them out”. This provides a convenient means for transferring ownership of
tokens through the system.
Think of the instance as like a library. (Actually, think of it like a network of libraries with an inter-library loan system, where the librarians tirelessly follow a protocol to make sure any given library has a book whenever a patron is ready to check a book out.)
Anyway, the tokenized_state_machine!
we’ll use here should look something like this:
- It should be able to store all the permission tokens for each
PCell
in the buffer. - It should have 4 transitions:
produce_start
should allow the producer to “check out” the permission for the cell that it is ready to write to.produce_end
should allow the producer to “check back in” the permission that it checked out.consume_start
should allow the consumer to “check out” the permission for the cell that it is ready to read from.consume_end
should allow the consumer to “check back in” the permission that it checked out.
Hang on, I read ahead and learned that Verus’s storage mechanism has a different mechanism for handling reads. Why aren’t we using that for the consumer?
A few reasons.
- That mechanism is useful specifically for read-sharing, which we aren’t using here.
- We actually can’t use it, since the “read” isn’t really a read. Well, at the byte-level, it should just be a read. But we actually do change the value that is stored there in the high-level program semantics: we move the value out and replaced it with an “uninitialized” value. And we have to do it this way, unless
T
implementedCopy
.
The producer and consumer each have a small “view into the world”: each one might have access to one of the permission tokens if they have it “checked out”, but that’s it.
To understand the state machine protocol we’re going to build, we have to look at the protocol dually to the producer and the consumer. If the producer and consumer might have 0 or 1 permissions checked out at a given time, then complementarily, the protocol state should be “almost all of the permissions, except possibly up to 2 permissions that are currently checked out”.
For example, here is the full sequence of operations for an enqueue
step, both from the perspective of the producer
and of the storage protocol:
operation | Producer’s perspective | Storage protocol’s perspective |
---|---|---|
produce_start | receives a permission for an uninitialized cell | lends out a permission for an uninitialized cell |
write to the cell | writes to the cell with PCell::put | |
produce_end | returns a permission for the now-initialized cell | receives back the permission, now initialized |
And here is the storage protocol’s perspective, graphically:
Building the tokenized_state_machine!
Now that we finally have a handle on the protocol, let’s implement it. It should have all the following state:
- The value of the shared
head
atomic - The value of the shared
tail
atomic - The producer’s state
- Is the producer step in progress?
- Local
tail
field saved by the producer
- The consumer’s state
- Is the consumer step in progress?
- Local
head
field saved by the producer
- The IDs of cells in the buffer (so we know what permissions we’re meant to be storing)
- The permissions that are actually stored.
And now, in code:
#[is_variant]
pub enum ProducerState {
Idle(nat), // local copy of tail
Producing(nat),
}
#[is_variant]
pub enum ConsumerState {
Idle(nat), // local copy of head
Consuming(nat),
}
tokenized_state_machine!{FifoQueue<T> {
fields {
// IDs of the cells used in the ring buffer.
// These are fixed throughout the protocol.
#[sharding(constant)]
pub backing_cells: Seq<CellId>,
// All the stored permissions
#[sharding(storage_map)]
pub storage: Map<nat, cell::PointsTo<T>>,
// Represents the shared `head` field
#[sharding(variable)]
pub head: nat,
// Represents the shared `tail` field
#[sharding(variable)]
pub tail: nat,
// Represents the local state of the single-producer
#[sharding(variable)]
pub producer: ProducerState,
// Represents the local state of the single-consumer
#[sharding(variable)]
pub consumer: ConsumerState,
}
// ...
}}
As you can see, Verus allows us to utilize storage by declaring it in the sharding strategy.
Here, the strategy we use is storage_map
. In the storage_map
strategy,
the stored token items are given by the values in the map.
The keys in the map can be an arbitrary type that we choose to use as an index to refer to
stored items. Here, the index we’ll use will just the index into the queue.
Thus keys of the storage
map will take on values in the range [0, len)
.
Hmm. For a second, I thought you were going to use the cell IDs as keys in the map. Could it work that way as well?
Yes, although it would be slightly more complicated. For one, we’d need to track an invariant that all the IDs are distinct, just to show that there aren’t overlapping keys. But that’s an extra invariant to prove that we don’t need if we do it this way. Much easier to declare a correspondance between
backing_cells[i]
andstorage[i]
for eachi
.Wait, does that mean we aren’t going to prove an invariant that all the IDs are distinct? That can’t possibly be right, right?
It does mean that!
Certainly, it is true that in any execution of the program, the IDs of the cells are going to be distinct, but this isn’t something we need to track ourselves as users of the
PCell
library.You might be familiar with an approach where we have some kind of “heap model,” which maps addresses to values. When we update one entry, we have to show how nothing else of interest is changing. But like I just said, we aren’t using that sort of heap model in the
Fifo
protocol, we’re just indexing based on queue index. And we don’t rely on any sort of heap model like that in the implementations ofenqueue
ordequeue
either; there, we use the separated permission model.
Now, let’s dive into the transitions. Let’s start with produce_start
transition.
transition!{
produce_start() {
// In order to begin, we have to be in ProducerState::Idle.
require(pre.producer.is_Idle());
// We'll be comparing the producer's _local_ copy of the tail
// with the _shared_ version of the head.
let tail = pre.producer.get_Idle_0();
let head = pre.head;
assert(0 <= tail && tail < pre.backing_cells.len());
// Compute the incremented tail, wrapping around if necessary.
let next_tail = Self::inc_wrap(tail, pre.backing_cells.len());
// We have to check that the buffer isn't full to proceed.
require(next_tail != head);
// Update the producer's local state to be in the `Producing` state.
update producer = ProducerState::Producing(tail);
// Withdraw ("check out") the permission stored at index `tail`.
// This creates a proof obligation for the transition system to prove that
// there is actually a permission stored at this index.
withdraw storage -= [tail => let perm] by {
assert(pre.valid_storage_at_idx(tail));
};
// The transition needs to guarantee to the client that the
// permission they are checking out:
// (i) is for the cell at index `tail` (the IDs match)
// (ii) the permission indicates that the cell is empty
assert(
perm@.pcell === pre.backing_cells.index(tail as int)
&& perm@.value.is_None()
) by {
assert(!pre.in_active_range(tail));
assert(pre.valid_storage_at_idx(tail));
};
}
}
It’s a doozy, but we just need to break it down into three parts:
- The enabling conditions (
require
).- The client needs to exhibit that these conditions hold
(e.g., by doing the approparite comparison between the
head
andtail
values) in order to perform the transition.
- The client needs to exhibit that these conditions hold
(e.g., by doing the approparite comparison between the
- The stuff that gets updated:
- We
update
the producer’s local state - We
withdraw
(“check out”) the permission, which is represented simply by removing the key from the map.
- We
- Guarantees about the result of the transition.
- These guarantees will follow from internal invariants about the
FifoQueue
system - In this case, we care about guarantees on the permission token that is checked out.
- These guarantees will follow from internal invariants about the
Now, let’s see produce_end
. The main difference, here, is that the client is checking the permission token
back into the system, which means we have to provide the guarantees about the permission token
in the enabing condition rather than in a post-guarantee.
transition!{
// This transition is parameterized by the value of the permission
// being inserted. Since the permission is being deposited
// (coming from "outside" the system) we can't compute it as a
// function of the current state, unlike how we did it for the
// `produce_start` transition).
produce_end(perm: cell::PointsTo<T>) {
// In order to complete the produce step,
// we have to be in ProducerState::Producing.
require(pre.producer.is_Producing());
let tail = pre.producer.get_Producing_0();
assert(0 <= tail && tail < pre.backing_cells.len());
// Compute the incremented tail, wrapping around if necessary.
let next_tail = Self::inc_wrap(tail, pre.backing_cells.len());
// This time, we don't need to compare the `head` and `tail` - we already
// check that, and anyway, we don't have access to the `head` field
// for this transition. (This transition is supposed to occur while
// modifying the `tail` field, so we can't do both.)
// However, we _do_ need to check that the permission token being
// checked in satisfies its requirements. It has to be associated
// with the correct cell, and it has to be full.
require(perm@.pcell === pre.backing_cells.index(tail as int)
&& perm@.value.is_Some());
// Perform our updates. Update the tail to the computed value,
// both the shared version and the producer's local copy.
// Also, move back to the Idle state.
update producer = ProducerState::Idle(next_tail);
update tail = next_tail;
// Check the permission back into the storage map.
deposit storage += [tail => perm] by { assert(pre.valid_storage_at_idx(tail)); };
}
}
Check the full source for the consume_start
and consume_end
transitions, which are pretty similar,
and for the invariants we use to prove that the transitions are all well-formed.
Verified Implementation
For the implementation, let’s start with the definitions for the Producer
, Consumer
, and Queue
structs,
which are based on the ones from the unverified implementation, augmented with proof
variables.
The Producer
, for example, gets a proof token for the producer: ProducerState
field.
The well-formedness condition here demands us to be in the ProducerState::Idle
state
(in every call to enqueue
, we must start and end in the Idle
state).
pub struct Producer<T> {
queue: Arc<Queue<T>>,
tail: usize,
producer: Tracked<FifoQueue::producer<T>>,
}
impl<T> Producer<T> {
pub closed spec fn wf(&self) -> bool {
(*self.queue).wf()
&& self.producer@.instance_id() == (*self.queue).instance@.id()
&& self.producer@.value() == ProducerState::Idle(self.tail as nat)
&& (self.tail as int) < (*self.queue).buffer@.len()
}
}
For the Queue
type itself, we add an atomic invariant for the head
and tail
fields:
struct_with_invariants!{
struct Queue<T> {
buffer: Vec<PCell<T>>,
head: AtomicU64<_, FifoQueue::head<T>, _>,
tail: AtomicU64<_, FifoQueue::tail<T>, _>,
instance: Tracked<FifoQueue::Instance<T>>,
}
pub closed spec fn wf(&self) -> bool {
predicate {
// The Cell IDs in the instance protocol match the cell IDs in the actual vector:
&&& self.instance@.backing_cells().len() == self.buffer@.len()
&&& forall|i: int| 0 <= i && i < self.buffer@.len() as int ==>
self.instance@.backing_cells().index(i) ===
self.buffer@.index(i).id()
}
invariant on head with (instance) is (v: u64, g: FifoQueue::head<T>) {
&&& g.instance_id() === instance@.id()
&&& g.value() == v as int
}
invariant on tail with (instance) is (v: u64, g: FifoQueue::tail<T>) {
&&& g.instance_id() === instance@.id()
&&& g.value() == v as int
}
}
}
Now we can implement and verify enqueue
:
impl<T> Producer<T> {
fn enqueue(&mut self, t: T)
requires
old(self).wf(),
ensures
self.wf(),
{
// Loop: if the queue is full, then block until it is not.
loop
invariant
self.wf(),
{
let queue = &*self.queue;
let len = queue.buffer.len();
assert(0 <= self.tail && self.tail < len);
// Calculate the index of the slot right after `tail`, wrapping around
// if necessary. If the enqueue is successful, then we will be updating
// the `tail` to this value.
let next_tail = if self.tail + 1 == len {
0
} else {
self.tail + 1
};
let tracked cell_perm: Option<cell::PointsTo<T>>;
// Get the current `head` value from the shared atomic.
let head =
atomic_with_ghost!(&queue.head => load();
returning head;
ghost head_token => {
// If `head != next_tail`, then we proceed with the operation.
// We check here, ghostily, in the `open_atomic_invariant` block if that's the case.
// If so, we proceed with the `produce_start` transition
// and obtain the cell permission.
cell_perm = if head != next_tail as u64 {
let tracked cp = queue.instance.borrow().produce_start(&head_token, self.producer.borrow_mut());
Option::Some(cp)
} else {
Option::None
};
}
);
// Here's where we "actually" do the `head != next_tail` check:
if head != next_tail as u64 {
// Unwrap the cell_perm from the option.
let tracked mut cell_perm = match cell_perm {
Option::Some(cp) => cp,
Option::None => {
assert(false);
proof_from_false()
},
};
// Write the element t into the buffer, updating the cell
// from uninitialized to initialized (to the value t).
queue.buffer[self.tail].put(Tracked(&mut cell_perm), t);
// Store the updated tail to the shared `tail` atomic,
// while performing the `produce_end` transition.
atomic_with_ghost!(&queue.tail => store(next_tail as u64); ghost tail_token => {
queue.instance.borrow().produce_end(cell_perm,
cell_perm, &mut tail_token, self.producer.borrow_mut());
});
self.tail = next_tail;
return ;
}
}
}
}