Skip to content
Extraits de code Groupes Projets
Vérifiée Valider f005c7e8 rédigé par Kubat's avatar Kubat
Parcourir les fichiers

MODULE: Can now create the repo module

The error was that passing to threads the pointer to a variable that
will be copied then destroyed before the end of the thread is a really
bad idea
parent f0d7aefa
Aucune branche associée trouvée
Aucune étiquette associée trouvée
1 requête de fusion!105Refactor and more
...@@ -424,16 +424,6 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil ...@@ -424,16 +424,6 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil
.base_url = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)), .base_url = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
}; };
int workers_count;
if (!database_config_get_int(db, "repo", "workers_count", &workers_count))
workers_count = 5;
if (worker_pool_new(&repo.workers,
10 /* Initial number of elements in the call queue */,
workers_count /* Number of worker threads */)) {
LOG_ERROR("REPO", "Out of memory");
return false;
}
/* Copies */ /* Copies */
if (!database_config_get_text(db, "database", "kara_dir", repo.kara_dir, PATH_MAX) || if (!database_config_get_text(db, "database", "kara_dir", repo.kara_dir, PATH_MAX) ||
!database_config_get_text(db, "repo", "name", repo.name, LKT_LINE_MAX) || !database_config_get_text(db, "repo", "name", repo.name, LKT_LINE_MAX) ||
...@@ -443,6 +433,18 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil ...@@ -443,6 +433,18 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil
!database_config_get_text(db, "repo", "json", repo.get_all_json, LKT_LINE_MAX) ) !database_config_get_text(db, "repo", "json", repo.get_all_json, LKT_LINE_MAX) )
return false; return false;
memcpy(repo_, &repo, sizeof(struct module_repo_internal)); memcpy(repo_, &repo, sizeof(struct module_repo_internal));
/* Init the worker only now ! */
int workers_count;
if (!database_config_get_int(db, "repo", "workers_count", &workers_count))
workers_count = 5;
if (worker_pool_new(&repo_->workers,
10 /* Initial number of elements in the call queue */,
workers_count /* Number of worker threads */)) {
LOG_ERROR("REPO", "Out of memory");
return false;
}
return true; return true;
} }
......
...@@ -14,40 +14,34 @@ ...@@ -14,40 +14,34 @@
static void * static void *
__worker_thread(void *__pool) __worker_thread(void *__pool)
{ {
struct worker_pool *pool = (struct worker_pool *) __pool; volatile struct worker_pool *pool = (volatile struct worker_pool *) __pool;
volatile void *volatile arg; volatile void *volatile arg;
worker_function func; worker_function func;
for (;;) { for (;;) {
errno = 0; assert(!pthread_mutex_lock((pthread_mutex_t *) &pool->lock));
if (pthread_mutex_lock(&pool->lock)) {
LOG_ERROR("WORKER", "Failed to lock mutex: %s", strerror(errno));
abort();
}
if (pool->len) { if (pool->len) {
--(pool->len); --(pool->len);
++(pool->thread_working); ++(pool->thread_working);
func = pool->functions[pool->len]; func = pool->functions[pool->len];
arg = pool->args[pool->len]; arg = pool->args[pool->len];
} else if (pool->exit) { } else if (pool->exit) {
pthread_mutex_unlock(&pool->lock); assert(!pthread_mutex_unlock((pthread_mutex_t *) &pool->lock));
LOG_INFO("WORKER", "Exiting"); LOG_INFO("WORKER", "Exiting");
break; break;
} else { } else {
func = NULL; assert(!pthread_mutex_unlock((pthread_mutex_t *) &pool->lock));
pthread_mutex_unlock(&pool->lock);
sched_yield(); sched_yield();
continue; continue;
} }
pthread_mutex_unlock(&pool->lock); assert(!pthread_mutex_unlock((pthread_mutex_t *) &pool->lock));
func((void *) arg); func((void *) arg);
while (pthread_mutex_trylock(&pool->lock)) assert(!pthread_mutex_lock((pthread_mutex_t *) &pool->lock));
sched_yield();
--(pool->thread_working); --(pool->thread_working);
pthread_mutex_unlock(&pool->lock); assert(!pthread_mutex_unlock((pthread_mutex_t *) &pool->lock));
} }
return NULL; pthread_exit(NULL);
} }
int int
...@@ -71,10 +65,7 @@ worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count) ...@@ -71,10 +65,7 @@ worker_pool_new(struct worker_pool *ret, size_t init_size, size_t thread_count)
return 1; return 1;
} }
*ret = __ret; *ret = __ret;
if (pthread_mutex_init(&ret->lock, NULL)) { assert(!pthread_mutex_init(&ret->lock, NULL));
LOG_ERROR("WORKER", "Failed to init mutex: %s", strerror(errno));
abort();
}
size_t i; size_t i;
for (i = 0; i < ret->thread_size; ++i) for (i = 0; i < ret->thread_size; ++i)
assert(!pthread_create(&ret->threads[i], NULL, __worker_thread, ret)); assert(!pthread_create(&ret->threads[i], NULL, __worker_thread, ret));
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter