From 5162133728a722830f50021e25d63fd704f4cd88 Mon Sep 17 00:00:00 2001 From: Kubat <mael.martin31@gmail.com> Date: Wed, 15 Jul 2020 15:30:52 +0200 Subject: [PATCH] WIP: Add the worker threads, for module_repo --- inc/lektor/config.def | 1 + src/module/module_repo.c | 18 ++++- src/module/repo.c | 1 + src/module/worker.c | 139 +++++++++++++++++++++++++++++++++++++++ src/module/worker.h | 26 ++++++++ 5 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 src/module/worker.c create mode 100644 src/module/worker.h diff --git a/inc/lektor/config.def b/inc/lektor/config.def index 001f44d3..8c63644b 100644 --- a/inc/lektor/config.def +++ b/inc/lektor/config.def @@ -13,6 +13,7 @@ value("db_path", "/home/kara/kara.db") section("repo") value("module", "repo") value("name", "Kurisu") +value("workers_count", "5") value("url", "https://kurisu.iiens.net") value("json", "https://kurisu.iiens.net/api") value("id_json", "https://kurisu.iiens.net/api?id=%ld") diff --git a/src/module/module_repo.c b/src/module/module_repo.c index e207651d..265e9afa 100644 --- a/src/module/module_repo.c +++ b/src/module/module_repo.c @@ -3,11 +3,13 @@ #include <lektor/lktconfig.h> #include <common/common.h> #include <common/queue.h> -#include <lektor/module/mthread.h> #include <lektor/database.h> #include <lektor/net.h> #include <lektor/reg.h> +#include "worker.h" + +#include <pthread.h> #include <errno.h> #include <stdio.h> #include <unistd.h> @@ -42,6 +44,10 @@ struct module_repo_internal { char *get_id_file; const uint64_t version; + /* Worker threads */ + worker_pool workers; + int updating; + /* The database and the queue */ struct queue *queue; volatile sqlite3 *db; @@ -195,6 +201,7 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil .version = 1, .queue = queue, .db = db, + .updating = 0, .name = safe_zero_malloc(INI_MAX_LINE_LEN * sizeof(char)), .kara_dir = safe_zero_malloc(PATH_MAX * sizeof(char)), .get_id_json = safe_zero_malloc(URL_MAX_LEN * sizeof(char)), @@ -203,6 +210,15 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil .base_url = safe_zero_malloc(URL_MAX_LEN * sizeof(char)), }; + if (!database_config_get_int(db, "repo", "workers_count", &repo.workers_count)) + repo.workers_count = 5; + if (worker_pool_new(&repo.workers, + 10 /* Initial number of elements in the call queue */, + repo.workers_count)) { + LOG_ERROR("REPO", "Out of memory"); + return 1; + } + /* Copies */ if (!database_config_get_text(db, "database", "kara_dir", repo.kara_dir, PATH_MAX - 1) || !database_config_get_text(db, "repo", "name", repo.name, INI_MAX_LINE_LEN) || diff --git a/src/module/repo.c b/src/module/repo.c index 7b1382d9..ba77cd34 100644 --- a/src/module/repo.c +++ b/src/module/repo.c @@ -119,6 +119,7 @@ repo_free(struct lkt_repo *const repo) free(repo->get_id_file); free(repo->base_url); free(repo->get_all_json); + worker_pool_free(&repo->workers); } int diff --git a/src/module/worker.c b/src/module/worker.c new file mode 100644 index 00000000..995a24b2 --- /dev/null +++ b/src/module/worker.c @@ -0,0 +1,139 @@ +#define _POSIX_C_SOURCE 200809L + +#include "worker.h" +#include "common.h" + +#include <pthread.h> +#include <assert.h> +#include <sys/sysinfo.h> +#include <sched.h> + +static inline void * +__worker_thread(void *__pool) +{ + struct worker_pool *pool = (struct worker_pool *) __pool; + void *arg; + worker_function func; + for (;;) + { + assert(!pthread_mutex_lock(&pool->lock)); + if (pool->len) + { + --(pool->len); + ++(pool->thread_working); + func = pool->functions[pool->len]; + arg = pool->args[pool->len]; + } + else if (pool->exit) + { + pthread_mutex_unlock(&pool->lock); + LOG_INFO("WORKER", "Exiting"); + break; + } + else + { + func = NULL; + pthread_mutex_unlock(&pool->lock); + sched_yield(); + continue; + } + + pthread_mutex_unlock(&pool->lock); + func(arg); + + while (pthread_mutex_trylock(&pool->lock)) + sched_yield(); + --(pool->thread_working); + pthread_mutex_unlock(&pool->lock); + } + return NULL; +} + +int +worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count) +{ + pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; + if (!thread_count) + thread_count = get_nprocs_conf(); + struct worker_pool __ret = { + .functions = malloc(sizeof(worker_function) * init_size), + .args = malloc(sizeof(void *) * init_size), + .threads = malloc(sizeof(pthread_t) * thread_count), + .size = init_size, + .thread_size = thread_count, + .len = 0u, + .lock = mtx, + .exit = 0, + }; + if (!__ret.functions || !__ret.args || !__ret.threads) { + LOG_ERROR("WORKER", "Out of memory"); + __ret.thread_size = 0u; + worker_pool_free(&__ret); + return 1; + } + size_t i; + *ret = __ret; + for (i = 0; i < ret->thread_size; ++i) + assert(!pthread_create(&ret->threads[i], NULL, __worker_thread, ret)); + return 0; +} + +void worker_pool_free(struct worker_pool *pool) +{ + size_t i; + pool->exit = 1; + for (i = 0; i < pool->thread_size; ++i) + pthread_join(pool->threads[i], NULL); + assert(!pthread_mutex_lock(&pool->lock)); + if (pool->threads) + free((void *) pool->threads); + pool->thread_size = 0u; + if (pool->functions) + free((void *) pool->functions); + if (pool->args) + free((void *) pool->args); + pool->size = 0u; + pool->len = 0u; + assert(!pthread_mutex_unlock(&pool->lock)); +} + +int +worker_pool_push(struct worker_pool *pool, worker_function func, void *arg) +{ + int ret = 1; + assert(!pthread_mutex_lock(&pool->lock)); + if (pool->len == pool->size) { + void *new_functions = realloc((void *) pool->functions, + 2 * pool->size * sizeof(worker_function)); + if (NULL == new_functions) { + LOG_ERROR("WORKER", "Out of memory"); + goto error; + } + void *new_args = realloc((void *) pool->args, + 2 * pool->size * sizeof(void *)); + if (NULL == new_args) { + free(new_functions); + LOG_ERROR("WORKER", "Out of memory"); + goto error; + } + pool->size *= 2; + pool->functions = new_functions; + pool->args = new_args; + } + + pool->functions[pool->len] = func; + pool->args[pool->len] = arg; + ++(pool->len); + ret = 0; +error: + assert(!pthread_mutex_unlock(&pool->lock)); + return ret; +} + +void +worker_pool_waitall(struct worker_pool *pool) +{ + /* No lock, nothing, just test and yield */ + while (pool->len) + sched_yield(); +} diff --git a/src/module/worker.h b/src/module/worker.h new file mode 100644 index 00000000..cc6417f1 --- /dev/null +++ b/src/module/worker.h @@ -0,0 +1,26 @@ +#pragma once + +#include <pthread.h> +#include <stdlib.h> + +typedef void *(*worker_function)(void *); + +struct worker_pool { + volatile size_t len; + volatile size_t size; + volatile size_t thread_size; + volatile size_t thread_working; + volatile worker_function *functions; + volatile void *volatile *volatile args; + pthread_t *threads; + pthread_mutex_t lock; + volatile int exit; +}; + +/* After a free, the pool is no longer usable, you must re-init with a new */ +int worker_pool_new(struct worker_pool *, size_t init_len, size_t thread_count); +void worker_pool_free(struct worker_pool *); +void worker_pool_waitall(struct worker_pool *); + +/* It is up to the user to free or not the `arg` depending on what he whats */ +int worker_pool_push(struct worker_pool *, worker_function, void *arg); -- GitLab