diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b81ae4e4..2fac7df2fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of the offending line. Callers whose upstream servers emit unusually large headers must account for this limit +- Improved performance of SMP scheduler. As a result, resources selected with `enif_select` and + stopped with the `ERL_NIF_SELECT_STOP_SCHEDULED` result are now released asynchronously by the + scheduler polling events, staying within the boundaries of the BEAM `enif_select` specification ### Removed - Removed `ahttp_client` support for obsolete line folding (RFC 9112 ยง5.2); folded header and diff --git a/src/libAtomVM/scheduler.c b/src/libAtomVM/scheduler.c index a856d60012..b51d0040e6 100644 --- a/src/libAtomVM/scheduler.c +++ b/src/libAtomVM/scheduler.c @@ -152,6 +152,51 @@ static void scheduler_process_native_signal_messages(Context *ctx) } } +static Context *scheduler_first_runnable_ready(GlobalContext *global) +{ + Context *result = NULL; + SMP_SPINLOCK_LOCK(&global->processes_spinlock); + // Pick first ready which is not running. + struct ListHead *next_ready = list_first(&global->ready_processes); + while (next_ready != &global->ready_processes) { + result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); + if (!(result->flags & Running)) { + list_remove(next_ready); + context_update_flags(result, ~Ready, Running); + if (result->native_handler) { + // Native handlers are marked as waiting + list_append(&global->waiting_processes, next_ready); + } else { + list_append(&global->running_processes, next_ready); + } + break; + } + next_ready = next_ready->next; + result = NULL; + } + SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); + return result; +} + +#ifndef AVM_NO_SMP +// Caller must hold processes_spinlock. Returns true iff ready_processes holds a +// process a scheduler can actually dispatch right now: a Ready entry that is not +// also flagged Running. A Ready|Running entry is a signaled context still +// executing on another scheduler; scheduler_first_runnable_ready skips it, so it +// is not dispatchable backlog. +static bool scheduler_has_runnable_ready(GlobalContext *global) +{ + struct ListHead *item; + LIST_FOR_EACH (item, &global->ready_processes) { + Context *c = GET_LIST_ENTRY(item, Context, processes_list_head); + if (!context_get_flags(c, Running)) { + return true; + } + } + return false; +} +#endif + static Context *scheduler_run0(GlobalContext *global) { // This function should return a new process to run. @@ -209,6 +254,13 @@ static Context *scheduler_run0(GlobalContext *global) return NULL; } if (!is_waiting) { + // If a process is ready, process it instead of waking up + // the poller scheduler + result = scheduler_first_runnable_ready(global); + if (result != NULL) { + break; + } + // Before entering the condition variable, signal the poll events // so the thread polling on events can check the ready queue. sys_signal(global); @@ -232,32 +284,23 @@ static Context *scheduler_run0(GlobalContext *global) int32_t wait_timeout = update_timer_list(global); SMP_SPINLOCK_UNLOCK(&global->timer_spinlock); - SMP_SPINLOCK_LOCK(&global->processes_spinlock); - // Pick first ready which is not running. - struct ListHead *next_ready = list_first(&global->ready_processes); - while (next_ready != &global->ready_processes) { - result = GET_LIST_ENTRY(next_ready, Context, processes_list_head); - if (!(result->flags & Running)) { - list_remove(next_ready); - context_update_flags(result, ~Ready, Running); - if (result->native_handler) { - // Native handlers are marked as waiting - list_append(&global->waiting_processes, next_ready); - } else { - list_append(&global->running_processes, next_ready); - } - break; - } - next_ready = next_ready->next; - result = NULL; + if (result == NULL) { + result = scheduler_first_runnable_ready(global); } - SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); - if (result == NULL && !global->scheduler_stop_all) { - sys_poll_events(global, wait_timeout); - } else { - sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + // Only the poller scheduler drives the event loop. +#ifndef AVM_NO_SMP + if (is_waiting) { +#endif + if (result == NULL && !global->scheduler_stop_all) { + // The poller may block waiting for events. + sys_poll_events(global, wait_timeout); + } else { + sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT); + } +#ifndef AVM_NO_SMP } +#endif #ifdef AVM_TASK_DRIVER_ENABLED globalcontext_process_task_driver_queues(global); #endif @@ -265,8 +308,11 @@ static Context *scheduler_run0(GlobalContext *global) } while (result == NULL); #ifndef AVM_NO_SMP - global->waiting_scheduler = false; - smp_condvar_signal(global->schedulers_cv); + // Only the polling scheduler relinquishes the poller role. + if (is_waiting) { + global->waiting_scheduler = false; + smp_condvar_signal(global->schedulers_cv); + } SMP_MUTEX_UNLOCK(global->schedulers_mutex); #endif @@ -357,17 +403,13 @@ static void scheduler_make_ready(Context *ctx) } list_remove(&ctx->processes_list_head); #ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { - // Start a new scheduler if none are going to take this process. - if (!global->waiting_scheduler - && global->running_schedulers > 0 - && global->running_schedulers < global->online_schedulers - && !context_get_flags(ctx, Running)) { - global->running_schedulers++; - smp_scheduler_start(global); - } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } + // The readying scheduler will pick up this process itself (directly before + // parking, or on its next reschedule), so only wake or start another + // scheduler when there is already a *runnable* backlog to parallelize. + // ctx is not in ready_processes yet (it was just removed), and entries + // flagged Running are signaled contexts no scheduler can dispatch, so + // neither inflates the count. + bool ready_backlog = scheduler_has_runnable_ready(global); #endif // Move to ready queue (from waiting or running) // The process may be running (it would be signaled), so mark it @@ -376,13 +418,23 @@ static void scheduler_make_ready(Context *ctx) list_append(&global->ready_processes, &ctx->processes_list_head); SMP_SPINLOCK_UNLOCK(&global->processes_spinlock); #ifndef AVM_NO_SMP - if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { - if (global->waiting_scheduler) { + if (ready_backlog) { + if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) { + // Start a new scheduler if none are going to take this process. + if (!global->waiting_scheduler + && global->running_schedulers > 0 + && global->running_schedulers < global->online_schedulers + && !context_get_flags(ctx, Running)) { + global->running_schedulers++; + smp_scheduler_start(global); + } + if (global->waiting_scheduler) { + sys_signal(global); + } + SMP_MUTEX_UNLOCK(global->schedulers_mutex); + } else { sys_signal(global); } - SMP_MUTEX_UNLOCK(global->schedulers_mutex); - } else { - sys_signal(global); } #elif defined(AVM_TASK_DRIVER_ENABLED) sys_signal(global); diff --git a/src/platforms/esp32/components/avm_sys/sys.c b/src/platforms/esp32/components/avm_sys/sys.c index 42786b6c8e..606b26b8fb 100644 --- a/src/platforms/esp32/components/avm_sys/sys.c +++ b/src/platforms/esp32/components/avm_sys/sys.c @@ -39,6 +39,7 @@ #include "esp_system.h" #include "esp_timer.h" #include "freertos/FreeRTOS.h" +#include "freertos/semphr.h" #include "freertos/task.h" #include #include @@ -112,12 +113,16 @@ static const char *const revision_atom = "\x8" "revision"; QueueHandle_t event_queue = NULL; QueueSetHandle_t event_set = NULL; +static SemaphoreHandle_t signal_semaphore = NULL; void esp32_sys_queue_init() { - event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4); + // + 1 accounts for the signal binary semaphore + event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4 + 1); event_queue = xQueueCreate(EVENT_QUEUE_LEN, sizeof(void *)); xQueueAddToSet(event_queue, event_set); + signal_semaphore = xSemaphoreCreateBinary(); + xQueueAddToSet(signal_semaphore, event_set); } static inline void sys_clock_gettime(struct timespec *t) @@ -132,6 +137,14 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) void *sender = NULL; QueueSetMemberHandle_t event_source; while ((event_source = xQueueSelectFromSet(event_set, wait_ticks))) { +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) + if (event_source == signal_semaphore) { + // We've been signaled + xSemaphoreTake(signal_semaphore, 0); + return; + } +#endif + // Listener used shared event_queue. if (event_source == event_queue) { if (UNLIKELY(xQueueReceive(event_queue, &sender, 0) == pdFALSE)) { @@ -141,13 +154,6 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks) sender = event_source; } -#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) - if (sender == CAST_FUNC_TO_VOID_PTR(sys_signal)) { - // We've been signaled - return; - } -#endif - struct ListHead *listeners = synclist_wrlock(&glb->listeners); if (!process_listener_handler(glb, sender, listeners, NULL, NULL)) { TRACE("sys: handler not found for: %p\n", (void *) sender); @@ -170,8 +176,8 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { - void *queue_item = CAST_FUNC_TO_VOID_PTR(sys_signal); - xQueueSendToBack(event_queue, &queue_item, 0); + UNUSED(glb); + xSemaphoreGive(signal_semaphore); } void sys_time(struct timespec *t) diff --git a/src/platforms/esp32/test/main/test_erl_sources/test_file.erl b/src/platforms/esp32/test/main/test_erl_sources/test_file.erl index 3acb8c0647..fdacbc0a4e 100644 --- a/src/platforms/esp32/test/main/test_erl_sources/test_file.erl +++ b/src/platforms/esp32/test/main/test_erl_sources/test_file.erl @@ -50,7 +50,12 @@ test_basic_file() -> test_gc() -> Path = "/sdcard/atomvm-2.txt", GCSubPid = spawn(fun() -> gc_loop(Path, undefined) end), - MemorySize0 = erlang:memory(binary), + % Dead resource terms from earlier tests still hold their refc binaries + % until this process collects them, and resources are released + % asynchronously by the scheduler polling events: collect, then wait for + % the global binary count to settle before taking the baseline. + erlang:garbage_collect(), + MemorySize0 = wait_memory_stable(erlang:memory(binary), 100), call_gc_loop(GCSubPid, open), MemorySize1 = erlang:memory(binary), true = MemorySize1 > MemorySize0, @@ -75,6 +80,18 @@ test_gc() -> ok = atomvm:posix_unlink(Path), ok. +% Wait until two samples 20ms apart are equal. +wait_memory_stable(Last, 0) -> + Last; +wait_memory_stable(Last, Retries) -> + receive + after 20 -> ok + end, + case erlang:memory(binary) of + Last -> Last; + Other -> wait_memory_stable(Other, Retries - 1) + end. + call_gc_loop(Pid, Message) -> Pid ! {self(), Message}, receive diff --git a/src/platforms/generic_unix/lib/socket_driver.c b/src/platforms/generic_unix/lib/socket_driver.c index 7e9439622d..dd9228c0d2 100644 --- a/src/platforms/generic_unix/lib/socket_driver.c +++ b/src/platforms/generic_unix/lib/socket_driver.c @@ -47,6 +47,9 @@ #include #include +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener); +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener); + // #define ENABLE_TRACE #include "trace.h" @@ -83,6 +86,22 @@ typedef struct SocketDriverData PassiveRecvListener *passive_listener; } SocketDriverData; +static void register_active_listener(GlobalContext *glb, SocketDriverData *socket_data, ActiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->active_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + +static void register_passive_listener(GlobalContext *glb, SocketDriverData *socket_data, PassiveRecvListener *listener) +{ + synclist_wrlock(&glb->listeners); + socket_data->passive_listener = listener; + sys_register_listener_nolock(glb, &listener->base); + synclist_unlock(&glb->listeners); +} + // clang-format off // TODO define in defaultatoms const char *const send_a = "\x4" "send"; @@ -253,8 +272,7 @@ static term init_udp_socket(Context *ctx, SocketDriverData *socket_data, term pa listener->base.handler = active_recvfrom_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + register_active_listener(glb, socket_data, listener); } } return ret; @@ -338,8 +356,7 @@ static term init_client_tcp_socket(Context *ctx, SocketDriverData *socket_data, listener->base.handler = active_recv_callback; listener->buf_size = socket_data->buffer; listener->process_id = ctx->process_id; - sys_register_listener(glb, &listener->base); - socket_data->active_listener = listener; + register_active_listener(glb, socket_data, listener); } } return ret; @@ -731,10 +748,10 @@ static EventListener *active_recv_callback(GlobalContext *glb, EventListener *ba port_send_message_nolock(glb, pid, msg); mailbox_send(ctx, globalcontext_make_atom(glb, close_internal)); // See socket_consume_mailbox close path below - if (socket_data->active_listener) { + if (socket_data->active_listener == listener) { socket_data->active_listener = NULL; - free(listener); } + free(listener); result = NULL; END_WITH_STACK_HEAP(heap, glb); } else { @@ -835,12 +852,12 @@ static EventListener *passive_recv_callback(GlobalContext *glb, EventListener *b port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list return NULL; @@ -975,12 +992,12 @@ static EventListener *passive_recvfrom_callback(GlobalContext *glb, EventListene port_send_message_nolock(glb, pid, reply); memory_destroy_heap(&heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); free(buf); // remove the EventListener from the global list and clean up return NULL; @@ -1015,8 +1032,7 @@ static void do_recv(Context *ctx, term pid, term ref, term length, term timeout, listener->length = term_to_int(length); listener->buffer = socket_data->buffer; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); - socket_data->passive_listener = listener; + register_passive_listener(glb, socket_data, listener); } void socket_driver_do_recvfrom(Context *ctx, term pid, term ref, term length, term timeout) @@ -1044,6 +1060,13 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li socklen_t clientlen = sizeof(clientaddr); int fd = accept(listener->base.fd, (struct sockaddr *) &clientaddr, &clientlen); Context *ctx = globalcontext_get_process_lock(glb, listener->process_id); + if (UNLIKELY(ctx == NULL)) { + if (fd != -1) { + close(fd); + } + free(listener); + return NULL; + } SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; EventListener *result = NULL; if (fd == -1) { @@ -1058,6 +1081,21 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li TRACE("socket_driver|accept_callback: accepted connection. fd: %i\n", fd); term pid = listener->pid; + if (UNLIKELY(fcntl(fd, F_SETFL, O_NONBLOCK) == -1)) { + int err = errno; + close(fd); + BEGIN_WITH_STACK_HEAP(12, heap); + term ref = term_from_ref_ticks(listener->ref_ticks, &heap); + term reply = port_heap_create_reply(&heap, ref, port_heap_create_sys_error_tuple(&heap, FCNTL_ATOM, err)); + port_send_message_nolock(glb, pid, reply); + END_WITH_STACK_HEAP(heap, glb); + if (socket_data->passive_listener == listener) { + socket_data->passive_listener = NULL; + } + globalcontext_get_process_unlock(glb, ctx); + free(listener); + return NULL; + } SocketDriverData *new_socket_data = socket_driver_create_data(); new_socket_data->sockfd = fd; new_socket_data->proto = socket_data->proto; @@ -1070,9 +1108,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li Context *new_ctx = create_accepting_socket(glb, new_socket_data); ctx = globalcontext_get_process_lock(glb, listener->process_id); if (UNLIKELY(ctx == NULL)) { + socket_driver_do_close(new_ctx); + scheduler_terminate(new_ctx); free(listener); return NULL; } + socket_data = (SocketDriverData *) ctx->platform_data; if (new_socket_data->active) { result = &create_accepting_socket_listener(new_ctx, new_socket_data)->base; } @@ -1086,12 +1127,12 @@ static EventListener *accept_callback(GlobalContext *glb, EventListener *base_li port_send_message_nolock(glb, pid, reply); END_WITH_STACK_HEAP(heap, glb); } - globalcontext_get_process_unlock(glb, ctx); // See socket_consume_mailbox close path below - if (socket_data->passive_listener) { + if (socket_data->passive_listener == listener) { socket_data->passive_listener = NULL; - free(listener); } + globalcontext_get_process_unlock(glb, ctx); + free(listener); // remove the EventListener from the global list and replace it if needed return result; } @@ -1117,8 +1158,7 @@ void socket_driver_do_accept(Context *ctx, term pid, term ref, term timeout) listener->length = 0; listener->buffer = 0; listener->ref_ticks = term_to_ref_ticks(ref); - sys_register_listener(glb, &listener->base); - socket_data->passive_listener = listener; + register_passive_listener(glb, socket_data, listener); } static NativeHandlerResult socket_consume_mailbox(Context *ctx) @@ -1194,31 +1234,26 @@ static NativeHandlerResult socket_consume_mailbox(Context *ctx) TRACE("close\n"); port_send_reply(ctx, pid, ref, OK_ATOM); SocketDriverData *socket_data = (SocketDriverData *) ctx->platform_data; - // Callbacks (active_recv_callback, passive_recv_callback) are called - // while glb->listeners lock is held. They may want to free the - // listener, causing a potential double free here. - // We acquire the lock on listeners here and set the listeners - // to NULL in the socket_data structure to prevent them from freeing - // the listeners. + // Callbacks (active_recv_callback, passive_recv_callback, accept_callback) + // are called while glb->listeners lock is held. They may free the + // listener and set the socket_data pointer to NULL. + // We must atomically detach the pointers AND unlink from the listeners + // list under the same lock hold, to prevent a race where the callback + // also unlinks the same listener node. synclist_wrlock(&glb->listeners); ActiveRecvListener *active_listener = socket_data->active_listener; PassiveRecvListener *passive_listener = socket_data->passive_listener; socket_data->active_listener = NULL; socket_data->passive_listener = NULL; - synclist_unlock(&glb->listeners); if (active_listener) { - // Then we unregister, which also acquires the lock. The callbacks - // may have returned NULL which means the listener would no longer - // be registered, but this will work. - sys_unregister_listener(glb, &active_listener->base); - // After the listener is unregistered, the callbacks can no longer - // be called, so we can eventually free the listener - free(active_listener); + sys_unregister_listener_nolock(glb, &active_listener->base); } if (passive_listener) { - sys_unregister_listener(glb, &passive_listener->base); - free(passive_listener); + sys_unregister_listener_nolock(glb, &passive_listener->base); } + synclist_unlock(&glb->listeners); + free(active_listener); + free(passive_listener); socket_driver_do_close(ctx); // We don't need to remove message. return NativeTerminate; diff --git a/src/platforms/generic_unix/lib/sys.c b/src/platforms/generic_unix/lib/sys.c index ce6a032fc9..5a35873ddf 100644 --- a/src/platforms/generic_unix/lib/sys.c +++ b/src/platforms/generic_unix/lib/sys.c @@ -535,7 +535,11 @@ void sys_init_platform(GlobalContext *global) platform->kqueue_fd = kqueue(); platform->listeners_poll_count = 0; platform->select_events_poll_count = 0; -#ifndef AVM_NO_SMP + // Register the user event that sys_signal triggers. sys_signal is compiled + // whenever SMP or the task driver is enabled, so the registration must + // match that condition -- otherwise a non-SMP task-driver build triggers an + // unregistered event and kevent fails. +#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED) struct timespec ts = { 0, 0 }; struct kevent kev; EV_SET(&kev, SIGNAL_IDENTIFIER, EVFILT_USER, EV_ADD | EV_CLEAR, 0, 0, NULL); @@ -660,9 +664,8 @@ void event_listener_add_to_polling_set(struct EventListener *listener, GlobalCon #endif } -void sys_register_listener(GlobalContext *global, struct EventListener *listener) +void sys_register_listener_nolock(GlobalContext *global, struct EventListener *listener) { - struct ListHead *listeners = synclist_wrlock(&global->listeners); event_listener_add_to_polling_set(listener, global); #ifndef AVM_NO_SMP #ifndef HAVE_KQUEUE @@ -670,7 +673,13 @@ void sys_register_listener(GlobalContext *global, struct EventListener *listener sys_signal(global); #endif #endif - list_append(listeners, &listener->listeners_list_head); + list_append(synclist_nolock(&global->listeners), &listener->listeners_list_head); +} + +void sys_register_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_register_listener_nolock(global, listener); synclist_unlock(&global->listeners); } @@ -692,10 +701,17 @@ static void listener_event_remove_from_polling_set(listener_event_t listener_fd, #endif } -void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +void sys_unregister_listener_nolock(GlobalContext *global, struct EventListener *listener) { listener_event_remove_from_polling_set(listener->fd, global); - synclist_remove(&global->listeners, &listener->listeners_list_head); + list_remove(&listener->listeners_list_head); +} + +void sys_unregister_listener(GlobalContext *global, struct EventListener *listener) +{ + synclist_wrlock(&global->listeners); + sys_unregister_listener_nolock(global, listener); + synclist_unlock(&global->listeners); } void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write) @@ -729,6 +745,9 @@ void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool EV_SET(&kev, event, is_write ? EVFILT_WRITE : EVFILT_READ, EV_DELETE, 0, 0, NULL); (void) kevent(platform->kqueue_fd, &kev, 1, NULL, 0, &ts); platform->select_events_poll_count = -1; +#ifndef AVM_NO_SMP + sys_signal(global); +#endif #else UNUSED(event); UNUSED(is_write); diff --git a/src/platforms/rp2/src/lib/rp2_sys.h b/src/platforms/rp2/src/lib/rp2_sys.h index 77b2918c88..3d1393d066 100644 --- a/src/platforms/rp2/src/lib/rp2_sys.h +++ b/src/platforms/rp2/src/lib/rp2_sys.h @@ -27,7 +27,8 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpedantic" -#include +#include +#include #include #include @@ -77,8 +78,7 @@ void sys_unregister_listener_from_event(GlobalContext *global, listener_event_t struct RP2PlatformData { #ifndef AVM_NO_SMP - mutex_t event_poll_mutex; - cond_t event_poll_cond; + semaphore_t event_poll_sem; #endif queue_t event_queue; diff --git a/src/platforms/rp2/src/lib/sys.c b/src/platforms/rp2/src/lib/sys.c index 0894d0bf1d..97311832a8 100644 --- a/src/platforms/rp2/src/lib/sys.c +++ b/src/platforms/rp2/src/lib/sys.c @@ -82,8 +82,7 @@ void sys_init_platform(GlobalContext *glb) struct RP2PlatformData *platform = malloc(sizeof(struct RP2PlatformData)); glb->platform_data = platform; #ifndef AVM_NO_SMP - mutex_init(&platform->event_poll_mutex); - cond_init(&platform->event_poll_cond); + sem_init(&platform->event_poll_sem, 0, 1); #endif queue_init(&platform->event_queue, sizeof(queue_t *), EVENT_QUEUE_LEN); @@ -152,37 +151,10 @@ bool sys_try_post_listener_event_from_isr(GlobalContext *glb, listener_event_t l return false; } -#ifndef AVM_NO_SMP - uint32_t owner; - bool acquired_mutex = mutex_try_enter(&platform->event_poll_mutex, &owner); - // We're from an ISR, so we cannot wait for the interrupted code (running - // on the same core as we do) to release the mutex. - if (!acquired_mutex) { - // If this core is not the owner, wait for the other core to release - // the mutex. - // TODO: implement queue_try_remove_wait_timeout_ms in Pico SDK to - // simplify this logic - uint32_t caller = (uint32_t) lock_get_caller_owner_id(); // same cast exists in mutex_try_enter - if (caller != owner) { - mutex_enter_blocking(&platform->event_poll_mutex); - acquired_mutex = true; - } - } -#endif if (UNLIKELY(!queue_try_add(&platform->event_queue, &listener_queue))) { -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif fprintf(stderr, "Lost event from ISR as global event queue is full. System is overloaded or EVENT_QUEUE_LEN is too low\n"); return false; } -#ifndef AVM_NO_SMP - if (acquired_mutex) { - mutex_exit(&platform->event_poll_mutex); - } -#endif #ifndef AVM_NO_SMP sys_signal(glb); @@ -202,16 +174,12 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) sys_tinyusb_unlock(glb); #endif #ifndef AVM_NO_SMP - if (timeout_ms != 0) { - mutex_enter_blocking(&platform->event_poll_mutex); - if (queue_is_empty(&platform->event_queue)) { - if (timeout_ms > 0) { - cond_wait_timeout_ms(&platform->event_poll_cond, &platform->event_poll_mutex, timeout_ms); - } else { - cond_wait(&platform->event_poll_cond, &platform->event_poll_mutex); - } + if (timeout_ms != 0 && queue_is_empty(&platform->event_queue)) { + if (timeout_ms > 0) { + sem_acquire_timeout_ms(&platform->event_poll_sem, timeout_ms); + } else { + sem_acquire_blocking(&platform->event_poll_sem); } - mutex_exit(&platform->event_poll_mutex); } #else UNUSED(timeout_ms); @@ -230,7 +198,7 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms) void sys_signal(GlobalContext *glb) { struct RP2PlatformData *platform = glb->platform_data; - cond_signal(&platform->event_poll_cond); + sem_release(&platform->event_poll_sem); } #endif diff --git a/tests/libs/eavmlib/test_file.erl b/tests/libs/eavmlib/test_file.erl index c69a3a9f24..458c15d8a4 100644 --- a/tests/libs/eavmlib/test_file.erl +++ b/tests/libs/eavmlib/test_file.erl @@ -130,7 +130,13 @@ test_fifo_select(_HasSelect) -> test_gc(HasSelect) -> Path = "/tmp/atomvm.tmp." ++ integer_to_list(erlang:system_time(millisecond)), GCSubPid = spawn(fun() -> gc_loop(Path, undefined) end), - MemorySize0 = erlang:memory(binary), + % Dead resource terms from earlier tests still hold their refc binaries + % until this process collects them, and resources stopped with + % ERL_NIF_SELECT_STOP_SCHEDULED are released asynchronously by the + % scheduler polling events: collect, then wait for the global binary + % count to settle before taking the baseline. + erlang:garbage_collect(), + MemorySize0 = wait_memory_stable(erlang:memory(binary), 100), call_gc_loop(GCSubPid, open), MemorySize1 = erlang:memory(binary), ?ASSERT_TRUE(MemorySize1 > MemorySize0), @@ -166,7 +172,11 @@ test_gc(HasSelect) -> ?ASSERT_EQUALS(MemorySize8, MemorySize1), call_gc_loop(GCSubPid, close), call_gc_loop(GCSubPid, gc), - MemorySize9 = erlang:memory(binary), + % If select_stop raced the ready_output notification, the stop + % was scheduled (ERL_NIF_SELECT_STOP_SCHEDULED) and the resource + % is only released once the scheduler polling events retires the + % closed select event, so wait for memory to converge. + MemorySize9 = wait_memory_binary(MemorySize0, 100), ?ASSERT_EQUALS(MemorySize9, MemorySize0); true -> ok @@ -182,6 +192,31 @@ call_gc_loop(Pid, Message) -> {Pid, Message} -> ok end. +% Wait until two samples 20ms apart are equal. +wait_memory_stable(Last, 0) -> + Last; +wait_memory_stable(Last, Retries) -> + receive + after 20 -> ok + end, + case erlang:memory(binary) of + Last -> Last; + Other -> wait_memory_stable(Other, Retries - 1) + end. + +wait_memory_binary(Expected, Retries) -> + case erlang:memory(binary) of + Expected -> + Expected; + Other when Retries =:= 0 -> + Other; + _ -> + receive + after 10 -> ok + end, + wait_memory_binary(Expected, Retries - 1) + end. + gc_loop(Path, File) -> receive {select, _Resource, undefined, _Direction} ->