Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
longer lines return `{error, {parser, {line_too_long, Prefix}}}` with the first 128 bytes of
the offending line. Callers whose upstream servers emit unusually large headers must account
for this limit
- Improved performance of SMP scheduler. As a result, resources selected with `enif_select` and
stopped with the `ERL_NIF_SELECT_STOP_SCHEDULED` result are now released asynchronously by the
scheduler polling events, staying within the boundaries of the BEAM `enif_select` specification

### Removed
- Removed `ahttp_client` support for obsolete line folding (RFC 9112 §5.2); folded header and
Expand Down
134 changes: 93 additions & 41 deletions src/libAtomVM/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,51 @@ static void scheduler_process_native_signal_messages(Context *ctx)
}
}

static Context *scheduler_first_runnable_ready(GlobalContext *global)
{
Context *result = NULL;
SMP_SPINLOCK_LOCK(&global->processes_spinlock);
// Pick first ready which is not running.
struct ListHead *next_ready = list_first(&global->ready_processes);
while (next_ready != &global->ready_processes) {
result = GET_LIST_ENTRY(next_ready, Context, processes_list_head);
if (!(result->flags & Running)) {
list_remove(next_ready);
context_update_flags(result, ~Ready, Running);
if (result->native_handler) {
// Native handlers are marked as waiting
list_append(&global->waiting_processes, next_ready);
} else {
list_append(&global->running_processes, next_ready);
}
break;
}
next_ready = next_ready->next;
result = NULL;
}
SMP_SPINLOCK_UNLOCK(&global->processes_spinlock);
return result;
}

#ifndef AVM_NO_SMP
// Caller must hold processes_spinlock. Returns true iff ready_processes holds a
// process a scheduler can actually dispatch right now: a Ready entry that is not
// also flagged Running. A Ready|Running entry is a signaled context still
// executing on another scheduler; scheduler_first_runnable_ready skips it, so it
// is not dispatchable backlog.
static bool scheduler_has_runnable_ready(GlobalContext *global)
{
struct ListHead *item;
LIST_FOR_EACH (item, &global->ready_processes) {
Context *c = GET_LIST_ENTRY(item, Context, processes_list_head);
if (!context_get_flags(c, Running)) {
return true;
}
}
return false;
}
#endif

static Context *scheduler_run0(GlobalContext *global)
{
// This function should return a new process to run.
Expand Down Expand Up @@ -209,6 +254,13 @@ static Context *scheduler_run0(GlobalContext *global)
return NULL;
}
if (!is_waiting) {
// If a process is ready, process it instead of waking up
// the poller scheduler
result = scheduler_first_runnable_ready(global);
if (result != NULL) {
break;
}

// Before entering the condition variable, signal the poll events
// so the thread polling on events can check the ready queue.
sys_signal(global);
Expand All @@ -232,41 +284,35 @@ static Context *scheduler_run0(GlobalContext *global)
int32_t wait_timeout = update_timer_list(global);
SMP_SPINLOCK_UNLOCK(&global->timer_spinlock);

SMP_SPINLOCK_LOCK(&global->processes_spinlock);
// Pick first ready which is not running.
struct ListHead *next_ready = list_first(&global->ready_processes);
while (next_ready != &global->ready_processes) {
result = GET_LIST_ENTRY(next_ready, Context, processes_list_head);
if (!(result->flags & Running)) {
list_remove(next_ready);
context_update_flags(result, ~Ready, Running);
if (result->native_handler) {
// Native handlers are marked as waiting
list_append(&global->waiting_processes, next_ready);
} else {
list_append(&global->running_processes, next_ready);
}
break;
}
next_ready = next_ready->next;
result = NULL;
if (result == NULL) {
result = scheduler_first_runnable_ready(global);
}
SMP_SPINLOCK_UNLOCK(&global->processes_spinlock);

if (result == NULL && !global->scheduler_stop_all) {
sys_poll_events(global, wait_timeout);
} else {
sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT);
// Only the poller scheduler drives the event loop.
#ifndef AVM_NO_SMP
if (is_waiting) {
#endif
if (result == NULL && !global->scheduler_stop_all) {
// The poller may block waiting for events.
sys_poll_events(global, wait_timeout);
} else {
sys_poll_events(global, SYS_POLL_EVENTS_DO_NOT_WAIT);
}
#ifndef AVM_NO_SMP
}
#endif
#ifdef AVM_TASK_DRIVER_ENABLED
globalcontext_process_task_driver_queues(global);
#endif
SMP_MUTEX_LOCK(global->schedulers_mutex);
} while (result == NULL);

#ifndef AVM_NO_SMP
global->waiting_scheduler = false;
smp_condvar_signal(global->schedulers_cv);
// Only the polling scheduler relinquishes the poller role.
if (is_waiting) {
global->waiting_scheduler = false;
smp_condvar_signal(global->schedulers_cv);
}
SMP_MUTEX_UNLOCK(global->schedulers_mutex);
#endif

Expand Down Expand Up @@ -357,17 +403,13 @@ static void scheduler_make_ready(Context *ctx)
}
list_remove(&ctx->processes_list_head);
#ifndef AVM_NO_SMP
if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) {
// Start a new scheduler if none are going to take this process.
if (!global->waiting_scheduler
&& global->running_schedulers > 0
&& global->running_schedulers < global->online_schedulers
&& !context_get_flags(ctx, Running)) {
global->running_schedulers++;
smp_scheduler_start(global);
}
SMP_MUTEX_UNLOCK(global->schedulers_mutex);
}
// The readying scheduler will pick up this process itself (directly before
// parking, or on its next reschedule), so only wake or start another
// scheduler when there is already a *runnable* backlog to parallelize.
// ctx is not in ready_processes yet (it was just removed), and entries
// flagged Running are signaled contexts no scheduler can dispatch, so
// neither inflates the count.
bool ready_backlog = scheduler_has_runnable_ready(global);
#endif
// Move to ready queue (from waiting or running)
// The process may be running (it would be signaled), so mark it
Expand All @@ -376,13 +418,23 @@ static void scheduler_make_ready(Context *ctx)
list_append(&global->ready_processes, &ctx->processes_list_head);
SMP_SPINLOCK_UNLOCK(&global->processes_spinlock);
#ifndef AVM_NO_SMP
if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) {
if (global->waiting_scheduler) {
if (ready_backlog) {
if (SMP_MUTEX_TRYLOCK(global->schedulers_mutex)) {
// Start a new scheduler if none are going to take this process.
if (!global->waiting_scheduler
&& global->running_schedulers > 0
&& global->running_schedulers < global->online_schedulers
&& !context_get_flags(ctx, Running)) {
global->running_schedulers++;
smp_scheduler_start(global);
}
if (global->waiting_scheduler) {
sys_signal(global);
}
SMP_MUTEX_UNLOCK(global->schedulers_mutex);
} else {
sys_signal(global);
}
SMP_MUTEX_UNLOCK(global->schedulers_mutex);
} else {
sys_signal(global);
}
#elif defined(AVM_TASK_DRIVER_ENABLED)
sys_signal(global);
Expand Down
26 changes: 16 additions & 10 deletions src/platforms/esp32/components/avm_sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "esp_system.h"
#include "esp_timer.h"
#include "freertos/FreeRTOS.h"
#include "freertos/semphr.h"
#include "freertos/task.h"
#include <esp_log.h>
#include <esp_partition.h>
Expand Down Expand Up @@ -112,12 +113,16 @@ static const char *const revision_atom = "\x8" "revision";

QueueHandle_t event_queue = NULL;
QueueSetHandle_t event_set = NULL;
static SemaphoreHandle_t signal_semaphore = NULL;

void esp32_sys_queue_init()
{
event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4);
// + 1 accounts for the signal binary semaphore
event_set = xQueueCreateSet(EVENT_QUEUE_LEN * 4 + 1);
event_queue = xQueueCreate(EVENT_QUEUE_LEN, sizeof(void *));
xQueueAddToSet(event_queue, event_set);
signal_semaphore = xSemaphoreCreateBinary();
xQueueAddToSet(signal_semaphore, event_set);
}

static inline void sys_clock_gettime(struct timespec *t)
Expand All @@ -132,6 +137,14 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks)
void *sender = NULL;
QueueSetMemberHandle_t event_source;
while ((event_source = xQueueSelectFromSet(event_set, wait_ticks))) {
#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED)
if (event_source == signal_semaphore) {
// We've been signaled
xSemaphoreTake(signal_semaphore, 0);
return;
}
#endif

// Listener used shared event_queue.
if (event_source == event_queue) {
if (UNLIKELY(xQueueReceive(event_queue, &sender, 0) == pdFALSE)) {
Expand All @@ -141,13 +154,6 @@ static void receive_events(GlobalContext *glb, TickType_t wait_ticks)
sender = event_source;
}

#if !defined(AVM_NO_SMP) || defined(AVM_TASK_DRIVER_ENABLED)
if (sender == CAST_FUNC_TO_VOID_PTR(sys_signal)) {
// We've been signaled
return;
}
#endif

struct ListHead *listeners = synclist_wrlock(&glb->listeners);
if (!process_listener_handler(glb, sender, listeners, NULL, NULL)) {
TRACE("sys: handler not found for: %p\n", (void *) sender);
Expand All @@ -170,8 +176,8 @@ void sys_poll_events(GlobalContext *glb, int timeout_ms)

void sys_signal(GlobalContext *glb)
{
void *queue_item = CAST_FUNC_TO_VOID_PTR(sys_signal);
xQueueSendToBack(event_queue, &queue_item, 0);
UNUSED(glb);
xSemaphoreGive(signal_semaphore);
}

void sys_time(struct timespec *t)
Expand Down
19 changes: 18 additions & 1 deletion src/platforms/esp32/test/main/test_erl_sources/test_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ test_basic_file() ->
test_gc() ->
Path = "/sdcard/atomvm-2.txt",
GCSubPid = spawn(fun() -> gc_loop(Path, undefined) end),
MemorySize0 = erlang:memory(binary),
% Dead resource terms from earlier tests still hold their refc binaries
% until this process collects them, and resources are released
% asynchronously by the scheduler polling events: collect, then wait for
% the global binary count to settle before taking the baseline.
erlang:garbage_collect(),
MemorySize0 = wait_memory_stable(erlang:memory(binary), 100),
call_gc_loop(GCSubPid, open),
MemorySize1 = erlang:memory(binary),
true = MemorySize1 > MemorySize0,
Expand All @@ -75,6 +80,18 @@ test_gc() ->
ok = atomvm:posix_unlink(Path),
ok.

% Wait until two samples 20ms apart are equal.
wait_memory_stable(Last, 0) ->
Last;
wait_memory_stable(Last, Retries) ->
receive
after 20 -> ok
end,
case erlang:memory(binary) of
Last -> Last;
Other -> wait_memory_stable(Other, Retries - 1)
end.

call_gc_loop(Pid, Message) ->
Pid ! {self(), Message},
receive
Expand Down
Loading