store debouncer fold function in the debouncer itself

no_thread
pantonshire 3 years ago
parent 20d954c10b
commit a25eb76de0

@ -3,93 +3,188 @@ use std::{
time::Duration, time::Duration,
thread::{self, JoinHandle}, thread::{self, JoinHandle},
io, io,
panic, panic, marker::PhantomData,
}; };
/// A debouncer for deduplicating groups of events which occur at a similar time. Upon receiving an /// A debouncer for deduplicating groups of events which occur at a similar time. Upon receiving an
/// event (via `DebouncerController::notify_dirty`), the debouncer waits for a specified duration, /// event, the debouncer waits for a specified duration, during which time any additional events
/// during which time any additional events will be considered part of the same group. Once it has /// will be considered part of the same group. Once it has finished waiting, it will emit a single
/// finished waiting, it will emit a single event via a `mpsc` channel. /// event of type `T` via an [mpsc](std::sync::mpsc) channel.
pub struct Debouncer<T> { ///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::Debouncer;
/// use std::{time::Duration, thread};
///
/// let (debouncer, rx) = Debouncer::<u32, u32, _>::new(
/// Duration::from_millis(100),
/// // Combine raw events into one debounced event by adding them
/// |acc, e| acc.unwrap_or_default() + e
/// )?;
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(4);
/// debouncer.debounce(5);
///
/// // Wait, then send two more events to the debouncer
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
/// debouncer.debounce(2);
/// debouncer.debounce(1);
///
/// assert_eq!(rx.recv().unwrap(), 9); // First debounced event (4 + 5)
/// assert_eq!(rx.recv().unwrap(), 6); // Second debounced event (3 + 2 + 1)
/// # Ok(())
/// # }
/// ```
pub struct Debouncer<RawEvent, DebouncedEvent, FoldFn> {
thread: Option<JoinHandle<()>>, thread: Option<JoinHandle<()>>,
// This is reference counted because the debouncer thread needs access to the controller, and // This is reference counted because the debouncer thread needs access to the controller, and
// the debouncer itself hasn't been created yet when the debouncer thread is created so we // the debouncer itself hasn't been created yet when the debouncer thread is created so we
// can't give it an `Arc<Debouncer<T>>`. Besides, the thread probably shouldn't have access to // can't give it an `Arc<Debouncer<T>>`. Besides, the thread probably shouldn't have access to
// its own join handle! // its own join handle!
controller: Arc<DebouncerController<T>>, controller: Arc<DebouncerController<DebouncedEvent>>,
fold: FoldFn,
_raw_maker: PhantomData<RawEvent>,
} }
impl<T> Debouncer<T> impl<RawEvent, DebouncedEvent, FoldFn> Debouncer<RawEvent, DebouncedEvent, FoldFn>
where where
T: Send + 'static, DebouncedEvent: Send + 'static,
FoldFn: Fn(Option<DebouncedEvent>, RawEvent) -> DebouncedEvent,
{
/// Create a new debouncer which deduplicates events it receives within the timeframe given by
/// `debounce_time`. The raw, un-debounced events can be sent to the debouncer with
/// [`Debouncer::debounce`](crate::Debouncer::debounce), and the resulting debounced events can
/// be received using the [`mpsc::Receiver`](std::sync::mpsc::Receiver) returned by this
/// function.
///
/// Events are deduplicated using the given `fold` function, which combines the current
/// deduplicated event (an `Option<DebouncedEvent>`) with a new `RawEvent` to produce a new
/// `DebouncedEvent`. For example, the fold function could collect raw events into a vector:
///
/// ```
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # use treacle::Debouncer;
/// # use std::{thread, time::Duration};
/// let (debouncer, rx) = Debouncer::<u32, Vec<u32>, _>::new(
/// Duration::from_millis(100),
/// // Combine raw events by pushing them to a vector
/// |acc, raw_event| {
/// let mut vec = acc.unwrap_or_default();
/// vec.push(raw_event);
/// vec
/// }
/// )?;
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(1);
/// debouncer.debounce(2);
/// // Wait, then send another event
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
///
/// assert_eq!(rx.recv().unwrap(), vec![1, 2]);
/// assert_eq!(rx.recv().unwrap(), vec![3]);
/// # Ok(())
/// # }
/// ```
///
/// An [`io::Error`](std::io::Error) will be returned by this function if spawning the
/// debouncer thread fails.
pub fn new(debounce_time: Duration, fold: FoldFn)
-> Result<(Self, mpsc::Receiver<DebouncedEvent>), io::Error>
{ {
pub fn new(debounce_time: Duration) -> Result<(Self, mpsc::Receiver<T>), io::Error> {
let controller = Arc::new(DebouncerController { let controller = Arc::new(DebouncerController {
state: Mutex::new(DebouncerState::new()), state: Mutex::new(DebouncerState::new()),
main_cvar: Condvar::new(), main_cvar: Condvar::new(),
sleep_cvar: Condvar::new(), sleep_cvar: Condvar::new(),
}); });
let (tx, rx) = mpsc::channel::<T>(); let (tx, rx) = mpsc::channel::<DebouncedEvent>();
let thread = thread::Builder::new().spawn({ let thread = thread::Builder::new().spawn({
let debounce_state = controller.clone(); let debounce_state = controller.clone();
move || debounce_thread(debounce_state, tx, debounce_time) move || debounce_thread(debounce_state, tx, debounce_time)
})?; })?;
Ok((Self { thread: Some(thread), controller }, rx)) Ok((Self {
} thread: Some(thread),
controller,
pub fn new_arc(debounce_time: Duration) fold,
-> Result<(Arc<Self>, mpsc::Receiver<T>), io::Error> _raw_maker: PhantomData,
{ }, rx))
Self::new(debounce_time)
.map(|(this, rx)| (Arc::new(this), rx))
} }
} }
impl<T> Debouncer<T> { impl<RawEvent, DebouncedEvent, FoldFn> Debouncer<RawEvent, DebouncedEvent, FoldFn>
where
FoldFn: Fn(Option<DebouncedEvent>, RawEvent) -> DebouncedEvent,
{
/// Send the debouncer a raw event that should be debounced. The /// Send the debouncer a raw event that should be debounced. The
/// [`mpsc::Receiver`](mpsc::Receiver) associated with this debouncer will receive a message /// [`mpsc::Receiver`](mpsc::Receiver) associated with this debouncer will receive a message
/// after at least the amount of time specified when the debouncer was created. /// after at least the amount of time specified when the debouncer was created.
/// ///
/// The debouncer collects event data into an element of type `T`; the provided function `f` /// The debouncer collects event data into an element of type `T`; the provided function `fold`
/// specifies how to combine the new event data with the existing event data `Option<T>` to /// specifies how to combine the new event data with the existing event data `Option<T>` to
/// produce a new `T`. For example, if `T = Vec<E>`, then `f` could be a function which pushes /// produce a new `T`. For example, if `T = Vec<E>`, then `fold` could be a function which
/// to the vec: /// pushes to the vec:
/// ///
/// ```no_run
/// debouncer.debounce(e, |acc, e| {
/// let mut acc = acc.unwrap_or_default();
/// acc.push(e);
/// acc
/// })
/// ``` /// ```
pub fn debounce<E, F>(&self, event_data: E, f: F) /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
where /// # use treacle::Debouncer;
F: FnOnce(Option<T>, E) -> T, /// # use std::{thread, time::Duration};
{ /// let (debouncer, rx) = Debouncer::<u32, Vec<u32>, _>::new(
self.controller.notify_event(event_data, f) /// Duration::from_millis(100),
/// // Combine raw events by pushing them to a vector
/// |acc, raw_event| {
/// let mut vec = acc.unwrap_or_default();
/// vec.push(raw_event);
/// vec
/// }
/// )?;
///
/// // Send two events to the debouncer in quick succession
/// debouncer.debounce(1);
/// debouncer.debounce(2);
/// // Wait, then send another event
/// thread::sleep(Duration::from_millis(150));
/// debouncer.debounce(3);
///
/// assert_eq!(rx.recv().unwrap(), vec![1, 2]);
/// assert_eq!(rx.recv().unwrap(), vec![3]);
/// # Ok(())
/// # }
/// ```
///
/// `fold` is an argument to this function rather than being stored by the `Debouncer` to avoid
/// ending up with a `Debouncer` with a type that's unnameable (e.g. if a closure is used) or
/// having to resort to dynamic dispatch.
#[inline]
pub fn debounce(&self, event_data: RawEvent) {
self.controller.notify_event(event_data, &self.fold)
} }
} }
impl<T> Debouncer<Vec<T>> { // impl<T> Debouncer<Vec<T>> {
pub fn debounce_push(&self, event_data: T) { // #[inline]
self.debounce(event_data, |acc, event_data| { // pub fn debounce_push(&self, event_data: T) {
let mut acc = acc.unwrap_or_default(); // self.debounce(event_data, |acc, event_data| {
acc.push(event_data); // let mut acc = acc.unwrap_or_default();
acc // acc.push(event_data);
}); // acc
} // });
} // }
// }
impl Debouncer<()> { // impl Debouncer<()> {
pub fn debounce_unit(&self) { // #[inline]
self.debounce((), |_acc, _event_data| ()); // pub fn debounce_unit(&self) {
} // self.debounce((), |_acc, _event_data| ());
} // }
// }
impl<T> Drop for Debouncer<T> { impl<RawEvent, DebouncedEvent, FoldFn> Drop for Debouncer<RawEvent, DebouncedEvent, FoldFn> {
fn drop(&mut self) { fn drop(&mut self) {
self.controller.notify_shutdown(); self.controller.notify_shutdown();
@ -120,23 +215,25 @@ impl<T> DebouncerController<T> {
/// ///
/// For example, to collect events into a list, the accumulator could be a `Vec<E>` and the /// For example, to collect events into a list, the accumulator could be a `Vec<E>` and the
/// fold function could be: /// fold function could be:
/// ```no_run /// ```ignore
/// |acc, e| { /// |acc, e| {
/// let mut acc = acc.unwrap_or_default(); /// let mut acc = acc.unwrap_or_default();
/// acc.push(e); /// acc.push(e);
/// acc /// acc
/// } /// }
/// ``` /// ```
fn notify_event<E, F>(&self, event_data: E, f: F) #[inline]
fn notify_event<E, F>(&self, event_data: E, fold: F)
where where
F: FnOnce(Option<T>, E) -> T, F: FnOnce(Option<T>, E) -> T,
{ {
let mut guard = self.state.lock().unwrap(); let mut guard = self.state.lock().unwrap();
guard.fold(event_data, f); guard.fold(event_data, fold);
drop(guard); drop(guard);
self.main_cvar.notify_one(); self.main_cvar.notify_one();
} }
#[inline]
fn notify_shutdown(&self) { fn notify_shutdown(&self) {
let mut guard = self.state.lock().unwrap(); let mut guard = self.state.lock().unwrap();
guard.set_shutdown(); guard.set_shutdown();
@ -172,12 +269,13 @@ impl<T> DebouncerState<T> {
self.shutdown self.shutdown
} }
fn fold<E, F>(&mut self, event_data: E, f: F) #[inline]
fn fold<E, F>(&mut self, event_data: E, fold_fn: F)
where where
F: FnOnce(Option<T>, E) -> T, F: FnOnce(Option<T>, E) -> T,
{ {
let acc = self.acc.take(); let acc = self.acc.take();
self.acc = Some(f(acc, event_data)); self.acc = Some(fold_fn(acc, event_data));
} }
fn swap_acc(&mut self) -> Option<T> { fn swap_acc(&mut self) -> Option<T> {

Loading…
Cancel
Save