diff --git a/inc/lektor/repo.h b/inc/lektor/repo.h index 565ba01819af36a7873493bdaa3d2dc69d11d87a..ffd5bc8eac25b6bb3144d21b1043804de776b646 100644 --- a/inc/lektor/repo.h +++ b/inc/lektor/repo.h @@ -21,31 +21,21 @@ struct lkt_repo { const char *get_id_file; const uint64_t version; - /* Thread related */ - struct poller_thread self; - volatile int init; - volatile int stop; - volatile int all_json; - pthread_mutex_t mtx; + /* The database */ + volatile sqlite3 *db; }; -int repo_new(struct lkt_repo *const repo, const char *name, const char *url); +int repo_new(struct lkt_repo *const repo, const char *name, const char *url, volatile sqlite3 *db); void repo_free(struct lkt_repo *const repo); -/* Only one possible repo thread is authorized. */ -int repo_new_thread(struct lkt_repo *const repo); -int repo_join_thread(struct lkt_repo *const repo); - /* Get metadata of a kara. */ -int repo_get_id(struct lkt_repo *const repo, const uint64_t id, struct kara_metadata *mdt); +int repo_get_id (struct lkt_repo *const repo, const uint64_t id, struct kara_metadata *mdt); int repo_get_alljson_sync(struct lkt_repo *const repo, struct json_object **json); -int repo_get_allid_async(struct lkt_repo *const repo); +int repo_get_allid_async (struct lkt_repo *const repo); /* Download a kara. */ -int repo_download_id_sync(struct lkt_repo *const repo, sqlite3 *db, const uint64_t id, const char *kara_path, - struct kara_metadata *mdt_ret); +int repo_download_id_sync (struct lkt_repo *const repo, const uint64_t id, const char *kara_path, struct kara_metadata *mdt); int repo_download_id_async(struct lkt_repo *const repo, const size_t id); -int repo_get_kara_async(struct lkt_repo *const repo, struct kara **downloaded); /* Scan and update the DB. If only update from the kurisu repo, rescan must be null, if you want to diff --git a/src/main/lktadm.c b/src/main/lktadm.c index 57ee9a57b77b003ca6337ec753882557f6b71bab..78a7e25058815822ec6bf29a21611ccb6d7751c2 100644 --- a/src/main/lktadm.c +++ b/src/main/lktadm.c @@ -194,7 +194,7 @@ get__(struct lkt_cmd_args *args) int i = atoi(args->argv[0]); - if (repo_new(&repo, "kurisu", "https://kurisu.iiens.net")) + if (repo_new(&repo, "kurisu", "https://kurisu.iiens.net", NULL)) fail("Cound not create repo"); if (repo_get_id(&repo, i, &data)) @@ -259,10 +259,10 @@ download__(struct lkt_cmd_args *args) struct kara_metadata data; int i = atoi(args->argv[0]); - if (repo_new(&repo, "kurisu", "https://kurisu.iiens.net")) + if (repo_new(&repo, "kurisu", "https://kurisu.iiens.net", NULL)) fail("Could not create the repo"); - if (repo_download_id_sync(&repo, NULL, i, args->argv[1], &data)) + if (repo_download_id_sync(&repo, i, args->argv[1], &data)) fail("Cound not download json for kara %d", i); printf("Kara %d at %s\n" diff --git a/src/net/listen.c b/src/net/listen.c index 8b52ef84d6489cb105cac72a46a47579ba234d7b..f8b6d603e0acf210802d8faf4d2a4f96639eac85 100644 --- a/src/net/listen.c +++ b/src/net/listen.c @@ -687,53 +687,6 @@ lkt_client_auth(struct lkt_state *srv, size_t c, bool set) return srv->clients[c - 1].authentificated |= set; } -static inline void -handle_repo_hevents(struct lkt_state *srv) -{ - struct kara *kara; - - for (;;) { - if (repo_get_kara_async(&srv->repo, &kara)) - goto get_out; - - switch (kara->action) { - /* Add the downloaded kara to the database. */ - case kara_action_add: - if (!database_update_add((sqlite3 *) srv->db, kara->filename, &kara->mdt, kara->id, true)) { - LOG_ERROR("Failed to add downloaded kara with id %lu and path %s", kara->id, kara->filename); - goto get_out; - } - - LOG_INFO("Added kara %lu with path %s to database", kara->id, kara->filename); - - break; - - /* Add the mdt of the kara to the database. Mark it unavailable. */ - case kara_action_unavail: - if (!database_update_add((sqlite3 *) srv->db, kara->filename, &kara->mdt, kara->id, false)) { - LOG_ERROR("Failed to add kara with id %lu with flag unavailable", kara->id); - goto get_out; - } - - LOG_INFO("Added kara %lu to database and set it to unavailable", kara->id); - - break; - - case kara_action_none: - default: - break; - } - - free(kara); - kara = NULL; - } - - /* Just get out of the loop. */ -get_out: - if (kara) - free(kara); -} - int lkt_listen(void) { @@ -789,8 +742,7 @@ lkt_listen(void) RETURN_UNLESS(load_module_by_name((sqlite3 *) srv.db, player_mod, &srv.win), "Can't load module", 3); RETURN_UNLESS(srv.win.new(&srv.win), "Can't create window", 3); srv.win.attach(&srv.win, &srv); - RETURN_IF(repo_new(&srv.repo, "kurisu", "https://kurisu.iiens.net"), "Failed to create repo", 4); - RETURN_IF(repo_new_thread(&srv.repo), "Failed to launch repo thread", 4); + RETURN_IF(repo_new(&srv.repo, "kurisu", "https://kurisu.iiens.net", srv.db), "Failed to create repo", 4); for (;;) { if (handle_network_events(&srv) < 0) @@ -798,11 +750,8 @@ lkt_listen(void) if (handle_idle_events(&srv) < 0) break; srv.win.handle_events(&srv.win, (sqlite3 *) srv.db, (enum mpd_idle_flag *) &srv.mpd_idle_events); - handle_repo_hevents(&srv); } - repo_join_thread(&srv.repo); srv.win.free(&srv.win); - return -1; } diff --git a/src/repo/curl.c b/src/repo/curl.c index d9acdd14f5d7d04447c9ab833d4fbf75224d37ef..727070127ee09396e13cf8bd1abd834f14507aa1 100644 --- a/src/repo/curl.c +++ b/src/repo/curl.c @@ -1,5 +1,6 @@ #define _POSIX_C_SOURCE 200809L +#include <common/common.h> #include <lektor/repo.h> #include <lektor/macro.h> #include <lektor/database.h> @@ -52,7 +53,7 @@ write_disk__(char *data, size_t size, size_t nmem, void *user) } int -repo_new(struct lkt_repo *const repo_, const char *name_, const char *url_) +repo_new(struct lkt_repo *const repo_, const char *name_, const char *url_, volatile sqlite3 *db) { if (!curl_init) { curl_global_init(CURL_GLOBAL_ALL); @@ -60,17 +61,6 @@ repo_new(struct lkt_repo *const repo_, const char *name_, const char *url_) } else ++curl_init; - const size_t init_size = 30; - pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; - uint64_t *calloc1 = calloc(init_size, sizeof(uint64_t)); - RETURN_UNLESS(calloc1, "Out of memory", errno = ENOMEM); - uint64_t *calloc2 = calloc(init_size, sizeof(uint64_t)); - if (!calloc2) { - free(calloc1); - LOG_ERROR_SCT("MEMORY", "%s", "Out of memory"); - return ENOMEM; - } - struct lkt_repo repo = { /* Just the repo */ .name = name_, @@ -81,11 +71,8 @@ repo_new(struct lkt_repo *const repo_, const char *name_, const char *url_) .kara_dir = "/home/kara/", // TODO .version = 1, - /* Thread related */ - .mtx = mtx, - .init = 0, - .stop = 0, - .all_json = 0, + /* The db */ + .db = db, }; memcpy(repo_, &repo, sizeof(struct lkt_repo)); @@ -95,11 +82,8 @@ repo_new(struct lkt_repo *const repo_, const char *name_, const char *url_) void repo_free(struct lkt_repo *const repo) { + UNUSED(repo); --curl_init; - - free((void *) repo->base_url); - free((void *) repo->kara_dir); - if (!curl_init) curl_global_cleanup(); } @@ -246,8 +230,7 @@ err: } int -repo_download_id_sync(struct lkt_repo *const repo, sqlite3 *db, const uint64_t id, const char *kara_path, - struct kara_metadata *mdt_ret) +repo_download_id_sync(struct lkt_repo *const repo, const uint64_t id, const char *kara_path, struct kara_metadata *mdt_ret) { RETURN_UNLESS(kara_path, "Invalid argument", 1); struct kara_metadata mdt; @@ -277,8 +260,6 @@ repo_download_id_sync(struct lkt_repo *const repo, sqlite3 *db, const uint64_t i goto err_no_curl; } - /* Download the kara... (TODO) */ - struct file file = { .path = kara_path, .fd = fd, @@ -299,24 +280,18 @@ repo_download_id_sync(struct lkt_repo *const repo, sqlite3 *db, const uint64_t i goto err; } - if (CURLE_OK == ( ret = curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_TYPE, &ct))) + if (CURLE_OK == (ret = curl_easy_getinfo(curl_handle, CURLINFO_CONTENT_TYPE, &ct))) LOG_INFO_SCT("CURL", "Content-Type is '%s'", ct); else { LOG_ERROR_SCT("CURL", "Failed to get Content-Type: %s", curl_easy_strerror(ret)); goto err; } - if (!db) { - LOG_INFO("%s", "Skip database update here"); - goto no_db_update; - } - - if (! database_update_add(db, kara_path, mdt_ret ? mdt_ret : &mdt, id, true)) { + if (repo->db && !database_update_add((sqlite3 *) repo->db, kara_path, mdt_ret ? mdt_ret : &mdt, id, true)) { LOG_ERROR("%s", "Failed to add kara to database"); goto err; } -no_db_update: ret = 0; err: curl_easy_cleanup(curl_handle); diff --git a/src/repo/downloader.c b/src/repo/downloader.c index aed1c04220d9db12a8baae6dc04577e72bd825ba..0ae589a7b0da445c6f5763aa6435486ba69d505e 100644 --- a/src/repo/downloader.c +++ b/src/repo/downloader.c @@ -11,41 +11,63 @@ #include <lektor/repo.h> #include <lektor/thread.h> -int -repo_join_thread(struct lkt_repo *const repo) -{ - int ret = 1; - RETURN_IF(pthread_mutex_lock(&repo->mtx), "Failed to lock mutex", 3); - GOTO_UNLESS(repo->init, "Repo thread not launched, can't join", error); - repo->stop = 1; - GOTO_IF(pthread_join(repo->self.th, NULL), "Failed to join repo thread", error); - fprintf(stderr, " . repo_join_thread: repo thread joined"); - ret = 0; -error: - RETURN_IF(pthread_mutex_unlock(&repo->mtx), "Failed to unlock mutex", 3); - return ret; -} - -/* Find it in the repo/curl.c file. */ +/* Find it in the repo/curl.c file. FIXME */ extern int safe_json_get_string(struct json_object *jobj, const char *key, char *content, const size_t len); extern int safe_json_get_int32(struct json_object *json, const char *key, int32_t *ret); +/* Download a kara */ + +static inline void * +__repo_download_id_async(void *arg) +{ + struct lkt_repo *const repo = *(struct lkt_repo **) arg; + struct kara *kara = (struct kara *) (((struct lkt_repo **) arg) + 1); + + if (repo_download_id_sync(repo, kara->id, kara->filename, &kara->mdt)) + LOG_ERROR_SCT("REPO", "Failed to download kara '%ld' with path '%s'", kara->id, kara->filename); + + free(arg); + return NULL; +} + +int +repo_download_id_async(struct lkt_repo *const repo, const size_t id) +{ + RETURN_IF(id == 0, "Invalid argument", 1); + + /* Create the argument, I know it's ugly */ + void *arg = malloc(sizeof(struct kara) + sizeof(struct lkt_repo *)); + RETURN_UNLESS(arg, "Out of memory", 1); + *(struct lkt_repo **) arg = repo; + struct kara *kara = (struct kara *) (((struct lkt_repo **) arg) + 1); + + /* Init arg */ + snprintf(kara->filename, PATH_MAX - 1, "%s%lu.mkv", repo->kara_dir, id); + kara->filename[PATH_MAX - 1] = 0; + kara->id = id; + + /* Thread */ + return mthread_create(th, NULL, __repo_download_id_async, arg); +} + +/* Get all the kara, make them unavailable */ + static inline void -__handle_got_json(struct poller_thread *self, struct lkt_repo *repo, struct json_object *json) +__handle_got_json(volatile sqlite3 *db, struct lkt_repo *repo, struct json_object *json) { size_t i, len = json_object_array_length(json); struct json_object *kara_json; int32_t integer; struct kara *kara; int err; - RETURN_UNLESS(len > 0 && json_object_get_array(json), "Json invalid or array empty", NOTHING); for (i = 0; i < len; ++i) { kara_json = json_object_array_get_idx(json, i); kara = calloc(1, sizeof(struct kara)); + RETURN_UNLESS(kara, "Out of memory", NOTHING); err = 0; /* Get the id of the kara. */ @@ -63,165 +85,39 @@ __handle_got_json(struct poller_thread *self, struct lkt_repo *repo, struct json kara->filename[PATH_MAX - 1] = 0; LOG_INFO("Crafted filename is '%s'", kara->filename); - RETURN_UNLESS(kara, "Out of memory", NOTHING); - - /* Get the fields from the json. */ - err |= safe_json_get_string(kara_json, "song_name", kara->mdt.song_name, LEKTOR_TAG_MAX); + err |= safe_json_get_string(kara_json, "song_name", kara->mdt.song_name, LEKTOR_TAG_MAX); err |= safe_json_get_string(kara_json, "source_name", kara->mdt.source_name, LEKTOR_TAG_MAX); - err |= safe_json_get_string(kara_json, "category", kara->mdt.category, LEKTOR_TAG_MAX); - err |= safe_json_get_string(kara_json, "language", kara->mdt.language, LEKTOR_TAG_MAX); + err |= safe_json_get_string(kara_json, "category", kara->mdt.category, LEKTOR_TAG_MAX); + err |= safe_json_get_string(kara_json, "language", kara->mdt.language, LEKTOR_TAG_MAX); err |= safe_json_get_string(kara_json, "author_name", kara->mdt.author_name, LEKTOR_TAG_MAX); - err |= safe_json_get_string(kara_json, "song_type", kara->mdt.song_type, LEKTOR_TAG_MAX); - - if (err) - goto err; + err |= safe_json_get_string(kara_json, "song_type", kara->mdt.song_type, LEKTOR_TAG_MAX); + GOTO_IF(err, "Invalid json", err); - /* Get the song number. */ if (safe_json_get_int32(kara_json, "song_number", &kara->mdt.song_number)) goto err; - /* Append. */ - if (poller_append_output(self, kara)) { - LOG_ERROR("%s", "Could not append downloaded kara mdt"); - goto err; - } + if (database_update_add(db, kara->filename, &kara->mdt, kara->id, false)) + LOG_ERROR_SCT("REPO", "Could not add unavailable kara %ld to db", kara->id); + goto err; + } - continue; err: - free(kara); - } + free(kara); } -static void * -__repo_thread_function(struct poller_thread_arg *arg) +static inline void * +__repo_get_all_id_async(void *arg) { - size_t head; - struct lkt_repo *repo = arg->args; - struct poller_thread *self = arg->self; - struct kara *kara; - struct json_object *json = NULL; - char path[PATH_MAX]; - free(arg); - - LOG_INFO("%s", "Starting the repo thread"); - - for (;;) { - GOTO_IF(pthread_mutex_lock(&repo->mtx), "Failed to lock mutex", end_loop); - - if (repo->all_json) { - repo_get_alljson_sync(repo, &json); - __handle_got_json(self, repo, json); - json_object_put(json); - } - - if (repo->stop) { - if (pthread_mutex_unlock(&repo->mtx)) - LOG_ERROR("Failed to unlock mutex: %s", strerror(errno)); - break; - } - - head = 0; - - /* size_t has the size of a pointer (thus of a void *). */ - if (poller_pop_input(self, (void **) &head)) { - LOG_ERROR("%s", "Failed to get the head of the input list"); - goto end_loop; /* Just skip all the loop to the yield function. */ - } - - /* Did we pop something? */ - if (NULL == (void *) head) - goto end_loop; - - snprintf(path, PATH_MAX - 1, "%s%lu.mkv", repo->kara_dir, head); - path[PATH_MAX - 1] = 0; - kara = calloc(1, sizeof(struct kara)); - - if (NULL == kara) { - LOG_ERROR_SCT("MEMORY", "%s", "Out of memory"); - goto end_loop; - } - - if (repo_download_id_sync(repo, NULL, head, path, &kara->mdt)) { - LOG_ERROR("Failed to download kara with id %lu", head); - goto try_later; - } - - /* Copy data to the structure that we will pass to the main thread. */ - kara->action = kara_action_add; - kara->id = head; - memcpy(kara->filename, path, (strlen(path) + 1) * sizeof(char)); - - if (poller_append_output(self, (void *) kara)) { - LOG_ERROR("Failed to append to output, orphan kara %lu", head); - free(kara); - goto end_loop; - } - - LOG_INFO("Append kara %lu with path %s to out pool", kara->id, kara->filename); - - kara = NULL; - goto end_loop; - -try_later: - if (kara) - free(kara); - - /* Retry later. TODO: Implements a retry counter. */ - if (poller_append_input(self, (void *) head)) - LOG_ERROR("%s", "Failed to get the head of the input list"); - -end_loop: - sched_yield(); - sleep(1); - } - - LOG_INFO("%s", "Stopping the repo thread"); + struct json_object *json; + struct lkt_repo *const repo = arg; + repo_get_alljson_sync(repo, &json); + __handle_got_json(repo->db, repo, json); + json_object_put(json); return NULL; } -int -repo_new_thread(struct lkt_repo *const repo) -{ - RETURN_UNLESS(repo, "Invalid argument", 1); - RETURN_IF(repo->init, "Already running", 1); - struct poller_thread_arg *arg = calloc(1, sizeof(struct poller_thread_arg)); - RETURN_UNLESS(arg, "Out of memory", errno = ENOMEM); - arg->args = repo; - RETURN_IF(poller_new(&repo->self, LKT_DEFAULT_LIST_SIZE, __repo_thread_function, arg), "Thread error", 1); - repo->init = 1; - return 0; -} - -int -repo_download_id_async(struct lkt_repo *const repo, const size_t id) -{ - RETURN_IF(id == 0, "Invalid argument", 1); - RETURN_IF(poller_append_input(&repo->self, (void *) id), "Failed to push downloaded id", id); - LOG_INFO("Asked to download kara with id %lu", id); - return 0; -} - -int -repo_get_kara_async(struct lkt_repo *const repo, struct kara **downloaded) -{ - /* Is there a kara that has been downloaded? */ - if (poller_pop_output(&repo->self, (void **) downloaded)) - goto err; - - if (!*downloaded) - goto err; - - return 0; -err: - *downloaded = NULL; - return 1; -} - inline int repo_get_allid_async(struct lkt_repo *const repo) { - RETURN_IF(pthread_mutex_lock(&repo->mtx), "Failed to lock mutex", 3); - repo->all_json = 1; - RETURN_IF(pthread_mutex_unlock(&repo->mtx), "Failed to lock mutex", 3); - return 0; + return mthread_create(th, NULL, __repo_get_all_id_async, repo); }