Sélectionner une révision Git
module_repo.c 17,23 Kio
#define _POSIX_C_SOURCE 200809L
#define __LKT_MODULE_MAIN_SOURCE__
#include <lektor/lktmodule.h>
#include "worker.h"
#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <strings.h>
#include <limits.h>
#include <stdlib.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <time.h>
#include <stdarg.h>
/***********
* Globals *
***********/
static volatile unsigned int __curl_init = false;
/*********************
* Private structure *
*********************/
struct module_repo_internal {
/* Just the repo */
char *name;
char *base_url;
char *kara_dir;
char *get_all_json;
char *get_id_json;
char *get_id_file;
const uint64_t version;
/* Worker threads */
struct worker_pool workers;
volatile int updating;
/* The database and the queue */
struct queue *queue;
volatile sqlite3 *db;
};
struct __memory {
void *mem;
size_t size;
};
struct __file {
const char *path;
int fd;
};
/*********************
* Private functions *
*********************/
static inline void
__clean_file(struct __file *f)
{
if (f->fd) {
close(f->fd);
f->fd = -1;
}
}
static inline void
__clean_memory(struct __memory *m)
{
if (m->mem) {
free(m->mem);
m->mem = NULL;
}
}
static size_t
__write_mem(char *data, size_t size, size_t nmem, void *user)
{
size_t realsize = size * nmem;
struct __memory *mem = (struct __memory *) user;
void *ptr = realloc(mem->mem, mem->size + realsize);
RETURN_UNLESS(ptr, "Out of memory", 0);
mem->mem = ptr;
memcpy(((uint8_t *) mem->mem) + mem->size, data, realsize);
mem->size += realsize;
return realsize;
}
static size_t
__write_disk(char *data, size_t size, size_t nmem, void *user)
{
ssize_t realsize = size * nmem;
struct __file *file = (struct __file *) user;
RETURN_IF(write(file->fd, data, realsize) != realsize, "Failed to write", 0);
return realsize;
}
static int
__safe_json_get_string(struct json_object *jobj, const char *key,
char *content, const size_t len)
{
const char *got;
struct json_object *field;
RETURN_UNLESS(json_object_object_get_ex(jobj, key, &field),
"Key not found in json", 1);
got = json_object_get_string(field);
RETURN_UNLESS(got, "Got a NULL for the key, may be an error", 1);
strncpy(content, got, len - 1);
content[len - 1] = 0;
return 0;
}
static int
__safe_json_get_long(struct json_object *json, const char *key, long *ret)
{
const int len = long_length(LONG_MAX);
char content[len], *endptr, err;
if (__safe_json_get_string(json, key, content, len))
return 1;
STRTOL(*ret, content, endptr, err);
return err;
}
static int
__json_sync(struct module_repo_internal *repo, struct json_object **json)
{
RETURN_UNLESS(json, "Invalid argument", 1);
CURL *curl_handle;
CURLcode res;
int ret = 1;
struct __memory file = {
.mem = NULL,
.size = 0.
};
curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_URL, repo->get_all_json);
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, __write_mem);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *) &file);
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0");
res = curl_easy_perform(curl_handle);
if (res != CURLE_OK) {
LOG_ERROR("CURL", "curl_easy_perform failed: %s", curl_easy_strerror(res));
goto err;
}
*json = json_tokener_parse(file.mem);
ret = 0;
err:
__clean_memory(&file);
curl_easy_cleanup(curl_handle);
return ret;
}
static inline int
__download_kara(const char *url, const char *path, int override)
{
CURL *curl_handle;
char ret = 1;
errno = 0;
int fd = open(path, O_WRONLY | O_APPEND | O_CREAT | O_EXCL | O_NOFOLLOW, S_IRUSR | S_IWUSR);
retest:
if (fd < 0) {
if (errno == EEXIST && ! override) {
LOG_ERROR("REPO", "File '%s' already exists", path);
return 1;
}
else if (errno == EEXIST && override) {
if (unlink(path)) {
LOG_ERROR("REPO", "Failed to unlink file '%s'", path);
return 1;
}
override = false;
fd = open(path, O_WRONLY | O_CREAT | O_NOFOLLOW, S_IRUSR | S_IWUSR);
goto retest;
}
else {
LOG_ERROR("REPO", "Could not open file '%s'", path);
return 1;
}
}
struct __file file = {
.path = path,
.fd = fd,
};
curl_handle = curl_easy_init();
curl_easy_setopt(curl_handle, CURLOPT_URL, url);
curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, __write_disk);
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void *) &file);
curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0");
if (CURLE_OK != (ret = curl_easy_perform(curl_handle))) {
LOG_ERROR("CURL", "curl_easy_perform failed: %s", curl_easy_strerror(ret));
goto err;
}
ret = 0;
err:
__clean_file(&file);
curl_easy_cleanup(curl_handle);
return ret;
}
static inline void
__handle_got_json(volatile sqlite3 *db, struct module_repo_internal *repo,
struct json_object *json)
{
size_t i, ignored_count = 0, update_count = 0,
len = json_object_array_length(json);
struct json_object *kara_json;
struct kara kara;
long filestamp = 0, timestamp = 0, download_id;
char *mkvpropedit = safe_zero_malloc(sizeof(char) * PATH_MAX);
char *url = safe_zero_malloc(sizeof(char) * LKT_LINE_MAX);
int current_id, err;
struct timespec time_sleep = {
.tv_sec = 0,
.tv_nsec = 100000000L,
}; /* Sleep for 0.1s */
RETURN_UNLESS(len > 0 && json_object_get_array(json),
"Json invalid or array empty", NOTHING);
RETURN_UNLESS(database_config_get_text(db, "externals", "mkvpropedit",
mkvpropedit, PATH_MAX - 1),
"Can't get the mkvpropedit executable path", NOTHING);
LOG_INFO("REPO", "Starting to process json for repo %s", repo->name);
for (i = 0; i < len; ++i) {
nanosleep(&time_sleep, NULL); /* Sleep a bit, better for Hard drive */
kara_json = json_object_array_get_idx(json, i);
err = 0;
/* Get the id of the kara. */
if (__safe_json_get_long(kara_json, "id", &download_id))
continue;
kara.id = download_id;
/* Craft a fake filepath here, it will be used later. */
size_t kara_dir_len = strlen(repo->kara_dir);
memcpy(kara.filename, repo->kara_dir, sizeof(char) * (kara_dir_len + 1));
if (kara.filename[kara_dir_len - 1] != '/') {
strncat(kara.filename, "/", PATH_MAX - 1);
kara.filename[++kara_dir_len] = 0;
}
safe_snprintf(kara.filename + kara_dir_len, PATH_MAX - kara_dir_len, "%ld.mkv", download_id);
/* Timestamp and presence verification */
if (!database_get_kara_path(db, kara.id, NULL))
goto do_it;
if (__safe_json_get_long(kara_json, "unix_timestamp", ×tamp))
continue;
filestamp = get_mtime(kara.filename);
if (!(filestamp > timestamp))
goto do_it;
else {
++ignored_count;
database_update_touch(db, kara.id);
database_update_set_available(db, kara.id);
LOG_DEBUG("REPO", "Ignore kara '%ld' with path '%s'",
kara.id, kara.filename);
continue;
}
do_it:
/* Reads the json */
#define __get_string(field, json_field) \
err |= __safe_json_get_string(kara_json, #field, kara.mdt.json_field, LEKTOR_TAG_MAX)
__get_string(song_name, song_name);
__get_string(source_name, source_name);
__get_string(category, category);
__get_string(language, language);
__get_string(author_name, author_name);
__get_string(song_type, song_type);
#undef __get_string
if (err || __safe_json_get_long(kara_json, "song_number", &download_id)) {
LOG_WARN("REPO", "Json is invalid for kara '%ld', skip it", kara.id);
continue;
}
kara.mdt.song_number = download_id;
current_id = 0;
database_queue_current_kara(db, NULL, ¤t_id);
if (current_id == (int) kara.id) {
LOG_WARN("REPO", "Update currently playing kara %d, skip it",
current_id);
lkt_queue_send(repo->queue, lkt_event_skip_current, NULL);
}
if (!database_update_add(db, kara.filename, &kara.mdt, kara.id, false)) {
LOG_ERROR("REPO", "Could not add unavailable kara %ld to db",
kara.id);
continue;
}
safe_snprintf(url, LKT_LINE_MAX, repo->get_id_file, kara.id);
if (__download_kara(url, kara.filename, true)) {
LOG_WARN("REPO", "Could not download kara %ld at path '%s'",
kara.id, kara.filename);
continue;
}
if (kara_metadata_write(&kara.mdt, kara.filename, mkvpropedit)) {
LOG_WARN("REPO", "Could not write metadata to kara '%ld' with "
"path '%s'", kara.id, kara.filename);
continue;
}
if (!database_update_set_available(db, kara.id)) {
LOG_WARN("REPO", "Could not set kara %ld available", kara.id);
continue;
}
database_stamp(db);
++update_count;
LOG_INFO("REPO", "Added kara %ld from repo %s, filepath is %s",
kara.id, repo->name, kara.filename);
}
LOG_INFO("REPO", "Updated %ld karas and ignored %ld karas, total is %ld",
update_count, ignored_count, len);
free(mkvpropedit);
free(url);
}
static inline void
__handle_deleted_kara(volatile sqlite3 *db)
{
size_t len, i;
int *kara_ids;
char filepath[PATH_MAX];
database_deleted_kara(db, &kara_ids, &len);
for (i = 0; i < len; ++i) {
if (!database_get_kara_path(db, kara_ids[i], filepath))
continue;
database_update_del(db, kara_ids[i]);
}
free(kara_ids);
}
static void *
__worker_update(void *__repo)
{
struct module_repo_internal *repo = __repo;
lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_PROGRESS);
repo->updating = 1;
struct json_object *json;
LOG_INFO("REPO", "Download kara list from %s (%s), directory is %s",
repo->name, repo->get_all_json, repo->kara_dir);
if (__json_sync(repo, &json)) {
LOG_ERROR("REPO", "Failed to get json, possibly no internet connexion or repo is down");
pthread_exit(NULL);
}
__handle_got_json(repo->db, repo, json);
LOG_INFO("REPO", "Finished to download and insert kara list");
json_object_put(json);
__handle_deleted_kara(repo->db);
LOG_INFO("REPO", "Finished to deal with deleted kara");
database_updated(repo->db);
lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_FINISHED);
pthread_exit(NULL);
}
static void *
__worker_rescan(void *__repo)
{
struct module_repo_internal *repo = __repo;
char kara_prefix[LKT_LINE_MAX];
lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_PROGRESS);
if (!database_config_get_text(repo->db, "database", "kara_dir", kara_prefix, LKT_LINE_MAX)) {
LOG_ERROR("REPO", "Failed to get kara prefix from config");
pthread_exit(NULL);
}
repo->updating = 1;
database_update(repo->db, kara_prefix, 0); /* Don't check timestamp.
TODO: Sometimes we want to check them */
repo->updating = 0;
lkt_queue_send(repo->queue, lkt_event_db_updating, LKT_DB_UPDATING_FINISHED);
pthread_exit(NULL);
}
/***********************************************
* Functions that will be wrapped and exported *
***********************************************/
static inline void
module_repo_close(struct module_repo_internal *repo)
{
LOG_INFO("REPO", "Waiting for workers to finish");
worker_pool_waitall(&repo->workers);
LOG_INFO("REPO", "All workers have finished");
}
static inline bool
module_repo_new(struct module_repo_internal *repo_, struct queue *queue, volatile sqlite3 *db)
{
RETURN_UNLESS(repo_ && queue && db, "Invalid argument", 1);
if (!__curl_init) {
curl_global_init(CURL_GLOBAL_ALL);
__curl_init = 1;
} else
++__curl_init;
struct module_repo_internal repo = {
.version = 1,
.queue = queue,
.db = db,
.updating = 0,
.name = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
.kara_dir = safe_zero_malloc(PATH_MAX * sizeof(char)),
.get_id_json = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
.get_id_file = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
.get_all_json = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
.base_url = safe_zero_malloc(LKT_LINE_MAX * sizeof(char)),
};
/* Copies */
if (!database_config_get_text(db, "database", "kara_dir", repo.kara_dir, PATH_MAX) ||
!database_config_get_text(db, "repo", "name", repo.name, LKT_LINE_MAX) ||
!database_config_get_text(db, "repo", "url", repo.base_url, LKT_LINE_MAX) ||
!database_config_get_text(db, "repo", "id_json", repo.get_id_json, LKT_LINE_MAX) ||
!database_config_get_text(db, "repo", "id_kara", repo.get_id_file, LKT_LINE_MAX) ||
!database_config_get_text(db, "repo", "json", repo.get_all_json, LKT_LINE_MAX) )
return false;
memcpy(repo_, &repo, sizeof(struct module_repo_internal));
/* Init the worker only now ! */
int workers_count;
if (!database_config_get_int(db, "repo", "workers_count", &workers_count))
workers_count = 5;
if (worker_pool_new(&repo_->workers,
10 /* Initial number of elements in the call queue */,
workers_count /* Number of worker threads */)) {
LOG_ERROR("REPO", "Out of memory");
return false;
}
return true;
}
/********************
* Export functions *
********************/
static int
mod_new(va_list *va)
{
va_list copy;
struct module_repo_internal **repo;
va_copy(copy, *va);
repo = (struct module_repo_internal **) va_arg(copy, void **);
struct queue *queue = va_arg(copy, struct queue *);
volatile sqlite3 *db = va_arg(copy, volatile sqlite3 *);
if (NULL != *repo) {
LOG_ERROR("REPO", "Can't init two times the module");
return 1;
}
*repo = malloc(sizeof(struct module_repo_internal));
if (NULL == *repo) {
LOG_ERROR("REPO", "Out of memory");
return 1;
}
bool ret = module_repo_new(*repo, queue, db);
lkt_queue_make_available(queue, lkt_event_db_updating);
va_end(copy);
if (!ret)
LOG_ERROR("REPO", "Failed to create the module");
return ! ret;
}
static int
mod_close(va_list *va)
{
va_list copy;
struct module_repo_internal **repo;
va_copy(copy, *va);
repo = (struct module_repo_internal **) va_arg(copy, void **);
module_repo_close(*repo);
va_end(copy);
return 0;
}
static int
mod_free(va_list *va)
{
va_list copy;
struct module_repo_internal **repo;
va_copy(copy, *va);
repo = (struct module_repo_internal **) va_arg(copy, void **);
module_repo_close(*repo);
worker_pool_free(&(*repo)->workers);
--__curl_init;
if (!__curl_init)
curl_global_cleanup();
free((*repo)->kara_dir);
free((*repo)->get_id_json);
free((*repo)->get_id_file);
free((*repo)->base_url);
free((*repo)->get_all_json);
LOG_INFO("REPO", "Repo module terminated");
va_end(copy);
return 0;
}
static int
mod_update(va_list *va)
{
va_list copy;
struct module_repo_internal **repo;
va_copy(copy, *va);
repo = (struct module_repo_internal **) va_arg(copy, void **);
if ((*repo)->updating) {
LOG_WARN("REPO", "Already updating");
va_end(copy);
return 0;
}
(*repo)->updating = 1;
if (worker_pool_push(&(*repo)->workers, __worker_update, (void *) *repo)) {
LOG_ERROR("REPO", "Out of memory");
va_end(copy);
return 1;
}
va_end(copy);
LOG_INFO("REPO", "Update started (update)");
return 0;
}
static int
mod_rescan(va_list *va)
{
va_list copy;
struct module_repo_internal **repo;
va_copy(copy, *va);
repo = (struct module_repo_internal **) va_arg(copy, void **);
if ((*repo)->updating) {
LOG_WARN("REPO", "Already updating");
va_end(copy);
return 0;
}
(*repo)->updating = 1;
if (worker_pool_push(&(*repo)->workers, __worker_rescan, (void *) *repo)) {
LOG_ERROR("REPO", "Out of memory");
va_end(copy);
return 1;
}
va_end(copy);
LOG_INFO("REPO", "Update started (rescan)");
return 0;
}
REG_BEGIN(repo_reg)
REG_ADD_NAMED("new", mod_new)
REG_ADD_NAMED("free", mod_free)
REG_ADD_NAMED("close", mod_close)
REG_ADD_NAMED("update", mod_update)
REG_ADD_NAMED("rescan", mod_rescan)
REG_END()
#if ! defined (LKT_STATIC_MODULE)
REG_EXPORT(repo_reg)
#endif