continue work on eventloop

main
pantonshire 1 month ago
parent 643cc75f67
commit bcf3f8fe18

@ -4,13 +4,16 @@
#include <stdbool.h> #include <stdbool.h>
#include <time.h> #include <time.h>
#include "platform.h"
#include "list.h" #include "list.h"
#include "scoped.h" #include "scoped.h"
#define OWD_EVENT_READABLE (1 << 0) #define OWD_EVENT_READABLE (1 << 0)
#define OWD_EVENT_WRITABLE (1 << 1) #define OWD_EVENT_WRITABLE (1 << 1)
#if defined(OWD_EVENTLOOP_KQUEUE)
#define OWD_EVENTLOOP_KQUEUE_FILTERS_MAX 2
#endif
enum owd_event_source { enum owd_event_source {
OWD_EVENT_SOURCE_FD, OWD_EVENT_SOURCE_FD,
OWD_EVENT_SOURCE_TIMER, OWD_EVENT_SOURCE_TIMER,
@ -25,12 +28,14 @@ struct owd_event {
owd_event_id_t id; owd_event_id_t id;
struct intrusive_list list; struct intrusive_list list;
#if defined(EVENTLOOP_KQUEUE) #if defined(OWD_EVENTLOOP_KQUEUE)
uintptr_t kqueue_ident; uintptr_t kqueue_ident;
short kqueue_filter; uint16_t kqueue_filters[OWD_EVENTLOOP_KQUEUE_FILTERS_MAX];
uint8_t kqueue_filters_len;
#elif defined(EVENTLOOP_EPOLL) #elif defined(OWD_EVENTLOOP_EPOLL)
int epoll_registered_fd; int epoll_registered_fd;
bool epoll_fd_owned;
#endif #endif
}; };
@ -38,11 +43,11 @@ struct owd_event {
struct owd_eventloop { struct owd_eventloop {
struct intrusive_list event_list; struct intrusive_list event_list;
#if defined(EVENTLOOP_KQUEUE) #if defined(OWD_EVENTLOOP_KQUEUE)
uintptr_t kqueue_timer_next; uintptr_t kqueue_timer_next;
int kqueue_fd; int kqueue_fd;
#elif defined(EVENTLOOP_EPOLL) #elif defined(OWD_EVENTLOOP_EPOLL)
int epoll_fd; int epoll_fd;
#endif #endif
@ -55,7 +60,7 @@ static inline bool owd_event_id_eq(owd_event_id_t eid1, owd_event_id_t eid2) {
int owd_eventloop_init(struct owd_eventloop *el); int owd_eventloop_init(struct owd_eventloop *el);
void owd_eventloop_cleanup(struct owd_eventloop *el); void owd_eventloop_cleanup(struct owd_eventloop *el);
int owd_eventloop_wait(struct owd_eventloop *el, owd_event_id_t *id_out, const struct timespec *timeout); int owd_eventloop_wait(struct owd_eventloop *el, owd_event_id_t *id_out, uint32_t *flags_out, const struct timespec *timeout);
int owd_eventloop_clear(struct owd_eventloop *el, struct owd_event *e); int owd_eventloop_clear(struct owd_eventloop *el, struct owd_event *e);
void owd_eventloop_remove(struct owd_eventloop *el, struct owd_event *e); void owd_eventloop_remove(struct owd_eventloop *el, struct owd_event *e);
int owd_eventloop_add_timer(struct owd_eventloop *el, struct owd_event *e, uint64_t millis, bool oneshot); int owd_eventloop_add_timer(struct owd_eventloop *el, struct owd_event *e, uint64_t millis, bool oneshot);

@ -7,6 +7,6 @@ void owd_platform_el_cleanup(struct owd_eventloop *el);
void owd_platform_el_event_remove(struct owd_eventloop *el, struct owd_event *e); void owd_platform_el_event_remove(struct owd_eventloop *el, struct owd_event *e);
void owd_platform_el_event_cleanup(struct owd_eventloop *el, struct owd_event *e); void owd_platform_el_event_cleanup(struct owd_eventloop *el, struct owd_event *e);
int owd_platform_el_event_clear(struct owd_eventloop *el, struct owd_event *e); int owd_platform_el_event_clear(struct owd_eventloop *el, struct owd_event *e);
int owd_platform_el_wait(struct owd_eventloop *el, owd_event_id_t *id_out, const struct timespec *timeout); int owd_platform_el_wait(struct owd_eventloop *el, owd_event_id_t *id_out, uint32_t *flags_out, const struct timespec *timeout);
int owd_platform_el_add_timer(struct owd_eventloop *el, struct owd_event *e, uint64_t millis, bool oneshot); int owd_platform_el_add_timer(struct owd_eventloop *el, struct owd_event *e, uint64_t millis, bool oneshot);
int owd_platform_el_add_fd(struct owd_eventloop *el, struct owd_event *e, int fd, uint32_t flags); int owd_platform_el_add_fd(struct owd_eventloop *el, struct owd_event *e, int fd, uint32_t flags);

@ -1,34 +0,0 @@
#pragma once
#if defined(__linux__)
#define PLATFORM_LINUX
#define EVENTLOOP_EPOLL
#elif defined(__APPLE__) && defined(__MACH__)
#define PLATFORM_OSX
#define EVENTLOOP_KQUEUE
#elif defined(__FreeBSD__)
#define PLATFORM_BSD
#define PLATFORM_FREEBSD
#define EVENTLOOP_KQUEUE
#elif defined(__NetBSD__)
#define PLATFORM_BSD
#define PLATFORM_NETBSD
#define EVENTLOOP_KQUEUE
#elif defined(__OpenBSD__)
#define PLATFORM_BSD
#define PLATFORM_OPENBSD
#define EVENTLOOP_KQUEUE
#elif defined(__DragonFly__)
#define PLATFORM_BSD
#define PLATFORM_DRAGONFLY
#define EVENTLOOP_KQUEUE
#else
#error "Failed to detect supported platform"
#endif

@ -32,9 +32,10 @@ void owd_eventloop_cleanup(struct owd_eventloop *el) {
int owd_eventloop_wait( int owd_eventloop_wait(
struct owd_eventloop *el, struct owd_eventloop *el,
owd_event_id_t *id_out, owd_event_id_t *id_out,
uint32_t *flags_out,
const struct timespec *timeout) const struct timespec *timeout)
{ {
return owd_platform_el_wait(el, id_out, timeout); return owd_platform_el_wait(el, id_out, flags_out, timeout);
} }
int owd_eventloop_clear(struct owd_eventloop *el, struct owd_event *e) { int owd_eventloop_clear(struct owd_eventloop *el, struct owd_event *e) {

@ -0,0 +1,79 @@
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include "eventloop.h"
#include "eventloop_platform.h"
int owd_platform_el_init(struct owd_eventloop *el) {
if ((el->epoll_fd = epoll_create1(EPOLL_CLOEXEC)) < 0)
return -1;
return 0;
}
void owd_platform_el_cleanup(struct owd_eventloop *el) {
close(el->epoll_fd);
}
void owd_platform_el_event_remove(struct owd_eventloop *el, struct owd_event *e) {
epoll_ctl(el->epoll_fd, EPOLL_CTL_DEL, e->epoll_registered_fd, NULL);
}
void owd_platform_el_event_cleanup(struct owd_eventloop *el, struct owd_event *e) {
if (e->epoll_fd_owned && (e->epoll_registered_fd >= 0)) {
close(e->epoll_registered_fd);
e->epoll_registered_fd = -1;
}
}
static int clear_timerfd(int fd) {
uint64_t tfd_buf;
ssize_t res;
do {
res = read(fd, &tfd_buf, sizeof(tfd_buf));
} while ((res < 0) && (errno == EINTR));
if (res < 0)
return -1;
return 0;
}
int owd_platform_el_event_clear(struct owd_eventloop *el, struct owd_event *e) {
int res;
if ((e->id.source == OWD_EVENT_SOURCE_TIMER) && (e->epoll_registered_fd >= 0)) {
if ((res = clear_timerfd(e->epoll_registered_fd)))
return res;
}
return 0;
}
int owd_platform_el_wait(
struct owd_eventloop *el,
owd_event_id_t *id_out,
uint32_t *flags_out,
const struct timespec *timeout)
{
}
int owd_platform_el_add_timer(
struct owd_eventloop *el,
struct owd_event *e,
uint64_t millis,
bool oneshot)
{
}
int owd_platform_el_add_fd(
struct owd_eventloop *el,
struct owd_event *e,
int fd,
uint32_t flags)
{
}

@ -1,14 +1,56 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
#include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/event.h> #include <sys/event.h>
#include <sys/time.h> #include <sys/time.h>
#include "eventloop.h"
#include "eventloop_platform.h" #include "eventloop_platform.h"
#include "io.h"
struct el_kqueue_add_params {
uint16_t filter;
uint16_t flags;
int64_t data;
};
static int new_kqueue(void) {
#if defined(OWD_PLATFORM_FREEBSD)
return kqueuex(KQUEUE_CLOEXEC);
#elif defined(OWD_PLATFORM_OPENBSD) || defined(OWD_PLATFORM_NETBSD)
return kqueue1(O_CLOEXEC);
#elif defined(OWD_PLATFORM_DARWIN)
// Darwin / XNU sets cloexec by default:
// - https://github.com/apple/darwin-xnu/blob/2ff845c2e033bd0ff64b5b6aa6063a1f8f65aa32/bsd/kern/kern_event.c#L3029
// - https://github.com/apple-oss-distributions/xnu/blob/f6217f891ac0bb64f3d375211650a4c1ff8ca1ea/bsd/kern/kern_event.c#L3070
return kqueue();
#else
FD_SCOPED(fd);
if ((fd = kqueue()) < 0)
return -1;
// FIXME: this is obviously not thread-safe, so if we ever use threads then this needs to be
// revisited.
//
// One option on dragonfly bsd would be to fork, create the kqueue in the fork, then send it
// back to the parent in a unix socket control message (SCM_RIGHTS) with MSG_CMSG_CLOEXEC set.
// Hopefully a better option exists.
if (fcntl(fd, F_SETFD, FD_CLOEXEC))
return -1;
return FD_RELEASE(&fd);
#endif
}
int owd_platform_el_init(struct owd_eventloop *el) { int owd_platform_el_init(struct owd_eventloop *el) {
if ((el->kqueue_fd = kqueue()) < 0) if ((el->kqueue_fd = new_kqueue()) < 0)
return -1; return -1;
el->kqueue_timer_next = 0; el->kqueue_timer_next = 0;
return 0; return 0;
@ -19,13 +61,24 @@ void owd_platform_el_cleanup(struct owd_eventloop *el) {
} }
void owd_platform_el_event_remove(struct owd_eventloop *el, struct owd_event *e) { void owd_platform_el_event_remove(struct owd_eventloop *el, struct owd_event *e) {
struct kevent kev; struct kevent kevs[OWD_EVENTLOOP_KQUEUE_FILTERS_MAX];
unsigned i;
unsigned filters_len;
if ((filters_len = e->kqueue_filters_len) > OWD_EVENTLOOP_KQUEUE_FILTERS_MAX)
filters_len = OWD_EVENTLOOP_KQUEUE_FILTERS_MAX;
if (!filters_len)
return;
memset(kevs, 0, sizeof(kevs));
for (i = 0; i < filters_len; i++) {
kevs[i].ident = e->kqueue_ident;
kevs[i].filter = e->kqueue_filters[i];
kevs[i].flags = EV_DELETE;
}
memset(&kev, 0, sizeof(kev)); kevent(el->kqueue_fd, kevs, filters_len, NULL, 0, NULL);
kev.ident = e->kqueue_ident;
kev.filter = e->kqueue_filter;
kev.flags = EV_DELETE;
kevent(el->kqueue_fd, &kev, 1, NULL, 0, NULL);
} }
// TODO: close timerfd on linux // TODO: close timerfd on linux
@ -44,6 +97,7 @@ int owd_platform_el_event_clear(struct owd_eventloop *el, struct owd_event *e) {
int owd_platform_el_wait( int owd_platform_el_wait(
struct owd_eventloop *el, struct owd_eventloop *el,
owd_event_id_t *id_out, owd_event_id_t *id_out,
uint32_t *flags_out,
const struct timespec *timeout) const struct timespec *timeout)
{ {
struct kevent kev; struct kevent kev;
@ -60,37 +114,54 @@ int owd_platform_el_wait(
} }
id_out->obj = kev.ident; id_out->obj = kev.ident;
*flags_out = 0;
if (kev.filter & EVFILT_TIMER) if (kev.filter & EVFILT_TIMER) {
id_out->source = OWD_EVENT_SOURCE_TIMER; id_out->source = OWD_EVENT_SOURCE_TIMER;
else *flags_out |= OWD_EVENT_READABLE;
}
else {
id_out->source = OWD_EVENT_SOURCE_FD; id_out->source = OWD_EVENT_SOURCE_FD;
if (kev.filter & EVFILT_READ)
*flags_out |= OWD_EVENT_READABLE;
if (kev.filter & EVFILT_WRITE)
*flags_out |= OWD_EVENT_WRITABLE;
}
return 0; return 0;
} }
static int el_kqueue_add_one( static int el_kqueue_add(
struct owd_eventloop *el, struct owd_eventloop *el,
struct owd_event *e, struct owd_event *e,
uintptr_t ident, uintptr_t ident,
uint16_t filter, const struct el_kqueue_add_params *params,
uint16_t flags, uint8_t params_len)
int64_t data)
{ {
struct kevent kev; struct kevent kevs[OWD_EVENTLOOP_KQUEUE_FILTERS_MAX];
unsigned i;
memset(&kev, 0, sizeof(kev)); if (!params_len || (params_len > OWD_EVENTLOOP_KQUEUE_FILTERS_MAX)) {
kev.ident = ident; errno = EINVAL;
kev.filter = filter; return -1;
kev.flags = flags; }
kev.data = data;
memset(kevs, 0, sizeof(kevs));
for (i = 0; i < params_len; i++) {
kevs[i].ident = ident;
kevs[i].filter = params[i].filter;
kevs[i].flags = EV_ADD | params[i].flags;
kevs[i].data = params[i].data;
}
if (kevent(el->kqueue_fd, &kev, 1, NULL, 0, NULL)) if (kevent(el->kqueue_fd, kevs, params_len, NULL, 0, NULL))
return -1; return -1;
e->id.obj = ident; e->id.obj = ident;
e->kqueue_ident = ident; e->kqueue_ident = ident;
e->kqueue_filter = filter; e->kqueue_filters_len = params_len;
for (i = 0; i < params_len; i++)
e->kqueue_filters[i] = params[i].filter;
return 0; return 0;
} }
@ -101,14 +172,17 @@ int owd_platform_el_add_timer(
uint64_t millis, uint64_t millis,
bool oneshot) bool oneshot)
{ {
uint16_t kev_flags;
int res; int res;
struct el_kqueue_add_params params;
kev_flags = 0; params.filter = EVFILT_TIMER;
params.data = millis;
params.flags = 0;
if (oneshot) if (oneshot)
kev_flags |= EV_ONESHOT; params.flags |= EV_ONESHOT;
if ((res = el_kqueue_add_one(el, e, el->kqueue_timer_next, EVFILT_TIMER, EV_ADD | kev_flags, millis))) if ((res = el_kqueue_add(el, e, el->kqueue_timer_next, &params, 1)))
return res; return res;
el->kqueue_timer_next++; el->kqueue_timer_next++;
@ -122,6 +196,26 @@ int owd_platform_el_add_fd(
int fd, int fd,
uint32_t flags) uint32_t flags)
{ {
// TODO struct el_kqueue_add_params params[2];
return 0; uint8_t params_len;
params_len = 0;
if (flags & OWD_EVENT_READABLE) {
params[params_len++] = (struct el_kqueue_add_params) {
.filter = EVFILT_READ,
.flags = 0,
.data = 0,
};
}
if (flags & OWD_EVENT_WRITABLE) {
params[params_len++] = (struct el_kqueue_add_params) {
.filter = EVFILT_WRITE,
.flags = 0,
.data = 0,
};
}
return el_kqueue_add(el, e, (uintptr_t)fd, params, params_len);
} }

Loading…
Cancel
Save