There are a mutex and condition-variable associated with the ring-buffer. However, they are not used to synchronize access to the buffer, but only to wake the thread that processes the buffer when new statements are added to it. Thus there is nothing to prevent concurrent modifications of the buffer. Instead, acquire the mutex before adding to the buffer, and, in the processing thread, copy the statement we're about to execute out of the buffer and release the mutex while processing it. Signed-off-by: Jeremy Sowden <jeremy@xxxxxxxxxx> --- include/ulogd/db.h | 2 ++ util/db.c | 63 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 12 deletions(-) diff --git a/include/ulogd/db.h b/include/ulogd/db.h index 6c2e3d07f463..bf4a19dea150 100644 --- a/include/ulogd/db.h +++ b/include/ulogd/db.h @@ -34,6 +34,8 @@ struct db_stmt_ring { struct db_stmt *elems; /* Buffer containing `size` statements of * `length` bytes */ + struct db_stmt *stmt; /* Currently executing statement */ + uint32_t size; /* No. of elements in ring buffer */ uint32_t used; /* No. of elements in ring buffer in use */ uint32_t length; /* Length of one element in ring buffer */ diff --git a/util/db.c b/util/db.c index 487eaed26153..6cfbcbc16791 100644 --- a/util/db.c +++ b/util/db.c @@ -372,10 +372,15 @@ _stop_db(struct ulogd_pluginstance *upi) } if (di->ring.size > 0) { pthread_cancel(di->ring.thread_id); + pthread_cond_destroy(&di->ring.cond); pthread_mutex_destroy(&di->ring.mutex); + free(di->ring.elems); + free(di->ring.stmt); + di->ring.elems = NULL; + di->ring.stmt = NULL; } } @@ -737,14 +742,22 @@ _start_ring(struct ulogd_pluginstance *upi) return 0; /* allocate */ + stmt_size = sizeof(*di->stmt) + di->stmt->size; stmt_align = __alignof__(*di->stmt); di->ring.length = stmt_size % stmt_align != 0 ? (1 + stmt_size / stmt_align) * stmt_align : stmt_size; + + di->ring.stmt = malloc(di->ring.length); + if (di->ring.stmt == NULL) + return -1; + di->ring.elems = calloc(di->ring.size, di->ring.length); - if (di->ring.elems == NULL) + if (di->ring.elems == NULL) { + free(di->ring.stmt); return -1; + } di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0; ulogd_log(ULOGD_NOTICE, "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n", @@ -775,6 +788,7 @@ cond_error: pthread_cond_destroy(&di->ring.cond); alloc_error: free(di->ring.elems); + free(di->ring.stmt); return -1; } @@ -784,12 +798,14 @@ _add_to_ring(struct ulogd_pluginstance *upi) { struct db_instance *di = (struct db_instance *) &upi->private; + pthread_mutex_lock(&di->ring.mutex); + if (di->ring.used == di->ring.size) { if (!di->ring.full) { ulogd_log(ULOGD_ERROR, "No place left in ring\n"); di->ring.full = 1; } - return ULOGD_IRET_OK; + goto unlock_mutex; } if (di->ring.full) { @@ -801,6 +817,9 @@ _add_to_ring(struct ulogd_pluginstance *upi) _incr_ring_used(&di->ring, 1); pthread_cond_signal(&di->ring.cond); +unlock_mutex: + pthread_mutex_unlock(&di->ring.mutex); + return ULOGD_IRET_OK; } @@ -809,27 +828,47 @@ _process_ring(void *arg) { struct ulogd_pluginstance *upi = arg; struct db_instance *di = (struct db_instance *) &upi->private; + struct db_stmt *stmt = di->ring.stmt; pthread_mutex_lock(&di->ring.mutex); + while(1) { - /* wait cond */ + pthread_cond_wait(&di->ring.cond, &di->ring.mutex); + while (di->ring.used > 0) { - struct db_stmt *stmt = _get_ring_elem(&di->ring, - di->ring.rd_idx); - - if (di->driver->execute(upi, stmt) < 0) { - di->driver->close_db(upi); - while (di->driver->open_db(upi) < 0) - sleep(1); - /* try to re-run statement */ + + memcpy(stmt, _get_ring_elem(&di->ring, di->ring.rd_idx), + di->ring.length); + + pthread_mutex_unlock(&di->ring.mutex); + +exec_stmt: + if (di->driver->execute(upi, stmt) == 0) { + + pthread_mutex_lock(&di->ring.mutex); + + _incr_ring_used(&di->ring, -1); + continue; + } - _incr_ring_used(&di->ring, -1); + /* If the exec fails, close the DB connexion and try to + * open it again. Once the connexion is made, retry the + * statement. + */ + di->driver->close_db(upi); + while (di->driver->open_db(upi) < 0) + sleep(1); + goto exec_stmt; + } + } + pthread_mutex_unlock(&di->ring.mutex); + return NULL; } -- 2.35.1