commit 8a2863d3a3301187f51142d1c429e13e4bac000a Author: pantonshire Date: Sat Apr 29 14:32:20 2023 +0100 🎉 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..68c526d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "treacle" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..7eef0ec --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,196 @@ +use std::{ + sync::{Arc, Mutex, Condvar, mpsc}, + time::Duration, + thread::{self, JoinHandle}, io, +}; + +/// A debouncer for deduplicating groups of events which occur at a similar time. Upon receiving an +/// event (via `DebouncerController::notify_dirty`), the debouncer waits for a specified duration, +/// during which time any additional events will be considered part of the same group. Once it has +/// finished waiting, it will emit a single event via a `mpsc` channel. +pub struct Debouncer { + thread: JoinHandle<()>, + controller: Arc>, +} + +impl Debouncer +where + T: Send + 'static, +{ + pub fn start_new(debounce_time: Duration) -> Result<(Self, mpsc::Receiver), io::Error> { + let controller = Arc::new(DebouncerController { + state: Mutex::new(DebouncerState::new()), + main_cvar: Condvar::new(), + sleep_cvar: Condvar::new(), + }); + + let (tx, rx) = mpsc::channel::(); + + let thread = thread::Builder::new().spawn({ + let debounce_state = controller.clone(); + move || debounce_thread(debounce_state, tx, debounce_time) + })?; + + Ok((Self { thread, controller }, rx)) + } + + pub fn controller(&self) -> &Arc> { + &self.controller + } + + pub fn close(self) -> thread::Result<()> { + self.controller().notify_shutdown(); + self.thread.join() + } +} + +pub struct DebouncerController { + state: Mutex>, + /// Condvar for notifying the debouncer that it has become dirty or that it should shutdown. + main_cvar: Condvar, + /// Condvar for notifying the debouncer that it should cancel sleeping and shutdown. + sleep_cvar: Condvar, +} + +impl DebouncerController { + /// Notify the debouncer that an event has occurred that should be debounced. + /// + /// The event data is represented by an element of `E`, and the function `f` specifies how to + /// fold this event into an accumulator of type `T`. + /// + /// For example, to collect events into a list, the accumulator could be a `Vec` and the + /// fold function could be: + /// ```no_run + /// |acc, e| { + /// let mut acc = acc.unwrap_or_default(); + /// acc.push(e); + /// acc + /// } + /// ``` + pub fn notify_event(&self, event_data: E, f: F) + where + F: FnOnce(Option, E) -> T, + { + let mut guard = self.state.lock().unwrap(); + guard.fold(event_data, f); + drop(guard); + self.main_cvar.notify_one(); + } + + fn notify_shutdown(&self) { + let mut guard = self.state.lock().unwrap(); + guard.set_shutdown(); + drop(guard); + self.main_cvar.notify_one(); + self.sleep_cvar.notify_one(); + } +} + +impl DebouncerController> { + pub fn notify_event_push(&self, event_data: T) { + self.notify_event(event_data, |acc, event_data| { + let mut acc = acc.unwrap_or_default(); + acc.push(event_data); + acc + }); + } +} + +struct DebouncerState { + /// The debounced data we have received so far. + acc: Option, + /// Whether or not we should shutdown the debouncer thread. + shutdown: bool, +} + +impl DebouncerState { + fn new() -> Self { + Self { acc: None, shutdown: false } + } + + fn get_event(&self) -> Option { + if self.shutdown { + Some(DebouncerEvent::Shutdown) + } else if self.acc.is_some() { + Some(DebouncerEvent::Dirty) + } else { + None + } + } + + fn should_shutdown(&self) -> bool { + self.shutdown + } + + fn fold(&mut self, event_data: E, f: F) + where + F: FnOnce(Option, E) -> T, + { + let acc = self.acc.take(); + self.acc = Some(f(acc, event_data)); + } + + fn swap_acc(&mut self) -> Option { + self.acc.take() + } + + fn set_shutdown(&mut self) { + self.shutdown = true; + } +} + +enum DebouncerEvent { + /// The debouncer has been told to shut down. + Shutdown, + /// The accumulator has been changed. + Dirty, +} + +fn debounce_thread( + debouncer: Arc>, + tx: mpsc::Sender, + debounce_time: Duration +) { + let mut guard = debouncer.state.lock().unwrap(); + + 'debounce: loop { + let event = 'event: loop { + // Check if either `acc` or `shutdown` is set. + if let Some(event) = guard.get_event() { + break 'event event; + } + + // If neither `acc` nor `shutdown` have been set, wait for the condvar to be notified + // then check again. We need to check in a loop because `Condvar` allows spurious + // wakeups. + guard = debouncer.main_cvar.wait(guard).unwrap(); + }; + + match event { + DebouncerEvent::Shutdown => break 'debounce, + + DebouncerEvent::Dirty => { + // Wait for the debounce time, or until the `sleep_cvar` is notified and the + // `shutdown` boolean is set. Note that checking `shutdown` here is necessary + // because `Condvar` allows spurious wakeups. + // + // By waiting for a condvar, we are temporarily releasing the mutex. This allows + // the other threads to set `acc` while we are waiting. + (guard, _) = debouncer.sleep_cvar + .wait_timeout_while(guard, debounce_time, |state| { + !state.should_shutdown() + }).unwrap(); + + if guard.should_shutdown() { + break 'debounce; + } + + if let Some(acc) = guard.swap_acc() { + if tx.send(acc).is_err() { + break 'debounce + } + } + }, + } + } +}