From aa6345afab9d6c75bd75dd280202c1f98a939ec6 Mon Sep 17 00:00:00 2001 From: Peter M Date: Wed, 4 Mar 2026 13:10:22 +0100 Subject: [PATCH 1/6] Fix use-after-free race in socket driver close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://ampcode.com/threads/T-019cb8b8-9e4c-7316-9566-c7e3f5f2b6db Fix a use-after-free race condition in the generic_unix socket driver's close handler, detected by Valgrind during CI gen_tcp tests. The close handler in socket_consume_mailbox used a two-phase locking pattern: it acquired the glb->listeners lock to NULL-out the socket_data listener pointers, released it, then called sys_unregister_listener (which re-acquires the lock) to remove the listener from the linked list. Between the unlock and re-lock, the event loop thread could also unlink the same listener node via process_listener_handler after the callback returned NULL. The subsequent list_remove in sys_unregister_listener then operated on stale prev/next pointers, corrupting the list or writing to freed memory. The fix makes the pointer detach and list unlink atomic under a single lock hold by introducing sys_unregister_listener_nolock — a variant that assumes the caller already holds the glb->listeners write lock. The close handler now NULLs the pointers, unlinks the listeners, and releases the lock before freeing the memory. This pattern is specific to generic_unix; ESP32 and RP2 use a single global listener for the socket driver subsystem and are not affected. Signed-off-by: Peter M --- .../generic_unix/lib/socket_driver.c | 29 +++++++++---------- src/platforms/generic_unix/lib/sys.c | 6 ++++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7e9439622d..7ff51b31e0 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,8 @@ #include #include +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); + // #define ENABLE_TRACE #include "trace.h" @@ -1194,31 +1196,26 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) TRACE("close\n"); port_send_reply(ctx, pid, ref, OK_ATOM); SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - // Callbacks (active_recv_callback, passive_recv_callback) are called - // while glb->listeners lock is held. They may want to free the - // listener, causing a potential double free here. - // We acquire the lock on listeners here and set the listeners - // to NULL in the socket_data structure to prevent them from freeing - // the listeners. + // Callbacks (active_recv_callback, passive_recv_callback, accept_callback) + // are called while glb->listeners lock is held. They may free the + // listener and set the socket_data pointer to NULL. + // We must atomically detach the pointers AND unlink from the listeners + // list under the same lock hold, to prevent a race where the callback + // also unlinks the same listener node. synclist_wrlock(&glb->listeners); ActiveRecvListener *active_listener = socket_data->active_listener; PassiveRecvListener *passive_listener = socket_data->passive_listener; socket_data->active_listener = NULL; socket_data->passive_listener = NULL; - synclist_unlock(&glb->listeners); if (active_listener) { - // Then we unregister, which also acquires the lock. The callbacks - // may have returned NULL which means the listener would no longer - // be registered, but this will work. - sys_unregister_listener(glb, &active_listener->base); - // After the listener is unregistered, the callbacks can no longer - // be called, so we can eventually free the listener - free(active_listener); + sys_unregister_listener_nolock(glb, &active_listener->base); } if (passive_listener) { - sys_unregister_listener(glb, &passive_listener->base); - free(passive_listener); + sys_unregister_listener_nolock(glb, &passive_listener->base); } + synclist_unlock(&glb->listeners); + free(active_listener); + free(passive_listener); socket_driver_do_close(ctx); // We don't need to remove message. return NativeTerminate; diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index ce6a032fc9..1f78174d6d 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -698,6 +698,12 @@ void sys_unregister_listener(GlobalContext *global, struct EventListener *listen synclist_remove(&global->listeners, &listener->listeners_list_head); } +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) +{ + listener_event_remove_from_polling_set(listener->fd, global); + list_remove(&listener->listeners_list_head); +} + void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) { struct GenericUnixPlatformData *platform = global->platform_data; From 3275e7f102024ba59d8bc98b0662318d0e83343e Mon Sep 17 00:00:00 2001 From: Peter M Date: Wed, 11 Mar 2026 15:45:32 +0100 Subject: [PATCH 2/6] Fix listener publication race in socket driver That commit fixed the close-time double-unlink/use-after-free race in generic_unix listener teardown. This change addresses a separate race in listener registration, where a listener could become visible to the event loop before socket_data published the corresponding pointer. Both fixes are needed; this patch complements the earlier teardown fix rather than replacing it. Fix a race in the generic_unix socket driver where newly created listeners were registered in the global listener list before the corresponding socket_data->{active,passive}_listener pointer was published. If the event loop processed the listener in that window, the callback could consume, free, or replace the listener before the socket driver stored the pointer. The later assignment then left socket_data pointing at stale listener memory, which could surface as random hangs or corruption in gen_tcp tests, including timeouts waiting for the server helper process to start. Publish the listener pointer before calling sys_register_listener in all affected paths: active UDP receive listener setup active TCP receive listener setup passive recv/recvfrom listener setup accept listener setup This complements the earlier close-path fix by removing another generic_unix listener lifecycle race. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7ff51b31e0..ffa0b7eb7d 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -255,8 +255,8 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); socket_data->active_listener = listener; + sys_register_listener(glb, &listener->base); } } return ret; @@ -340,8 +340,8 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); socket_data->active_listener = listener; + sys_register_listener(glb, &listener->base); } } return ret; @@ -1017,8 +1017,8 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); socket_data->passive_listener = listener; + sys_register_listener(glb, &listener->base); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1119,8 +1119,8 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); socket_data->passive_listener = listener; + sys_register_listener(glb, &listener->base); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) From c555274bc1e965a31d31c619bcbab1668de0121c Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 14 Mar 2026 18:34:44 +0100 Subject: [PATCH 3/6] Set accepted sockets nonblocking Configure newly accepted generic_unix TCP sockets as nonblocking before publishing them to the socket driver machinery. If fcntl fails, close the accepted fd and reply with an error so callers never observe a partially initialized connection. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index ffa0b7eb7d..b9daa38229 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -1060,6 +1060,21 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li TRACE("socket_driver|accept_callback: accepted connection. fd: %i\n", fd); term pid = listener->pid; + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) == -1)) { + int err = errno; + close(fd); + BEGIN_WITH_STACK_HEAP(12, heap); + term ref = term_from_ref_ticks(listener->ref_ticks, &heap); + term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); + port_send_message_nolock(glb, pid, reply); + END_WITH_STACK_HEAP(heap, glb); + globalcontext_get_process_unlock(glb, ctx); + if (socket_data->passive_listener) { + socket_data->passive_listener = NULL; + free(listener); + } + return NULL; + } SocketDriverData *new_socket_data = socket_driver_create_data(); new_socket_data->sockfd = fd; new_socket_data->proto = socket_data->proto; From aa6d362961e7de1a1c799a62bbeb4673d07b9765 Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 14 Mar 2026 20:36:00 +0100 Subject: [PATCH 4/6] Fix NULL dereference in accept_callback when process terminates If the owning process terminates between the accept() call and globalcontext_get_process_lock(), ctx will be NULL. The code immediately dereferences ctx->platform_data without checking, causing a segfault. Add a NULL check consistent with other callbacks (e.g. recv_callback), closing the accepted fd if needed and freeing the listener before returning. Signed-off-by: Peter M --- src/platforms/generic_unix/lib/socket_driver.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index b9daa38229..f7f35ff2e3 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -1046,6 +1046,13 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li socklen_t clientlen = sizeof(clientaddr); int fd = accept(listener->base.fd, (struct sockaddr *) &clientaddr, &clientlen); Context *ctx = globalcontext_get_process_lock(glb, listener->process_id); + if (UNLIKELY(ctx == NULL)) { + if (fd != -1) { + close(fd); + } + free(listener); + return NULL; + } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; EventListener *result = NULL; if (fd == -1) { From a476c693d43fbca3bfac615cfca23d3e636d6be0 Mon Sep 17 00:00:00 2001 From: Peter M Date: Sat, 6 Jun 2026 08:13:21 +0200 Subject: [PATCH 5/6] Fix listener lifetime races in socket callbacks Serialize listener pointer publication and global listener registration under the listeners lock, so callbacks cannot observe a partially published listener or erase a replacement listener. Callbacks always free the listener being dispatched, while clearing socket_data only when it still points to that listener. Keep socket_data accesses under the process-table lock and clean up accepted socket contexts if the listening process terminates during callback processing. Signed-off-by: Peter M --- .../generic_unix/lib/socket_driver.c | 60 ++++++++++++------- src/platforms/generic_unix/lib/sys.c | 22 ++++--- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index f7f35ff2e3..dd9228c0d2 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,7 @@ #include #include +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener); void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); // #define ENABLE_TRACE @@ -85,6 +86,22 @@ typedef struct SocketDriverData PassiveRecvListener *passive_listener; } SocketDriverData; +static void register_active_listener(GlobalContext *glb, SocketDriverData *socket_data, ActiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->active_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + +static void register_passive_listener(GlobalContext *glb, SocketDriverData *socket_data, PassiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->passive_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + // clang-format off // TODO define in defaultatoms const char *const send_a = "\x4" "send"; @@ -255,8 +272,7 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - socket_data->active_listener = listener; - sys_register_listener(glb, &listener->base); + register_active_listener(glb, socket_data, listener); } } return ret; @@ -340,8 +356,7 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - socket_data->active_listener = listener; - sys_register_listener(glb, &listener->base); + register_active_listener(glb, socket_data, listener); } } return ret; @@ -733,10 +748,10 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba port_send_message_nolock(glb, pid, msg); mailbox_send(ctx, globalcontext_make_atom(glb, close_internal)); // See socket_consume_mailbox close path below - if (socket_data->active_listener) { + if (socket_data->active_listener == listener) { socket_data->active_listener = NULL; - free(listener); } + free(listener); result = NULL; END_WITH_STACK_HEAP(heap, glb); } else { @@ -837,12 +852,12 @@ static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *b port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list return NULL; @@ -977,12 +992,12 @@ static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListene port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list and clean up return NULL; @@ -1017,8 +1032,7 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - socket_data->passive_listener = listener; - sys_register_listener(glb, &listener->base); + register_passive_listener(glb, socket_data, listener); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1075,11 +1089,11 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); - globalcontext_get_process_unlock(glb, ctx); - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); return NULL; } SocketDriverData *new_socket_data = socket_driver_create_data(); @@ -1094,9 +1108,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li Context *new_ctx = create_accepting_socket(glb, new_socket_data); ctx = globalcontext_get_process_lock(glb, listener->process_id); if (UNLIKELY(ctx == NULL)) { + socket_driver_do_close(new_ctx); + scheduler_terminate(new_ctx); free(listener); return NULL; } + socket_data = (SocketDriverData *) ctx->platform_data; if (new_socket_data->active) { result = &create_accepting_socket_listener(new_ctx, new_socket_data)->base; } @@ -1110,12 +1127,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); // remove the EventListener from the global list and replace it if needed return result; } @@ -1141,8 +1158,7 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - socket_data->passive_listener = listener; - sys_register_listener(glb, &listener->base); + register_passive_listener(glb, socket_data, listener); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index 1f78174d6d..64ecf4f643 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -660,9 +660,8 @@ void event_listener_add_to_polling_set(struct EventListener *listener, GlobalCon #endif } -void sys_register_listener(GlobalContext *global, struct EventListener *listener) +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener) { - struct ListHead *listeners = synclist_wrlock(&global->listeners); event_listener_add_to_polling_set(listener, global); #ifndef AVM_NO_SMP #ifndef HAVE_KQUEUE @@ -670,7 +669,13 @@ void sys_register_listener(GlobalContext *global, struct EventListener *listener sys_signal(global); #endif #endif - list_append(listeners, &listener->listeners_list_head); + list_append(synclist_nolock(&global->listeners), &listener->listeners_list_head); +} + +void sys_register_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_register_listener_nolock(global, listener); synclist_unlock(&global->listeners); } @@ -692,16 +697,17 @@ static void listener_event_remove_from_polling_set(listener_event_t listener_fd, #endif } -void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) { listener_event_remove_from_polling_set(listener->fd, global); - synclist_remove(&global->listeners, &listener->listeners_list_head); + list_remove(&listener->listeners_list_head); } -void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) { - listener_event_remove_from_polling_set(listener->fd, global); - list_remove(&listener->listeners_list_head); + synclist_wrlock(&global->listeners); + sys_unregister_listener_nolock(global, listener); + synclist_unlock(&global->listeners); } void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) From 74872d38d0bb5cdea0f1dafca50b2e90a565953a Mon Sep 17 00:00:00 2001 From: Paul Guyot Date: Mon, 29 Jun 2026 18:46:53 +0200 Subject: [PATCH 6/6] scheduler: avoid redundant scheduler wake on process handoff Significantly improve performance of ping-pong patterns by avoiding cross-thread round trips. Signed-off-by: Paul Guyot --- CHANGELOG.md | 3 + src/libAtomVM/scheduler.c | 134 ++++++++++++------ src/platforms/esp32/components/avm_sys/sys.c | 26 ++-- .../test/main/test_erl_sources/test_file.erl | 19 ++- src/platforms/generic_unix/lib/sys.c | 9 +- src/platforms/rp2/src/lib/rp2_sys.h | 6 +- src/platforms/rp2/src/lib/sys.c | 46 +----- tests/libs/eavmlib/test_file.erl | 39 ++++- 8 files changed, 185 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b81ae4e4..2fac7df2fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of the offending line. Callers whose upstream servers emit unusually large headers must account for this limit +- Improved performance of SMP scheduler. As a result, resources selected with `enif_select` and + stopped with the `ERL_NIF_SELECT_STOP_SCHEDULED` result are now released asynchronously by the + scheduler polling events, staying within the boundaries of the BEAM `enif_select` specification ### Removed - Removed `ahttp_client` support for obsolete line folding (RFC 9112 §5.2); folded header and diff --git a/src/libAtomVM/scheduler.c b/src/libAtomVM/scheduler.c index a856d60012..b51d0040e6 100644 --- a/src/libAtomVM/scheduler.c +++ b/src/libAtomVM/scheduler.c @@ -152,6 +152,51 @@ static void scheduler_process_native_signal_messages(Context *ctx) } } +static Context *scheduler_first_runnable_ready(GlobalContext *global) +{ + Context *result = NULL; + SMP_SPINLOCK_LOCK(&global->processes_spinlock); + // Pick first ready which is not running. + struct ListHead *next_ready = list_first(&global->ready_processes); + while (next_ready != &global->ready_processes) { + result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); + if (!(result->flags & Running)) { + list_remove(next_ready); + context_update_flags(result, ~Ready, Running); + if (result->native_handler) { + // Native handlers are marked as waiting + list_append(&global->waiting_processes, next_ready); + } else { + list_append(&global->running_processes, next_ready); + } + break; + } + next_ready = next_ready->next; + result = NULL; + } + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); + return result; +} + +#ifndef AVM_NO_SMP +// Caller must hold processes_spinlock. Returns true iff ready_processes holds a +// process a scheduler can actually dispatch right now: a Ready entry that is not +// also flagged Running. A Ready|Running entry is a signaled context still +// executing on another scheduler; scheduler_first_runnable_ready skips it, so it +// is not dispatchable backlog. +static bool scheduler_has_runnable_ready(GlobalContext *global) +{ + struct ListHead *item; + LIST_FOR_EACH (item, &global->ready_processes) { + Context *c = GET_LIST_ENTRY(item, Context, processes_list_head); + if (!context_get_flags(c, Running)) { + return true; + } + } + return false; +} +#endif + static Context *scheduler_run0(GlobalContext *global) { // This function should return a new process to run. @@ -209,6 +254,13 @@ static Context *scheduler_run0(GlobalContext *global) return NULL; } if (!is_waiting) { + // If a process is ready, process it instead of waking up + // the poller scheduler + result = scheduler_first_runnable_ready(global); + if (result != NULL) { + break; + } + // Before entering the condition variable, signal the poll events // so the thread polling on events can check the ready queue. sys_signal(global); @@ -232,32 +284,23 @@ static Context *scheduler_run0(GlobalContext *global) int32_t wait_timeout = update_timer_list(global); SMP_SPINLOCK_UNLOCK(&global->timer_spinlock); - SMP_SPINLOCK_LOCK(&global->processes_spinlock); - // Pick first ready which is not running. - struct ListHead *next_ready = list_first(&global->ready_processes); - while (next_ready != &global->ready_processes) { - result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); - if (!(result->flags & Running)) { - list_remove(next_ready); - context_update_flags(result, ~Ready, Running); - if (result->native_handler) { - // Native handlers are marked as waiting - list_append(&global->waiting_processes, next_ready); - } else { - list_append(&global->running_processes, next_ready); - } - break; - } - next_ready = next_ready->next; - result = NULL; + if (result == NULL) { + result = scheduler_first_runnable_ready(global); } - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); - if (result == NULL && !global->scheduler_stop_all) { - sys_poll_events(global, wait_timeout); - } else { - sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + // Only the poller scheduler drives the event loop. +#ifndef AVM_NO_SMP + if (is_waiting) { +#endif + if (result == NULL && !global->scheduler_stop_all) { + // The poller may block waiting for events. + sys_poll_events(global, wait_timeout); + } else { + sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + } +#ifndef AVM_NO_SMP } +#endif #ifdef AVM_TASK_DRIVER_ENABLED globalcontext_process_task_driver_queues(global); #endif @@ -265,8 +308,11 @@ static Context *scheduler_run0(GlobalContext *global) } while (result == NULL); #ifndef AVM_NO_SMP - global->waiting_scheduler = false; - smp_condvar_signal(global->schedulers_cv); + // Only the polling scheduler relinquishes the poller role. + if (is_waiting) { + global->waiting_scheduler = false; + smp_condvar_signal(global->schedulers_cv); + } SMP_MUTEX_UNLOCK(global->schedulers_mutex); #endif @@ -357,17 +403,13 @@ static void scheduler_make_ready(Context *ctx) } list_remove(&ctx->processes_list_head); #ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { - // Start a new scheduler if none are going to take this process. - if (!global->waiting_scheduler - && global->running_schedulers > 0 - && global->running_schedulers < global->online_schedulers - && !context_get_flags(ctx, Running)) { - global->running_schedulers++; - smp_scheduler_start(global); - } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } + // The readying scheduler will pick up this process itself (directly before + // parking, or on its next reschedule), so only wake or start another + // scheduler when there is already a *runnable* backlog to parallelize. + // ctx is not in ready_processes yet (it was just removed), and entries + // flagged Running are signaled contexts no scheduler can dispatch, so + // neither inflates the count. + bool ready_backlog = scheduler_has_runnable_ready(global); #endif // Move to ready queue (from waiting or running) // The process may be running (it would be signaled), so mark it @@ -376,13 +418,23 @@ static void scheduler_make_ready(Context *ctx) list_append(&global->ready_processes, &ctx->processes_list_head); SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); #ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { - if (global->waiting_scheduler) { + if (ready_backlog) { + if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { + // Start a new scheduler if none are going to take this process. + if (!global->waiting_scheduler + && global->running_schedulers > 0 + && global->running_schedulers < global->online_schedulers + && !context_get_flags(ctx, Running)) { + global->running_schedulers++; + smp_scheduler_start(global); + } + if (global->waiting_scheduler) { + sys_signal(global); + } + SMP_MUTEX_UNLOCK(global->schedulers_mutex); + } else { sys_signal(global); } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } else { - sys_signal(global); } #elif defined(AVM_TASK_DRIVER_ENABLED) sys_signal(global); diff --git a/src/platforms/esp32/components/avm_sys/sys.c b/src/platforms/esp32/components/avm_sys/sys.c index 42786b6c8e..606b26b8fb 100644 --- a/src/platforms/esp32/components/avm_sys/sys.c +++ b/src/platforms/esp32/components/avm_sys/sys.c @@ -39,6 +39,7 @@ #include "esp_system.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" +#include "freertos/semphr.h" #include "freertos/task.h" #include #include @@ -112,12 +113,16 @@ static const char *const revision_atom = "\x8" "revision"; QueueHandle_t event_queue = NULL; QueueSetHandle_t event_set = NULL; +static SemaphoreHandle_t signal_semaphore = NULL; void esp32_sys_queue_init() { - event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4); + // + 1 accounts for the signal binary semaphore + event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4 + 1); event_queue = xQueueCreate(EVENT_QUEUE_LEN, sizeof(void *)); xQueueAddToSet(event_queue, event_set); + signal_semaphore = xSemaphoreCreateBinary(); + xQueueAddToSet(signal_semaphore, event_set); } static inline void sys_clock_gettime(struct timespec *t) @@ -132,6 +137,14 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) void *sender = NULL; QueueSetMemberHandle_t event_source; while ((event_source = xQueueSelectFromSet(event_set, wait_ticks))) { +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) + if (event_source == signal_semaphore) { + // We've been signaled + xSemaphoreTake(signal_semaphore, 0); + return; + } +#endif + // Listener used shared event_queue. if (event_source == event_queue) { if (UNLIKELY(xQueueReceive(event_queue, &sender, 0) == pdFALSE)) { @@ -141,13 +154,6 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) sender = event_source; } -#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) - if (sender == CAST_FUNC_TO_VOID_PTR(sys_signal)) { - // We've been signaled - return; - } -#endif - struct ListHead *listeners = synclist_wrlock(&glb->listeners); if (!process_listener_handler(glb, sender, listeners, NULL, NULL)) { TRACE("sys: handler not found for: %p\n", (void *) sender); @@ -170,8 +176,8 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { - void *queue_item = CAST_FUNC_TO_VOID_PTR(sys_signal); - xQueueSendToBack(event_queue, &queue_item, 0); + UNUSED(glb); + xSemaphoreGive(signal_semaphore); } void sys_time(struct timespec *t) diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_file.erl b/src/platforms/esp32/test/main/test_erl_sources/test_file.erl index 3acb8c0647..fdacbc0a4e 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/test_file.erl +++ b/src/platforms/esp32/test/main/test_erl_sources/test_file.erl @@ -50,7 +50,12 @@ test_basic_file() -> test_gc() -> Path = "/sdcard/atomvm-2.txt", GCSubPid = spawn(fun() -> gc_loop(Path, undefined) end), - MemorySize0 = erlang:memory(binary), + % Dead resource terms from earlier tests still hold their refc binaries + % until this process collects them, and resources are released + % asynchronously by the scheduler polling events: collect, then wait for + % the global binary count to settle before taking the baseline. + erlang:garbage_collect(), + MemorySize0 = wait_memory_stable(erlang:memory(binary), 100), call_gc_loop(GCSubPid, open), MemorySize1 = erlang:memory(binary), true = MemorySize1 > MemorySize0, @@ -75,6 +80,18 @@ test_gc() -> ok = atomvm:posix_unlink(Path), ok. +% Wait until two samples 20ms apart are equal. +wait_memory_stable(Last, 0) -> + Last; +wait_memory_stable(Last, Retries) -> + receive + after 20 -> ok + end, + case erlang:memory(binary) of + Last -> Last; + Other -> wait_memory_stable(Other, Retries - 1) + end. + call_gc_loop(Pid, Message) -> Pid ! {self(), Message}, receive diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index 64ecf4f643..5a35873ddf 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -535,7 +535,11 @@ void sys_init_platform(GlobalContext *global) platform->kqueue_fd = kqueue(); platform->listeners_poll_count = 0; platform->select_events_poll_count = 0; -#ifndef AVM_NO_SMP + // Register the user event that sys_signal triggers. sys_signal is compiled + // whenever SMP or the task driver is enabled, so the registration must + // match that condition -- otherwise a non-SMP task-driver build triggers an + // unregistered event and kevent fails. +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) struct timespec ts = { 0, 0 }; struct kevent kev; EV_SET(&kev, SIGNAL_IDENTIFIER, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); @@ -741,6 +745,9 @@ void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool EV_SET(&kev, event, is_write ? EVFILT_WRITE : EVFILT_READ, EV_DELETE, 0, 0, NULL); (void) kevent(platform->kqueue_fd, &kev, 1, NULL, 0, &ts); platform->select_events_poll_count = -1; +#ifndef AVM_NO_SMP + sys_signal(global); +#endif #else UNUSED(event); UNUSED(is_write); diff --git a/src/platforms/rp2/src/lib/rp2_sys.h b/src/platforms/rp2/src/lib/rp2_sys.h index 77b2918c88..3d1393d066 100644 --- a/src/platforms/rp2/src/lib/rp2_sys.h +++ b/src/platforms/rp2/src/lib/rp2_sys.h @@ -27,7 +27,8 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" -#include +#include +#include #include #include @@ -77,8 +78,7 @@ void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t struct RP2PlatformData { #ifndef AVM_NO_SMP - mutex_t event_poll_mutex; - cond_t event_poll_cond; + semaphore_t event_poll_sem; #endif queue_t event_queue; diff --git a/src/platforms/rp2/src/lib/sys.c b/src/platforms/rp2/src/lib/sys.c index 0894d0bf1d..97311832a8 100644 --- a/src/platforms/rp2/src/lib/sys.c +++ b/src/platforms/rp2/src/lib/sys.c @@ -82,8 +82,7 @@ void sys_init_platform(GlobalContext *glb) struct RP2PlatformData *platform = malloc(sizeof(struct RP2PlatformData)); glb->platform_data = platform; #ifndef AVM_NO_SMP - mutex_init(&platform->event_poll_mutex); - cond_init(&platform->event_poll_cond); + sem_init(&platform->event_poll_sem, 0, 1); #endif queue_init(&platform->event_queue, sizeof(queue_t *), EVENT_QUEUE_LEN); @@ -152,37 +151,10 @@ bool sys_try_post_listener_event_from_isr(GlobalContext *glb, listener_event_t l return false; } -#ifndef AVM_NO_SMP - uint32_t owner; - bool acquired_mutex = mutex_try_enter(&platform->event_poll_mutex, &owner); - // We're from an ISR, so we cannot wait for the interrupted code (running - // on the same core as we do) to release the mutex. - if (!acquired_mutex) { - // If this core is not the owner, wait for the other core to release - // the mutex. - // TODO: implement queue_try_remove_wait_timeout_ms in Pico SDK to - // simplify this logic - uint32_t caller = (uint32_t) lock_get_caller_owner_id(); // same cast exists in mutex_try_enter - if (caller != owner) { - mutex_enter_blocking(&platform->event_poll_mutex); - acquired_mutex = true; - } - } -#endif if (UNLIKELY(!queue_try_add(&platform->event_queue, &listener_queue))) { -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif fprintf(stderr, "Lost event from ISR as global event queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); return false; } -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif #ifndef AVM_NO_SMP sys_signal(glb); @@ -202,16 +174,12 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) sys_tinyusb_unlock(glb); #endif #ifndef AVM_NO_SMP - if (timeout_ms != 0) { - mutex_enter_blocking(&platform->event_poll_mutex); - if (queue_is_empty(&platform->event_queue)) { - if (timeout_ms > 0) { - cond_wait_timeout_ms(&platform->event_poll_cond, &platform->event_poll_mutex, timeout_ms); - } else { - cond_wait(&platform->event_poll_cond, &platform->event_poll_mutex); - } + if (timeout_ms != 0 && queue_is_empty(&platform->event_queue)) { + if (timeout_ms > 0) { + sem_acquire_timeout_ms(&platform->event_poll_sem, timeout_ms); + } else { + sem_acquire_blocking(&platform->event_poll_sem); } - mutex_exit(&platform->event_poll_mutex); } #else UNUSED(timeout_ms); @@ -230,7 +198,7 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { struct RP2PlatformData *platform = glb->platform_data; - cond_signal(&platform->event_poll_cond); + sem_release(&platform->event_poll_sem); } #endif diff --git a/tests/libs/eavmlib/test_file.erl b/tests/libs/eavmlib/test_file.erl index c69a3a9f24..458c15d8a4 100644 --- a/tests/libs/eavmlib/test_file.erl +++ b/tests/libs/eavmlib/test_file.erl @@ -130,7 +130,13 @@ test_fifo_select(_HasSelect) -> test_gc(HasSelect) -> Path = "/tmp/atomvm.tmp." ++ integer_to_list(erlang:system_time(millisecond)), GCSubPid = spawn(fun() -> gc_loop(Path, undefined) end), - MemorySize0 = erlang:memory(binary), + % Dead resource terms from earlier tests still hold their refc binaries + % until this process collects them, and resources stopped with + % ERL_NIF_SELECT_STOP_SCHEDULED are released asynchronously by the + % scheduler polling events: collect, then wait for the global binary + % count to settle before taking the baseline. + erlang:garbage_collect(), + MemorySize0 = wait_memory_stable(erlang:memory(binary), 100), call_gc_loop(GCSubPid, open), MemorySize1 = erlang:memory(binary), ?ASSERT_TRUE(MemorySize1 > MemorySize0), @@ -166,7 +172,11 @@ test_gc(HasSelect) -> ?ASSERT_EQUALS(MemorySize8, MemorySize1), call_gc_loop(GCSubPid, close), call_gc_loop(GCSubPid, gc), - MemorySize9 = erlang:memory(binary), + % If select_stop raced the ready_output notification, the stop + % was scheduled (ERL_NIF_SELECT_STOP_SCHEDULED) and the resource + % is only released once the scheduler polling events retires the + % closed select event, so wait for memory to converge. + MemorySize9 = wait_memory_binary(MemorySize0, 100), ?ASSERT_EQUALS(MemorySize9, MemorySize0); true -> ok @@ -182,6 +192,31 @@ call_gc_loop(Pid, Message) -> {Pid, Message} -> ok end. +% Wait until two samples 20ms apart are equal. +wait_memory_stable(Last, 0) -> + Last; +wait_memory_stable(Last, Retries) -> + receive + after 20 -> ok + end, + case erlang:memory(binary) of + Last -> Last; + Other -> wait_memory_stable(Other, Retries - 1) + end. + +wait_memory_binary(Expected, Retries) -> + case erlang:memory(binary) of + Expected -> + Expected; + Other when Retries =:= 0 -> + Other; + _ -> + receive + after 10 -> ok + end, + wait_memory_binary(Expected, Retries - 1) + end. + gc_loop(Path, File) -> receive {select, _Resource, undefined, _Direction} ->