diff --git a/inc/common/queue.h b/inc/common/queue.h index 95899825ac38f84f20591b1ce036636ff49e4106..f1ea5eb194b5889521201cb5c2baf5005137d4ba 100644 --- a/inc/common/queue.h +++ b/inc/common/queue.h @@ -1,10 +1,6 @@ #pragma once -#include <fcntl.h> -#include <sys/stat.h> -#include <mqueue.h> - -#define LEKTORD_QUEUE_NAME "/LEKTORD_QUEUE" +#include <pthread.h> enum lkt_event_type { lkt_event_null = 0, /* NULL */ @@ -17,8 +13,16 @@ typedef struct { void *attr; } lkt_event; -mqd_t lkt_queue_open(void); -void lkt_queue_close(mqd_t queue); +struct queue { + pthread_mutex_t lock; + + volatile lkt_event *contents; + volatile size_t size; + volatile size_t last; +}; + +int lkt_queue_new(struct queue *); +void lkt_queue_free(struct queue *); -void lkt_queue_send(mqd_t, enum lkt_event_type, void *attr); -lkt_event lkt_queue_handle(mqd_t queue); +void lkt_queue_send(struct queue *, enum lkt_event_type, void *attr); +lkt_event lkt_queue_handle(struct queue *); diff --git a/inc/lektor/net.h b/inc/lektor/net.h index 4b83feda24a62c8b2ab57fa5c45804ee142c34ac..981753dc1f8d6aa4ce823c268fd76fe25946e257 100644 --- a/inc/lektor/net.h +++ b/inc/lektor/net.h @@ -74,7 +74,7 @@ struct lkt_state { size_t fds_max; char host[HOST_NAME_MAX]; char port[INI_MAX_LINE_LEN]; - mqd_t queue; + struct queue queue; volatile sqlite3 *db; const char *kara_prefix; diff --git a/src/commands.c b/src/commands.c index 9a1b7c4013ec33f77216a0949a33427ea820a82d..47155b92a623f776baa90e50e584d4ef494577c5 100644 --- a/src/commands.c +++ b/src/commands.c @@ -31,7 +31,7 @@ command_restart(struct lkt_state *srv, size_t c) } close(srv->fds[0].fd); database_queue_state(srv->db, &sta); - lkt_queue_close(srv->queue); + lkt_queue_free(&srv->queue); env_set(LKT_ENV_RESTART, "1"); int len = long_length(sta.current); if (len > 0) { @@ -90,7 +90,7 @@ command_kill(struct lkt_state *srv, size_t c) RETURN_UNLESS(lkt_client_auth(srv, c, false), "Failed to authentificate user", false); LOG_INFO_SCT("GENERAL", "%s", "Stopping lektord"); close(srv->fds[0].fd); - lkt_queue_close(srv->queue); + lkt_queue_free(&srv->queue); database_close_all(); exit(EXIT_SUCCESS); } diff --git a/src/main/server.c b/src/main/server.c index 71da9f5bada7c5610efb549915da1917137cdb03..29ff8e20e029f50753716296f4585e2fe13fa5c6 100644 --- a/src/main/server.c +++ b/src/main/server.c @@ -82,9 +82,12 @@ normal_launch: char *db_path = safe_malloc(PATH_MAX * sizeof(char)); char *kara_dir = safe_malloc(PATH_MAX * sizeof(char)); struct lkt_state srv = { - .queue = lkt_queue_open(), .kara_prefix = kara_dir, }; + if (lkt_queue_new(&srv.queue)) { + LOG_ERROR_SCT("INIT", "%s", "Faield to create server queue"); + exit(EXIT_FAILURE); + } /* Initialize the system. */ if (!database_new(&srv.db)) { @@ -128,9 +131,9 @@ normal_launch: /* Get ENV */ /* Not working -> race condition with player module */ char *env_current = env_get(LKT_ENV_CURRENT); - if (env_current && !STR_MATCH(env_current, "NULL")) { + if (env_current && env_current[0] && !STR_MATCH(env_current, "NULL")) { LOG_INFO_SCT("INIT", "Restart playback from %s", env_current); - lkt_queue_send(srv.queue, lkt_event_play_pos, (void *) (size_t) strtol(env_current, NULL, 0)); + lkt_queue_send(&srv.queue, lkt_event_play_pos, (void *) (size_t) strtol(env_current, NULL, 0)); } lkt_listen(&srv); diff --git a/src/net/listen.c b/src/net/listen.c index e87bcebf708bd943622669115a08a6e94c212263..6fe36067c32de8868976a456802720e6fa5ddb2e 100644 --- a/src/net/listen.c +++ b/src/net/listen.c @@ -720,7 +720,7 @@ handle_queue_events(struct lkt_state *srv) lkt_event evt; char string[BUFFER_MAX]; /* TODO: A less dirty bomb */ redo: - evt = lkt_queue_handle(srv->queue); + evt = lkt_queue_handle(&srv->queue); switch (evt.type) { case lkt_event_play_pos: diff --git a/src/queue.c b/src/queue.c index 50849813a7ef4ba31ae619b7b3fb7d84c08b1dde..dff8fb43c5eafde08d60d9ac0a4b55463c09bbea 100644 --- a/src/queue.c +++ b/src/queue.c @@ -7,66 +7,67 @@ #include <stdio.h> #include <common/common.h> #include <common/queue.h> +#include <pthread.h> -mqd_t -lkt_queue_open(void) +int +lkt_queue_new(struct queue *ret) { - struct mq_attr attr = { - .mq_flags = O_NONBLOCK, - .mq_maxmsg = 10, - .mq_msgsize = sizeof(void *), - .mq_curmsgs = 0, - }; - mq_unlink(LEKTORD_QUEUE_NAME); - errno = 0; - mqd_t ret = mq_open(LEKTORD_QUEUE_NAME, O_RDWR | O_CREAT | O_NONBLOCK, - 0666, &attr); - - switch (errno) { - case 0: - return ret; + if (!ret) + return 1; - case EACCES: - LOG_ERROR_SCT("QUEUE", "Queue %s already exists", LEKTORD_QUEUE_NAME); - return ret; + struct queue _ret = { + .contents = malloc(LKT_DEFAULT_LIST_SIZE * sizeof(lkt_event)), + .size = LKT_DEFAULT_LIST_SIZE, + .last = 0, + }; - case ENOMEM: - LOG_ERROR_SCT("QUEUE", "%s", "Out of memory"); - exit(EXIT_FAILURE); + if (_ret.contents == NULL) + return 1; - default: - LOG_ERROR_SCT("QUEUE", "Unhandled error '%s' (%d)", - strerror(errno), errno); - exit(EXIT_FAILURE); - } + *ret = _ret; + return 0; } void -lkt_queue_close(mqd_t queue) +lkt_queue_free(struct queue *queue) { - mq_unlink(LEKTORD_QUEUE_NAME); - mq_close(queue); + if (queue && queue->contents) + free((void *) queue->contents); } void -lkt_queue_send(mqd_t queue, enum lkt_event_type type, void *attr) +lkt_queue_send(struct queue *queue, enum lkt_event_type _type, void *_attr) { - errno = 0; - if (mq_send(queue, attr, sizeof(void *), type)) - LOG_WARN_SCT("QUEUE", "Failed to send msg of type %d: %s", - type, strerror(errno)); + if (!queue) + return; + + volatile lkt_event *new; + if (queue->size == queue->last) { + new = realloc((void *) queue->contents, queue->size * 2 * sizeof(lkt_event)); + + if (NULL == new) + return; + + queue->contents = new; + queue->size *= 2; + } + + lkt_event evt = { + .type = _type, + .attr = _attr, + }; + queue->contents[(queue->last)++] = evt; } lkt_event -lkt_queue_handle(mqd_t queue) +lkt_queue_handle(struct queue *queue) { - lkt_event event; - errno = 0; - ssize_t len = mq_receive(queue, event.attr, sizeof(void *), &event.type); + lkt_event ret = {0}; + + if (!queue || !queue->last) + return ret; - if (len <= 0 || errno == EAGAIN) { - memset(&event, 0, sizeof(lkt_event)); - return event; - } else - return event; + ret = queue->contents[0]; + memmove((void *) queue->contents, (void *) (queue->contents + 1), --(queue->last)); + return ret; }