Sélectionner une révision Git
Bifurcation depuis
ARISE / matrix-appservice-discord
Le projet source a une visibilité limitée.
worker.c 4,07 Kio
#define _POSIX_C_SOURCE 200809L
#include "worker.h"
#include <lektor/common.h>
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <sys/sysinfo.h>
#include <sched.h>
#include <errno.h>
#include <string.h>
static void *
__worker_thread(void *__pool)
{
struct worker_pool *pool = (struct worker_pool *) __pool;
volatile void *volatile arg;
worker_function func;
for (;;) {
errno = 0;
if (pthread_mutex_lock(&pool->lock)) {
LOG_ERROR("WORKER", "Failed to lock mutex: %s", strerror(errno));
abort();
}
if (pool->len) {
--(pool->len);
++(pool->thread_working);
func = pool->functions[pool->len];
arg = pool->args[pool->len];
} else if (pool->exit) {
pthread_mutex_unlock(&pool->lock);
LOG_INFO("WORKER", "Exiting");
break;
} else {
func = NULL;
pthread_mutex_unlock(&pool->lock);
sched_yield();
continue;
}
pthread_mutex_unlock(&pool->lock);
func((void *) arg);
while (pthread_mutex_trylock(&pool->lock))
sched_yield();
--(pool->thread_working);
pthread_mutex_unlock(&pool->lock);
}
return NULL;
}
int
worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count)
{
if (!thread_count)
thread_count = get_nprocs_conf();
struct worker_pool __ret = {
.functions = malloc(sizeof(worker_function) * init_size),
.args = malloc(sizeof(void *) * init_size),
.threads = malloc(sizeof(pthread_t) * thread_count),
.size = init_size,
.thread_size = thread_count,
.len = 0u,
.exit = 0,
};
if (!__ret.functions || !__ret.args || !__ret.threads) {
LOG_ERROR("WORKER", "Out of memory");
__ret.thread_size = 0u;
worker_pool_free(&__ret);
return 1;
}
*ret = __ret;
if (pthread_mutex_init(&ret->lock, NULL)) {
LOG_ERROR("WORKER", "Failed to init mutex: %s", strerror(errno));
abort();
}
size_t i;
for (i = 0; i < ret->thread_size; ++i)
assert(!pthread_create(&ret->threads[i], NULL, __worker_thread, ret));
return 0;
}
void
worker_pool_free(struct worker_pool *pool)
{
LOG_DEBUG("WORKER", "Freeing worker pool %p", pool);
size_t i;
pool->exit = 1;
for (i = 0; i < pool->thread_size; ++i)
pthread_join(pool->threads[i], NULL);
assert(!pthread_mutex_lock(&pool->lock));
if (pool->threads)
free((void *) pool->threads);
pool->thread_size = 0u;
if (pool->functions)
free((void *) pool->functions);
if (pool->args)
free((void *) pool->args);
pool->size = 0u;
pool->len = 0u;
assert(!pthread_mutex_unlock(&pool->lock));
}
int
worker_pool_push(struct worker_pool *pool, worker_function func, void *arg)
{
int ret = 1;
assert(!pthread_mutex_lock(&pool->lock));
if (pool->len == pool->size) {
void *new_functions = realloc((void *) pool->functions,
2 * pool->size * sizeof(worker_function));
if (NULL == new_functions) {
LOG_ERROR("WORKER", "Out of memory");
goto error;
}
void *new_args = realloc((void *) pool->args,
2 * pool->size * sizeof(void *));
if (NULL == new_args) {
free(new_functions);
LOG_ERROR("WORKER", "Out of memory");
goto error;
}
pool->size *= 2;
pool->functions = new_functions;
pool->args = new_args;
}
pool->functions[pool->len] = func;
pool->args[pool->len] = arg;
++(pool->len);
ret = 0;
error:
assert(!pthread_mutex_unlock(&pool->lock));
return ret;
}
void
worker_pool_waitall(struct worker_pool *pool)
{
/* No lock, nothing, just test and yield */
while (pool->len)
sched_yield();
}
size_t
worker_pool_get_working_count(struct worker_pool *pool)
{
return pool->thread_working;
}