* Rename some fields. * Use `uint32_t` consistently for all sizes and indices. * Move thread ID into the ring structure. * Replace status flag with a count of in-use elements. Signed-off-by: Jeremy Sowden <jeremy@xxxxxxxxxx> --- include/ulogd/db.h | 25 ++++++------ util/db.c | 100 +++++++++++++++++++++++++-------------------- 2 files changed, 68 insertions(+), 57 deletions(-) diff --git a/include/ulogd/db.h b/include/ulogd/db.h index 7c0649583f1d..ebf4f42917c3 100644 --- a/include/ulogd/db.h +++ b/include/ulogd/db.h @@ -27,22 +27,22 @@ struct db_driver { }; -enum { - RING_NO_QUERY, - RING_QUERY_READY, -}; - struct db_stmt_ring { - /* Ring buffer: 1 status byte + string */ - char *ring; /* pointer to the ring */ - uint32_t size; /* size of ring buffer in element */ - int length; /* length of one ring buffer element */ - uint32_t wr_item; /* write item in ring buffer */ - uint32_t rd_item; /* read item in ring buffer */ - char *wr_place; + + char *elems; /* Buffer containing `size` strings of `length` bytes */ + + uint32_t size; /* No. of elements in ring buffer */ + uint32_t used; /* No. of elements in ring buffer in use */ + uint32_t length; /* Length of one element in ring buffer */ + uint32_t wr_idx; /* Index of next element to write in ring buffer */ + uint32_t rd_idx; /* Index of next element to read in ring buffer */ + + pthread_t thread_id; pthread_cond_t cond; pthread_mutex_t mutex; + int full; + }; struct db_stmt { @@ -60,7 +60,6 @@ struct db_instance { struct db_driver *driver; /* DB ring buffer */ struct db_stmt_ring ring; - pthread_t db_thread_id; /* Backlog system */ unsigned int backlog_memcap; unsigned int backlog_memusage; diff --git a/util/db.c b/util/db.c index ee6dfb6b5a2a..8a870846332b 100644 --- a/util/db.c +++ b/util/db.c @@ -63,8 +63,10 @@ static int _process_backlog(struct ulogd_pluginstance *upi); static int _configure_ring(struct ulogd_pluginstance *upi); static int _start_ring(struct ulogd_pluginstance *upi); -static int _add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di); +static int _add_to_ring(struct ulogd_pluginstance *upi); static void *_process_ring(void *arg); +static char *_get_ring_elem(struct db_stmt_ring *r, uint32_t i); +static void _incr_ring_used(struct db_stmt_ring *r, int incr); int ulogd_db_configure(struct ulogd_pluginstance *upi, @@ -182,17 +184,16 @@ ulogd_db_signal(struct ulogd_pluginstance *upi, int signal) case SIGTERM: case SIGINT: if (di->ring.size) { - int s = pthread_cancel(di->db_thread_id); + int s = pthread_cancel(di->ring.thread_id); if (s != 0) { ulogd_log(ULOGD_ERROR, - "Can't cancel injection thread\n"); + "Can't cancel ring-processing thread\n"); break; } - s = pthread_join(di->db_thread_id, NULL); + s = pthread_join(di->ring.thread_id, NULL); if (s != 0) { ulogd_log(ULOGD_ERROR, - "Error waiting for injection thread" - "cancelation\n"); + "Error waiting for ring-processing thread cancellation\n"); } } break; @@ -293,7 +294,7 @@ _interp_db_main(struct ulogd_pluginstance *upi) struct db_instance *di = (struct db_instance *) &upi->private; if (di->ring.size) { - if (_add_to_ring(upi, di) < 0) + if (_add_to_ring(upi) < 0) return ULOGD_IRET_ERR; return ULOGD_IRET_OK; } @@ -372,11 +373,11 @@ _stop_db(struct ulogd_pluginstance *upi) di->stmt = NULL; } if (di->ring.size > 0) { - pthread_cancel(di->db_thread_id); - free(di->ring.ring); + pthread_cancel(di->ring.thread_id); pthread_cond_destroy(&di->ring.cond); pthread_mutex_destroy(&di->ring.mutex); - di->ring.ring = NULL; + free(di->ring.elems); + di->ring.elems = NULL; } } @@ -743,18 +744,17 @@ _start_ring(struct ulogd_pluginstance *upi) return 0; /* allocate */ - di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length); - if (di->ring.ring == NULL) + di->ring.elems = calloc(di->ring.size, di->ring.length); + if (di->ring.elems == NULL) return -1; - di->ring.wr_place = di->ring.ring; + di->ring.wr_idx = di->ring.rd_idx = di->ring.used = 0; ulogd_log(ULOGD_NOTICE, - "Allocating %d elements of size %d for ring\n", + "Allocating %" PRIu32 " elements of size %" PRIu32 " for ring\n", di->ring.size, di->ring.length); /* init start of query for each element */ for(i = 0; i < di->ring.size; i++) - strcpy(di->ring.ring + di->ring.length * i + 1, - di->stmt); + strcpy(_get_ring_elem(&di->ring, i), di->stmt); /* init cond & mutex */ ret = pthread_cond_init(&di->ring.cond, NULL); @@ -765,7 +765,7 @@ _start_ring(struct ulogd_pluginstance *upi) goto cond_error; /* create thread */ - ret = pthread_create(&di->db_thread_id, NULL, _process_ring, upi); + ret = pthread_create(&di->ring.thread_id, NULL, _process_ring, upi); if (ret != 0) goto mutex_error; @@ -776,66 +776,78 @@ mutex_error: cond_error: pthread_cond_destroy(&di->ring.cond); alloc_error: - free(di->ring.ring); + free(di->ring.elems); return -1; } static int -_add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di) +_add_to_ring(struct ulogd_pluginstance *upi) { - if (*di->ring.wr_place == RING_QUERY_READY) { - if (di->ring.full == 0) { + struct db_instance *di = (struct db_instance *) &upi->private; + + if (di->ring.used == di->ring.size) { + if (!di->ring.full) { ulogd_log(ULOGD_ERROR, "No place left in ring\n"); di->ring.full = 1; } return ULOGD_IRET_OK; - } else if (di->ring.full) { + } + + if (di->ring.full) { ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n"); di->ring.full = 0; } - _bind_sql_stmt(upi, di->ring.wr_place + 1); - *di->ring.wr_place = RING_QUERY_READY; + + _bind_sql_stmt(upi, _get_ring_elem(&di->ring, di->ring.wr_idx)); + _incr_ring_used(&di->ring, 1); + pthread_cond_signal(&di->ring.cond); - di->ring.wr_item ++; - di->ring.wr_place += di->ring.length; - if (di->ring.wr_item == di->ring.size) { - di->ring.wr_item = 0; - di->ring.wr_place = di->ring.ring; - } return ULOGD_IRET_OK; } static void * -_process_ring(void *gdi) +_process_ring(void *arg) { - struct ulogd_pluginstance *upi = gdi; + struct ulogd_pluginstance *upi = arg; struct db_instance *di = (struct db_instance *) &upi->private; - char *wr_place; - wr_place = di->ring.ring; pthread_mutex_lock(&di->ring.mutex); while(1) { /* wait cond */ pthread_cond_wait(&di->ring.cond, &di->ring.mutex); - while (*wr_place == RING_QUERY_READY) { - if (di->driver->execute(upi, wr_place + 1, - strlen(wr_place + 1)) < 0) { + while (di->ring.used > 0) { + char *stmt = _get_ring_elem(&di->ring, di->ring.rd_idx); + + if (di->driver->execute(upi, stmt, + strlen(stmt)) < 0) { + di->driver->close_db(upi); while (di->driver->open_db(upi) < 0) sleep(1); /* try to re run query */ continue; } - *wr_place = RING_NO_QUERY; - di->ring.rd_item++; - if (di->ring.rd_item == di->ring.size) { - di->ring.rd_item = 0; - wr_place = di->ring.ring; - } else - wr_place += di->ring.length; + + _incr_ring_used(&di->ring, -1); } } return NULL; } + +static char * +_get_ring_elem(struct db_stmt_ring *r, uint32_t i) +{ + return &r->elems[i * r->length]; +} + +static void +_incr_ring_used(struct db_stmt_ring *r, int incr) +{ + uint32_t *idx = incr > 0 ? &r->wr_idx : &r->rd_idx; + + *idx = (*idx + 1) % r->size; + + r->used += incr; +} -- 2.35.1