diff --git a/src/lib.rs b/src/lib.rs index c5158ee..6ae2a71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod fold; +mod next; use std::{ sync::{Arc, Mutex, Condvar, mpsc}, diff --git a/src/next.rs b/src/next.rs new file mode 100644 index 0000000..930949d --- /dev/null +++ b/src/next.rs @@ -0,0 +1,234 @@ +use std::{time::{Instant, Duration}, collections::VecDeque, marker::PhantomData, sync::{Mutex, Condvar, MutexGuard, Arc}}; + +pub struct DebouncerTx { + debouncer: Arc>, + fold: FoldFn, + _raw_event_marker: PhantomData, +} + +impl DebouncerTx +where + FoldFn: Fn(Option, RawEvent) -> DebouncedEvent, +{ + +} + +pub struct DebouncerRx { + debouncer: Arc> +} + +impl DebouncerRx { + +} + +pub struct DebouncerClosedError; + +struct Debouncer { + state: Mutex>, + debounce_time: Duration, + queue_wait_cvar: Condvar, + event_ready_wait_cvar: Condvar, +} + +impl Debouncer { + // TODO: error on closed channel? + fn push(&self, raw_event: R, fold: F) + where + F: Fn(Option, R) -> T, + { + let now = Instant::now(); + + let push_outcome = { + let mut state_guard = self.state.lock().unwrap(); + state_guard.push_latest(raw_event, fold, now, self.debounce_time) + }; + + if matches!(push_outcome, PushOutcome::NewAcc) { + // If we pushed a new event accumulator, wake up one rx thread which was waiting for + // the queue to be non-empty. Don't wake up all the rx threads because only one of + // them can consume the acc from the queue; therefore, we should wait until another + // acc is added to the queue before waking up another thread. + self.queue_wait_cvar.notify_one(); + } + } + + fn pop(&self) -> Result { + enum PopOutcome { + Event(T), + Shutdown, + } + + let mut state_guard = self.state.lock().unwrap(); + + if state_guard.shutdown { + return Err(DebouncerClosedError); + } + + let event = loop { + // If the queue is empty, wait for a tx thread to notify us that a new acc has been + // added to the queue. + if state_guard.is_empty() { + state_guard = self.queue_wait_cvar.wait(state_guard).unwrap(); + + // We may have been unparked because someone wants to shut down the debouncer, so + // check the shutdown flag and return if it is set. + if state_guard.shutdown { + return Err(DebouncerClosedError); + } + } + + let event = loop { + let Some(event) = state_guard.peek_earliest() else { + break None; + }; + + let now = Instant::now(); + + match event.ready_time.checked_duration_since(now) { + Some(wait_time) => { + // Wait the amount of time between now and the `ready_time` of the event. + // This is done using a condvar so the sleep can be interrupted if someone + // wants to shut down the debouncer. + (state_guard, _) = self.event_ready_wait_cvar + .wait_timeout(state_guard, wait_time) + .unwrap(); + + // Again, we may have been unparked because someone wants to shut down the + // debouncer, so check the shutdown flag and return if it is set. + if state_guard.shutdown { + return Err(DebouncerClosedError); + } + + continue; + }, + + None => { + break state_guard.pop_oldest_acc(); + }, + } + }; + + let Some(event) = event else { + continue; + }; + + break event; + }; + + // FIXME: replace above with this (set outcome rather than early return) + let result = 'result: { + if state_guard.shutdown { + break 'result PopOutcome::Shutdown; + } + + // TODO + todo!() + }; + + match result { + PopOutcome::Event(event) => Ok(event), + PopOutcome::Shutdown => { + state_guard + .pop_oldest_acc_discard_none() + .ok_or(DebouncerClosedError) + }, + } + } + + // TODO: non-blocking `try_pop`? +} + +struct DebouncerState { + event_queue: VecDeque>, + shutdown: bool, +} + +impl DebouncerState { + fn is_empty(&self) -> bool { + self.event_queue.is_empty() + } + + fn peek_oldest(&self) -> Option<&EventAcc> { + self.event_queue.front() + } + + fn pop_oldest(&mut self) -> Option> { + self.event_queue.pop_front() + } + + fn pop_oldest_acc(&mut self) -> Option { + self.pop_oldest().and_then(EventAcc::into_acc) + } + + fn pop_oldest_acc_discard_none(&mut self) -> Option { + while let Some(event) = self.pop_oldest() { + if let Some(acc) = event.acc { + return Some(acc); + } + } + return None; + } + + fn push_latest(&mut self, raw_event: R, f: F, now: Instant, debounce_time: Duration) + -> PushOutcome + where + F: FnOnce(Option, R) -> T, + { + match self.event_queue + .back_mut() + .and_then(|event| (event.ready_time > now).then_some(event)) + { + Some(event) => { + event.fold(raw_event, f); + PushOutcome::NoNewAcc + }, + + None => { + let ready_time = now + debounce_time; + let event = EventAcc::new_from_fold(raw_event, f, ready_time); + self.event_queue.push_back(event); + PushOutcome::NewAcc + }, + } + } +} + +enum PushOutcome { + NewAcc, + NoNewAcc, +} + +struct EventAcc { + // The accumulator is stored as an `Option` because it is temporarily set to `None` during + // `EventAcc::fold`, so that the old accumulator can be moved into the user-provided fold + // function rather than borrowing it. `MaybeUninit` cannot be used for this because the + // user-provided fold function may panic, which would leave the accumulator in an uninitialised + // state, potentially causing UB later. + acc: Option, + ready_time: Instant, +} + +impl EventAcc { + fn new_from_fold(raw_event: R, f: F, ready_time: Instant) -> Self + where + F: FnOnce(Option, R) -> T, + { + Self { + acc: Some(f(None, raw_event)), + ready_time, + } + } + + fn fold(&mut self, raw_event: R, f: F) + where + F: FnOnce(Option, R) -> T, + { + let acc = self.acc.take(); + self.acc = Some(f(acc, raw_event)); + } + + fn into_acc(self) -> Option { + self.acc + } +} +