replace threaded debouncer with threadless debouncer

no_thread
pantonshire 2 years ago
parent d1ed8b3f0e
commit 7780dabed7

@ -1,12 +1,11 @@
use std::{error::Error, panic, thread, time::Duration};
use std::{panic, thread, time::Duration};
use treacle::{fold, Debouncer};
use treacle::{fold, debouncer};
fn main() -> Result<(), Box<dyn Error>> {
fn main() {
// Create a debouncer which combines `u32` events which occur within the same 500ms window by
// pushing them to a `Vec`.
let (debouncer, rx) =
Debouncer::new(Duration::from_millis(500), fold::fold_vec_push::<u32>)?;
let (tx, rx) = debouncer(Duration::from_millis(500), fold::fold_vec_push::<u32>);
// Spawn a thread which receives the debounced events and prints them.
let t = thread::spawn(move || {
@ -17,17 +16,15 @@ fn main() -> Result<(), Box<dyn Error>> {
// Create 100 raw events for the debouncer to debounce.
for i in 0..100 {
debouncer.debounce(i);
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(50));
}
// Dropping the debouncer will gracefully close the mpsc channel, stopping the loop in the
// other thread.
drop(debouncer);
// Drop the tx, so there are no more `DebouncerTx`s left. This will gracefully shut down the
// debouncer, stopping the loop in the other thread once it has received all the events.
drop(tx);
if let Err(err) = t.join() {
panic::resume_unwind(err);
}
Ok(())
}

@ -1,30 +1,24 @@
//! Pre-written fold functions which can be passed to [`Debouncer::new`](crate::Debouncer::new) to
//! Pre-written fold functions which can be passed to [`debouncer`](crate::debouncer) to
//! specify how raw events should be combined into a single debounced event.
/// A fold function which combines events of type `T` into a `Vec<T>` by pushing them.
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::{Debouncer, fold::fold_vec_push};
/// # use treacle::{debouncer, fold::fold_vec_push};
/// use std::{time::Duration, thread};
///
/// let (debouncer, rx) = Debouncer::new(
/// Duration::from_millis(100),
/// fold_vec_push::<u32>
/// )?;
/// let (tx, rx) = debouncer(Duration::from_millis(100), fold_vec_push::<u32>);
///
/// debouncer.debounce(4);
/// debouncer.debounce(5);
/// tx.send(4).unwrap();
/// tx.send(5).unwrap();
///
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// debouncer.debounce(2);
/// debouncer.debounce(1);
/// tx.send(3).unwrap();
/// tx.send(2).unwrap();
/// tx.send(1).unwrap();
///
/// assert_eq!(rx.recv().unwrap(), vec![4, 5]);
/// assert_eq!(rx.recv().unwrap(), vec![3, 2, 1]);
/// # Ok(())
/// # }
/// ```
pub fn fold_vec_push<T>(acc: Option<Vec<T>>, e: T) -> Vec<T> {
let mut acc = acc.unwrap_or_default();
@ -36,26 +30,20 @@ pub fn fold_vec_push<T>(acc: Option<Vec<T>>, e: T) -> Vec<T> {
/// existence of events and not any data associated with them.
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::{Debouncer, fold::fold_unit};
/// # use treacle::{debouncer, fold::fold_unit};
/// use std::{time::Duration, thread};
///
/// let (debouncer, rx) = Debouncer::new(
/// Duration::from_millis(100),
/// fold_unit::<u32>
/// )?;
/// let (tx, rx) = debouncer(Duration::from_millis(100), fold_unit::<u32>);
///
/// debouncer.debounce(4);
/// debouncer.debounce(5);
/// tx.send(4).unwrap();
/// tx.send(5).unwrap();
///
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// debouncer.debounce(2);
/// debouncer.debounce(1);
/// tx.send(3).unwrap();
/// tx.send(2).unwrap();
/// tx.send(1).unwrap();
///
/// assert_eq!(rx.recv().unwrap(), ());
/// assert_eq!(rx.recv().unwrap(), ());
/// # Ok(())
/// # }
/// ```
pub fn fold_unit<T>(_acc: Option<()>, _e: T) {}

@ -1,368 +1,707 @@
pub mod fold;
pub mod next;
use std::{
sync::{Arc, Mutex, Condvar, mpsc},
time::Duration,
thread::{self, JoinHandle},
io,
panic, marker::PhantomData,
time::{Instant, Duration},
collections::VecDeque,
marker::PhantomData,
sync::{Mutex, Condvar, Arc, MutexGuard},
fmt,
error,
};
/// A debouncer for deduplicating groups of events which occur at a similar time. Upon receiving an
/// event, the debouncer waits for a specified duration, during which time any additional events
/// will be considered part of the same group. Once it has finished waiting, it will emit a single
/// event of type `T` via an [mpsc](std::sync::mpsc) channel.
/// Creates a new debouncer for deduplicating groups of "raw" events which occur at a similar time.
/// The debouncer is comprised of two halves; a [`DebouncerTx`](DebouncerTx) for sending raw events
/// to the debouncer, and a [`DebouncerRx`](DebouncerRx) for receiving grouped (debounced) events
/// from the debouncer.
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::Debouncer;
/// use std::{time::Duration, thread};
/// When a raw event is sent through the `DebouncerTx`, future `debounce_events` which are sent
/// before the given `debounce_time` has elapsed are considered part of the same group. For
/// example, if the `debounce_time` is 1 second, all raw events which are sent less than one second
/// after the first are considered part of the same group.
///
/// The `DebouncerRx` receives the grouped events from the debouncer once the `debounce_time` has
/// elapsed for the event. Events are grouped using the given `fold` function, which takes a
/// previous grouped event (or "accumulator") and combines it with a new raw event. Some
/// pre-written fold functions are available in the [`fold`](fold) module.
///
/// let (debouncer, rx) = Debouncer::<u32, u32, _>::new(
/// Duration::from_millis(100),
/// // Combine raw events into one debounced event by adding them
/// |acc, e| acc.unwrap_or_default() + e
/// )?;
/// ```
/// # use std::{thread, time::Duration};
/// // Create a new debouncer which takes raw events of type `u32` and combines
/// // them by pushing them to a vector.
/// let (tx, rx) = treacle::debouncer::<u32, Vec<u32>, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// let mut events_vector = acc.unwrap_or_default();
/// events_vector.push(raw_event);
/// events_vector
/// });
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(4);
/// debouncer.debounce(5);
/// thread::spawn(move || {
/// // Send two raw events in quick succession.
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
///
/// // Wait, then send two more events to the debouncer
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// debouncer.debounce(2);
/// debouncer.debounce(1);
/// // Wait, then send another raw event.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(30).unwrap();
/// });
///
/// assert_eq!(rx.recv().unwrap(), 9); // First debounced event (4 + 5)
/// assert_eq!(rx.recv().unwrap(), 6); // Second debounced event (3 + 2 + 1)
/// # Ok(())
/// # }
/// assert_eq!(rx.recv().unwrap(), &[10, 20]);
/// assert_eq!(rx.recv().unwrap(), &[30]);
/// ```
///
/// When dropped, the debouncer will gracefully shut down, closing the associated mpsc channel once
/// it has finished sending any remaining events.
pub struct Debouncer<RawEvent, DebouncedEvent, FoldFn> {
thread: Option<JoinHandle<()>>,
// This is reference counted because the debouncer thread needs access to the controller, and
// the debouncer itself hasn't been created yet when the debouncer thread is created so we
// can't give it an `Arc<Debouncer<T>>`. Besides, the thread probably shouldn't have access to
// its own join handle!
controller: Arc<DebouncerController<DebouncedEvent>>,
fold: FoldFn,
_raw_maker: PhantomData<RawEvent>,
}
impl<RawEvent, DebouncedEvent, FoldFn> Debouncer<RawEvent, DebouncedEvent, FoldFn>
/// Both the `DebouncerTx` and `DebouncerRx` may be cloned, allowing multiple event senders and
/// multiple event receivers for a single debouncer.
pub fn debouncer<R, D, F>(debounce_time: Duration, fold: F)
-> (DebouncerTx<R, D, F>, DebouncerRx<D>)
where
DebouncedEvent: Send + 'static,
FoldFn: Fn(Option<DebouncedEvent>, RawEvent) -> DebouncedEvent,
F: Fn(Option<D>, R) -> D,
{
/// Create a new [`Debouncer`](Debouncer) which deduplicates events it receives within the
/// timeframe given by `debounce_time`. The raw, un-debounced events can be sent to the
/// debouncer with [`Debouncer::debounce`](crate::Debouncer::debounce), and the resulting
/// debounced events can be received using the [`mpsc::Receiver`](std::sync::mpsc::Receiver)
/// returned by this function.
let shared_state = Arc::new(Debouncer::new(debounce_time, 1, 1));
let tx = DebouncerTx {
debouncer: shared_state.clone(),
fold,
_raw_event_marker: PhantomData,
};
let rx = DebouncerRx {
debouncer: shared_state,
};
(tx, rx)
}
/// The "send" half of a debouncer, through which raw events are sent to be debounced. To create a
/// new debouncer, see the [`debouncer`](debouncer) function. To send raw events, see
/// [`DebouncerTx::send`](DebouncerTx::send).
pub struct DebouncerTx<R, D, F> {
debouncer: Arc<Debouncer<D>>,
fold: F,
_raw_event_marker: PhantomData<R>,
}
impl<R, D, F> DebouncerTx<R, D, F>
where
F: Fn(Option<D>, R) -> D,
{
/// Send a raw event to the debouncer, which will be grouped with other raw events sent at a
/// similar time. Grouping is performed using the "fold" function this `DebouncerTx` was
/// created with.
///
/// Events are deduplicated using the given `fold` function, which combines the current
/// deduplicated event (an `Option<DebouncedEvent>`) with a new `RawEvent` to produce a new
/// `DebouncedEvent`. For example, the fold function could collect raw events into a vector:
/// An error is returned if there are no [`DebouncerRx`](DebouncerRx)s associated with the
/// debouncer left to receive events.
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::Debouncer;
/// # use std::{thread, time::Duration};
/// let (debouncer, rx) = Debouncer::<u32, Vec<u32>, _>::new(
/// Duration::from_millis(100),
/// // Combine raw events by pushing them to a vector
/// # use std::{panic, thread, time::Duration};
/// // Create a new debouncer which takes raw events of type `u32` and
/// // combines them by converting them to `u64`s and adding them together.
/// let (tx, rx) = treacle::debouncer::<u32, u64, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// let mut vec = acc.unwrap_or_default();
/// vec.push(raw_event);
/// vec
/// }
/// )?;
/// acc.unwrap_or_default() + u64::from(raw_event)
/// });
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(1);
/// debouncer.debounce(2);
/// // Wait, then send another event
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// // Spawn a thread to receive the events we are about to send.
/// let t = thread::spawn(move || {
/// assert_eq!(rx.recv().unwrap(), 3); // 1 + 2
/// assert_eq!(rx.recv().unwrap(), 12); // 3 + 4 + 5
/// });
///
/// assert_eq!(rx.recv().unwrap(), vec![1, 2]);
/// assert_eq!(rx.recv().unwrap(), vec![3]);
/// # Ok(())
/// # }
/// ```
/// // Send two raw events in quick succession.
/// tx.send(1).unwrap();
/// tx.send(2).unwrap();
///
/// When the returned [`Debouncer`](Debouncer) is dropped it will gracefully shut down, closing
/// the associated mpsc channel once it has finished sending any remaining events.
/// // Wait, then send three more raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(3).unwrap();
/// tx.send(4).unwrap();
/// tx.send(5).unwrap();
///
/// An [`io::Error`](std::io::Error) will be returned by this function if spawning the
/// debouncer thread fails.
pub fn new(debounce_time: Duration, fold: FoldFn)
-> Result<(Self, mpsc::Receiver<DebouncedEvent>), io::Error>
{
let controller = Arc::new(DebouncerController {
state: Mutex::new(DebouncerState::new()),
main_cvar: Condvar::new(),
sleep_cvar: Condvar::new(),
});
let (tx, rx) = mpsc::channel::<DebouncedEvent>();
let thread = thread::Builder::new().spawn({
let debounce_state = controller.clone();
move || debounce_thread(debounce_state, tx, debounce_time)
})?;
Ok((Self {
thread: Some(thread),
controller,
fold,
_raw_maker: PhantomData,
}, rx))
/// // Wait for the receive thread to finish.
/// if let Err(err) = t.join() {
/// panic::resume_unwind(err);
/// }
///
/// // We moved `rx` into the receive thread, so when the receive thread
/// // finishes, `rx` is dropped. `rx` was the only `DebouncerRx`, so there
/// // is nothing left to receive events we send, so an error is returned.
/// assert!(tx.send(6).is_err());
/// ```
pub fn send(&self, event: R) -> Result<(), SendError<R>> {
self.debouncer.push(event, &self.fold)
}
}
impl<RawEvent, DebouncedEvent, FoldFn> Debouncer<RawEvent, DebouncedEvent, FoldFn>
impl<R, D, F> Clone for DebouncerTx<R, D, F>
where
FoldFn: Fn(Option<DebouncedEvent>, RawEvent) -> DebouncedEvent,
F: Clone,
{
/// Send the debouncer a raw event that should be debounced. The
/// [`mpsc::Receiver`](mpsc::Receiver) associated with this debouncer will receive a message
/// after at least the amount of time specified when the debouncer was created.
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_tx()
.expect("debouncer tx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
fold: self.fold.clone(),
_raw_event_marker: PhantomData,
}
}
}
impl<R, D, F> Drop for DebouncerTx<R, D, F> {
fn drop(&mut self) {
let remaining_tx = {
let mut state_guard = self.debouncer.lock_state();
state_guard.remove_tx()
};
if remaining_tx == 0 {
// There may be some rx threads waiting on the condvars, so notify them to stop them
// from waiting. Upon waking up, the rx threads will see that the tx count is 0 and
// switch to their "shutdown" behaviour.
self.debouncer.event_ready_wait_cvar.notify_all();
self.debouncer.queue_wait_cvar.notify_all();
}
}
}
/// The "receive" half of a debouncer, which receives events which have been debounced by grouping
/// raw events which occured close together in time. To create a new debouncer, see the
/// [`debouncer`](debouncer) function. To receive debounced events, see
/// [`DebouncerRx::recv`](DebouncerRx::recv) or [`DebouncerRx::try_recv`](DebouncerRx::try_recv).
pub struct DebouncerRx<D> {
debouncer: Arc<Debouncer<D>>
}
impl<D> DebouncerRx<D> {
/// Receive a debounced event from the debouncer. If there is not a debounced event available
/// to be received, the function will wait for one.
///
/// The debouncer collects event data into an element of type `T`; the provided function `fold`
/// specifies how to combine the new event data with the existing event data `Option<T>` to
/// produce a new `T`. For example, if `T = Vec<E>`, then `fold` could be a function which
/// pushes to the vec:
/// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events
/// and there are no more events left to receive.
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::Debouncer;
/// # use std::{thread, time::Duration};
/// let (debouncer, rx) = Debouncer::<u32, Vec<u32>, _>::new(
/// Duration::from_millis(100),
/// // Combine raw events by pushing them to a vector
/// # use std::{panic, thread, time::Duration};
/// // Create a new debouncer which takes raw events of type `u8` and
/// // combines them by ORing them together.
/// let (tx, rx) = treacle::debouncer::<u8, u8, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// let mut vec = acc.unwrap_or_default();
/// vec.push(raw_event);
/// vec
/// acc.unwrap_or_default() | raw_event
/// });
///
/// // Create a thread to send raw events to the debouncer.
/// let t = thread::spawn(move || {
/// // Send three raw events in quick succession.
/// tx.send(0b00001000).unwrap();
/// tx.send(0b00000100).unwrap();
/// tx.send(0b00000010).unwrap();
///
/// // Wait, then send more raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(0b10000000).unwrap();
/// tx.send(0b00100000).unwrap();
///
/// // Wait, then send a final batch of raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(0b01000000).unwrap();
/// tx.send(0b00000001).unwrap();
/// });
///
/// // Receive the first two debounced events from the debouncer, created
/// // by ORing the raw events together that occurred within the same 500ms
/// // window.
/// assert_eq!(rx.recv().unwrap(), 0b00001110);
/// assert_eq!(rx.recv().unwrap(), 0b10100000);
///
/// // Wait for the send thread to finish. The send thread owns `tx`, so it
/// // will be dropped once the thread finishes.
/// if let Err(err) = t.join() {
/// panic::resume_unwind(err);
/// }
/// )?;
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(1);
/// debouncer.debounce(2);
/// // Wait, then send another event
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// // `tx` has been dropped, but there is still one last event left for us
/// // to receive.
/// assert_eq!(rx.recv().unwrap(), 0b01000001);
///
/// assert_eq!(rx.recv().unwrap(), vec![1, 2]);
/// assert_eq!(rx.recv().unwrap(), vec![3]);
/// # Ok(())
/// # }
/// // There are no more events to receive and no more `DebouncerTx`s left
/// // to send events, so attempting to receive results in an error.
/// assert!(rx.recv().is_err());
/// ```
pub fn recv(&self) -> Result<D, ReceiveError> {
self.debouncer.pop()
}
/// An alternative to [`DebouncerRx::recv`](DebouncerRx::recv) which returns `None` if there is
/// not a debounced event immediately available, instead of waiting for it to become available.
///
/// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events
/// and there are no more events left to receive.
///
/// ```
/// # use std::{thread, time::Duration};
/// // Create a new debouncer which takes raw events of type `u8` and
/// // combines them by ORing them together.
/// let (tx, rx) = treacle::debouncer::<u8, u8, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// acc.unwrap_or_default() | raw_event
/// });
///
/// tx.send(0b00001000).unwrap();
/// tx.send(0b00000100).unwrap();
/// tx.send(0b00000010).unwrap();
///
/// `fold` is an argument to this function rather than being stored by the `Debouncer` to avoid
/// ending up with a `Debouncer` with a type that's unnameable (e.g. if a closure is used) or
/// having to resort to dynamic dispatch.
#[inline]
pub fn debounce(&self, event_data: RawEvent) {
self.controller.notify_event(event_data, &self.fold)
/// assert_eq!(rx.try_recv().unwrap(), None);
///
/// thread::sleep(Duration::from_millis(500));
/// assert_eq!(rx.try_recv().unwrap(), Some(0b00001110));
/// ```
pub fn try_recv(&self) -> Result<Option<D>, ReceiveError> {
self.debouncer.try_pop()
}
}
impl<D> Clone for DebouncerRx<D> {
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_rx()
.expect("debouncer rx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
}
}
}
impl<RawEvent, DebouncedEvent, FoldFn> Drop for Debouncer<RawEvent, DebouncedEvent, FoldFn> {
impl<D> Drop for DebouncerRx<D> {
fn drop(&mut self) {
self.controller.notify_shutdown();
// Decrement the rx count. We don't need to notify any threads waiting on condvars if the
// count reaches 0, because only rx threads wait on the condvars and we know there are no
// more rx threads (because the count just reached 0!)
self.debouncer.lock_state().remove_rx();
}
}
/// An error indicating that there are no more [`DebouncerRx`](DebouncerRx)s left to receive
/// events. The raw event that could not be sent is included in the error so it is not lost.
pub struct SendError<T>(pub T);
let thread = self.thread
.take()
.expect("debouncer thread has already been shut down");
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
match thread.join() {
Ok(()) => (),
Err(err) => panic::resume_unwind(err),
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("sending through a closed debouncer", f)
}
}
impl<T> error::Error for SendError<T> {}
/// An error indicating that the debouncer has no more events left to receive, and there are no
/// more [`DebouncerTx`](DebouncerTx)s left to send events.
#[derive(Debug)]
pub struct ReceiveError;
impl fmt::Display for ReceiveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("receiving from a closed debouncer", f)
}
}
struct DebouncerController<T> {
impl error::Error for ReceiveError {}
struct Debouncer<T> {
state: Mutex<DebouncerState<T>>,
/// Condvar for notifying the debouncer that it has become dirty or that it should shutdown.
main_cvar: Condvar,
/// Condvar for notifying the debouncer that it should cancel sleeping and shutdown.
sleep_cvar: Condvar,
debounce_time: Duration,
queue_wait_cvar: Condvar,
event_ready_wait_cvar: Condvar,
}
impl<T> DebouncerController<T> {
/// Notify the debouncer that an event has occurred that should be debounced.
///
/// The event data is represented by an element of `E`, and the function `f` specifies how to
/// fold this event into an accumulator of type `T`.
///
/// For example, to collect events into a list, the accumulator could be a `Vec<E>` and the
/// fold function could be:
/// ```ignore
/// |acc, e| {
/// let mut acc = acc.unwrap_or_default();
/// acc.push(e);
/// acc
/// }
/// ```
#[inline]
fn notify_event<E, F>(&self, event_data: E, fold: F)
impl<T> Debouncer<T> {
fn new(debounce_time: Duration, tx_count: usize, rx_count: usize) -> Self {
Self {
state: Mutex::new(DebouncerState::new(tx_count, rx_count)),
debounce_time,
queue_wait_cvar: Condvar::new(),
event_ready_wait_cvar: Condvar::new(),
}
}
fn lock_state(&self) -> MutexGuard<DebouncerState<T>> {
match self.state.lock() {
Ok(guard) => guard,
Err(err) => err.into_inner(),
}
}
fn push<R, F>(&self, raw_event: R, fold: F) -> Result<(), SendError<R>>
where
F: FnOnce(Option<T>, E) -> T,
F: Fn(Option<T>, R) -> T,
{
let mut guard = self.state.lock().unwrap();
guard.fold(event_data, fold);
drop(guard);
self.main_cvar.notify_one();
let now = Instant::now();
let push_outcome = {
let mut state_guard = self.lock_state();
// Return an error if there are no rxs left to send to. Include the raw event in the
// error so that it isn't lost.
if state_guard.has_no_rxs() {
return Err(SendError(raw_event));
}
state_guard.acc_queue.push_latest(raw_event, fold, now, self.debounce_time)
};
if matches!(push_outcome, PushOutcome::NewAcc) {
// If we pushed a new event accumulator, wake up one rx thread which was waiting for
// the queue to be non-empty. Don't wake up all the rx threads because only one of
// them can consume the acc from the queue; therefore, we should wait until another
// acc is added to the queue before waking up another thread.
self.queue_wait_cvar.notify_one();
}
Ok(())
}
fn pop(&self) -> Result<T, ReceiveError> {
enum PopOutcome<T> {
Event(T),
Shutdown,
}
let mut state_guard = self.lock_state();
let result = 'result: {
// Check that there are any txs left so we don't get stuck waiting forever if the queue
// is empty (as a tx is the only thing that can wake us up).
if state_guard.has_no_txs() {
break 'result PopOutcome::Shutdown;
}
// Loop because we may have a situation where:
// 1. We wait for the queue to be non-empty.
// 2. The accumulator in the queue, `x`, has a `ready_time` in the future, so we must
// wait for it to be ready.
// 3. Before we can wake up from waiting, some other thread pops the now-ready `x`, and
// the queue is now empty.
// 4. We wake up and must wait for the queue to become non-empty again.
//
// Additionally, the loop also handles spurious returns from the condvar wait.
'pop_event_outer_loop: loop {
// If there are no accumulators in the queue, wait for one to be pushed.
if state_guard.acc_queue.is_empty() {
// Park the thread. We will be woken up again either by a new accumulator being
// pushed to the queue, or by the debouncer being shut down.
//
// This may return spuriously so the queue may still be empty when we wake up,
// but if this happens we will retry the wait when we attempt and fail to peek
// the queue later.
state_guard = self.queue_wait_cvar.wait(state_guard).unwrap();
// We may have been unparked because someone wants to shut down the debouncer,
// so check the shutdown flag.
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
}
// Loop because we may have a situation where:
// 1. The first accumulator in the queue, `x`, has a `ready_time` in the future.
// 2. We begin waiting for `x` to become ready.
// 3. Before we can wake up from waiting, some other thread pops the now-ready `x`.
// 4. The new first accumulator in the queue, `y`, has a `ready_time` in the queue.
// 5. We wake up and must start waiting again, as `y` is not ready yet.
break loop {
let required_wait_time = {
// Get the oldest accumulator in the queue, so we can see if it is ready to
// be popped and, if not, how long we need to wait for it to be ready to be
// popped.
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
// If there is no accumulator for us to pop from the queue, go back to
// waiting for the queue to be non-empty. This could happen because the
// previous condvar wait returned spuriously, or because another thread
// popped the accumulator before us.
continue 'pop_event_outer_loop;
};
let now = Instant::now();
// Calculate the time we need to wait for the accumulator to become ready,
// if at all.
peeked_acc.ready_time.checked_duration_since(now)
};
match required_wait_time {
// Case where we must wait for the accumulator to become ready.
Some(wait_time) if !wait_time.is_zero() => {
// Wait the amount of time between now and the `ready_time` of the
// accumulator. This is done using a condvar so the sleep can be
// interrupted if someone wants to shut down the debouncer.
//
// This may return spuriously so we may wake up before the time has
// elapsed and without being notified by anyone. However, since we go
// back so the start of the loop after this, we will see that there is
// still time remaining before the accumulator becomes ready, and
// therefore we will resume waiting.
(state_guard, _) = self.event_ready_wait_cvar
.wait_timeout(state_guard, wait_time)
.unwrap();
// Again, we may have been unparked because someone wants to shut down the
// debouncer, so check the shutdown flag and return if it is set.
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
continue;
},
// If we don't have to wait, we can pop the oldest accumulator from the
// queue and return it. We can unwrap the `pop_oldest` because in this
// case, we successfully peeked the queue and haven't modified the queue or
// released the lock since.
_ => {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break PopOutcome::Event(popped_acc),
// If the accumulator we popped has no data (which can happen if
// the fold function panicked while pushing), retry from the
// beginning.
None => continue 'pop_event_outer_loop,
}
},
}
};
}
};
match result {
PopOutcome::Event(event) => Ok(event),
#[inline]
fn notify_shutdown(&self) {
let mut guard = self.state.lock().unwrap();
guard.set_shutdown();
drop(guard);
self.main_cvar.notify_one();
self.sleep_cvar.notify_one();
// If the debouncer has been shut down, return any remaining accumulators then start
// returning errors once the accumulators have been depleted.
PopOutcome::Shutdown => {
state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.ok_or(ReceiveError)
},
}
}
fn try_pop(&self) -> Result<Option<T>, ReceiveError> {
let mut state_guard = self.lock_state();
// If the debouncer has been shut down, return any remaining accumulators then start
// returning errors once the accumulators have been depleted.
if state_guard.has_no_txs() {
return state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.map(Some)
.ok_or(ReceiveError);
}
let now = Instant::now();
// Loop in case we pop an accumulator from the queue which contains no data, which can
// happen if the fold function panicked when pushing.
loop {
let first_acc_ready = {
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
break Ok(None);
};
peeked_acc.ready_time <= now
};
if first_acc_ready {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break Ok(Some(popped_acc)),
None => continue,
}
} else {
break Ok(None);
}
}
}
}
struct DebouncerState<T> {
/// The debounced data we have received so far.
acc: Option<T>,
/// Whether or not we should shutdown the debouncer thread.
shutdown: bool,
acc_queue: EventAccQueue<T>,
// These could alternatively be `AtomicUsize`s which live outside the mutex, but we opt to have
// them inside the mutex instead to avoid a confusing mixture of mutexes / condvars and
// atomics.
tx_count: usize,
rx_count: usize,
}
impl<T> DebouncerState<T> {
fn new() -> Self {
Self { acc: None, shutdown: false }
fn new(tx_count: usize, rx_count: usize) -> Self {
Self {
acc_queue: EventAccQueue::new(),
tx_count,
rx_count,
}
}
fn get_event(&self) -> Option<DebouncerEvent> {
if self.shutdown {
Some(DebouncerEvent::Shutdown)
} else if self.acc.is_some() {
Some(DebouncerEvent::Dirty)
} else {
None
fn has_no_txs(&self) -> bool {
self.tx_count == 0
}
fn add_tx(&mut self) -> Result<(), CountOverflowError> {
self.tx_count = self.tx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn should_shutdown(&self) -> bool {
self.shutdown
fn remove_tx(&mut self) -> usize {
self.tx_count = self.tx_count.saturating_sub(1);
self.tx_count
}
#[inline]
fn fold<E, F>(&mut self, event_data: E, fold_fn: F)
where
F: FnOnce(Option<T>, E) -> T,
{
let acc = self.acc.take();
self.acc = Some(fold_fn(acc, event_data));
fn has_no_rxs(&self) -> bool {
self.rx_count == 0
}
fn swap_acc(&mut self) -> Option<T> {
self.acc.take()
fn add_rx(&mut self) -> Result<(), CountOverflowError> {
self.rx_count = self.rx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn set_shutdown(&mut self) {
self.shutdown = true;
fn remove_rx(&mut self) -> usize {
self.rx_count = self.rx_count.saturating_sub(1);
self.rx_count
}
}
enum DebouncerEvent {
/// The debouncer has been told to shut down.
Shutdown,
/// The accumulator has been changed.
Dirty,
struct EventAccQueue<T> {
inner: VecDeque<EventAcc<T>>,
}
impl<T> EventAccQueue<T> {
fn new() -> Self {
Self { inner: VecDeque::new() }
}
fn debounce_thread<T>(
debouncer: Arc<DebouncerController<T>>,
tx: mpsc::Sender<T>,
debounce_time: Duration
) {
let mut guard = debouncer.state.lock().unwrap();
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
'debounce: loop {
let event = 'event: loop {
// Check if either `acc` or `shutdown` is set.
if let Some(event) = guard.get_event() {
break 'event event;
fn peek_oldest(&self) -> Option<&EventAcc<T>> {
self.inner.front()
}
// If neither `acc` nor `shutdown` have been set, wait for the condvar to be notified
// then check again. We need to check in a loop because `Condvar` allows spurious
// wakeups.
guard = debouncer.main_cvar.wait(guard).unwrap();
};
fn pop_oldest(&mut self) -> Option<EventAcc<T>> {
self.inner.pop_front()
}
match event {
DebouncerEvent::Shutdown => {
// Before shutting down, check if there is anything in the accumulator and send it
// through the channel if it is still open.
if let Some(acc) = guard.swap_acc() {
tx.send(acc).ok();
fn pop_oldest_acc_discard_none(&mut self) -> Option<T> {
while let Some(event) = self.pop_oldest() {
if let Some(acc) = event.into_acc() {
return Some(acc);
}
}
return None;
}
break 'debounce;
fn push_latest<R, F>(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration)
-> PushOutcome
where
F: FnOnce(Option<T>, R) -> T,
{
// Get the last accumulator in the queue if and only if the queue is non-empty and the last
// accumulator in the queue is ready (based on the given `now` time).
match self.inner
.back_mut()
.and_then(|acc| (acc.ready_time > now).then_some(acc))
{
// If the last accumulator in the queue is not ready yet, we can fold the new event
// into the accumulator.
Some(event) => {
event.fold(raw_event, f);
PushOutcome::NoNewAcc
},
DebouncerEvent::Dirty => {
// Wait for the debounce time, or until the `sleep_cvar` is notified and the
// `shutdown` boolean is set. Note that checking `shutdown` here is necessary
// because `Condvar` allows spurious wakeups.
//
// By waiting for a condvar, we are temporarily releasing the mutex. This allows
// the other threads to set `acc` while we are waiting.
(guard, _) = debouncer.sleep_cvar
.wait_timeout_while(guard, debounce_time, |state| {
!state.should_shutdown()
}).unwrap();
// If the last accumulator in the queue is ready, we don't want to keep adding to it,
// so make a new accumulator and push it to the end of the queue.
None => {
let ready_time = now + debounce_time;
let event = EventAcc::new_from_fold(raw_event, f, ready_time);
self.inner.push_back(event);
PushOutcome::NewAcc
},
}
}
}
// Once we have finished waiting, send the contents of the accumulator through the
// channel.
if let Some(acc) = guard.swap_acc() {
if tx.send(acc).is_err() {
// If the other side of the channel has been closed, stop the debouncer.
break 'debounce;
enum PushOutcome {
NewAcc,
NoNewAcc,
}
/// "Event accumulator", created by repeatedly joining raw events using a fold function.
struct EventAcc<T> {
// The accumulator is stored as an `Option` because it is temporarily set to `None` during
// `EventAcc::fold`, so that the old accumulator can be moved into the user-provided fold
// function rather than borrowing it. `MaybeUninit` cannot be used for this because the
// user-provided fold function may panic, which would leave the accumulator in an uninitialised
// state, potentially causing UB later.
acc: Option<T>,
ready_time: Instant,
}
// Check if the shutdown flag was set while we were waiting, and stop the deboucner
// if so.
if guard.should_shutdown() {
break 'debounce;
impl<T> EventAcc<T> {
fn new_from_fold<R, F>(raw_event: R, f: F, ready_time: Instant) -> Self
where
F: FnOnce(Option<T>, R) -> T,
{
Self {
acc: Some(f(None, raw_event)),
ready_time,
}
},
}
fn fold<R, F>(&mut self, raw_event: R, f: F)
where
F: FnOnce(Option<T>, R) -> T,
{
let acc = self.acc.take();
self.acc = Some(f(acc, raw_event));
}
fn into_acc(self) -> Option<T> {
self.acc
}
}
#[derive(Debug)]
struct CountOverflowError;
#[cfg(test)]
mod tests {
use std::{time::Duration, thread};
use super::{Debouncer, fold};
use super::{debouncer, fold};
#[test]
fn test_debounce() {
let (debouncer, rx) = Debouncer::new(
Duration::from_millis(50),
fold::fold_vec_push::<u8>
).unwrap();
let (tx, rx) = debouncer(Duration::from_millis(50), fold::fold_vec_push::<u8>);
for i in 0..3 {
for j in 0..10 {
debouncer.debounce(i * 10 + j);
tx.send(i * 10 + j).unwrap();
thread::sleep(Duration::from_millis(4));
}
@ -376,17 +715,14 @@ mod tests {
#[test]
fn test_debouncer_shutdown() {
let (debouncer, rx) = Debouncer::new(
Duration::from_millis(100),
fold::fold_vec_push::<u8>
).unwrap();
let (tx, rx) = debouncer(Duration::from_millis(100), fold::fold_vec_push::<u8>);
debouncer.debounce(1);
debouncer.debounce(2);
debouncer.debounce(3);
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
// Drop the debouncer, shutting it down.
drop(debouncer);
// Drop the tx, shutting down the debouncer.
drop(tx);
// Test that the events emitted just before the shutdown are not lost.
assert_eq!(rx.recv().unwrap(), &[1, 2, 3]);

@ -1,691 +0,0 @@
use std::{
time::{Instant, Duration},
collections::VecDeque,
marker::PhantomData,
sync::{Mutex, Condvar, Arc, MutexGuard},
fmt,
error,
};
/// Creates a new debouncer for deduplicating groups of "raw" events which occur at a similar time.
/// The debouncer is comprised of two halves; a [`DebouncerTx`](DebouncerTx) for sending raw events
/// to the debouncer, and a [`DebouncerRx`](DebouncerRx) for receiving grouped (debounced) events
/// from the debouncer.
///
/// When a raw event is sent through the `DebouncerTx`, future `debounce_events` which are sent
/// before the given `debounce_time` has elapsed are considered part of the same group. For
/// example, if the `debounce_time` is 1 second, all raw events which are sent less than one second
/// after the first are considered part of the same group.
///
/// The `DebouncerRx` receives the grouped events from the debouncer once the `debounce_time` has
/// elapsed for the event. Events are grouped using the given `fold` function, which takes a
/// previous grouped event (or "accumulator") and combines it with a new raw event.
///
/// ```
/// # use std::{thread, time::Duration};
/// # use treacle::next::debouncer;
/// // Create a new debouncer which takes raw events of type `u32` and combines
/// // them by pushing them to a vector.
/// let (tx, rx) = debouncer::<u32, Vec<u32>, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// let mut events_vector = acc.unwrap_or_default();
/// events_vector.push(raw_event);
/// events_vector
/// });
///
/// thread::spawn(move || {
/// // Send two raw events in quick succession.
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
///
/// // Wait, then send another raw event.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(30).unwrap();
/// });
///
/// assert_eq!(rx.recv().unwrap(), &[10, 20]);
/// assert_eq!(rx.recv().unwrap(), &[30]);
/// ```
///
/// Both the `DebouncerTx` and `DebouncerRx` may be cloned, allowing multiple event senders and
/// multiple event receivers for a single debouncer.
pub fn debouncer<R, D, F>(debounce_time: Duration, fold: F)
-> (DebouncerTx<R, D, F>, DebouncerRx<D>)
where
F: Fn(Option<D>, R) -> D,
{
let shared_state = Arc::new(Debouncer::new(debounce_time, 1, 1));
let tx = DebouncerTx {
debouncer: shared_state.clone(),
fold,
_raw_event_marker: PhantomData,
};
let rx = DebouncerRx {
debouncer: shared_state,
};
(tx, rx)
}
/// The "send" half of a debouncer, through which raw events are sent to be debounced. To create a
/// new debouncer, see the [`debouncer`](debouncer) function. To send raw events, see
/// [`DebouncerTx::send`](DebouncerTx::send).
pub struct DebouncerTx<R, D, F> {
debouncer: Arc<Debouncer<D>>,
fold: F,
_raw_event_marker: PhantomData<R>,
}
impl<R, D, F> DebouncerTx<R, D, F>
where
F: Fn(Option<D>, R) -> D,
{
/// Send a raw event to the debouncer, which will be grouped with other raw events sent at a
/// similar time. Grouping is performed using the "fold" function this `DebouncerTx` was
/// created with.
///
/// An error is returned if there are no [`DebouncerRx`](DebouncerRx)s associated with the
/// debouncer left to receive events.
///
/// ```
/// # use std::{panic, thread, time::Duration};
/// # use treacle::next::debouncer;
/// // Create a new debouncer which takes raw events of type `u32` and
/// // combines them by converting them to `u64`s and adding them together.
/// let (tx, rx) = debouncer::<u32, u64, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// acc.unwrap_or_default() + u64::from(raw_event)
/// });
///
/// // Spawn a thread to receive the events we are about to send.
/// let t = thread::spawn(move || {
/// assert_eq!(rx.recv().unwrap(), 3); // 1 + 2
/// assert_eq!(rx.recv().unwrap(), 12); // 3 + 4 + 5
/// });
///
/// // Send two raw events in quick succession.
/// tx.send(1).unwrap();
/// tx.send(2).unwrap();
///
/// // Wait, then send three more raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(3).unwrap();
/// tx.send(4).unwrap();
/// tx.send(5).unwrap();
///
/// // Wait for the receive thread to finish.
/// if let Err(err) = t.join() {
/// panic::resume_unwind(err);
/// }
///
/// // We moved `rx` into the receive thread, so when the receive thread
/// // finishes, `rx` is dropped. `rx` was the only `DebouncerRx`, so there
/// // is nothing left to receive events we send, so an error is returned.
/// assert!(tx.send(6).is_err());
/// ```
pub fn send(&self, event: R) -> Result<(), SendError<R>> {
self.debouncer.push(event, &self.fold)
}
}
impl<R, D, F> Clone for DebouncerTx<R, D, F>
where
F: Clone,
{
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_tx()
.expect("debouncer tx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
fold: self.fold.clone(),
_raw_event_marker: PhantomData,
}
}
}
impl<R, D, F> Drop for DebouncerTx<R, D, F> {
fn drop(&mut self) {
let remaining_tx = {
let mut state_guard = self.debouncer.lock_state();
state_guard.remove_tx()
};
if remaining_tx == 0 {
// There may be some rx threads waiting on the condvars, so notify them to stop them
// from waiting. Upon waking up, the rx threads will see that the tx count is 0 and
// switch to their "shutdown" behaviour.
self.debouncer.event_ready_wait_cvar.notify_all();
self.debouncer.queue_wait_cvar.notify_all();
}
}
}
/// The "receive" half of a debouncer, which receives events which have been debounced by grouping
/// raw events which occured close together in time. To create a new debouncer, see the
/// [`debouncer`](debouncer) function. To receive debounced events, see
/// [`DebouncerRx::recv`](DebouncerRx::recv) or [`DebouncerRx::try_recv`](DebouncerRx::try_recv).
pub struct DebouncerRx<D> {
debouncer: Arc<Debouncer<D>>
}
impl<D> DebouncerRx<D> {
/// Receive a debounced event from the debouncer. If there is not a debounced event available
/// to be received, the function will wait for one.
///
/// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events
/// and there are no more events left to receive.
///
/// ```
/// # use std::{panic, thread, time::Duration};
/// # use treacle::next::debouncer;
/// // Create a new debouncer which takes raw events of type `u8` and
/// // combines them by ORing them together.
/// let (tx, rx) = debouncer::<u8, u8, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// acc.unwrap_or_default() | raw_event
/// });
///
/// // Create a thread to send raw events to the debouncer.
/// let t = thread::spawn(move || {
/// // Send three raw events in quick succession.
/// tx.send(0b00001000).unwrap();
/// tx.send(0b00000100).unwrap();
/// tx.send(0b00000010).unwrap();
///
/// // Wait, then send more raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(0b10000000).unwrap();
/// tx.send(0b00100000).unwrap();
///
/// // Wait, then send a final batch of raw events.
/// thread::sleep(Duration::from_millis(500));
/// tx.send(0b01000000).unwrap();
/// tx.send(0b00000001).unwrap();
/// });
///
/// // Receive the first two debounced events from the debouncer, created
/// // by ORing the raw events together that occurred within the same 500ms
/// // window.
/// assert_eq!(rx.recv().unwrap(), 0b00001110);
/// assert_eq!(rx.recv().unwrap(), 0b10100000);
///
/// // Wait for the send thread to finish. The send thread owns `tx`, so it
/// // will be dropped once the thread finishes.
/// if let Err(err) = t.join() {
/// panic::resume_unwind(err);
/// }
///
/// // `tx` has been dropped, but there is still one last event left for us
/// // to receive.
/// assert_eq!(rx.recv().unwrap(), 0b01000001);
///
/// // There are no more events to receive and no more `DebouncerTx`s left
/// // to send events, so attempting to receive results in an error.
/// assert!(rx.recv().is_err());
/// ```
pub fn recv(&self) -> Result<D, ReceiveError> {
self.debouncer.pop()
}
/// An alternative to [`DebouncerRx::recv`](DebouncerRx::recv) which returns `None` if there is
/// not a debounced event immediately available, instead of waiting for it to become available.
///
/// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events
/// and there are no more events left to receive.
///
/// ```
/// # use std::{thread, time::Duration};
/// # use treacle::next::debouncer;
/// // Create a new debouncer which takes raw events of type `u8` and
/// // combines them by ORing them together.
/// let (tx, rx) = debouncer::<u8, u8, _>(
/// Duration::from_millis(500),
/// |acc, raw_event| {
/// acc.unwrap_or_default() | raw_event
/// });
///
/// tx.send(0b00001000).unwrap();
/// tx.send(0b00000100).unwrap();
/// tx.send(0b00000010).unwrap();
///
/// assert_eq!(rx.try_recv().unwrap(), None);
///
/// thread::sleep(Duration::from_millis(500));
/// assert_eq!(rx.try_recv().unwrap(), Some(0b00001110));
/// ```
pub fn try_recv(&self) -> Result<Option<D>, ReceiveError> {
self.debouncer.try_pop()
}
}
impl<D> Clone for DebouncerRx<D> {
fn clone(&self) -> Self {
self.debouncer
.lock_state()
.add_rx()
.expect("debouncer rx count should not overflow");
Self {
debouncer: self.debouncer.clone(),
}
}
}
impl<D> Drop for DebouncerRx<D> {
fn drop(&mut self) {
// Decrement the rx count. We don't need to notify any threads waiting on condvars if the
// count reaches 0, because only rx threads wait on the condvars and we know there are no
// more rx threads (because the count just reached 0!)
self.debouncer.lock_state().remove_rx();
}
}
/// An error indicating that there are no more [`DebouncerRx`](DebouncerRx)s left to receive
/// events. The raw event that could not be sent is included in the error so it is not lost.
pub struct SendError<T>(pub T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendError").finish_non_exhaustive()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("sending through a closed debouncer", f)
}
}
impl<T> error::Error for SendError<T> {}
/// An error indicating that the debouncer has no more events left to receive, and there are no
/// more [`DebouncerTx`](DebouncerTx)s left to send events.
#[derive(Debug)]
pub struct ReceiveError;
impl fmt::Display for ReceiveError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt("receiving from a closed debouncer", f)
}
}
impl error::Error for ReceiveError {}
struct Debouncer<T> {
state: Mutex<DebouncerState<T>>,
debounce_time: Duration,
queue_wait_cvar: Condvar,
event_ready_wait_cvar: Condvar,
}
impl<T> Debouncer<T> {
fn new(debounce_time: Duration, tx_count: usize, rx_count: usize) -> Self {
Self {
state: Mutex::new(DebouncerState::new(tx_count, rx_count)),
debounce_time,
queue_wait_cvar: Condvar::new(),
event_ready_wait_cvar: Condvar::new(),
}
}
fn lock_state(&self) -> MutexGuard<DebouncerState<T>> {
match self.state.lock() {
Ok(guard) => guard,
Err(err) => err.into_inner(),
}
}
fn push<R, F>(&self, raw_event: R, fold: F) -> Result<(), SendError<R>>
where
F: Fn(Option<T>, R) -> T,
{
let now = Instant::now();
let push_outcome = {
let mut state_guard = self.lock_state();
// Return an error if there are no rxs left to send to. Include the raw event in the
// error so that it isn't lost.
if state_guard.has_no_rxs() {
return Err(SendError(raw_event));
}
state_guard.acc_queue.push_latest(raw_event, fold, now, self.debounce_time)
};
if matches!(push_outcome, PushOutcome::NewAcc) {
// If we pushed a new event accumulator, wake up one rx thread which was waiting for
// the queue to be non-empty. Don't wake up all the rx threads because only one of
// them can consume the acc from the queue; therefore, we should wait until another
// acc is added to the queue before waking up another thread.
self.queue_wait_cvar.notify_one();
}
Ok(())
}
fn pop(&self) -> Result<T, ReceiveError> {
enum PopOutcome<T> {
Event(T),
Shutdown,
}
let mut state_guard = self.lock_state();
let result = 'result: {
// Check that there are any txs left so we don't get stuck waiting forever if the queue
// is empty (as a tx is the only thing that can wake us up).
if state_guard.has_no_txs() {
break 'result PopOutcome::Shutdown;
}
// Loop because we may have a situation where:
// 1. We wait for the queue to be non-empty.
// 2. The accumulator in the queue, `x`, has a `ready_time` in the future, so we must
// wait for it to be ready.
// 3. Before we can wake up from waiting, some other thread pops the now-ready `x`, and
// the queue is now empty.
// 4. We wake up and must wait for the queue to become non-empty again.
//
// Additionally, the loop also handles spurious returns from the condvar wait.
'pop_event_outer_loop: loop {
// If there are no accumulators in the queue, wait for one to be pushed.
if state_guard.acc_queue.is_empty() {
// Park the thread. We will be woken up again either by a new accumulator being
// pushed to the queue, or by the debouncer being shut down.
//
// This may return spuriously so the queue may still be empty when we wake up,
// but if this happens we will retry the wait when we attempt and fail to peek
// the queue later.
state_guard = self.queue_wait_cvar.wait(state_guard).unwrap();
// We may have been unparked because someone wants to shut down the debouncer,
// so check the shutdown flag.
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
}
// Loop because we may have a situation where:
// 1. The first accumulator in the queue, `x`, has a `ready_time` in the future.
// 2. We begin waiting for `x` to become ready.
// 3. Before we can wake up from waiting, some other thread pops the now-ready `x`.
// 4. The new first accumulator in the queue, `y`, has a `ready_time` in the queue.
// 5. We wake up and must start waiting again, as `y` is not ready yet.
break loop {
let required_wait_time = {
// Get the oldest accumulator in the queue, so we can see if it is ready to
// be popped and, if not, how long we need to wait for it to be ready to be
// popped.
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
// If there is no accumulator for us to pop from the queue, go back to
// waiting for the queue to be non-empty. This could happen because the
// previous condvar wait returned spuriously, or because another thread
// popped the accumulator before us.
continue 'pop_event_outer_loop;
};
let now = Instant::now();
// Calculate the time we need to wait for the accumulator to become ready,
// if at all.
peeked_acc.ready_time.checked_duration_since(now)
};
match required_wait_time {
// Case where we must wait for the accumulator to become ready.
Some(wait_time) if !wait_time.is_zero() => {
// Wait the amount of time between now and the `ready_time` of the
// accumulator. This is done using a condvar so the sleep can be
// interrupted if someone wants to shut down the debouncer.
//
// This may return spuriously so we may wake up before the time has
// elapsed and without being notified by anyone. However, since we go
// back so the start of the loop after this, we will see that there is
// still time remaining before the accumulator becomes ready, and
// therefore we will resume waiting.
(state_guard, _) = self.event_ready_wait_cvar
.wait_timeout(state_guard, wait_time)
.unwrap();
// Again, we may have been unparked because someone wants to shut down the
// debouncer, so check the shutdown flag and return if it is set.
if state_guard.has_no_txs() {
break PopOutcome::Shutdown;
}
continue;
},
// If we don't have to wait, we can pop the oldest accumulator from the
// queue and return it. We can unwrap the `pop_oldest` because in this
// case, we successfully peeked the queue and haven't modified the queue or
// released the lock since.
_ => {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break PopOutcome::Event(popped_acc),
// If the accumulator we popped has no data (which can happen if
// the fold function panicked while pushing), retry from the
// beginning.
None => continue 'pop_event_outer_loop,
}
},
}
};
}
};
match result {
PopOutcome::Event(event) => Ok(event),
// If the debouncer has been shut down, return any remaining accumulators then start
// returning errors once the accumulators have been depleted.
PopOutcome::Shutdown => {
state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.ok_or(ReceiveError)
},
}
}
fn try_pop(&self) -> Result<Option<T>, ReceiveError> {
let mut state_guard = self.lock_state();
// If the debouncer has been shut down, return any remaining accumulators then start
// returning errors once the accumulators have been depleted.
if state_guard.has_no_txs() {
return state_guard
.acc_queue
.pop_oldest_acc_discard_none()
.map(Some)
.ok_or(ReceiveError);
}
let now = Instant::now();
// Loop in case we pop an accumulator from the queue which contains no data, which can
// happen if the fold function panicked when pushing.
loop {
let first_acc_ready = {
let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else {
break Ok(None);
};
peeked_acc.ready_time <= now
};
if first_acc_ready {
match state_guard.acc_queue.pop_oldest().unwrap().into_acc() {
Some(popped_acc) => break Ok(Some(popped_acc)),
None => continue,
}
} else {
break Ok(None);
}
}
}
}
struct DebouncerState<T> {
acc_queue: EventAccQueue<T>,
// These could alternatively be `AtomicUsize`s which live outside the mutex, but we opt to have
// them inside the mutex instead to avoid a confusing mixture of mutexes / condvars and
// atomics.
tx_count: usize,
rx_count: usize,
}
impl<T> DebouncerState<T> {
fn new(tx_count: usize, rx_count: usize) -> Self {
Self {
acc_queue: EventAccQueue::new(),
tx_count,
rx_count,
}
}
fn has_no_txs(&self) -> bool {
self.tx_count == 0
}
fn add_tx(&mut self) -> Result<(), CountOverflowError> {
self.tx_count = self.tx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn remove_tx(&mut self) -> usize {
self.tx_count = self.tx_count.saturating_sub(1);
self.tx_count
}
fn has_no_rxs(&self) -> bool {
self.rx_count == 0
}
fn add_rx(&mut self) -> Result<(), CountOverflowError> {
self.rx_count = self.rx_count.checked_add(1)
.ok_or(CountOverflowError)?;
Ok(())
}
fn remove_rx(&mut self) -> usize {
self.rx_count = self.rx_count.saturating_sub(1);
self.rx_count
}
}
struct EventAccQueue<T> {
inner: VecDeque<EventAcc<T>>,
}
impl<T> EventAccQueue<T> {
fn new() -> Self {
Self { inner: VecDeque::new() }
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn peek_oldest(&self) -> Option<&EventAcc<T>> {
self.inner.front()
}
fn pop_oldest(&mut self) -> Option<EventAcc<T>> {
self.inner.pop_front()
}
fn pop_oldest_acc_discard_none(&mut self) -> Option<T> {
while let Some(event) = self.pop_oldest() {
if let Some(acc) = event.into_acc() {
return Some(acc);
}
}
return None;
}
fn push_latest<R, F>(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration)
-> PushOutcome
where
F: FnOnce(Option<T>, R) -> T,
{
// Get the last accumulator in the queue if and only if the queue is non-empty and the last
// accumulator in the queue is ready (based on the given `now` time).
match self.inner
.back_mut()
.and_then(|acc| (acc.ready_time > now).then_some(acc))
{
// If the last accumulator in the queue is not ready yet, we can fold the new event
// into the accumulator.
Some(event) => {
event.fold(raw_event, f);
PushOutcome::NoNewAcc
},
// If the last accumulator in the queue is ready, we don't want to keep adding to it,
// so make a new accumulator and push it to the end of the queue.
None => {
let ready_time = now + debounce_time;
let event = EventAcc::new_from_fold(raw_event, f, ready_time);
self.inner.push_back(event);
PushOutcome::NewAcc
},
}
}
}
enum PushOutcome {
NewAcc,
NoNewAcc,
}
/// "Event accumulator", created by repeatedly joining raw events using a fold function.
struct EventAcc<T> {
// The accumulator is stored as an `Option` because it is temporarily set to `None` during
// `EventAcc::fold`, so that the old accumulator can be moved into the user-provided fold
// function rather than borrowing it. `MaybeUninit` cannot be used for this because the
// user-provided fold function may panic, which would leave the accumulator in an uninitialised
// state, potentially causing UB later.
acc: Option<T>,
ready_time: Instant,
}
impl<T> EventAcc<T> {
fn new_from_fold<R, F>(raw_event: R, f: F, ready_time: Instant) -> Self
where
F: FnOnce(Option<T>, R) -> T,
{
Self {
acc: Some(f(None, raw_event)),
ready_time,
}
}
fn fold<R, F>(&mut self, raw_event: R, f: F)
where
F: FnOnce(Option<T>, R) -> T,
{
let acc = self.acc.take();
self.acc = Some(f(acc, raw_event));
}
fn into_acc(self) -> Option<T> {
self.acc
}
}
#[derive(Debug)]
struct CountOverflowError;
Loading…
Cancel
Save