Sélectionner une révision Git
downloader.c 7,30 Kio
#define _POSIX_C_SOURCE 200809L
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <lektor/macro.h>
#include <lektor/repo.h>
#include <lektor/thread.h>
#include <limits.h>
static struct lkt_thread repo_thread;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static volatile int init = 0;
static volatile int stop = 0;
static volatile int all_json = 0;
int
repo_join_thread(void)
{
int ret = 1;
RETURN_IF(pthread_mutex_lock(&mtx), "Failed to lock mutex", 3);
GOTO_UNLESS(init, "Repo thread not launched, can't join\n", error);
stop = 1;
GOTO_IF(pthread_join(repo_thread.th, NULL), "Failed to join repo thread", error);
fprintf(stderr, " . repo_join_thread: repo thread joined\n");
ret = 0;
error:
RETURN_IF(pthread_mutex_unlock(&mtx), "Failed to unlock mutex", 3);
return ret;
}
/* Find it in the repo/curl.c file. */
extern int
safe_json_get_string(struct json_object *jobj, const char *key, char *content, const size_t len);
extern int
safe_json_get_int32(struct json_object *json, const char *key, int32_t *ret);
static inline void
__handle_got_json(struct lkt_thread *self, struct lkt_repo *repo, struct json_object *json)
{
size_t i, len = json_object_array_length(json);
struct json_object *kara_json;
int32_t integer;
struct kara *kara;
int err;
RETURN_UNLESS(len > 0 && json_object_get_array(json), "Json invalid or array empty", NOTHING);
for (i = 0; i < len; ++i) {
kara_json = json_object_array_get_idx(json, i);
kara = calloc(1, sizeof(struct kara));
err = 0;
/* Get the id of the kara. */
if (safe_json_get_int32(kara_json, "id", &integer))
goto err;
/* Craft a fake filepath here, it will be used later. */
size_t kara_dir_len = strlen(repo->kara_dir);
memcpy(kara->filename, repo->kara_dir, sizeof(char) * (kara_dir_len + 1));
if (kara->filename[kara_dir_len - 1] != '/') {
strncat(kara->filename, "/", PATH_MAX - 1);
kara->filename[++kara_dir_len] = 0;
}
integer = snprintf(kara->filename + kara_dir_len, PATH_MAX - kara_dir_len, "%d", integer);
kara->filename[PATH_MAX - 1] = 0;
fprintf(stderr, " . __handle_got_json: Crafted filename is '%s'\n", kara->filename);
RETURN_UNLESS(kara, "Out of memory", NOTHING);
/* Get the fields from the json. */
err |= safe_json_get_string(kara_json, "song_name", kara->mdt.song_name, LEKTOR_TAG_MAX);
err |= safe_json_get_string(kara_json, "source_name", kara->mdt.source_name, LEKTOR_TAG_MAX);
err |= safe_json_get_string(kara_json, "category", kara->mdt.category, LEKTOR_TAG_MAX);
err |= safe_json_get_string(kara_json, "language", kara->mdt.language, LEKTOR_TAG_MAX);
err |= safe_json_get_string(kara_json, "author_name", kara->mdt.author_name, LEKTOR_TAG_MAX);
err |= safe_json_get_string(kara_json, "song_type", kara->mdt.song_type, LEKTOR_TAG_MAX);
if (err)
goto err;
/* Get the song number. */
if (safe_json_get_int32(kara_json, "song_number", &kara->mdt.song_number))
goto err;
/* Append. */
if (lkt_th_append_output(self, kara)) {
fprintf(stderr, " . __handle_got_json: Could not append downloaded kara mdt\n");
goto err;
}
continue;
err:
free(kara);
}
}
static void *
__repo_thread_function(struct lkt_thread_arg *arg)
{
size_t head;
struct lkt_repo *repo = arg->args;
struct lkt_thread *self = arg->self;
struct kara *kara;
struct json_object *json = NULL;
char path[PATH_MAX];
free(arg);
fprintf(stderr, " . __repo_thread_function: Starting the repo thread\n");
for (;;) {
if (pthread_mutex_lock(&mtx)) {
fprintf(stderr, " ! __repo_thread_function: Failed to lock mutex\n");
goto end_loop;
}
if (all_json) {
repo_get_alljson_sync(repo, &json);
__handle_got_json(self, repo, json);
json_object_put(json);
}
if (stop) {
if (pthread_mutex_unlock(&mtx))
fprintf(stderr, " ! __repo_thread_function: Failed to unlock mutex\n");
break;
}
head = 0;
/* size_t has the size of a pointer (thus of a void *). */
if (lkt_th_pop_input(self, (void **) &head)) {
fprintf(stderr, " * __repo_thread_function: Failed to get the head of the input list\n");
goto end_loop; /* Just skip all the loop to the yield function. */
}
/* Did we pop something? */
if (NULL == (void *) head)
goto end_loop;
snprintf(path, PATH_MAX - 1, "%s%lu.mkv", repo->kara_dir, head);
path[PATH_MAX - 1] = 0;
kara = calloc(1, sizeof(struct kara));
if (NULL == kara) {
fprintf(stderr, " ! __repo_thread_function: Out of memory\n");
goto end_loop;
}
if (repo_download_id_sync(repo, NULL, head, path, &kara->mdt)) {
fprintf(stderr, " ! __repo_thread_function: Failed to download kara with id %lu\n", head);
goto try_later;
}
/* Copy data to the structure that we will pass to the main thread. */
kara->action = kara_action_add;
kara->id = head;
memcpy(kara->filename, path, (strlen(path) + 1) * sizeof(char));
if (lkt_th_append_output(self, (void *) kara)) {
fprintf(stderr, " ! __repo_thread_function: Failed to append to output, orphan kara %lu\n", head);
free(kara);
goto end_loop;
}
fprintf(stderr, " . __repo_thread_function: Append kara %lu with path %s to out pool\n",
kara->id, kara->filename);
kara = NULL;
goto end_loop;
try_later:
if (kara)
free(kara);
/* Retry later. TODO: Implements a retry counter. */
if (lkt_th_append_input(self, (void *) head))
fprintf(stderr, " * __repo_thread_function: Failed to get the head of the input list\n");
end_loop:
sleep(1);
}
fprintf(stderr, " . __repo_thread_function: Stopping the repo thread\n");
return NULL;
}
int
repo_new_thread(struct lkt_repo *const repo)
{
RETURN_IF(init, "Already running", 1);
struct lkt_thread_arg *arg = calloc(1, sizeof(struct lkt_thread_arg));
RETURN_UNLESS(arg, "Out of memory", errno = ENOMEM);
arg->args = repo;
RETURN_IF(lkt_th_new(&repo_thread, LKT_DEFAULT_LIST_SIZE, __repo_thread_function, arg), "Thread error", 1);
init = 1;
return 0;
}
int
repo_download_id_async(const size_t id)
{
RETURN_IF(id == 0, "Invalid argument", 1);
RETURN_IF(lkt_th_append_input(&repo_thread, (void *) id), "Failed to push downloaded id", id);
fprintf(stderr, " * Asked to download kara with id %lu\n", id);
return 0;
}
int
repo_get_kara_async(struct kara **downloaded)
{
/* Is there a kara that has been downloaded? */
if (lkt_th_pop_output(&repo_thread, (void **) downloaded))
goto err;
if (!*downloaded)
goto err;
return 0;
err:
*downloaded = NULL;
return 1;
}
inline int
repo_get_allid_async(void)
{
RETURN_IF(pthread_mutex_lock(&mtx), "Failed to lock mutex", 3);
all_json = 1;
RETURN_IF(pthread_mutex_unlock(&mtx), "Failed to lock mutex", 3);
return 0;
}