Skip to content
Extraits de code Groupes Projets
Vérifiée Valider 63911dc8 rédigé par Kubat's avatar Kubat
Parcourir les fichiers

Use own implementation of queues

parent 776e0335
Aucune branche associée trouvée
Aucune étiquette associée trouvée
1 requête de fusion!95Process queue
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
#define LEKTORD_QUEUE_NAME "/LEKTORD_QUEUE"
#include <pthread.h>
enum lkt_event_type {
lkt_event_null = 0, /* NULL */
......@@ -17,8 +13,16 @@ typedef struct {
void *attr;
} lkt_event;
mqd_t lkt_queue_open(void);
void lkt_queue_close(mqd_t queue);
struct queue {
pthread_mutex_t lock;
volatile lkt_event *contents;
volatile size_t size;
volatile size_t last;
};
int lkt_queue_new(struct queue *);
void lkt_queue_free(struct queue *);
void lkt_queue_send(mqd_t, enum lkt_event_type, void *attr);
lkt_event lkt_queue_handle(mqd_t queue);
void lkt_queue_send(struct queue *, enum lkt_event_type, void *attr);
lkt_event lkt_queue_handle(struct queue *);
......@@ -74,7 +74,7 @@ struct lkt_state {
size_t fds_max;
char host[HOST_NAME_MAX];
char port[INI_MAX_LINE_LEN];
mqd_t queue;
struct queue queue;
volatile sqlite3 *db;
const char *kara_prefix;
......
......@@ -31,7 +31,7 @@ command_restart(struct lkt_state *srv, size_t c)
}
close(srv->fds[0].fd);
database_queue_state(srv->db, &sta);
lkt_queue_close(srv->queue);
lkt_queue_free(&srv->queue);
env_set(LKT_ENV_RESTART, "1");
int len = long_length(sta.current);
if (len > 0) {
......@@ -90,7 +90,7 @@ command_kill(struct lkt_state *srv, size_t c)
RETURN_UNLESS(lkt_client_auth(srv, c, false), "Failed to authentificate user", false);
LOG_INFO_SCT("GENERAL", "%s", "Stopping lektord");
close(srv->fds[0].fd);
lkt_queue_close(srv->queue);
lkt_queue_free(&srv->queue);
database_close_all();
exit(EXIT_SUCCESS);
}
......
......@@ -82,9 +82,12 @@ normal_launch:
char *db_path = safe_malloc(PATH_MAX * sizeof(char));
char *kara_dir = safe_malloc(PATH_MAX * sizeof(char));
struct lkt_state srv = {
.queue = lkt_queue_open(),
.kara_prefix = kara_dir,
};
if (lkt_queue_new(&srv.queue)) {
LOG_ERROR_SCT("INIT", "%s", "Faield to create server queue");
exit(EXIT_FAILURE);
}
/* Initialize the system. */
if (!database_new(&srv.db)) {
......@@ -128,9 +131,9 @@ normal_launch:
/* Get ENV */
/* Not working -> race condition with player module */
char *env_current = env_get(LKT_ENV_CURRENT);
if (env_current && !STR_MATCH(env_current, "NULL")) {
if (env_current && env_current[0] && !STR_MATCH(env_current, "NULL")) {
LOG_INFO_SCT("INIT", "Restart playback from %s", env_current);
lkt_queue_send(srv.queue, lkt_event_play_pos, (void *) (size_t) strtol(env_current, NULL, 0));
lkt_queue_send(&srv.queue, lkt_event_play_pos, (void *) (size_t) strtol(env_current, NULL, 0));
}
lkt_listen(&srv);
......
......@@ -720,7 +720,7 @@ handle_queue_events(struct lkt_state *srv)
lkt_event evt;
char string[BUFFER_MAX]; /* TODO: A less dirty bomb */
redo:
evt = lkt_queue_handle(srv->queue);
evt = lkt_queue_handle(&srv->queue);
switch (evt.type) {
case lkt_event_play_pos:
......
......@@ -7,66 +7,67 @@
#include <stdio.h>
#include <common/common.h>
#include <common/queue.h>
#include <pthread.h>
mqd_t
lkt_queue_open(void)
int
lkt_queue_new(struct queue *ret)
{
struct mq_attr attr = {
.mq_flags = O_NONBLOCK,
.mq_maxmsg = 10,
.mq_msgsize = sizeof(void *),
.mq_curmsgs = 0,
};
mq_unlink(LEKTORD_QUEUE_NAME);
errno = 0;
mqd_t ret = mq_open(LEKTORD_QUEUE_NAME, O_RDWR | O_CREAT | O_NONBLOCK,
0666, &attr);
switch (errno) {
case 0:
return ret;
if (!ret)
return 1;
case EACCES:
LOG_ERROR_SCT("QUEUE", "Queue %s already exists", LEKTORD_QUEUE_NAME);
return ret;
struct queue _ret = {
.contents = malloc(LKT_DEFAULT_LIST_SIZE * sizeof(lkt_event)),
.size = LKT_DEFAULT_LIST_SIZE,
.last = 0,
};
case ENOMEM:
LOG_ERROR_SCT("QUEUE", "%s", "Out of memory");
exit(EXIT_FAILURE);
if (_ret.contents == NULL)
return 1;
default:
LOG_ERROR_SCT("QUEUE", "Unhandled error '%s' (%d)",
strerror(errno), errno);
exit(EXIT_FAILURE);
}
*ret = _ret;
return 0;
}
void
lkt_queue_close(mqd_t queue)
lkt_queue_free(struct queue *queue)
{
mq_unlink(LEKTORD_QUEUE_NAME);
mq_close(queue);
if (queue && queue->contents)
free((void *) queue->contents);
}
void
lkt_queue_send(mqd_t queue, enum lkt_event_type type, void *attr)
lkt_queue_send(struct queue *queue, enum lkt_event_type _type, void *_attr)
{
errno = 0;
if (mq_send(queue, attr, sizeof(void *), type))
LOG_WARN_SCT("QUEUE", "Failed to send msg of type %d: %s",
type, strerror(errno));
if (!queue)
return;
volatile lkt_event *new;
if (queue->size == queue->last) {
new = realloc((void *) queue->contents, queue->size * 2 * sizeof(lkt_event));
if (NULL == new)
return;
queue->contents = new;
queue->size *= 2;
}
lkt_event evt = {
.type = _type,
.attr = _attr,
};
queue->contents[(queue->last)++] = evt;
}
lkt_event
lkt_queue_handle(mqd_t queue)
lkt_queue_handle(struct queue *queue)
{
lkt_event event;
errno = 0;
ssize_t len = mq_receive(queue, event.attr, sizeof(void *), &event.type);
lkt_event ret = {0};
if (!queue || !queue->last)
return ret;
if (len <= 0 || errno == EAGAIN) {
memset(&event, 0, sizeof(lkt_event));
return event;
} else
return event;
ret = queue->contents[0];
memmove((void *) queue->contents, (void *) (queue->contents + 1), --(queue->last));
return ret;
}
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter