diff --git a/CMakeLists.txt b/CMakeLists.txt index 8be66838b5e6d5f5127f120202a8233c8a239b78..b1c20ba1cbbf8c71f4a7542ae942e3d2b91210ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,7 +138,6 @@ set(lektor_module_SOURCES src/module/module_repo.c src/module/mpv.c src/module/module_sdl2.c - src/module/worker.c ) set(lektor_mkv_SOURCES diff --git a/inc/lektor/internal/worker.h b/inc/lektor/internal/worker.h index 21d1332188cf2c208cea4ccb74d3c40617a1fa9e..e9bcaa8e9044f179bf24c5b37a69e39a1ccbb94f 100644 --- a/inc/lektor/internal/worker.h +++ b/inc/lektor/internal/worker.h @@ -1,9 +1,12 @@ #pragma once +#include <lektor/common.h> +#include <sys/sysinfo.h> +#include <sched.h> #include <pthread.h> #include <stdlib.h> -typedef void *(*worker_function)(void *, void *); +typedef void *(*worker_pool_function)(void *, void *); typedef enum { WORKER_STATUS_NONE = 0, @@ -15,7 +18,7 @@ struct worker_pool { volatile size_t size; volatile size_t thread_size; volatile size_t thread_working; - volatile worker_function *functions; + volatile worker_pool_function *functions; volatile void *volatile *volatile args; pthread_t *threads; pthread_mutex_t lock; @@ -23,20 +26,181 @@ struct worker_pool { }; /* After a free, the pool is no longer usable, you must re-init with a new */ -int worker_pool_new(struct worker_pool *, size_t init_len, size_t thread_count); -void worker_pool_free(struct worker_pool *); -void worker_pool_waitall(struct worker_pool *); +PRIVATE_FUNCTION int worker_pool_new(struct worker_pool *, size_t init_len, size_t thread_count); +PRIVATE_FUNCTION void worker_pool_free(struct worker_pool *); +PRIVATE_FUNCTION void worker_pool_waitall(struct worker_pool *); /* Request an interupt for all threads/tasks, you can wait after that. Note * that if a task is currently executed, it needs to supports a way of being * interupted (no automagic stuff here). */ -void worker_pool_interrupt(struct worker_pool *); +PRIVATE_FUNCTION void worker_pool_interrupt(struct worker_pool *); /* It is up to the user to free or not the `arg` depending on what he whats */ -int worker_pool_push(struct worker_pool *, worker_function, void *arg); +PRIVATE_FUNCTION int worker_pool_push(struct worker_pool *, worker_pool_function, void *arg); /* Get the working thread count */ -size_t worker_pool_get_working_count(struct worker_pool *); +PRIVATE_FUNCTION size_t worker_pool_get_working_count(struct worker_pool *); /* For clients, get the status of the worker */ -WORKER_STATUS worker_pool_get_status(void *); +PRIVATE_FUNCTION WORKER_STATUS worker_pool_get_status(void *); + +/* Implementation */ + +PRIVATE_FUNCTION void * +worker_pool_thread(void *___pool) +{ + lkt_thread_set_name("lektord/worker"); + + volatile struct worker_pool *pool = (volatile struct worker_pool *)___pool; + volatile void *volatile arg; + worker_pool_function func; + FOR_EVER + { + assert(!pthread_mutex_lock((pthread_mutex_t *)&pool->lock)); + if (pool->len) { + --(pool->len); + ++(pool->thread_working); + func = pool->functions[pool->len]; + arg = pool->args[pool->len]; + } else if (pool->exit) { + assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); + LOG_INFO("WORKER", "Exiting"); + break; + } else { + assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); + sleep(1); + continue; + } + + assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); + LOG_INFO("WORKER", "Picked up a function"); + func(___pool, (void *)arg); + LOG_INFO("WORKER", "Finished work for a function"); + + assert(!pthread_mutex_lock((pthread_mutex_t *)&pool->lock)); + --(pool->thread_working); + assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); + } + pthread_exit(NULL); +} + +PRIVATE_FUNCTION WORKER_STATUS +worker_pool_get_status(void *__pool) +{ + WORKER_STATUS ret = WORKER_STATUS_NONE; + struct worker_pool *pool = (struct worker_pool *)__pool; + + /* Case handling to get the correct status, simple for the moment, but may + * involve some logic in the future. */ + DO_WITH_LOCK(pool->lock, error, { + if (pool->exit && pool->len == 0) + ret = WORKER_STATUS_INTERRUPT; + }); + +error: + return ret; +} + +PRIVATE_FUNCTION void +worker_pool_interrupt(struct worker_pool *pool) +{ + DO_WITH_LOCK(pool->lock, error, { + pool->len = 0; + pool->exit = 1; + }); +error: + return; +} + +PRIVATE_FUNCTION int +worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count) +{ + if (!thread_count) { + int nprocs = get_nprocs_conf(); + assert(nprocs > 0); + thread_count = (size_t)nprocs; + } + struct worker_pool __ret = { + .functions = LKT_ALLOC_ARRAY(worker_pool_function, init_size), + .args = LKT_ALLOC_ARRAY(volatile void *, init_size), + .threads = LKT_ALLOC_ARRAY(pthread_t, thread_count), + .size = init_size, + .thread_size = thread_count, + .len = 0u, + .exit = 0, + }; + *ret = __ret; + assert(!pthread_mutex_init(&ret->lock, NULL)); + size_t i; + for (i = 0; i < ret->thread_size; ++i) + assert(!pthread_create(&ret->threads[i], NULL, worker_pool_thread, ret)); + return 0; +} + +PRIVATE_FUNCTION void +worker_pool_free(struct worker_pool *pool) +{ + LOG_DEBUG("WORKER", "Freeing worker pool %p", pool); + size_t i; + pool->exit = 1; + for (i = 0; i < pool->thread_size; ++i) + pthread_join(pool->threads[i], NULL); + + DO_WITH_LOCK(pool->lock, error, { + safe_free((void **)&pool->threads); + safe_free((void **)&pool->functions); + safe_free((void **)&pool->args); + pool->thread_size = 0u; + pool->size = 0u; + pool->len = 0u; + }); + +error: + return; +} + +PRIVATE_FUNCTION int +worker_pool_push(struct worker_pool *pool, worker_pool_function func, void *arg) +{ + int ret = 1; + assert(!pthread_mutex_lock(&pool->lock)); + if (pool->exit) { + LOG_ERROR("WORKER", "Can't push new jobs to worker if interupt was requested"); + goto error; + } + + if (pool->len == pool->size) { + if (pool->size < 1) + pool->size = 2; + size_t nsize = 2 * pool->size; + void *new_func = + safe_realloc((void *)pool->functions, nsize * sizeof(worker_pool_function)); + void *new_args = safe_realloc((void *)pool->args, nsize * sizeof(void *)); + + pool->size *= 2; + pool->functions = new_func; + pool->args = new_args; + } + + pool->functions[pool->len] = func; + pool->args[pool->len] = arg; + ++(pool->len); + ret = 0; +error: + assert(!pthread_mutex_unlock(&pool->lock)); + return ret; +} + +PRIVATE_FUNCTION void +worker_pool_waitall(struct worker_pool *pool) +{ + /* No lock, nothing, just test and yield */ + FOR_EVER_IF (pool->len) + sched_yield(); +} + +PRIVATE_FUNCTION size_t +worker_pool_get_working_count(struct worker_pool *pool) +{ + return pool->thread_working; +} diff --git a/src/module/module_repo.c b/src/module/module_repo.c index 443fc47ec09b47306314b666411a1f600dc56fc5..f97a1927271089baf4df98b539b444a4514a18bb 100644 --- a/src/module/module_repo.c +++ b/src/module/module_repo.c @@ -1142,9 +1142,9 @@ mod_update(va_list *va) * if the query is an id, we will use the *from_ids* version, * enabling the user to queue the different kara to dl by their ID */ - const worker_function update_version = (lkt_uri_get_type(args->filter_uri) == URI_ID) - ? ___worker_update_from_ids - : ___worker_update; + const worker_pool_function update_version = + (lkt_uri_get_type(args->filter_uri) == URI_ID) ? ___worker_update_from_ids + : ___worker_update; if (worker_pool_push(&(*repo)->workers, update_version, (void *)args)) { LOG_ERROR("REPO", "Out of memory"); diff --git a/src/module/worker.c b/src/module/worker.c deleted file mode 100644 index 35fd0db992b578f878eb5e90491d46da9a42be4c..0000000000000000000000000000000000000000 --- a/src/module/worker.c +++ /dev/null @@ -1,162 +0,0 @@ -#include <lektor/common.h> -#include <lektor/internal/worker.h> -#include <sys/sysinfo.h> -#include <sched.h> - -static void * -___worker_thread(void *___pool) -{ - lkt_thread_set_name("lektord/worker"); - - volatile struct worker_pool *pool = (volatile struct worker_pool *)___pool; - volatile void *volatile arg; - worker_function func; - FOR_EVER - { - assert(!pthread_mutex_lock((pthread_mutex_t *)&pool->lock)); - if (pool->len) { - --(pool->len); - ++(pool->thread_working); - func = pool->functions[pool->len]; - arg = pool->args[pool->len]; - } else if (pool->exit) { - assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); - LOG_INFO("WORKER", "Exiting"); - break; - } else { - assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); - sleep(1); - continue; - } - - assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); - LOG_INFO("WORKER", "Picked up a function"); - func(___pool, (void *)arg); - LOG_INFO("WORKER", "Finished work for a function"); - - assert(!pthread_mutex_lock((pthread_mutex_t *)&pool->lock)); - --(pool->thread_working); - assert(!pthread_mutex_unlock((pthread_mutex_t *)&pool->lock)); - } - pthread_exit(NULL); -} - -WORKER_STATUS -worker_pool_get_status(void *__pool) -{ - WORKER_STATUS ret = WORKER_STATUS_NONE; - struct worker_pool *pool = (struct worker_pool *)__pool; - - /* Case handling to get the correct status, simple for the moment, but may - * involve some logic in the future. */ - DO_WITH_LOCK(pool->lock, error, { - if (pool->exit && pool->len == 0) - ret = WORKER_STATUS_INTERRUPT; - }); - -error: - return ret; -} - -void -worker_pool_interrupt(struct worker_pool *pool) -{ - DO_WITH_LOCK(pool->lock, error, { - pool->len = 0; - pool->exit = 1; - }); -error: - return; -} - -int -worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count) -{ - if (!thread_count) { - int nprocs = get_nprocs_conf(); - assert(nprocs > 0); - thread_count = (size_t)nprocs; - } - struct worker_pool __ret = { - .functions = LKT_ALLOC_ARRAY(worker_function, init_size), - .args = LKT_ALLOC_ARRAY(volatile void *, init_size), - .threads = LKT_ALLOC_ARRAY(pthread_t, thread_count), - .size = init_size, - .thread_size = thread_count, - .len = 0u, - .exit = 0, - }; - *ret = __ret; - assert(!pthread_mutex_init(&ret->lock, NULL)); - size_t i; - for (i = 0; i < ret->thread_size; ++i) - assert(!pthread_create(&ret->threads[i], NULL, ___worker_thread, ret)); - return 0; -} - -void -worker_pool_free(struct worker_pool *pool) -{ - LOG_DEBUG("WORKER", "Freeing worker pool %p", pool); - size_t i; - pool->exit = 1; - for (i = 0; i < pool->thread_size; ++i) - pthread_join(pool->threads[i], NULL); - - DO_WITH_LOCK(pool->lock, error, { - safe_free((void **)&pool->threads); - safe_free((void **)&pool->functions); - safe_free((void **)&pool->args); - pool->thread_size = 0u; - pool->size = 0u; - pool->len = 0u; - }); - -error: - return; -} - -int -worker_pool_push(struct worker_pool *pool, worker_function func, void *arg) -{ - int ret = 1; - assert(!pthread_mutex_lock(&pool->lock)); - if (pool->exit) { - LOG_ERROR("WORKER", "Can't push new jobs to worker if interupt was requested"); - goto error; - } - - if (pool->len == pool->size) { - if (pool->size < 1) - pool->size = 2; - size_t nsize = 2 * pool->size; - void *new_func = safe_realloc((void *)pool->functions, nsize * sizeof(worker_function)); - void *new_args = safe_realloc((void *)pool->args, nsize * sizeof(void *)); - - pool->size *= 2; - pool->functions = new_func; - pool->args = new_args; - } - - pool->functions[pool->len] = func; - pool->args[pool->len] = arg; - ++(pool->len); - ret = 0; -error: - assert(!pthread_mutex_unlock(&pool->lock)); - return ret; -} - -void -worker_pool_waitall(struct worker_pool *pool) -{ - /* No lock, nothing, just test and yield */ - FOR_EVER_IF (pool->len) - sched_yield(); -} - -size_t -worker_pool_get_working_count(struct worker_pool *pool) -{ - return pool->thread_working; -}