#define _POSIX_C_SOURCE 200809L #include "worker.h" #include <lektor/common.h> #include <stdio.h> #include <pthread.h> #include <assert.h> #include <sys/sysinfo.h> #include <sched.h> #include <errno.h> #include <string.h> static void * __worker_thread(void *__pool) { struct worker_pool *pool = (struct worker_pool *) __pool; volatile void *volatile arg; worker_function func; for (;;) { errno = 0; if (pthread_mutex_lock(&pool->lock)) { LOG_ERROR("WORKER", "Failed to lock mutex: %s", strerror(errno)); abort(); } if (pool->len) { --(pool->len); ++(pool->thread_working); func = pool->functions[pool->len]; arg = pool->args[pool->len]; } else if (pool->exit) { pthread_mutex_unlock(&pool->lock); LOG_INFO("WORKER", "Exiting"); break; } else { func = NULL; pthread_mutex_unlock(&pool->lock); sched_yield(); continue; } pthread_mutex_unlock(&pool->lock); func((void *) arg); while (pthread_mutex_trylock(&pool->lock)) sched_yield(); --(pool->thread_working); pthread_mutex_unlock(&pool->lock); } return NULL; } int worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count) { if (!thread_count) thread_count = get_nprocs_conf(); struct worker_pool __ret = { .functions = malloc(sizeof(worker_function) * init_size), .args = malloc(sizeof(void *) * init_size), .threads = malloc(sizeof(pthread_t) * thread_count), .size = init_size, .thread_size = thread_count, .len = 0u, .exit = 0, }; if (!__ret.functions || !__ret.args || !__ret.threads) { LOG_ERROR("WORKER", "Out of memory"); __ret.thread_size = 0u; worker_pool_free(&__ret); return 1; } *ret = __ret; if (pthread_mutex_init(&ret->lock, NULL)) { LOG_ERROR("WORKER", "Failed to init mutex: %s", strerror(errno)); abort(); } size_t i; for (i = 0; i < ret->thread_size; ++i) assert(!pthread_create(&ret->threads[i], NULL, __worker_thread, ret)); return 0; } void worker_pool_free(struct worker_pool *pool) { LOG_DEBUG("WORKER", "Freeing worker pool %p", pool); size_t i; pool->exit = 1; for (i = 0; i < pool->thread_size; ++i) pthread_join(pool->threads[i], NULL); assert(!pthread_mutex_lock(&pool->lock)); if (pool->threads) free((void *) pool->threads); pool->thread_size = 0u; if (pool->functions) free((void *) pool->functions); if (pool->args) free((void *) pool->args); pool->size = 0u; pool->len = 0u; assert(!pthread_mutex_unlock(&pool->lock)); } int worker_pool_push(struct worker_pool *pool, worker_function func, void *arg) { int ret = 1; assert(!pthread_mutex_lock(&pool->lock)); if (pool->len == pool->size) { void *new_functions = realloc((void *) pool->functions, 2 * pool->size * sizeof(worker_function)); if (NULL == new_functions) { LOG_ERROR("WORKER", "Out of memory"); goto error; } void *new_args = realloc((void *) pool->args, 2 * pool->size * sizeof(void *)); if (NULL == new_args) { free(new_functions); LOG_ERROR("WORKER", "Out of memory"); goto error; } pool->size *= 2; pool->functions = new_functions; pool->args = new_args; } pool->functions[pool->len] = func; pool->args[pool->len] = arg; ++(pool->len); ret = 0; error: assert(!pthread_mutex_unlock(&pool->lock)); return ret; } void worker_pool_waitall(struct worker_pool *pool) { /* No lock, nothing, just test and yield */ while (pool->len) sched_yield(); } size_t worker_pool_get_working_count(struct worker_pool *pool) { return pool->thread_working; }