diff --git a/src/module/module_repo.c b/src/module/module_repo.c index 34711a23777c67d8989edd8720f6b3379489195f..b27dab289952181a1310f009efb0ca84b01215be 100644 --- a/src/module/module_repo.c +++ b/src/module/module_repo.c @@ -29,7 +29,7 @@ * Globals * ***********/ -static volatile unsigned int __curl_init = false; +static volatile unsigned int ___curl_init = false; /********************* * Private structure * @@ -51,6 +51,11 @@ struct kara { char filename[PATH_MAX]; }; +struct json_parse_arg { + void *worker; + void *real_arg; +}; + struct module_repo_internal { /* Just the repo */ char *name; @@ -79,18 +84,18 @@ struct module_repo_internal { struct worker_pool workers; }; -struct __uri { +struct ___uri { char *fav; struct module_repo_internal *repo; struct lkt_uri uri; }; -struct __memory { +struct ___memory { void *mem; size_t size; }; -struct __file { +struct ___file { const char *path; uint8_t magic[4]; size_t index; @@ -103,7 +108,7 @@ struct __file { /* Sleep a bit, to not overuse ressources */ static inline void -__sleep(void) +___sleep(void) { struct timespec time_sleep = { .tv_sec = 0, @@ -114,7 +119,7 @@ __sleep(void) /* Recursive mkdir, where the last word of the string is a file, not a folder. */ static inline void -__mkdir(const char *dir, unsigned int umask) +___mkdir(const char *dir, unsigned int umask) { /* TODO pour le Kubat du futur: include le umask dans la conf. */ @@ -141,20 +146,20 @@ __mkdir(const char *dir, unsigned int umask) } static inline void -__craft_filename_obfuscate(char str[PATH_MAX], struct kara *kara) +___craft_filename_obfuscate(char str[PATH_MAX], struct kara *kara) { /* Obfuscate filename */ safe_snprintf(str, PATH_MAX, "%s%ld.mkv", kara->database_filepath, kara->id); } static inline void -__craft_filename_non_obfuscate(char str[PATH_MAX], struct kara *kara) +___craft_filename_non_obfuscate(char str[PATH_MAX], struct kara *kara) { /* Not obfuscate filename, need to create directories, won't fail if not * possible. The program will fail later, when write will be attempted. */ size_t len = safe_snprintf(str, PATH_MAX, "%s%s/%s/%s/", kara->database_filepath, kara->mdt.category, kara->mdt.language, kara->mdt.author_name); - __mkdir(str, 0); + ___mkdir(str, 0); if (access(str, R_OK | W_OK)) LOG_ERROR("REPO", "No access in read / write for folder %s", str); safe_snprintf(str + len, PATH_MAX - len, "%s - %s%d - %s.mkv", kara->mdt.source_name, kara->mdt.song_type, @@ -162,7 +167,7 @@ __craft_filename_non_obfuscate(char str[PATH_MAX], struct kara *kara) } static inline void -__clean_file(struct __file *f) +___clean_file(struct ___file *f) { if (f->fd) { close(f->fd); @@ -171,7 +176,7 @@ __clean_file(struct __file *f) } static inline void -__clean_memory(struct __memory *m) +___clean_memory(struct ___memory *m) { if (m->mem) { free(m->mem); @@ -181,10 +186,10 @@ __clean_memory(struct __memory *m) } static size_t -__write_mem(char *data, size_t size, size_t nmem, void *user) +___write_mem(char *data, size_t size, size_t nmem, void *user) { - size_t realsize = size * nmem; - struct __memory *mem = (struct __memory *)user; + size_t realsize = size * nmem; + struct ___memory *mem = (struct ___memory *)user; void *ptr = realloc(mem->mem, mem->size + realsize); RETURN_UNLESS(ptr, "Out of memory", 0); @@ -196,10 +201,10 @@ __write_mem(char *data, size_t size, size_t nmem, void *user) } static size_t -__write_disk(char *data, size_t size, size_t nmem, void *user) +___write_disk(char *data, size_t size, size_t nmem, void *user) { - ssize_t realsize = size * nmem; - struct __file *file = (struct __file *)user; + ssize_t realsize = size * nmem; + struct ___file *file = (struct ___file *)user; RETURN_IF(write(file->fd, data, realsize) != realsize, "Failed to write", 0); if (file->index < 4) { memcpy(file->magic + file->index, data, 4 - file->index); @@ -213,14 +218,14 @@ __write_disk(char *data, size_t size, size_t nmem, void *user) } static int -__json_dl(const char *url, char **json) +___json_dl(const char *url, char **json) { RETURN_UNLESS(json, "Invalid argument", 1); CURL *curl_handle; CURLcode res; struct curl_slist *headers = NULL; int ret = 1; - struct __memory file = { .mem = NULL, .size = 0. }; + struct ___memory file = { .mem = NULL, .size = 0. }; /* Only accept json file */ headers = curl_slist_append(headers, "Accept: application/json"); @@ -229,7 +234,7 @@ __json_dl(const char *url, char **json) curl_handle = curl_easy_init(); curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl_handle, CURLOPT_URL, url); - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, __write_mem); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, ___write_mem); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&file); curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); @@ -253,14 +258,14 @@ __json_dl(const char *url, char **json) ret = 0; err: if (ret != 0) - __clean_memory(&file); + ___clean_memory(&file); curl_easy_cleanup(curl_handle); curl_slist_free_all(headers); return ret; } static inline int -__download_kara(const char *url, const char *path, int override) +___download_kara(const char *url, const char *path, int override) { struct curl_slist *headers = NULL; CURL *curl_handle; @@ -294,7 +299,7 @@ retest: /* TODO: Buffered writes * The buffer needs to be 1Mio to do something (each time, packets are 10Kio...) */ - struct __file file = { + struct ___file file = { .path = path, .index = 0, .fd = fd, @@ -308,7 +313,7 @@ retest: curl_handle = curl_easy_init(); curl_easy_setopt(curl_handle, CURLOPT_HTTPHEADER, headers); curl_easy_setopt(curl_handle, CURLOPT_URL, url); - curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, __write_disk); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, ___write_disk); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *)&file); curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); @@ -323,14 +328,14 @@ retest: ret = 0; err: - __clean_file(&file); + ___clean_file(&file); curl_easy_cleanup(curl_handle); curl_slist_free_all(headers); return ret; } static void -__handle_got_json_dl(struct kara *kara, int current_id) +___handle_got_json_dl(struct kara *kara, int current_id) { /* Download the kara */ database_queue_current_kara(kara->db, NULL, ¤t_id); @@ -347,7 +352,7 @@ __handle_got_json_dl(struct kara *kara, int current_id) safe_snprintf(kara->url, LKT_LINE_MAX, kara->repo->get_id_file, kara->id); LOG_INFO("REPO", "Start downloading kara %ld to: %s", kara->id, kara->filename); - if (__download_kara(kara->url, kara->filename, true)) { + if (___download_kara(kara->url, kara->filename, true)) { LOG_WARN("REPO", "Could not download kara %ld at path '%s'", kara->id, kara->filename); return; } @@ -367,39 +372,47 @@ __handle_got_json_dl(struct kara *kara, int current_id) LOG_INFO("REPO", "Added kara %ld from repo %s, filepath is %s", kara->id, kara->repo->name, kara->filename); } -static void -__handle_got_json_internal_callback(const char *key, const char *val, int comp, void *user) +static int +___handle_got_json_internal_callback(const char *key, const char *val, int comp, void *user) { - struct kara *kara = (struct kara *)user; + struct json_parse_arg *arg = (struct json_parse_arg *)user; + struct kara *kara = (struct kara *)arg->real_arg; + + /* Check if interrupt was asked */ + enum WORKER_STATUS sta = worker_pool_get_status(arg->worker); + if (sta & WORKER_STATUS_INTERRUPT) { + LOG_WARN("REPO", "Interrupt was asked, stop processing json"); + return 1; + } /* Get the fields */ if (!comp && key && val) { -#define __get_field_string(field) \ +#define ___get_field_string(field) \ { \ if (STR_MATCH(#field, key)) { \ safe_strncpy(kara->mdt.field, val, LEKTOR_TAG_MAX); \ } \ } -#define __get_field_long_ex(field, json) \ +#define ___get_field_long_ex(field, json) \ { \ if (STR_MATCH(#json, key)) { \ kara->field = strtol(val, NULL, 0); \ } \ } -#define __get_field_long_mdt(field) __get_field_long_ex(mdt.field, field) -#define __get_field_long_kara(field) __get_field_long_ex(field, field) +#define ___get_field_long_mdt(field) ___get_field_long_ex(mdt.field, field) +#define ___get_field_long_kara(field) ___get_field_long_ex(field, field) - __get_field_long_kara(id); - __get_field_long_kara(unix_timestamp); + ___get_field_long_kara(id); + ___get_field_long_kara(unix_timestamp); - __get_field_long_mdt(song_number); + ___get_field_long_mdt(song_number); - __get_field_string(song_name); - __get_field_string(source_name); - __get_field_string(category); - __get_field_string(language); - __get_field_string(author_name); - __get_field_string(song_type); + ___get_field_string(song_name); + ___get_field_string(source_name); + ___get_field_string(category); + ___get_field_string(language); + ___get_field_string(author_name); + ___get_field_string(song_type); #undef __get_field_long_mdt #undef __get_field_long_kara @@ -409,7 +422,7 @@ __handle_got_json_internal_callback(const char *key, const char *val, int comp, /* The `void *user` is complete */ else if (comp) { - __sleep(); + ___sleep(); long filestamp = 0; int current_id = 0; @@ -434,20 +447,22 @@ __handle_got_json_internal_callback(const char *key, const char *val, int comp, database_update_set_available(kara->db, kara->id); lkt_queue_send(kara->repo->queue, lkt_event_db_update_tick, NULL); LOG_DEBUG("REPO", "Ignore kara %ld", kara->id); - return; + return 0; } do_it: - __handle_got_json_dl(kara, current_id); + ___handle_got_json_dl(kara, current_id); lkt_queue_send(kara->repo->queue, lkt_event_db_update_tick, NULL); } else LOG_ERROR("REPO", "Invalid call to this function, 'comp', 'key' and 'val' are null..."); + + return 0; } -static inline void -__handle_got_json(volatile sqlite3 *db, struct module_repo_internal *repo, const char *json) +static inline int +___handle_got_json(volatile sqlite3 *db, struct module_repo_internal *repo, const char *json) { size_t len = json_parse_get_count(json, 2); struct kara kara = { @@ -457,7 +472,7 @@ __handle_got_json(volatile sqlite3 *db, struct module_repo_internal *repo, const .update_count = 0, }; - RETURN_UNLESS(len > 0, "Json invalid or array empty", NOTHING); + RETURN_UNLESS(len > 0, "Json invalid or array empty", 1); /* Craft a folder path here, it will be used later */ kara.kara_dir_len = strlen(repo->kara_dir); @@ -474,13 +489,17 @@ __handle_got_json(volatile sqlite3 *db, struct module_repo_internal *repo, const /* Handle the json */ LOG_INFO("REPO", "Starting to process json for repo %s, total of %ld karas", repo->name, len); lkt_queue_send(repo->queue, lkt_event_db_update_total, (void *)(size_t)len); - json_parse(json, 2, __handle_got_json_internal_callback, (void *)&kara); - LOG_INFO("REPO", "Updated %ld karas and ignored %ld karas, total is %ld", kara.update_count, kara.ignored_count, - len); + struct json_parse_arg arg = { + .worker = &repo->workers, + .real_arg = (void *)&kara, + }; + int ret = json_parse(json, 2, ___handle_got_json_internal_callback, &arg); + LOG_INFO("REPO", "Updated %ld, ignored %ld, total is %ld", kara.update_count, kara.ignored_count, len); + return ret; } static inline void -__handle_deleted_kara(volatile sqlite3 *db) +___handle_deleted_kara(volatile sqlite3 *db) { size_t len, i; int *kara_ids; @@ -495,9 +514,9 @@ __handle_deleted_kara(volatile sqlite3 *db) } static void * -__worker_update(void *__repo) +___worker_update(void UNUSED *worker, void *___repo) { - struct module_repo_internal *repo = __repo; + struct module_repo_internal *repo = ___repo; lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_PROGRESS); GOTO_IF(pthread_mutex_lock(&(repo->mtx)), "Failed to lock", end_no_lock); @@ -506,14 +525,23 @@ __worker_update(void *__repo) char *json; LOG_INFO("REPO", "Download kara list from %s (%s)", repo->name, repo->get_all_json); - if (__json_dl(repo->get_all_json, &json)) { + if (___json_dl(repo->get_all_json, &json)) { LOG_ERROR("REPO", "Failed to get json, possibly no internet connexion or repo is down"); pthread_exit(NULL); } - __handle_got_json(repo->db, repo, json); + + int ret = ___handle_got_json(repo->db, repo, json); + LOG_INFO("REPO", "Finished to download and insert kara list"); free(json); - __handle_deleted_kara(repo->db); + + if (ret != 2) { + LOG_INFO("REPO", "No interrupt requested, don't skip delete kara operation"); + ___handle_deleted_kara(repo->db); + } else { + LOG_WARN("REPO", "Handle json was interrupt, skip delete kara operation"); + } + LOG_INFO("REPO", "Finished to deal with deleted kara"); database_updated(repo->db); @@ -527,9 +555,9 @@ end_no_lock: } static void * -__worker_rescan(void *__repo) +___worker_rescan(void UNUSED *worker, void *___repo) { - struct module_repo_internal *repo = __repo; + struct module_repo_internal *repo = ___repo; char kara_prefix[LKT_LINE_MAX]; lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_PROGRESS); @@ -542,6 +570,7 @@ __worker_rescan(void *__repo) repo->updating &= REPO_UPDATE_KARA; GOTO_IF(pthread_mutex_unlock(&(repo->mtx)), "Failed to unlock", end_no_lock); +#warning "Pass the check stop condition to the database_update from the REPO module" database_update(repo->db, kara_prefix, 0); /* Don't check timestamp. TODO: Sometimes we want to check them */ @@ -554,25 +583,36 @@ end_no_lock: pthread_exit(NULL); } -static void -__handle_fav_list_internal(const char UNUSED *key, const char *val, int UNUSED comp, void *user) +static int +___handle_fav_list_internal(const char UNUSED *key, const char *val, int UNUSED comp, void *user) { if (val == NULL) - return; + return 0; + + struct json_parse_arg *arg = (struct json_parse_arg *) user; + struct ___uri *uri = (struct ___uri *)arg->real_arg; + uri->uri.id = strtol(val, NULL, 0); + + /* Check if interrupt asked */ + enum WORKER_STATUS sta = worker_pool_get_status(arg->worker); + if (sta & WORKER_STATUS_INTERRUPT) { + LOG_WARN("REPO", "Stop processing json because interrupt was asked"); + return 1; + } - struct __uri *uri = (struct __uri *)user; - uri->uri.id = strtol(val, NULL, 0); lkt_queue_send(uri->repo->queue, lkt_event_db_update_tick, NULL); if (!database_plt_add_uri(uri->repo->db, uri->fav, &uri->uri)) { LOG_ERROR("REPO", "Failed to add kara %ld to playlist %s", uri->uri.id, uri->fav); - return; + return 1; } + /* Dirty fix for db lock and hard drive usage */ - __sleep(); + ___sleep(); + return 0; } static inline void -__handle_fav_list(struct module_repo_internal *repo, char *fav, size_t fav_size) +___handle_fav_list(struct module_repo_internal *repo, char *fav, size_t fav_size) { char *json; char fav_url[LKT_LINE_MAX]; @@ -583,7 +623,7 @@ __handle_fav_list(struct module_repo_internal *repo, char *fav, size_t fav_size) size_t fav_url_len = strlen(fav_url); safe_strncpy(fav_url + fav_url_len, fav, LKT_LINE_MAX - 1 - fav_url_len); - RETURN_IF(__json_dl(fav_url, &json), "Failed to download fav list", NOTHING); + RETURN_IF(___json_dl(fav_url, &json), "Failed to download fav list", NOTHING); /* Prepend by `@`, to diferentiate with playlists */ size_t fav_len = strlen(fav); @@ -599,7 +639,7 @@ __handle_fav_list(struct module_repo_internal *repo, char *fav, size_t fav_size) } database_plt_touch(repo->db, fav); - struct __uri uri = { + struct ___uri uri = { .fav = fav, .repo = repo, .uri = { @@ -610,30 +650,46 @@ __handle_fav_list(struct module_repo_internal *repo, char *fav, size_t fav_size) size_t len = json_parse_get_count(json, 2); lkt_queue_send(repo->queue, lkt_event_db_update_total, (void *)(size_t)len); - json_parse(json, 2, __handle_fav_list_internal, (void *)&uri); + + struct json_parse_arg arg = { + .worker = &repo->workers, + .real_arg = (void *)&uri, + }; + json_parse(json, 2, ___handle_fav_list_internal, (void *)&arg); + LOG_INFO("REPO", "Finished importing fav list '%s' as '%s'", fav_origin, fav); free(json); } -static void -__worker_import_favorites_internal(const char UNUSED *k, const char *val, int UNUSED c, void *user) +static int +___worker_import_favorites_internal(const char UNUSED *k, const char *val, int UNUSED c, void *user) { if (val == NULL) - return; + return 0; + + struct json_parse_arg *arg = (struct json_parse_arg *)user; + struct module_repo_internal *repo = (struct module_repo_internal *)arg->real_arg; + + /* Check if interrupt was asked */ + enum WORKER_STATUS sta = worker_pool_get_status(arg->worker); + if (sta & WORKER_STATUS_INTERRUPT) { + LOG_WARN("REPO", "Stop processing json because interrupt was asked"); + return 1; + } - struct module_repo_internal *repo = (struct module_repo_internal *)user; char fav[LKT_LINE_MAX]; safe_strncpy(fav, val, LKT_LINE_MAX); /* TODO: Add a way to use the workers to do this for each fav list */ LOG_INFO("REPO", "Processing favorite list: %s", fav); - __handle_fav_list(repo, fav, LKT_LINE_MAX); + ___handle_fav_list(repo, fav, LKT_LINE_MAX); lkt_queue_send(repo->queue, lkt_event_db_update_tick, NULL); + return 0; } static void * -__worker_import_favorites(void *__repo) +___worker_import_favorites(void *worker, void *___repo) { - struct module_repo_internal *repo = __repo; + struct module_repo_internal *repo = ___repo; GOTO_IF(pthread_mutex_lock(&(repo->mtx)), "Failed to lock", end_no_lock); repo->updating &= REPO_UPDATE_FAV; @@ -641,15 +697,22 @@ __worker_import_favorites(void *__repo) char *json; LOG_INFO("REPO", "Download fav lists: %s", repo->get_fav_json); - if (__json_dl(repo->get_fav_json, &json)) { + if (___json_dl(repo->get_fav_json, &json)) { LOG_ERROR("REPO", "Failed to get json, possibly no internet connexion or repo is down"); pthread_exit(NULL); } + size_t len = json_parse_get_count(json, 2); LOG_INFO("REPO", "Finished to dl favorite lists, got %ld lists", len); lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_PROGRESS); lkt_queue_send(repo->queue, lkt_event_db_update_total, (void *)(size_t)len); - json_parse(json, 2, __worker_import_favorites_internal, (void *)repo); + + struct json_parse_arg arg = { + .worker = worker, + .real_arg = (void *)repo, + }; + json_parse(json, 2, ___worker_import_favorites_internal, (void *)&arg); + free(json); LOG_INFO("REPO", "Finished to deal with %ld favorite lists", len); @@ -678,11 +741,11 @@ static inline bool module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatile sqlite3 *db) { RETURN_UNLESS(repo_ && queue && db, "Invalid argument", 1); - if (!__curl_init) { + if (!___curl_init) { curl_global_init(CURL_GLOBAL_ALL); - __curl_init = 1; + ___curl_init = 1; } else - ++__curl_init; + ++___curl_init; int obfuscate; if (!database_config_get(db, "repo", "obfuscate", &obfuscate)) { @@ -708,7 +771,7 @@ module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatil .get_fav_json = safe_malloc(LKT_LINE_MAX * sizeof(char)), .get_all_json = safe_malloc(LKT_LINE_MAX * sizeof(char)), .base_url = safe_malloc(LKT_LINE_MAX * sizeof(char)), - .craft_filename = obfuscate ? __craft_filename_obfuscate : __craft_filename_non_obfuscate, + .craft_filename = obfuscate ? ___craft_filename_obfuscate : ___craft_filename_non_obfuscate, }; /* Copies */ @@ -766,15 +829,10 @@ mod_new(va_list *va) } static int -mod_close(va_list *va) +mod_close(va_list UNUSED *va) { - va_list copy; - struct module_repo_internal **repo; - va_copy(copy, *va); - repo = (struct module_repo_internal **)va_arg(copy, void **); - module_repo_close(*repo); - va_end(copy); - return 0; + LOG_ERROR("ERROR", "Operation 'close' not supported on module"); + return 1; } static int @@ -785,10 +843,11 @@ mod_free(va_list *va) va_copy(copy, *va); repo = (struct module_repo_internal **)va_arg(copy, void **); + worker_pool_interrupt(&(*repo)->workers); module_repo_close(*repo); worker_pool_free(&(*repo)->workers); - --__curl_init; - if (!__curl_init) + --___curl_init; + if (!___curl_init) curl_global_cleanup(); free((*repo)->kara_dir); free((*repo)->get_id_json); @@ -817,7 +876,7 @@ mod_update(va_list *va) goto end; } (*repo)->updating &= REPO_UPDATE_KARA; - if (worker_pool_push(&(*repo)->workers, __worker_update, (void *)*repo)) { + if (worker_pool_push(&(*repo)->workers, ___worker_update, (void *)*repo)) { LOG_ERROR("REPO", "Out of memory"); ret = 1; goto end; @@ -846,7 +905,7 @@ mod_rescan(va_list *va) goto end; } (*repo)->updating &= REPO_UPDATE_KARA; - if (worker_pool_push(&(*repo)->workers, __worker_rescan, (void *)*repo)) { + if (worker_pool_push(&(*repo)->workers, ___worker_rescan, (void *)*repo)) { LOG_ERROR("REPO", "Out of memory"); ret = 1; goto end; @@ -876,7 +935,7 @@ mod_import(va_list *va) } (*repo)->updating &= REPO_UPDATE_FAV; - if (worker_pool_push(&(*repo)->workers, __worker_import_favorites, (void *)*repo)) { + if (worker_pool_push(&(*repo)->workers, ___worker_import_favorites, (void *)*repo)) { LOG_ERROR("REPO", "Out of memory"); ret = 1; goto end;