Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 73 additions & 46 deletions core/iwasm/common/wasm_shared_memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include "bh_log.h"
#include "wasm_shared_memory.h"
#if WASM_ENABLE_THREAD_MGR != 0
#include "../libraries/thread-mgr/thread_manager.h"
#endif

static bh_list shared_memory_list_head;
static bh_list *const shared_memory_list = &shared_memory_list_head;
Expand All @@ -21,6 +24,8 @@ typedef struct AtomicWaitInfo {
korp_mutex wait_list_lock;
bh_list wait_list_head;
bh_list *wait_list;
/* WARNING: insert to the list allowed only in acquire_wait_info
otherwise there will be data race as described in PR #2016 */
} AtomicWaitInfo;

typedef struct AtomicWaitNode {
Expand Down Expand Up @@ -298,7 +303,7 @@ notify_wait_list(bh_list *wait_list, uint32 count)
}

static AtomicWaitInfo *
acquire_wait_info(void *address, bool create)
acquire_wait_info(void *address, AtomicWaitNode *wait_node)
{
AtomicWaitInfo *wait_info = NULL;
bh_list_status ret;
Expand All @@ -308,7 +313,7 @@ acquire_wait_info(void *address, bool create)
if (address)
wait_info = (AtomicWaitInfo *)bh_hash_map_find(wait_map, address);

if (!create) {
if (!wait_node) {
os_mutex_unlock(&wait_map_lock);
return wait_info;
}
Expand Down Expand Up @@ -336,6 +341,12 @@ acquire_wait_info(void *address, bool create)
}
}

os_mutex_lock(&wait_info->wait_list_lock);
ret = bh_list_insert(wait_info->wait_list, wait_node);
os_mutex_unlock(&wait_info->wait_list_lock);
bh_assert(ret == BH_LIST_SUCCESS);
(void)ret;

os_mutex_unlock(&wait_map_lock);

bh_assert(wait_info);
Expand Down Expand Up @@ -376,16 +387,22 @@ destroy_wait_info(void *wait_info)
}
}

static bool
map_remove_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info,
void *address)
static void
map_try_release_wait_info(HashMap *wait_map_, AtomicWaitInfo *wait_info,
void *address)
{
os_mutex_lock(&wait_map_lock);
os_mutex_lock(&wait_info->wait_list_lock);
if (wait_info->wait_list->len > 0) {
return false;
os_mutex_unlock(&wait_info->wait_list_lock);
os_mutex_unlock(&wait_map_lock);
return;
}
os_mutex_unlock(&wait_info->wait_list_lock);

bh_hash_map_remove(wait_map_, address, NULL, NULL);
return true;
os_mutex_unlock(&wait_map_lock);
destroy_wait_info(wait_info);
}

uint32
Expand All @@ -396,7 +413,8 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
AtomicWaitInfo *wait_info;
AtomicWaitNode *wait_node;
WASMSharedMemNode *node;
bool check_ret, is_timeout, no_wait, removed_from_map;
WASMExecEnv *exec_env;
bool check_ret, is_timeout, no_wait;

bh_assert(module->module_type == Wasm_Module_Bytecode
|| module->module_type == Wasm_Module_AoT);
Expand All @@ -418,14 +436,6 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
return -1;
}

/* acquire the wait info, create new one if not exists */
wait_info = acquire_wait_info(address, true);

if (!wait_info) {
wasm_runtime_set_exception(module, "failed to acquire wait_info");
return -1;
}

node = search_module((WASMModuleCommon *)module_inst->module);
os_mutex_lock(&node->shared_mem_lock);
no_wait = (!wait64 && *(uint32 *)address != (uint32)expect)
Expand All @@ -435,40 +445,59 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
if (no_wait) {
return 1;
}
else {
bh_list_status ret;

if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) {
wasm_runtime_set_exception(module, "failed to create wait node");
return -1;
}
memset(wait_node, 0, sizeof(AtomicWaitNode));
if (!(wait_node = wasm_runtime_malloc(sizeof(AtomicWaitNode)))) {
wasm_runtime_set_exception(module, "failed to create wait node");
return -1;
}
memset(wait_node, 0, sizeof(AtomicWaitNode));

if (0 != os_mutex_init(&wait_node->wait_lock)) {
wasm_runtime_free(wait_node);
return -1;
}
if (0 != os_mutex_init(&wait_node->wait_lock)) {
wasm_runtime_free(wait_node);
return -1;
}

if (0 != os_cond_init(&wait_node->wait_cond)) {
os_mutex_destroy(&wait_node->wait_lock);
wasm_runtime_free(wait_node);
return -1;
}
if (0 != os_cond_init(&wait_node->wait_cond)) {
os_mutex_destroy(&wait_node->wait_lock);
wasm_runtime_free(wait_node);
return -1;
}

wait_node->status = S_WAITING;
os_mutex_lock(&wait_info->wait_list_lock);
ret = bh_list_insert(wait_info->wait_list, wait_node);
os_mutex_unlock(&wait_info->wait_list_lock);
bh_assert(ret == BH_LIST_SUCCESS);
(void)ret;
wait_node->status = S_WAITING;

/* acquire the wait info, create new one if not exists */
wait_info = acquire_wait_info(address, wait_node);

if (!wait_info) {
os_mutex_destroy(&wait_node->wait_lock);
wasm_runtime_free(wait_node);
wasm_runtime_set_exception(module, "failed to acquire wait_info");
return -1;
}

#if WASM_ENABLE_THREAD_MGR != 0
exec_env =
wasm_clusters_search_exec_env((WASMModuleInstanceCommon *)module_inst);
bh_assert(exec_env);
#endif

os_mutex_lock(&node->shared_mem_lock);
no_wait = (!wait64 && *(uint32 *)address != (uint32)expect)
|| (wait64 && *(uint64 *)address != expect);
os_mutex_unlock(&node->shared_mem_lock);

/* condition wait start */
os_mutex_lock(&wait_node->wait_lock);

os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock,
timeout < 0 ? BHT_WAIT_FOREVER
: (uint64)timeout / 1000);
if (!no_wait
#if WASM_ENABLE_THREAD_MGR != 0
&& !wasm_cluster_is_thread_terminated(exec_env)
#endif
) {
os_cond_reltimedwait(&wait_node->wait_cond, &wait_node->wait_lock,
timeout < 0 ? BHT_WAIT_FOREVER
: (uint64)timeout / 1000);
}

is_timeout = wait_node->status == S_WAITING ? true : false;
os_mutex_unlock(&wait_node->wait_lock);
Expand All @@ -486,14 +515,12 @@ wasm_runtime_atomic_wait(WASMModuleInstanceCommon *module, void *address,
wasm_runtime_free(wait_node);

/* Release wait info if no wait nodes attached */
removed_from_map = map_remove_wait_info(wait_map, wait_info, address);
os_mutex_unlock(&wait_info->wait_list_lock);
if (removed_from_map)
destroy_wait_info(wait_info);
map_try_release_wait_info(wait_map, wait_info, address);
os_mutex_unlock(&node->shared_mem_lock);

(void)check_ret;
return is_timeout ? 2 : 0;
return no_wait ? 1 : is_timeout ? 2 : 0;
}

uint32
Expand Down Expand Up @@ -523,7 +550,7 @@ wasm_runtime_atomic_notify(WASMModuleInstanceCommon *module, void *address,
return -1;
}

wait_info = acquire_wait_info(address, false);
wait_info = acquire_wait_info(address, NULL);

/* Nobody wait on this address */
if (!wait_info) {
Expand Down
1 change: 1 addition & 0 deletions core/iwasm/common/wasm_shared_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define _WASM_SHARED_MEMORY_H

#include "bh_common.h"
#include "wasm_exec_env.h"
#if WASM_ENABLE_INTERP != 0
#include "wasm_runtime.h"
#endif
Expand Down
11 changes: 5 additions & 6 deletions core/iwasm/libraries/lib-wasi-threads/test/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>
#include <limits.h>

#include "wasi_thread_start.h"

Expand All @@ -23,7 +24,6 @@ static bool termination_by_trap;
static bool termination_in_main_thread;
static blocking_task_type_t blocking_task_type;

#define TIMEOUT_SECONDS 10ll
#define NUM_THREADS 3
static pthread_barrier_t barrier;

Expand All @@ -36,15 +36,14 @@ void
run_long_task()
{
if (blocking_task_type == BLOCKING_TASK_BUSY_WAIT) {
for (int i = 0; i < TIMEOUT_SECONDS; i++)
sleep(1);
for (;;) {
}
}
else if (blocking_task_type == BLOCKING_TASK_ATOMIC_WAIT) {
__builtin_wasm_memory_atomic_wait32(
0, 0, TIMEOUT_SECONDS * 1000 * 1000 * 1000);
__builtin_wasm_memory_atomic_wait32(0, 0, -1);
}
else {
sleep(TIMEOUT_SECONDS);
sleep(UINT_MAX);
}
}

Expand Down