diff --git a/examples/vector.rs b/examples/vector.rs index 7fb1874..a50eb7a 100644 --- a/examples/vector.rs +++ b/examples/vector.rs @@ -1,12 +1,11 @@ -use std::{error::Error, panic, thread, time::Duration}; +use std::{panic, thread, time::Duration}; -use treacle::{fold, Debouncer}; +use treacle::{fold, debouncer}; -fn main() -> Result<(), Box> { +fn main() { // Create a debouncer which combines `u32` events which occur within the same 500ms window by // pushing them to a `Vec`. - let (debouncer, rx) = - Debouncer::new(Duration::from_millis(500), fold::fold_vec_push::)?; + let (tx, rx) = debouncer(Duration::from_millis(500), fold::fold_vec_push::); // Spawn a thread which receives the debounced events and prints them. let t = thread::spawn(move || { @@ -17,17 +16,15 @@ fn main() -> Result<(), Box> { // Create 100 raw events for the debouncer to debounce. for i in 0..100 { - debouncer.debounce(i); + tx.send(i).unwrap(); thread::sleep(Duration::from_millis(50)); } - // Dropping the debouncer will gracefully close the mpsc channel, stopping the loop in the - // other thread. - drop(debouncer); + // Drop the tx, so there are no more `DebouncerTx`s left. This will gracefully shut down the + // debouncer, stopping the loop in the other thread once it has received all the events. + drop(tx); if let Err(err) = t.join() { panic::resume_unwind(err); } - - Ok(()) } diff --git a/src/fold.rs b/src/fold.rs index 2bfc315..8160059 100644 --- a/src/fold.rs +++ b/src/fold.rs @@ -1,30 +1,24 @@ -//! Pre-written fold functions which can be passed to [`Debouncer::new`](crate::Debouncer::new) to +//! Pre-written fold functions which can be passed to [`debouncer`](crate::debouncer) to //! specify how raw events should be combined into a single debounced event. /// A fold function which combines events of type `T` into a `Vec` by pushing them. /// /// ``` -/// # fn main() -> Result<(), Box> { -/// # use treacle::{Debouncer, fold::fold_vec_push}; +/// # use treacle::{debouncer, fold::fold_vec_push}; /// use std::{time::Duration, thread}; /// -/// let (debouncer, rx) = Debouncer::new( -/// Duration::from_millis(100), -/// fold_vec_push:: -/// )?; +/// let (tx, rx) = debouncer(Duration::from_millis(100), fold_vec_push::); /// -/// debouncer.debounce(4); -/// debouncer.debounce(5); +/// tx.send(4).unwrap(); +/// tx.send(5).unwrap(); /// /// thread::sleep(Duration::from_millis(150)); -/// debouncer.debounce(3); -/// debouncer.debounce(2); -/// debouncer.debounce(1); +/// tx.send(3).unwrap(); +/// tx.send(2).unwrap(); +/// tx.send(1).unwrap(); /// /// assert_eq!(rx.recv().unwrap(), vec![4, 5]); /// assert_eq!(rx.recv().unwrap(), vec![3, 2, 1]); -/// # Ok(()) -/// # } /// ``` pub fn fold_vec_push(acc: Option>, e: T) -> Vec { let mut acc = acc.unwrap_or_default(); @@ -36,26 +30,20 @@ pub fn fold_vec_push(acc: Option>, e: T) -> Vec { /// existence of events and not any data associated with them. /// /// ``` -/// # fn main() -> Result<(), Box> { -/// # use treacle::{Debouncer, fold::fold_unit}; +/// # use treacle::{debouncer, fold::fold_unit}; /// use std::{time::Duration, thread}; /// -/// let (debouncer, rx) = Debouncer::new( -/// Duration::from_millis(100), -/// fold_unit:: -/// )?; +/// let (tx, rx) = debouncer(Duration::from_millis(100), fold_unit::); /// -/// debouncer.debounce(4); -/// debouncer.debounce(5); +/// tx.send(4).unwrap(); +/// tx.send(5).unwrap(); /// /// thread::sleep(Duration::from_millis(150)); -/// debouncer.debounce(3); -/// debouncer.debounce(2); -/// debouncer.debounce(1); +/// tx.send(3).unwrap(); +/// tx.send(2).unwrap(); +/// tx.send(1).unwrap(); /// /// assert_eq!(rx.recv().unwrap(), ()); /// assert_eq!(rx.recv().unwrap(), ()); -/// # Ok(()) -/// # } /// ``` pub fn fold_unit(_acc: Option<()>, _e: T) {} diff --git a/src/lib.rs b/src/lib.rs index a43b8f7..30fe10e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,368 +1,707 @@ pub mod fold; -pub mod next; use std::{ - sync::{Arc, Mutex, Condvar, mpsc}, - time::Duration, - thread::{self, JoinHandle}, - io, - panic, marker::PhantomData, + time::{Instant, Duration}, + collections::VecDeque, + marker::PhantomData, + sync::{Mutex, Condvar, Arc, MutexGuard}, + fmt, + error, }; -/// A debouncer for deduplicating groups of events which occur at a similar time. Upon receiving an -/// event, the debouncer waits for a specified duration, during which time any additional events -/// will be considered part of the same group. Once it has finished waiting, it will emit a single -/// event of type `T` via an [mpsc](std::sync::mpsc) channel. +/// Creates a new debouncer for deduplicating groups of "raw" events which occur at a similar time. +/// The debouncer is comprised of two halves; a [`DebouncerTx`](DebouncerTx) for sending raw events +/// to the debouncer, and a [`DebouncerRx`](DebouncerRx) for receiving grouped (debounced) events +/// from the debouncer. +/// +/// When a raw event is sent through the `DebouncerTx`, future `debounce_events` which are sent +/// before the given `debounce_time` has elapsed are considered part of the same group. For +/// example, if the `debounce_time` is 1 second, all raw events which are sent less than one second +/// after the first are considered part of the same group. +/// +/// The `DebouncerRx` receives the grouped events from the debouncer once the `debounce_time` has +/// elapsed for the event. Events are grouped using the given `fold` function, which takes a +/// previous grouped event (or "accumulator") and combines it with a new raw event. Some +/// pre-written fold functions are available in the [`fold`](fold) module. /// /// ``` -/// # fn main() -> Result<(), Box> { -/// # use treacle::Debouncer; -/// use std::{time::Duration, thread}; +/// # use std::{thread, time::Duration}; +/// // Create a new debouncer which takes raw events of type `u32` and combines +/// // them by pushing them to a vector. +/// let (tx, rx) = treacle::debouncer::, _>( +/// Duration::from_millis(500), +/// |acc, raw_event| { +/// let mut events_vector = acc.unwrap_or_default(); +/// events_vector.push(raw_event); +/// events_vector +/// }); +/// +/// thread::spawn(move || { +/// // Send two raw events in quick succession. +/// tx.send(10).unwrap(); +/// tx.send(20).unwrap(); /// -/// let (debouncer, rx) = Debouncer::::new( -/// Duration::from_millis(100), -/// // Combine raw events into one debounced event by adding them -/// |acc, e| acc.unwrap_or_default() + e -/// )?; -/// -/// // Send two events to the debouncer in quick succession -/// debouncer.debounce(4); -/// debouncer.debounce(5); -/// -/// // Wait, then send two more events to the debouncer -/// thread::sleep(Duration::from_millis(150)); -/// debouncer.debounce(3); -/// debouncer.debounce(2); -/// debouncer.debounce(1); -/// -/// assert_eq!(rx.recv().unwrap(), 9); // First debounced event (4 + 5) -/// assert_eq!(rx.recv().unwrap(), 6); // Second debounced event (3 + 2 + 1) -/// # Ok(()) -/// # } +/// // Wait, then send another raw event. +/// thread::sleep(Duration::from_millis(500)); +/// tx.send(30).unwrap(); +/// }); +/// +/// assert_eq!(rx.recv().unwrap(), &[10, 20]); +/// assert_eq!(rx.recv().unwrap(), &[30]); /// ``` /// -/// When dropped, the debouncer will gracefully shut down, closing the associated mpsc channel once -/// it has finished sending any remaining events. -pub struct Debouncer { - thread: Option>, - // This is reference counted because the debouncer thread needs access to the controller, and - // the debouncer itself hasn't been created yet when the debouncer thread is created so we - // can't give it an `Arc>`. Besides, the thread probably shouldn't have access to - // its own join handle! - controller: Arc>, - fold: FoldFn, - _raw_maker: PhantomData, +/// Both the `DebouncerTx` and `DebouncerRx` may be cloned, allowing multiple event senders and +/// multiple event receivers for a single debouncer. +pub fn debouncer(debounce_time: Duration, fold: F) + -> (DebouncerTx, DebouncerRx) +where + F: Fn(Option, R) -> D, +{ + let shared_state = Arc::new(Debouncer::new(debounce_time, 1, 1)); + + let tx = DebouncerTx { + debouncer: shared_state.clone(), + fold, + _raw_event_marker: PhantomData, + }; + + let rx = DebouncerRx { + debouncer: shared_state, + }; + + (tx, rx) +} + +/// The "send" half of a debouncer, through which raw events are sent to be debounced. To create a +/// new debouncer, see the [`debouncer`](debouncer) function. To send raw events, see +/// [`DebouncerTx::send`](DebouncerTx::send). +pub struct DebouncerTx { + debouncer: Arc>, + fold: F, + _raw_event_marker: PhantomData, } -impl Debouncer +impl DebouncerTx where - DebouncedEvent: Send + 'static, - FoldFn: Fn(Option, RawEvent) -> DebouncedEvent, + F: Fn(Option, R) -> D, { - /// Create a new [`Debouncer`](Debouncer) which deduplicates events it receives within the - /// timeframe given by `debounce_time`. The raw, un-debounced events can be sent to the - /// debouncer with [`Debouncer::debounce`](crate::Debouncer::debounce), and the resulting - /// debounced events can be received using the [`mpsc::Receiver`](std::sync::mpsc::Receiver) - /// returned by this function. + /// Send a raw event to the debouncer, which will be grouped with other raw events sent at a + /// similar time. Grouping is performed using the "fold" function this `DebouncerTx` was + /// created with. /// - /// Events are deduplicated using the given `fold` function, which combines the current - /// deduplicated event (an `Option`) with a new `RawEvent` to produce a new - /// `DebouncedEvent`. For example, the fold function could collect raw events into a vector: + /// An error is returned if there are no [`DebouncerRx`](DebouncerRx)s associated with the + /// debouncer left to receive events. /// /// ``` - /// # fn main() -> Result<(), Box> { - /// # use treacle::Debouncer; - /// # use std::{thread, time::Duration}; - /// let (debouncer, rx) = Debouncer::, _>::new( - /// Duration::from_millis(100), - /// // Combine raw events by pushing them to a vector + /// # use std::{panic, thread, time::Duration}; + /// // Create a new debouncer which takes raw events of type `u32` and + /// // combines them by converting them to `u64`s and adding them together. + /// let (tx, rx) = treacle::debouncer::( + /// Duration::from_millis(500), /// |acc, raw_event| { - /// let mut vec = acc.unwrap_or_default(); - /// vec.push(raw_event); - /// vec - /// } - /// )?; + /// acc.unwrap_or_default() + u64::from(raw_event) + /// }); /// - /// // Send two events to the debouncer in quick succession - /// debouncer.debounce(1); - /// debouncer.debounce(2); - /// // Wait, then send another event - /// thread::sleep(Duration::from_millis(150)); - /// debouncer.debounce(3); + /// // Spawn a thread to receive the events we are about to send. + /// let t = thread::spawn(move || { + /// assert_eq!(rx.recv().unwrap(), 3); // 1 + 2 + /// assert_eq!(rx.recv().unwrap(), 12); // 3 + 4 + 5 + /// }); /// - /// assert_eq!(rx.recv().unwrap(), vec![1, 2]); - /// assert_eq!(rx.recv().unwrap(), vec![3]); - /// # Ok(()) - /// # } - /// ``` + /// // Send two raw events in quick succession. + /// tx.send(1).unwrap(); + /// tx.send(2).unwrap(); /// - /// When the returned [`Debouncer`](Debouncer) is dropped it will gracefully shut down, closing - /// the associated mpsc channel once it has finished sending any remaining events. + /// // Wait, then send three more raw events. + /// thread::sleep(Duration::from_millis(500)); + /// tx.send(3).unwrap(); + /// tx.send(4).unwrap(); + /// tx.send(5).unwrap(); /// - /// An [`io::Error`](std::io::Error) will be returned by this function if spawning the - /// debouncer thread fails. - pub fn new(debounce_time: Duration, fold: FoldFn) - -> Result<(Self, mpsc::Receiver), io::Error> - { - let controller = Arc::new(DebouncerController { - state: Mutex::new(DebouncerState::new()), - main_cvar: Condvar::new(), - sleep_cvar: Condvar::new(), - }); - - let (tx, rx) = mpsc::channel::(); - - let thread = thread::Builder::new().spawn({ - let debounce_state = controller.clone(); - move || debounce_thread(debounce_state, tx, debounce_time) - })?; - - Ok((Self { - thread: Some(thread), - controller, - fold, - _raw_maker: PhantomData, - }, rx)) + /// // Wait for the receive thread to finish. + /// if let Err(err) = t.join() { + /// panic::resume_unwind(err); + /// } + /// + /// // We moved `rx` into the receive thread, so when the receive thread + /// // finishes, `rx` is dropped. `rx` was the only `DebouncerRx`, so there + /// // is nothing left to receive events we send, so an error is returned. + /// assert!(tx.send(6).is_err()); + /// ``` + pub fn send(&self, event: R) -> Result<(), SendError> { + self.debouncer.push(event, &self.fold) } } -impl Debouncer +impl Clone for DebouncerTx where - FoldFn: Fn(Option, RawEvent) -> DebouncedEvent, + F: Clone, { - /// Send the debouncer a raw event that should be debounced. The - /// [`mpsc::Receiver`](mpsc::Receiver) associated with this debouncer will receive a message - /// after at least the amount of time specified when the debouncer was created. + fn clone(&self) -> Self { + self.debouncer + .lock_state() + .add_tx() + .expect("debouncer tx count should not overflow"); + + Self { + debouncer: self.debouncer.clone(), + fold: self.fold.clone(), + _raw_event_marker: PhantomData, + } + } +} + +impl Drop for DebouncerTx { + fn drop(&mut self) { + let remaining_tx = { + let mut state_guard = self.debouncer.lock_state(); + state_guard.remove_tx() + }; + + if remaining_tx == 0 { + // There may be some rx threads waiting on the condvars, so notify them to stop them + // from waiting. Upon waking up, the rx threads will see that the tx count is 0 and + // switch to their "shutdown" behaviour. + self.debouncer.event_ready_wait_cvar.notify_all(); + self.debouncer.queue_wait_cvar.notify_all(); + } + } +} + +/// The "receive" half of a debouncer, which receives events which have been debounced by grouping +/// raw events which occured close together in time. To create a new debouncer, see the +/// [`debouncer`](debouncer) function. To receive debounced events, see +/// [`DebouncerRx::recv`](DebouncerRx::recv) or [`DebouncerRx::try_recv`](DebouncerRx::try_recv). +pub struct DebouncerRx { + debouncer: Arc> +} + +impl DebouncerRx { + /// Receive a debounced event from the debouncer. If there is not a debounced event available + /// to be received, the function will wait for one. /// - /// The debouncer collects event data into an element of type `T`; the provided function `fold` - /// specifies how to combine the new event data with the existing event data `Option` to - /// produce a new `T`. For example, if `T = Vec`, then `fold` could be a function which - /// pushes to the vec: + /// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events + /// and there are no more events left to receive. /// /// ``` - /// # fn main() -> Result<(), Box> { - /// # use treacle::Debouncer; - /// # use std::{thread, time::Duration}; - /// let (debouncer, rx) = Debouncer::, _>::new( - /// Duration::from_millis(100), - /// // Combine raw events by pushing them to a vector + /// # use std::{panic, thread, time::Duration}; + /// // Create a new debouncer which takes raw events of type `u8` and + /// // combines them by ORing them together. + /// let (tx, rx) = treacle::debouncer::( + /// Duration::from_millis(500), /// |acc, raw_event| { - /// let mut vec = acc.unwrap_or_default(); - /// vec.push(raw_event); - /// vec - /// } - /// )?; + /// acc.unwrap_or_default() | raw_event + /// }); + /// + /// // Create a thread to send raw events to the debouncer. + /// let t = thread::spawn(move || { + /// // Send three raw events in quick succession. + /// tx.send(0b00001000).unwrap(); + /// tx.send(0b00000100).unwrap(); + /// tx.send(0b00000010).unwrap(); + /// + /// // Wait, then send more raw events. + /// thread::sleep(Duration::from_millis(500)); + /// tx.send(0b10000000).unwrap(); + /// tx.send(0b00100000).unwrap(); + /// + /// // Wait, then send a final batch of raw events. + /// thread::sleep(Duration::from_millis(500)); + /// tx.send(0b01000000).unwrap(); + /// tx.send(0b00000001).unwrap(); + /// }); + /// + /// // Receive the first two debounced events from the debouncer, created + /// // by ORing the raw events together that occurred within the same 500ms + /// // window. + /// assert_eq!(rx.recv().unwrap(), 0b00001110); + /// assert_eq!(rx.recv().unwrap(), 0b10100000); + /// + /// // Wait for the send thread to finish. The send thread owns `tx`, so it + /// // will be dropped once the thread finishes. + /// if let Err(err) = t.join() { + /// panic::resume_unwind(err); + /// } /// - /// // Send two events to the debouncer in quick succession - /// debouncer.debounce(1); - /// debouncer.debounce(2); - /// // Wait, then send another event - /// thread::sleep(Duration::from_millis(150)); - /// debouncer.debounce(3); + /// // `tx` has been dropped, but there is still one last event left for us + /// // to receive. + /// assert_eq!(rx.recv().unwrap(), 0b01000001); /// - /// assert_eq!(rx.recv().unwrap(), vec![1, 2]); - /// assert_eq!(rx.recv().unwrap(), vec![3]); - /// # Ok(()) - /// # } + /// // There are no more events to receive and no more `DebouncerTx`s left + /// // to send events, so attempting to receive results in an error. + /// assert!(rx.recv().is_err()); /// ``` + pub fn recv(&self) -> Result { + self.debouncer.pop() + } + + /// An alternative to [`DebouncerRx::recv`](DebouncerRx::recv) which returns `None` if there is + /// not a debounced event immediately available, instead of waiting for it to become available. + /// + /// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events + /// and there are no more events left to receive. /// - /// `fold` is an argument to this function rather than being stored by the `Debouncer` to avoid - /// ending up with a `Debouncer` with a type that's unnameable (e.g. if a closure is used) or - /// having to resort to dynamic dispatch. - #[inline] - pub fn debounce(&self, event_data: RawEvent) { - self.controller.notify_event(event_data, &self.fold) + /// ``` + /// # use std::{thread, time::Duration}; + /// // Create a new debouncer which takes raw events of type `u8` and + /// // combines them by ORing them together. + /// let (tx, rx) = treacle::debouncer::( + /// Duration::from_millis(500), + /// |acc, raw_event| { + /// acc.unwrap_or_default() | raw_event + /// }); + /// + /// tx.send(0b00001000).unwrap(); + /// tx.send(0b00000100).unwrap(); + /// tx.send(0b00000010).unwrap(); + /// + /// assert_eq!(rx.try_recv().unwrap(), None); + /// + /// thread::sleep(Duration::from_millis(500)); + /// assert_eq!(rx.try_recv().unwrap(), Some(0b00001110)); + /// ``` + pub fn try_recv(&self) -> Result, ReceiveError> { + self.debouncer.try_pop() } } -impl Drop for Debouncer { - fn drop(&mut self) { - self.controller.notify_shutdown(); +impl Clone for DebouncerRx { + fn clone(&self) -> Self { + self.debouncer + .lock_state() + .add_rx() + .expect("debouncer rx count should not overflow"); - let thread = self.thread - .take() - .expect("debouncer thread has already been shut down"); - - match thread.join() { - Ok(()) => (), - Err(err) => panic::resume_unwind(err), + Self { + debouncer: self.debouncer.clone(), } } } -struct DebouncerController { +impl Drop for DebouncerRx { + fn drop(&mut self) { + // Decrement the rx count. We don't need to notify any threads waiting on condvars if the + // count reaches 0, because only rx threads wait on the condvars and we know there are no + // more rx threads (because the count just reached 0!) + self.debouncer.lock_state().remove_rx(); + } +} + +/// An error indicating that there are no more [`DebouncerRx`](DebouncerRx)s left to receive +/// events. The raw event that could not be sent is included in the error so it is not lost. +pub struct SendError(pub T); + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SendError").finish_non_exhaustive() + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("sending through a closed debouncer", f) + } +} + +impl error::Error for SendError {} + +/// An error indicating that the debouncer has no more events left to receive, and there are no +/// more [`DebouncerTx`](DebouncerTx)s left to send events. +#[derive(Debug)] +pub struct ReceiveError; + +impl fmt::Display for ReceiveError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt("receiving from a closed debouncer", f) + } +} + +impl error::Error for ReceiveError {} + +struct Debouncer { state: Mutex>, - /// Condvar for notifying the debouncer that it has become dirty or that it should shutdown. - main_cvar: Condvar, - /// Condvar for notifying the debouncer that it should cancel sleeping and shutdown. - sleep_cvar: Condvar, + debounce_time: Duration, + queue_wait_cvar: Condvar, + event_ready_wait_cvar: Condvar, } -impl DebouncerController { - /// Notify the debouncer that an event has occurred that should be debounced. - /// - /// The event data is represented by an element of `E`, and the function `f` specifies how to - /// fold this event into an accumulator of type `T`. - /// - /// For example, to collect events into a list, the accumulator could be a `Vec` and the - /// fold function could be: - /// ```ignore - /// |acc, e| { - /// let mut acc = acc.unwrap_or_default(); - /// acc.push(e); - /// acc - /// } - /// ``` - #[inline] - fn notify_event(&self, event_data: E, fold: F) +impl Debouncer { + fn new(debounce_time: Duration, tx_count: usize, rx_count: usize) -> Self { + Self { + state: Mutex::new(DebouncerState::new(tx_count, rx_count)), + debounce_time, + queue_wait_cvar: Condvar::new(), + event_ready_wait_cvar: Condvar::new(), + } + } + + fn lock_state(&self) -> MutexGuard> { + match self.state.lock() { + Ok(guard) => guard, + Err(err) => err.into_inner(), + } + } + + fn push(&self, raw_event: R, fold: F) -> Result<(), SendError> where - F: FnOnce(Option, E) -> T, + F: Fn(Option, R) -> T, { - let mut guard = self.state.lock().unwrap(); - guard.fold(event_data, fold); - drop(guard); - self.main_cvar.notify_one(); + let now = Instant::now(); + + let push_outcome = { + let mut state_guard = self.lock_state(); + + // Return an error if there are no rxs left to send to. Include the raw event in the + // error so that it isn't lost. + if state_guard.has_no_rxs() { + return Err(SendError(raw_event)); + } + + state_guard.acc_queue.push_latest(raw_event, fold, now, self.debounce_time) + }; + + if matches!(push_outcome, PushOutcome::NewAcc) { + // If we pushed a new event accumulator, wake up one rx thread which was waiting for + // the queue to be non-empty. Don't wake up all the rx threads because only one of + // them can consume the acc from the queue; therefore, we should wait until another + // acc is added to the queue before waking up another thread. + self.queue_wait_cvar.notify_one(); + } + + Ok(()) } - #[inline] - fn notify_shutdown(&self) { - let mut guard = self.state.lock().unwrap(); - guard.set_shutdown(); - drop(guard); - self.main_cvar.notify_one(); - self.sleep_cvar.notify_one(); + fn pop(&self) -> Result { + enum PopOutcome { + Event(T), + Shutdown, + } + + let mut state_guard = self.lock_state(); + + let result = 'result: { + // Check that there are any txs left so we don't get stuck waiting forever if the queue + // is empty (as a tx is the only thing that can wake us up). + if state_guard.has_no_txs() { + break 'result PopOutcome::Shutdown; + } + + // Loop because we may have a situation where: + // 1. We wait for the queue to be non-empty. + // 2. The accumulator in the queue, `x`, has a `ready_time` in the future, so we must + // wait for it to be ready. + // 3. Before we can wake up from waiting, some other thread pops the now-ready `x`, and + // the queue is now empty. + // 4. We wake up and must wait for the queue to become non-empty again. + // + // Additionally, the loop also handles spurious returns from the condvar wait. + 'pop_event_outer_loop: loop { + // If there are no accumulators in the queue, wait for one to be pushed. + if state_guard.acc_queue.is_empty() { + // Park the thread. We will be woken up again either by a new accumulator being + // pushed to the queue, or by the debouncer being shut down. + // + // This may return spuriously so the queue may still be empty when we wake up, + // but if this happens we will retry the wait when we attempt and fail to peek + // the queue later. + state_guard = self.queue_wait_cvar.wait(state_guard).unwrap(); + + // We may have been unparked because someone wants to shut down the debouncer, + // so check the shutdown flag. + if state_guard.has_no_txs() { + break PopOutcome::Shutdown; + } + } + + // Loop because we may have a situation where: + // 1. The first accumulator in the queue, `x`, has a `ready_time` in the future. + // 2. We begin waiting for `x` to become ready. + // 3. Before we can wake up from waiting, some other thread pops the now-ready `x`. + // 4. The new first accumulator in the queue, `y`, has a `ready_time` in the queue. + // 5. We wake up and must start waiting again, as `y` is not ready yet. + break loop { + let required_wait_time = { + // Get the oldest accumulator in the queue, so we can see if it is ready to + // be popped and, if not, how long we need to wait for it to be ready to be + // popped. + let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else { + // If there is no accumulator for us to pop from the queue, go back to + // waiting for the queue to be non-empty. This could happen because the + // previous condvar wait returned spuriously, or because another thread + // popped the accumulator before us. + continue 'pop_event_outer_loop; + }; + + let now = Instant::now(); + + // Calculate the time we need to wait for the accumulator to become ready, + // if at all. + peeked_acc.ready_time.checked_duration_since(now) + }; + + match required_wait_time { + // Case where we must wait for the accumulator to become ready. + Some(wait_time) if !wait_time.is_zero() => { + // Wait the amount of time between now and the `ready_time` of the + // accumulator. This is done using a condvar so the sleep can be + // interrupted if someone wants to shut down the debouncer. + // + // This may return spuriously so we may wake up before the time has + // elapsed and without being notified by anyone. However, since we go + // back so the start of the loop after this, we will see that there is + // still time remaining before the accumulator becomes ready, and + // therefore we will resume waiting. + (state_guard, _) = self.event_ready_wait_cvar + .wait_timeout(state_guard, wait_time) + .unwrap(); + + // Again, we may have been unparked because someone wants to shut down the + // debouncer, so check the shutdown flag and return if it is set. + if state_guard.has_no_txs() { + break PopOutcome::Shutdown; + } + + continue; + }, + + // If we don't have to wait, we can pop the oldest accumulator from the + // queue and return it. We can unwrap the `pop_oldest` because in this + // case, we successfully peeked the queue and haven't modified the queue or + // released the lock since. + _ => { + match state_guard.acc_queue.pop_oldest().unwrap().into_acc() { + Some(popped_acc) => break PopOutcome::Event(popped_acc), + + // If the accumulator we popped has no data (which can happen if + // the fold function panicked while pushing), retry from the + // beginning. + None => continue 'pop_event_outer_loop, + } + }, + } + }; + } + }; + + match result { + PopOutcome::Event(event) => Ok(event), + + // If the debouncer has been shut down, return any remaining accumulators then start + // returning errors once the accumulators have been depleted. + PopOutcome::Shutdown => { + state_guard + .acc_queue + .pop_oldest_acc_discard_none() + .ok_or(ReceiveError) + }, + } + } + + fn try_pop(&self) -> Result, ReceiveError> { + let mut state_guard = self.lock_state(); + + // If the debouncer has been shut down, return any remaining accumulators then start + // returning errors once the accumulators have been depleted. + if state_guard.has_no_txs() { + return state_guard + .acc_queue + .pop_oldest_acc_discard_none() + .map(Some) + .ok_or(ReceiveError); + } + + let now = Instant::now(); + + // Loop in case we pop an accumulator from the queue which contains no data, which can + // happen if the fold function panicked when pushing. + loop { + let first_acc_ready = { + let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else { + break Ok(None); + }; + + peeked_acc.ready_time <= now + }; + + if first_acc_ready { + match state_guard.acc_queue.pop_oldest().unwrap().into_acc() { + Some(popped_acc) => break Ok(Some(popped_acc)), + None => continue, + } + } else { + break Ok(None); + } + } } } struct DebouncerState { - /// The debounced data we have received so far. - acc: Option, - /// Whether or not we should shutdown the debouncer thread. - shutdown: bool, + acc_queue: EventAccQueue, + // These could alternatively be `AtomicUsize`s which live outside the mutex, but we opt to have + // them inside the mutex instead to avoid a confusing mixture of mutexes / condvars and + // atomics. + tx_count: usize, + rx_count: usize, } impl DebouncerState { - fn new() -> Self { - Self { acc: None, shutdown: false } + fn new(tx_count: usize, rx_count: usize) -> Self { + Self { + acc_queue: EventAccQueue::new(), + tx_count, + rx_count, + } } - fn get_event(&self) -> Option { - if self.shutdown { - Some(DebouncerEvent::Shutdown) - } else if self.acc.is_some() { - Some(DebouncerEvent::Dirty) - } else { - None - } + fn has_no_txs(&self) -> bool { + self.tx_count == 0 } - fn should_shutdown(&self) -> bool { - self.shutdown + fn add_tx(&mut self) -> Result<(), CountOverflowError> { + self.tx_count = self.tx_count.checked_add(1) + .ok_or(CountOverflowError)?; + + Ok(()) } - #[inline] - fn fold(&mut self, event_data: E, fold_fn: F) - where - F: FnOnce(Option, E) -> T, - { - let acc = self.acc.take(); - self.acc = Some(fold_fn(acc, event_data)); + fn remove_tx(&mut self) -> usize { + self.tx_count = self.tx_count.saturating_sub(1); + self.tx_count } - fn swap_acc(&mut self) -> Option { - self.acc.take() + fn has_no_rxs(&self) -> bool { + self.rx_count == 0 } - fn set_shutdown(&mut self) { - self.shutdown = true; + fn add_rx(&mut self) -> Result<(), CountOverflowError> { + self.rx_count = self.rx_count.checked_add(1) + .ok_or(CountOverflowError)?; + + Ok(()) + } + + fn remove_rx(&mut self) -> usize { + self.rx_count = self.rx_count.saturating_sub(1); + self.rx_count } } -enum DebouncerEvent { - /// The debouncer has been told to shut down. - Shutdown, - /// The accumulator has been changed. - Dirty, +struct EventAccQueue { + inner: VecDeque>, } -fn debounce_thread( - debouncer: Arc>, - tx: mpsc::Sender, - debounce_time: Duration -) { - let mut guard = debouncer.state.lock().unwrap(); - - 'debounce: loop { - let event = 'event: loop { - // Check if either `acc` or `shutdown` is set. - if let Some(event) = guard.get_event() { - break 'event event; +impl EventAccQueue { + fn new() -> Self { + Self { inner: VecDeque::new() } + } + + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + fn peek_oldest(&self) -> Option<&EventAcc> { + self.inner.front() + } + + fn pop_oldest(&mut self) -> Option> { + self.inner.pop_front() + } + + fn pop_oldest_acc_discard_none(&mut self) -> Option { + while let Some(event) = self.pop_oldest() { + if let Some(acc) = event.into_acc() { + return Some(acc); } + } + return None; + } - // If neither `acc` nor `shutdown` have been set, wait for the condvar to be notified - // then check again. We need to check in a loop because `Condvar` allows spurious - // wakeups. - guard = debouncer.main_cvar.wait(guard).unwrap(); - }; - - match event { - DebouncerEvent::Shutdown => { - // Before shutting down, check if there is anything in the accumulator and send it - // through the channel if it is still open. - if let Some(acc) = guard.swap_acc() { - tx.send(acc).ok(); - } + fn push_latest(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration) + -> PushOutcome + where + F: FnOnce(Option, R) -> T, + { + // Get the last accumulator in the queue if and only if the queue is non-empty and the last + // accumulator in the queue is ready (based on the given `now` time). + match self.inner + .back_mut() + .and_then(|acc| (acc.ready_time > now).then_some(acc)) + { + // If the last accumulator in the queue is not ready yet, we can fold the new event + // into the accumulator. + Some(event) => { + event.fold(raw_event, f); + PushOutcome::NoNewAcc + }, - break 'debounce; + // If the last accumulator in the queue is ready, we don't want to keep adding to it, + // so make a new accumulator and push it to the end of the queue. + None => { + let ready_time = now + debounce_time; + let event = EventAcc::new_from_fold(raw_event, f, ready_time); + self.inner.push_back(event); + PushOutcome::NewAcc }, + } + } +} - DebouncerEvent::Dirty => { - // Wait for the debounce time, or until the `sleep_cvar` is notified and the - // `shutdown` boolean is set. Note that checking `shutdown` here is necessary - // because `Condvar` allows spurious wakeups. - // - // By waiting for a condvar, we are temporarily releasing the mutex. This allows - // the other threads to set `acc` while we are waiting. - (guard, _) = debouncer.sleep_cvar - .wait_timeout_while(guard, debounce_time, |state| { - !state.should_shutdown() - }).unwrap(); - - // Once we have finished waiting, send the contents of the accumulator through the - // channel. - if let Some(acc) = guard.swap_acc() { - if tx.send(acc).is_err() { - // If the other side of the channel has been closed, stop the debouncer. - break 'debounce; - } - } +enum PushOutcome { + NewAcc, + NoNewAcc, +} - // Check if the shutdown flag was set while we were waiting, and stop the deboucner - // if so. - if guard.should_shutdown() { - break 'debounce; - } - }, +/// "Event accumulator", created by repeatedly joining raw events using a fold function. +struct EventAcc { + // The accumulator is stored as an `Option` because it is temporarily set to `None` during + // `EventAcc::fold`, so that the old accumulator can be moved into the user-provided fold + // function rather than borrowing it. `MaybeUninit` cannot be used for this because the + // user-provided fold function may panic, which would leave the accumulator in an uninitialised + // state, potentially causing UB later. + acc: Option, + ready_time: Instant, +} + +impl EventAcc { + fn new_from_fold(raw_event: R, f: F, ready_time: Instant) -> Self + where + F: FnOnce(Option, R) -> T, + { + Self { + acc: Some(f(None, raw_event)), + ready_time, } } + + fn fold(&mut self, raw_event: R, f: F) + where + F: FnOnce(Option, R) -> T, + { + let acc = self.acc.take(); + self.acc = Some(f(acc, raw_event)); + } + + fn into_acc(self) -> Option { + self.acc + } } +#[derive(Debug)] +struct CountOverflowError; + #[cfg(test)] mod tests { use std::{time::Duration, thread}; - use super::{Debouncer, fold}; + use super::{debouncer, fold}; #[test] fn test_debounce() { - let (debouncer, rx) = Debouncer::new( - Duration::from_millis(50), - fold::fold_vec_push:: - ).unwrap(); + let (tx, rx) = debouncer(Duration::from_millis(50), fold::fold_vec_push::); for i in 0..3 { for j in 0..10 { - debouncer.debounce(i * 10 + j); + tx.send(i * 10 + j).unwrap(); thread::sleep(Duration::from_millis(4)); } @@ -376,17 +715,14 @@ mod tests { #[test] fn test_debouncer_shutdown() { - let (debouncer, rx) = Debouncer::new( - Duration::from_millis(100), - fold::fold_vec_push:: - ).unwrap(); + let (tx, rx) = debouncer(Duration::from_millis(100), fold::fold_vec_push::); - debouncer.debounce(1); - debouncer.debounce(2); - debouncer.debounce(3); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); - // Drop the debouncer, shutting it down. - drop(debouncer); + // Drop the tx, shutting down the debouncer. + drop(tx); // Test that the events emitted just before the shutdown are not lost. assert_eq!(rx.recv().unwrap(), &[1, 2, 3]); diff --git a/src/next.rs b/src/next.rs deleted file mode 100644 index 070dd18..0000000 --- a/src/next.rs +++ /dev/null @@ -1,691 +0,0 @@ -use std::{ - time::{Instant, Duration}, - collections::VecDeque, - marker::PhantomData, - sync::{Mutex, Condvar, Arc, MutexGuard}, - fmt, - error, -}; - -/// Creates a new debouncer for deduplicating groups of "raw" events which occur at a similar time. -/// The debouncer is comprised of two halves; a [`DebouncerTx`](DebouncerTx) for sending raw events -/// to the debouncer, and a [`DebouncerRx`](DebouncerRx) for receiving grouped (debounced) events -/// from the debouncer. -/// -/// When a raw event is sent through the `DebouncerTx`, future `debounce_events` which are sent -/// before the given `debounce_time` has elapsed are considered part of the same group. For -/// example, if the `debounce_time` is 1 second, all raw events which are sent less than one second -/// after the first are considered part of the same group. -/// -/// The `DebouncerRx` receives the grouped events from the debouncer once the `debounce_time` has -/// elapsed for the event. Events are grouped using the given `fold` function, which takes a -/// previous grouped event (or "accumulator") and combines it with a new raw event. -/// -/// ``` -/// # use std::{thread, time::Duration}; -/// # use treacle::next::debouncer; -/// // Create a new debouncer which takes raw events of type `u32` and combines -/// // them by pushing them to a vector. -/// let (tx, rx) = debouncer::, _>( -/// Duration::from_millis(500), -/// |acc, raw_event| { -/// let mut events_vector = acc.unwrap_or_default(); -/// events_vector.push(raw_event); -/// events_vector -/// }); -/// -/// thread::spawn(move || { -/// // Send two raw events in quick succession. -/// tx.send(10).unwrap(); -/// tx.send(20).unwrap(); -/// -/// // Wait, then send another raw event. -/// thread::sleep(Duration::from_millis(500)); -/// tx.send(30).unwrap(); -/// }); -/// -/// assert_eq!(rx.recv().unwrap(), &[10, 20]); -/// assert_eq!(rx.recv().unwrap(), &[30]); -/// ``` -/// -/// Both the `DebouncerTx` and `DebouncerRx` may be cloned, allowing multiple event senders and -/// multiple event receivers for a single debouncer. -pub fn debouncer(debounce_time: Duration, fold: F) - -> (DebouncerTx, DebouncerRx) -where - F: Fn(Option, R) -> D, -{ - let shared_state = Arc::new(Debouncer::new(debounce_time, 1, 1)); - - let tx = DebouncerTx { - debouncer: shared_state.clone(), - fold, - _raw_event_marker: PhantomData, - }; - - let rx = DebouncerRx { - debouncer: shared_state, - }; - - (tx, rx) -} - -/// The "send" half of a debouncer, through which raw events are sent to be debounced. To create a -/// new debouncer, see the [`debouncer`](debouncer) function. To send raw events, see -/// [`DebouncerTx::send`](DebouncerTx::send). -pub struct DebouncerTx { - debouncer: Arc>, - fold: F, - _raw_event_marker: PhantomData, -} - -impl DebouncerTx -where - F: Fn(Option, R) -> D, -{ - /// Send a raw event to the debouncer, which will be grouped with other raw events sent at a - /// similar time. Grouping is performed using the "fold" function this `DebouncerTx` was - /// created with. - /// - /// An error is returned if there are no [`DebouncerRx`](DebouncerRx)s associated with the - /// debouncer left to receive events. - /// - /// ``` - /// # use std::{panic, thread, time::Duration}; - /// # use treacle::next::debouncer; - /// // Create a new debouncer which takes raw events of type `u32` and - /// // combines them by converting them to `u64`s and adding them together. - /// let (tx, rx) = debouncer::( - /// Duration::from_millis(500), - /// |acc, raw_event| { - /// acc.unwrap_or_default() + u64::from(raw_event) - /// }); - /// - /// // Spawn a thread to receive the events we are about to send. - /// let t = thread::spawn(move || { - /// assert_eq!(rx.recv().unwrap(), 3); // 1 + 2 - /// assert_eq!(rx.recv().unwrap(), 12); // 3 + 4 + 5 - /// }); - /// - /// // Send two raw events in quick succession. - /// tx.send(1).unwrap(); - /// tx.send(2).unwrap(); - /// - /// // Wait, then send three more raw events. - /// thread::sleep(Duration::from_millis(500)); - /// tx.send(3).unwrap(); - /// tx.send(4).unwrap(); - /// tx.send(5).unwrap(); - /// - /// // Wait for the receive thread to finish. - /// if let Err(err) = t.join() { - /// panic::resume_unwind(err); - /// } - /// - /// // We moved `rx` into the receive thread, so when the receive thread - /// // finishes, `rx` is dropped. `rx` was the only `DebouncerRx`, so there - /// // is nothing left to receive events we send, so an error is returned. - /// assert!(tx.send(6).is_err()); - /// ``` - pub fn send(&self, event: R) -> Result<(), SendError> { - self.debouncer.push(event, &self.fold) - } -} - -impl Clone for DebouncerTx -where - F: Clone, -{ - fn clone(&self) -> Self { - self.debouncer - .lock_state() - .add_tx() - .expect("debouncer tx count should not overflow"); - - Self { - debouncer: self.debouncer.clone(), - fold: self.fold.clone(), - _raw_event_marker: PhantomData, - } - } -} - -impl Drop for DebouncerTx { - fn drop(&mut self) { - let remaining_tx = { - let mut state_guard = self.debouncer.lock_state(); - state_guard.remove_tx() - }; - - if remaining_tx == 0 { - // There may be some rx threads waiting on the condvars, so notify them to stop them - // from waiting. Upon waking up, the rx threads will see that the tx count is 0 and - // switch to their "shutdown" behaviour. - self.debouncer.event_ready_wait_cvar.notify_all(); - self.debouncer.queue_wait_cvar.notify_all(); - } - } -} - -/// The "receive" half of a debouncer, which receives events which have been debounced by grouping -/// raw events which occured close together in time. To create a new debouncer, see the -/// [`debouncer`](debouncer) function. To receive debounced events, see -/// [`DebouncerRx::recv`](DebouncerRx::recv) or [`DebouncerRx::try_recv`](DebouncerRx::try_recv). -pub struct DebouncerRx { - debouncer: Arc> -} - -impl DebouncerRx { - /// Receive a debounced event from the debouncer. If there is not a debounced event available - /// to be received, the function will wait for one. - /// - /// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events - /// and there are no more events left to receive. - /// - /// ``` - /// # use std::{panic, thread, time::Duration}; - /// # use treacle::next::debouncer; - /// // Create a new debouncer which takes raw events of type `u8` and - /// // combines them by ORing them together. - /// let (tx, rx) = debouncer::( - /// Duration::from_millis(500), - /// |acc, raw_event| { - /// acc.unwrap_or_default() | raw_event - /// }); - /// - /// // Create a thread to send raw events to the debouncer. - /// let t = thread::spawn(move || { - /// // Send three raw events in quick succession. - /// tx.send(0b00001000).unwrap(); - /// tx.send(0b00000100).unwrap(); - /// tx.send(0b00000010).unwrap(); - /// - /// // Wait, then send more raw events. - /// thread::sleep(Duration::from_millis(500)); - /// tx.send(0b10000000).unwrap(); - /// tx.send(0b00100000).unwrap(); - /// - /// // Wait, then send a final batch of raw events. - /// thread::sleep(Duration::from_millis(500)); - /// tx.send(0b01000000).unwrap(); - /// tx.send(0b00000001).unwrap(); - /// }); - /// - /// // Receive the first two debounced events from the debouncer, created - /// // by ORing the raw events together that occurred within the same 500ms - /// // window. - /// assert_eq!(rx.recv().unwrap(), 0b00001110); - /// assert_eq!(rx.recv().unwrap(), 0b10100000); - /// - /// // Wait for the send thread to finish. The send thread owns `tx`, so it - /// // will be dropped once the thread finishes. - /// if let Err(err) = t.join() { - /// panic::resume_unwind(err); - /// } - /// - /// // `tx` has been dropped, but there is still one last event left for us - /// // to receive. - /// assert_eq!(rx.recv().unwrap(), 0b01000001); - /// - /// // There are no more events to receive and no more `DebouncerTx`s left - /// // to send events, so attempting to receive results in an error. - /// assert!(rx.recv().is_err()); - /// ``` - pub fn recv(&self) -> Result { - self.debouncer.pop() - } - - /// An alternative to [`DebouncerRx::recv`](DebouncerRx::recv) which returns `None` if there is - /// not a debounced event immediately available, instead of waiting for it to become available. - /// - /// An error is returned if there are no more [`DebouncerTx`](DebouncerTx)s left to send events - /// and there are no more events left to receive. - /// - /// ``` - /// # use std::{thread, time::Duration}; - /// # use treacle::next::debouncer; - /// // Create a new debouncer which takes raw events of type `u8` and - /// // combines them by ORing them together. - /// let (tx, rx) = debouncer::( - /// Duration::from_millis(500), - /// |acc, raw_event| { - /// acc.unwrap_or_default() | raw_event - /// }); - /// - /// tx.send(0b00001000).unwrap(); - /// tx.send(0b00000100).unwrap(); - /// tx.send(0b00000010).unwrap(); - /// - /// assert_eq!(rx.try_recv().unwrap(), None); - /// - /// thread::sleep(Duration::from_millis(500)); - /// assert_eq!(rx.try_recv().unwrap(), Some(0b00001110)); - /// ``` - pub fn try_recv(&self) -> Result, ReceiveError> { - self.debouncer.try_pop() - } -} - -impl Clone for DebouncerRx { - fn clone(&self) -> Self { - self.debouncer - .lock_state() - .add_rx() - .expect("debouncer rx count should not overflow"); - - Self { - debouncer: self.debouncer.clone(), - } - } -} - -impl Drop for DebouncerRx { - fn drop(&mut self) { - // Decrement the rx count. We don't need to notify any threads waiting on condvars if the - // count reaches 0, because only rx threads wait on the condvars and we know there are no - // more rx threads (because the count just reached 0!) - self.debouncer.lock_state().remove_rx(); - } -} - -/// An error indicating that there are no more [`DebouncerRx`](DebouncerRx)s left to receive -/// events. The raw event that could not be sent is included in the error so it is not lost. -pub struct SendError(pub T); - -impl fmt::Debug for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SendError").finish_non_exhaustive() - } -} - -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt("sending through a closed debouncer", f) - } -} - -impl error::Error for SendError {} - -/// An error indicating that the debouncer has no more events left to receive, and there are no -/// more [`DebouncerTx`](DebouncerTx)s left to send events. -#[derive(Debug)] -pub struct ReceiveError; - -impl fmt::Display for ReceiveError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt("receiving from a closed debouncer", f) - } -} - -impl error::Error for ReceiveError {} - -struct Debouncer { - state: Mutex>, - debounce_time: Duration, - queue_wait_cvar: Condvar, - event_ready_wait_cvar: Condvar, -} - -impl Debouncer { - fn new(debounce_time: Duration, tx_count: usize, rx_count: usize) -> Self { - Self { - state: Mutex::new(DebouncerState::new(tx_count, rx_count)), - debounce_time, - queue_wait_cvar: Condvar::new(), - event_ready_wait_cvar: Condvar::new(), - } - } - - fn lock_state(&self) -> MutexGuard> { - match self.state.lock() { - Ok(guard) => guard, - Err(err) => err.into_inner(), - } - } - - fn push(&self, raw_event: R, fold: F) -> Result<(), SendError> - where - F: Fn(Option, R) -> T, - { - let now = Instant::now(); - - let push_outcome = { - let mut state_guard = self.lock_state(); - - // Return an error if there are no rxs left to send to. Include the raw event in the - // error so that it isn't lost. - if state_guard.has_no_rxs() { - return Err(SendError(raw_event)); - } - - state_guard.acc_queue.push_latest(raw_event, fold, now, self.debounce_time) - }; - - if matches!(push_outcome, PushOutcome::NewAcc) { - // If we pushed a new event accumulator, wake up one rx thread which was waiting for - // the queue to be non-empty. Don't wake up all the rx threads because only one of - // them can consume the acc from the queue; therefore, we should wait until another - // acc is added to the queue before waking up another thread. - self.queue_wait_cvar.notify_one(); - } - - Ok(()) - } - - fn pop(&self) -> Result { - enum PopOutcome { - Event(T), - Shutdown, - } - - let mut state_guard = self.lock_state(); - - let result = 'result: { - // Check that there are any txs left so we don't get stuck waiting forever if the queue - // is empty (as a tx is the only thing that can wake us up). - if state_guard.has_no_txs() { - break 'result PopOutcome::Shutdown; - } - - // Loop because we may have a situation where: - // 1. We wait for the queue to be non-empty. - // 2. The accumulator in the queue, `x`, has a `ready_time` in the future, so we must - // wait for it to be ready. - // 3. Before we can wake up from waiting, some other thread pops the now-ready `x`, and - // the queue is now empty. - // 4. We wake up and must wait for the queue to become non-empty again. - // - // Additionally, the loop also handles spurious returns from the condvar wait. - 'pop_event_outer_loop: loop { - // If there are no accumulators in the queue, wait for one to be pushed. - if state_guard.acc_queue.is_empty() { - // Park the thread. We will be woken up again either by a new accumulator being - // pushed to the queue, or by the debouncer being shut down. - // - // This may return spuriously so the queue may still be empty when we wake up, - // but if this happens we will retry the wait when we attempt and fail to peek - // the queue later. - state_guard = self.queue_wait_cvar.wait(state_guard).unwrap(); - - // We may have been unparked because someone wants to shut down the debouncer, - // so check the shutdown flag. - if state_guard.has_no_txs() { - break PopOutcome::Shutdown; - } - } - - // Loop because we may have a situation where: - // 1. The first accumulator in the queue, `x`, has a `ready_time` in the future. - // 2. We begin waiting for `x` to become ready. - // 3. Before we can wake up from waiting, some other thread pops the now-ready `x`. - // 4. The new first accumulator in the queue, `y`, has a `ready_time` in the queue. - // 5. We wake up and must start waiting again, as `y` is not ready yet. - break loop { - let required_wait_time = { - // Get the oldest accumulator in the queue, so we can see if it is ready to - // be popped and, if not, how long we need to wait for it to be ready to be - // popped. - let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else { - // If there is no accumulator for us to pop from the queue, go back to - // waiting for the queue to be non-empty. This could happen because the - // previous condvar wait returned spuriously, or because another thread - // popped the accumulator before us. - continue 'pop_event_outer_loop; - }; - - let now = Instant::now(); - - // Calculate the time we need to wait for the accumulator to become ready, - // if at all. - peeked_acc.ready_time.checked_duration_since(now) - }; - - match required_wait_time { - // Case where we must wait for the accumulator to become ready. - Some(wait_time) if !wait_time.is_zero() => { - // Wait the amount of time between now and the `ready_time` of the - // accumulator. This is done using a condvar so the sleep can be - // interrupted if someone wants to shut down the debouncer. - // - // This may return spuriously so we may wake up before the time has - // elapsed and without being notified by anyone. However, since we go - // back so the start of the loop after this, we will see that there is - // still time remaining before the accumulator becomes ready, and - // therefore we will resume waiting. - (state_guard, _) = self.event_ready_wait_cvar - .wait_timeout(state_guard, wait_time) - .unwrap(); - - // Again, we may have been unparked because someone wants to shut down the - // debouncer, so check the shutdown flag and return if it is set. - if state_guard.has_no_txs() { - break PopOutcome::Shutdown; - } - - continue; - }, - - // If we don't have to wait, we can pop the oldest accumulator from the - // queue and return it. We can unwrap the `pop_oldest` because in this - // case, we successfully peeked the queue and haven't modified the queue or - // released the lock since. - _ => { - match state_guard.acc_queue.pop_oldest().unwrap().into_acc() { - Some(popped_acc) => break PopOutcome::Event(popped_acc), - - // If the accumulator we popped has no data (which can happen if - // the fold function panicked while pushing), retry from the - // beginning. - None => continue 'pop_event_outer_loop, - } - }, - } - }; - } - }; - - match result { - PopOutcome::Event(event) => Ok(event), - - // If the debouncer has been shut down, return any remaining accumulators then start - // returning errors once the accumulators have been depleted. - PopOutcome::Shutdown => { - state_guard - .acc_queue - .pop_oldest_acc_discard_none() - .ok_or(ReceiveError) - }, - } - } - - fn try_pop(&self) -> Result, ReceiveError> { - let mut state_guard = self.lock_state(); - - // If the debouncer has been shut down, return any remaining accumulators then start - // returning errors once the accumulators have been depleted. - if state_guard.has_no_txs() { - return state_guard - .acc_queue - .pop_oldest_acc_discard_none() - .map(Some) - .ok_or(ReceiveError); - } - - let now = Instant::now(); - - // Loop in case we pop an accumulator from the queue which contains no data, which can - // happen if the fold function panicked when pushing. - loop { - let first_acc_ready = { - let Some(peeked_acc) = state_guard.acc_queue.peek_oldest() else { - break Ok(None); - }; - - peeked_acc.ready_time <= now - }; - - if first_acc_ready { - match state_guard.acc_queue.pop_oldest().unwrap().into_acc() { - Some(popped_acc) => break Ok(Some(popped_acc)), - None => continue, - } - } else { - break Ok(None); - } - } - } -} - -struct DebouncerState { - acc_queue: EventAccQueue, - // These could alternatively be `AtomicUsize`s which live outside the mutex, but we opt to have - // them inside the mutex instead to avoid a confusing mixture of mutexes / condvars and - // atomics. - tx_count: usize, - rx_count: usize, -} - -impl DebouncerState { - fn new(tx_count: usize, rx_count: usize) -> Self { - Self { - acc_queue: EventAccQueue::new(), - tx_count, - rx_count, - } - } - - fn has_no_txs(&self) -> bool { - self.tx_count == 0 - } - - fn add_tx(&mut self) -> Result<(), CountOverflowError> { - self.tx_count = self.tx_count.checked_add(1) - .ok_or(CountOverflowError)?; - - Ok(()) - } - - fn remove_tx(&mut self) -> usize { - self.tx_count = self.tx_count.saturating_sub(1); - self.tx_count - } - - fn has_no_rxs(&self) -> bool { - self.rx_count == 0 - } - - fn add_rx(&mut self) -> Result<(), CountOverflowError> { - self.rx_count = self.rx_count.checked_add(1) - .ok_or(CountOverflowError)?; - - Ok(()) - } - - fn remove_rx(&mut self) -> usize { - self.rx_count = self.rx_count.saturating_sub(1); - self.rx_count - } -} - -struct EventAccQueue { - inner: VecDeque>, -} - -impl EventAccQueue { - fn new() -> Self { - Self { inner: VecDeque::new() } - } - - fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - fn peek_oldest(&self) -> Option<&EventAcc> { - self.inner.front() - } - - fn pop_oldest(&mut self) -> Option> { - self.inner.pop_front() - } - - fn pop_oldest_acc_discard_none(&mut self) -> Option { - while let Some(event) = self.pop_oldest() { - if let Some(acc) = event.into_acc() { - return Some(acc); - } - } - return None; - } - - fn push_latest(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration) - -> PushOutcome - where - F: FnOnce(Option, R) -> T, - { - // Get the last accumulator in the queue if and only if the queue is non-empty and the last - // accumulator in the queue is ready (based on the given `now` time). - match self.inner - .back_mut() - .and_then(|acc| (acc.ready_time > now).then_some(acc)) - { - // If the last accumulator in the queue is not ready yet, we can fold the new event - // into the accumulator. - Some(event) => { - event.fold(raw_event, f); - PushOutcome::NoNewAcc - }, - - // If the last accumulator in the queue is ready, we don't want to keep adding to it, - // so make a new accumulator and push it to the end of the queue. - None => { - let ready_time = now + debounce_time; - let event = EventAcc::new_from_fold(raw_event, f, ready_time); - self.inner.push_back(event); - PushOutcome::NewAcc - }, - } - } -} - -enum PushOutcome { - NewAcc, - NoNewAcc, -} - -/// "Event accumulator", created by repeatedly joining raw events using a fold function. -struct EventAcc { - // The accumulator is stored as an `Option` because it is temporarily set to `None` during - // `EventAcc::fold`, so that the old accumulator can be moved into the user-provided fold - // function rather than borrowing it. `MaybeUninit` cannot be used for this because the - // user-provided fold function may panic, which would leave the accumulator in an uninitialised - // state, potentially causing UB later. - acc: Option, - ready_time: Instant, -} - -impl EventAcc { - fn new_from_fold(raw_event: R, f: F, ready_time: Instant) -> Self - where - F: FnOnce(Option, R) -> T, - { - Self { - acc: Some(f(None, raw_event)), - ready_time, - } - } - - fn fold(&mut self, raw_event: R, f: F) - where - F: FnOnce(Option, R) -> T, - { - let acc = self.acc.take(); - self.acc = Some(f(acc, raw_event)); - } - - fn into_acc(self) -> Option { - self.acc - } -} - -#[derive(Debug)] -struct CountOverflowError;