Hi, Content-Disposition: inline; filename=ulogd-SQLITE3-plugin.diff NOTE As described in http://www.sqlite.org/cvstrac/wiki?p=CorruptionFollowingBusyError some versions of SQLITE contain an error when transactions other than EXCLUSIVE are used. If a SQL commands to create a transaction or inside a transaction ever gets SQLITE_BUSY a ROLLBACK has to be done, as otherwise a corrupt database might be the result. Signed-off-by: Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx> Index: ulogd-netfilter-stuffed/output/sqlite3/ulogd_output_SQLITE3.c =================================================================== --- ulogd-netfilter-stuffed.orig/output/sqlite3/ulogd_output_SQLITE3.c +++ ulogd-netfilter-stuffed/output/sqlite3/ulogd_output_SQLITE3.c @@ -1,4 +1,3 @@ -#if 0 /* * ulogd output plugin for logging to a SQLITE database * @@ -26,389 +25,805 @@ * * 2005-02-09 Harald Welte <laforge@xxxxxxxxxxxx>: * - port to ulogd-1.20 + * + * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx> Astaro AG, 2007 + * - port to ulogd-2.00 */ -#include <stdlib.h> -#include <string.h> -#include <arpa/inet.h> #include <ulogd/ulogd.h> #include <ulogd/conffile.h> +#include <ulogd/common.h> +#include <ulogd/linuxlist.h> +#include <arpa/inet.h> #include <sqlite3.h> -#ifdef DEBUG_SQLITE3 -#define DEBUGP(x, args...) fprintf(stderr, x, ## args) -#else -#define DEBUGP(x, args...) -#endif +#define PFX "SQLITE3: " + +/* config defaults */ +#define CFG_BUFFER_DEFAULT 100 +#define CFG_TIMER_DEFAULT 1 SEC +#define CFG_MAX_BACKLOG_DEFAULT 0 /* unlimited */ + + +#define SQLITE3_BUSY_TIMEOUT 300 + +/* number of colums we have (really should be configurable) */ +#define DB_NUM_COLS 10 -struct _field { + +/* map DB column to ulogd key */ +struct col { char name[ULOGD_MAX_KEYLEN]; - unsigned int id; - struct _field *next; + struct ulogd_key *key; +}; + +struct row { + struct llist_head link; + uint32_t ip_saddr; + uint32_t ip_daddr; + unsigned char ip_proto; + unsigned l4_dport; + unsigned raw_in_pktlen; + unsigned raw_in_pktcount; + unsigned raw_out_pktlen; + unsigned raw_out_pktcount; + unsigned flow_start_sec; + unsigned flow_duration; }; -/* the database handle we are using */ -static sqlite3 *dbh; +#define RKEY(key) ((key)->u.source) -/* a linked list of the fields the table has */ -static struct _field *fields; -/* buffer for our insert statement */ -static char *stmt; +struct sqlite3_priv { + sqlite3 *dbh; /* database handle we are using */ + char *stmt; + sqlite3_stmt *p_stmt; + int buffer_size; -/* pointer to the final prepared statement */ -static sqlite3_stmt *p_stmt; + struct ulogd_timer timer; -/* number of statements to buffer before we commit */ -static int buffer_size; + struct col cols[DB_NUM_COLS]; -/* number of statements currently in the buffer */ -static int buffer_ctr; + /* our backlog buffer */ + struct llist_head rows; + int num_rows; + int max_backlog; -/* our configuration directives */ -static config_entry_t db_ce = { - .key = "db", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, -}; + time_t commit_time; -static config_entry_t table_ce = { - .next = &db_ce, - .key = "table", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, + unsigned disable : 1; + unsigned overlimit_msg : 1; }; -static config_entry_t buffer_ce = { - .next = &table_ce, - .key = "buffer", - .type = CONFIG_TYPE_INT, - .options = CONFIG_OPT_MANDATORY, + +static struct config_keyset sqlite3_kset = { + .num_ces = 6, + .ces = { + { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "table", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "buffer", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_BUFFER_DEFAULT, + }, + { + .key = "timer", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_TIMER_DEFAULT, + }, + { + .key = "max-backlog", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_MAX_BACKLOG_DEFAULT, + }, + { + .key = "disable", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = 0, + }, + }, }; -/* our main output function, called by ulogd */ -static int _sqlite3_output(ulog_iret_t *result) +#define db_ce(pi) (pi)->config_kset->ces[0].u.string +#define table_ce(pi) (pi)->config_kset->ces[1].u.string +#define buffer_ce(pi) (pi)->config_kset->ces[2].u.value +#define timer_ce(pi) (pi)->config_kset->ces[3].u.value +#define max_backlog_ce(pi) (pi)->config_kset->ces[4].u.value +#define disable_ce(pi) (pi)->config_kset->ces[5].u.value + + +#define SQL_CREATE_STR \ + "create table daily(ip_saddr integer, ip_daddr integer, " \ + "ip_protocol integer, l4_dport integer, raw_in_pktlen integer, " \ + "raw_in_pktcount integer, raw_out_pktlen integer, " \ + "raw_out_pktcount integer, flow_start_sec integer, " \ + "flow_duration integer)" + + +static struct row * +row_new(void) { - struct _field *f; - ulog_iret_t *res; - int col_counter; -#ifdef IP_AS_STRING - char *ipaddr; - struct in_addr addr; -#endif - - col_counter = 1; - for (f = fields; f; f = f->next) { - res = keyh_getres(f->id); - - if (!res) { - ulogd_log(ULOGD_NOTICE, - "no result for %s ?!?\n", f->name); - } - - if (!res || !IS_VALID((*res))) { - /* no result, pass a null */ - sqlite3_bind_null(p_stmt, col_counter); - col_counter++; - continue; - } - - switch (res->type) { - case ULOGD_RET_INT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.i8); - break; - case ULOGD_RET_INT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.i16); - break; - case ULOGD_RET_INT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.i32); - break; - case ULOGD_RET_INT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.i64); - break; - case ULOGD_RET_UINT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui8); - break; - case ULOGD_RET_UINT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui16); - break; - case ULOGD_RET_IPADDR: -#ifdef IP_AS_STRING - memset(&addr, 0, sizeof(addr)); - addr.s_addr = ntohl(res->value.ui32); - ipaddr = inet_ntoa(addr); - sqlite3_bind_text(p_stmt,col_counter,ipaddr,strlen(ipaddr),SQLITE_STATIC); - break; -#endif /* IP_AS_STRING */ - /* EVIL: fallthrough when logging IP as u_int32_t */ - case ULOGD_RET_UINT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui32); - break; - case ULOGD_RET_UINT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.ui64); - break; - case ULOGD_RET_BOOL: - sqlite3_bind_int(p_stmt,col_counter,res->value.b); - break; - case ULOGD_RET_STRING: - sqlite3_bind_text(p_stmt,col_counter,res->value.ptr,strlen(res->value.ptr),SQLITE_STATIC); - break; - default: - ulogd_log(ULOGD_NOTICE, - "unknown type %d for %s\n", - res->type, res->key); - break; - } - - col_counter++; - } - - /* now we have created our statement, insert it */ - - if (sqlite3_step(p_stmt) == SQLITE_DONE) { - sqlite3_reset(p_stmt); - buffer_ctr++; - } else { - ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", - sqlite3_errmsg(dbh)); - return 1; - } + struct row *row; + + if ((row = calloc(1, sizeof(struct row))) == NULL) + ulogd_error("%s: out of memory\n", __func__); + + return row; +} + - /* commit all of the inserts to the database, ie flush buffer */ - if (buffer_ctr >= buffer_size) { - if (sqlite3_exec(dbh,"commit",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit records to db."); +static inline void +__row_del(struct sqlite3_priv *priv, struct row *row) +{ + assert(row != NULL); - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to begin a new transaction."); + free(row); +} + + +static void +row_del(struct sqlite3_priv *priv, struct row *row) +{ + llist_del(&row->link); - buffer_ctr = 0; - DEBUGP("committing.\n"); + __row_del(priv, row); + + priv->num_rows--; +} + + +static int +row_add(struct sqlite3_priv *priv, struct row *row) +{ + if (priv->max_backlog && priv->num_rows >= priv->max_backlog) { + if (!priv->overlimit_msg) { + ulogd_error(PFX "over max-backlog limit, dropping rows\n"); + + priv->overlimit_msg = 1; + } + + __row_del(priv, row); + + return -1; } + llist_add_tail(&row->link, &priv->rows); + + priv->num_rows++; + return 0; } +/* set_commit_time() - set time for next try on locked database + * + * The database is effectively locked in between. + */ +static void +set_commit_time(const struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + priv->commit_time = t_now + 1; + + pr_debug("%s: commit time %d\n", __func__, priv->commit_time); +} + #define _SQLITE3_INSERTTEMPL "insert into X (Y) values (Z)" -/* create the static part of our insert statement */ -static int _sqlite3_createstmt(void) +/* create static part of our insert statement */ +static int +db_createstmt(struct ulogd_pluginstance *pi) { - struct _field *f; - unsigned int size; + struct sqlite3_priv *priv = (void *)pi->private; char buf[ULOGD_MAX_KEYLEN]; char *underscore; char *stmt_pos; - int col_count; int i; - if (stmt) { - ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt" - " already existing\n"); - return 1; - } - - /* caclulate the size for the insert statement */ - size = strlen(_SQLITE3_INSERTTEMPL) + strlen(table_ce.u.string); + if (priv->stmt != NULL) + free(priv->stmt); - DEBUGP("initial size: %u\n", size); - - col_count = 0; - for (f = fields; f; f = f->next) { - /* we need space for the key and a comma, and a ? */ - size += strlen(f->name) + 3; - DEBUGP("size is now %u since adding %s\n",size,f->name); - col_count++; + if ((priv->stmt = calloc(1, 1024)) == NULL) { + ulogd_error(PFX "out of memory\n"); + return -1; } - DEBUGP("there were %d columns\n",col_count); - DEBUGP("after calc name length: %u\n",size); - - ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size); - - stmt = (char *) malloc(size); + sprintf(priv->stmt, "insert into %s (", table_ce(pi)); + stmt_pos = priv->stmt + strlen(priv->stmt); - if (!stmt) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; - } + for (i = 0; i < DB_NUM_COLS; i++) { + struct col *col = &priv->cols[i]; - sprintf(stmt, "insert into %s (", table_ce.u.string); - stmt_pos = stmt + strlen(stmt); + /* convert name */ + strncpy(buf, col->name, ULOGD_MAX_KEYLEN); - for (f = fields; f; f = f->next) { - strncpy(buf, f->name, ULOGD_MAX_KEYLEN); while ((underscore = strchr(buf, '.'))) *underscore = '_'; + sprintf(stmt_pos, "%s,", buf); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); } *(stmt_pos - 1) = ')'; sprintf(stmt_pos, " values ("); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); - for (i = 0; i < col_count - 1; i++) { + for (i = 0; i < DB_NUM_COLS - 1; i++) { sprintf(stmt_pos,"?,"); stmt_pos += 2; } sprintf(stmt_pos, "?)"); - ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", stmt); + ulogd_log(ULOGD_DEBUG, "%s: stmt='%s'\n", pi->id, priv->stmt); - DEBUGP("about to prepare statement.\n"); + pr_debug("about to prepare statement.\n"); - sqlite3_prepare(dbh,stmt,-1,&p_stmt,0); - - DEBUGP("statement prepared.\n"); - - if (!p_stmt) { - ulogd_log(ULOGD_ERROR,"unable to prepare statement"); + sqlite3_prepare(priv->dbh, priv->stmt, -1, &priv->p_stmt, 0); + if (priv->p_stmt == NULL) { + ulogd_error(PFX "prepare: %s\n", sqlite3_errmsg(priv->dbh)); return 1; } + pr_debug("%s: statement prepared.\n", pi->id); + return 0; } -/* length of "select * from \0" */ -#define SQLITE_SELECT_LEN 15 +static struct ulogd_key * +ulogd_find_key(struct ulogd_pluginstance *pi, const char *name) +{ + int i; + + for (i = 0; i < pi->input.num_keys; i++) { + if (strcmp(pi->input.keys[i].name, name) == 0) + return &pi->input.keys[i]; + } + + return NULL; +} + +#define SELECT_ALL_STR "select * from " +#define SELECT_ALL_LEN sizeof(SELECT_ALL_STR) -/* find out which columns the table has */ -static int _sqlite3_get_columns(const char *table) +static int +db_count_cols(struct ulogd_pluginstance *pi, sqlite3_stmt **stmt) { + struct sqlite3_priv *priv = (void *)pi->private; + char query[SELECT_ALL_LEN + CONFIG_VAL_STRING_LEN] = SELECT_ALL_STR; + + strncat(query, table_ce(pi), LINE_LEN); + + if (sqlite3_prepare(priv->dbh, query, -1, stmt, 0) != SQLITE_OK) { + return -1; + } + + return sqlite3_column_count(*stmt); +} + + +static int +db_create_tbl(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + char *errmsg; + int ret; + + sqlite3_exec(priv->dbh, "drop table daily", NULL, NULL, NULL); + + ret = sqlite3_exec(priv->dbh, SQL_CREATE_STR, NULL, NULL, &errmsg); + if (ret != SQLITE_OK) { + ulogd_error(PFX "create table: %s\n", errmsg); + sqlite3_free(errmsg); + + return -1; + } + + return 0; +} + + +/* initialize DB, possibly creating it */ +static int +db_init(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; char buf[ULOGD_MAX_KEYLEN]; - char query[SQLITE_SELECT_LEN + CONFIG_VAL_STRING_LEN] = "select * from \0"; char *underscore; - struct _field *f; sqlite3_stmt *schema_stmt; - int column; - int result; - int id; + int num_cols, i; - if (!dbh) - return 1; + if (priv->dbh == NULL) + return -1; - strncat(query,table,LINE_LEN); - - result = sqlite3_prepare(dbh,query,-1,&schema_stmt,0); - - if (result != SQLITE_OK) - return 1; + num_cols = db_count_cols(pi, &schema_stmt); + if (num_cols != DB_NUM_COLS) { + ulogd_log(ULOGD_INFO, "%s: (re)creating database\n", pi->id); + + if (db_create_tbl(pi) < 0) + return -1; + + num_cols = db_count_cols(pi, &schema_stmt); + } + + assert(num_cols == DB_NUM_COLS); + + for (i = 0; i < DB_NUM_COLS; i++) { + struct col *col = &priv->cols[i]; + + strncpy(buf, sqlite3_column_name(schema_stmt, i), ULOGD_MAX_KEYLEN); - for (column = 0; column < sqlite3_column_count(schema_stmt); column++) { /* replace all underscores with dots */ - strncpy(buf, sqlite3_column_name(schema_stmt,column), ULOGD_MAX_KEYLEN); - while ((underscore = strchr(buf, '_'))) + while ((underscore = strchr(buf, '_')) != NULL) *underscore = '.'; - DEBUGP("field '%s' found: ", buf); + pr_debug("column '%s' found\n", buf); - if (!(id = keyh_getid(buf))) { - DEBUGP(" no keyid!\n"); - continue; + strncpy(col->name, buf, ULOGD_MAX_KEYLEN); + + if ((col->key = ulogd_find_key(pi, buf)) == NULL) { + printf(PFX "%s: key not found\n", buf); + return -1; } + } + + ulogd_log(ULOGD_INFO, "%s: database opened\n", pi->id); + + if (sqlite3_finalize(schema_stmt) != SQLITE_OK) { + ulogd_error(PFX "sqlite_finalize: %s\n", + sqlite3_errmsg(priv->dbh)); + return -1; + } + + return 0; +} + + +static void +db_reset(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + sqlite3_finalize(priv->p_stmt); + + sqlite3_close(priv->dbh); + priv->dbh = NULL; + + free(priv->stmt); + priv->stmt = NULL; +} + + +static int +db_start(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; - DEBUGP("keyid %u\n", id); + ulogd_log(ULOGD_DEBUG, "%s: opening database connection\n", pi->id); - /* prepend it to the linked list */ - f = (struct _field *) malloc(sizeof *f); - if (!f) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; + if (sqlite3_open(db_ce(pi), &priv->dbh) != SQLITE_OK) { + ulogd_error(PFX "%s\n", sqlite3_errmsg(priv->dbh)); + return -1; + } + + /* set the timeout so that we don't automatically fail + if the table is busy */ + sqlite3_busy_timeout(priv->dbh, SQLITE3_BUSY_TIMEOUT); + + /* read the fieldnames to know which values to insert */ + if (db_init(pi) < 0) + return -1; + + /* initialize our buffer size and counter */ + priv->buffer_size = buffer_ce(pi); + + priv->max_backlog = max_backlog_ce(pi); + + /* create and prepare the actual insert statement */ + db_createstmt(pi); + + return 0; +} + +/* db_err() - handle database errors */ +static int +db_err(struct ulogd_pluginstance *pi, int ret) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + pr_debug("%s: ret=%d (errcode %d)\n", __func__, ret, + sqlite3_errcode(priv->dbh)); + + assert(ret != SQLITE_OK && ret != SQLITE_DONE); + + if (ret == SQLITE_BUSY || ret == SQLITE_LOCKED) + set_commit_time(pi); + else { + switch (sqlite3_errcode(priv->dbh)) { + case SQLITE_LOCKED: + case SQLITE_BUSY: + set_commit_time(pi); + break; + + case SQLITE_SCHEMA: + if (priv->stmt) { + sqlite3_finalize(priv->p_stmt); + + db_createstmt(pi); + + ulogd_log(ULOGD_INFO, "%s: database schema changed\n", + pi->id); + } + break; + + default: + ulogd_error("%s: transaction: %s\n", pi->id, + sqlite3_errmsg(priv->dbh)); + break; } - strncpy(f->name, buf, ULOGD_MAX_KEYLEN); - f->id = id; - f->next = fields; - fields = f; } - sqlite3_finalize(schema_stmt); + sqlite3_exec(priv->dbh, "rollback", NULL, NULL, NULL); + + /* no sqlit3_clear_bindings(), as an unbind will be done implicitely + on next bind. */ + if (priv->p_stmt != NULL) + sqlite3_reset(priv->p_stmt); + return 0; } -/** - * make connection and select database - * returns 0 if database failed to open. - */ -static int _sqlite3_open_db(char *db_file) +static int +db_add_row(struct ulogd_pluginstance *pi, const struct row *row) { - DEBUGP("opening database.\n"); - return sqlite3_open(db_file,&dbh); + struct sqlite3_priv *priv = (void *)pi->private; + int db_col = 1, ret; + + do { + ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->ip_saddr); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->ip_daddr); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->ip_proto); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->l4_dport); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->raw_in_pktlen); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->raw_in_pktcount); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->raw_out_pktlen); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int64(priv->p_stmt, db_col++, + row->raw_out_pktcount); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->flow_start_sec); + if (ret != SQLITE_OK) + break; + + ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->flow_duration); + if (ret != SQLITE_OK) + break; + + if ((ret = sqlite3_step(priv->p_stmt)) == SQLITE_DONE) { + /* no sqlite3_clear_bindings(), as an unbind will be + implicetely done before next bind. */ + sqlite3_reset(priv->p_stmt); + + return 0; + } + + /* according to the documentation sqlite3_step() always returns a + generic SQLITE_ERROR. In order to find out the cause of the + error you have to call sqlite3_reset() or sqlite3_finalize(). */ + ret = sqlite3_reset(priv->p_stmt); + } while (0); + + return db_err(pi, ret); } -/* give us an opportunity to close the database down properly */ -static void _sqlite3_fini(void) +/* delete_rows() - delete rows from the tail of the list */ +static int +delete_rows(struct ulogd_pluginstance *pi, int rows) { - DEBUGP("cleaning up db connection\n"); + struct sqlite3_priv *priv = (void *)pi->private; + struct llist_head *curr, *tmp; + + llist_for_each_prev_safe(curr, tmp, &priv->rows) { + struct row *row = container_of(curr, struct row, link); - /* free up our prepared statements so we can close the db */ - if (p_stmt) { - sqlite3_finalize(p_stmt); - DEBUGP("prepared statement finalized\n"); + if (rows-- == 0) + break; + + row_del(priv, row); } - if (dbh) { - int result; - /* flush the remaining insert statements to the database. */ - result = sqlite3_exec(dbh,"commit",NULL,NULL,NULL); + return 0; +} + +/* + db_commit_rows() + + RETURN + >0 rows commited + 0 locked + -1 error +*/ +static int +db_commit_rows(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + struct row *row; + int ret, rows = 0, max_commit; + + ret = sqlite3_exec(priv->dbh, "begin immediate transaction", NULL, + NULL, NULL); + if (ret != SQLITE_OK) + return db_err(pi, ret); + + /* Limit number of rows to commit. Note that currently three times + buffer_size is a bit arbitrary and therefore might be adjusted in + the future. */ + max_commit = max(3 * priv->buffer_size, 1024); - if (result != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit remaining records to db."); + llist_for_each_entry_reverse(row, &priv->rows, link) { + if (++rows > max_commit) + break; - sqlite3_close(dbh); - DEBUGP("database file closed\n"); + if (db_add_row(pi, row) < 0) + return db_err(pi, ret); } + + ret = sqlite3_exec(priv->dbh, "commit", NULL, NULL, NULL); + if (ret != SQLITE_OK) + return db_err(pi, ret); + + sqlite3_reset(priv->p_stmt); + + pr_debug("%s: commited %d/%d rows\n", pi->id, rows, priv->num_rows); + + delete_rows(pi, rows); + + if (priv->commit_time >= t_now) + priv->commit_time = 0; /* release commit lock */ + + if (priv->overlimit_msg) + priv->overlimit_msg = 0; + + return rows; } -#define _SQLITE3_BUSY_TIMEOUT 300 -static int _sqlite3_init(void) +/* our main output function, called by ulogd */ +static int +sqlite3_interp(struct ulogd_pluginstance *pi) { - /* have the opts parsed */ - config_parse_file("SQLITE3", &buffer_ce); + struct sqlite3_priv *priv = (void *)pi->private; + struct col *cols = priv->cols; + struct row *row; + + if ((row = row_new()) == NULL) + return ULOGD_IRET_ERR; + + row->ip_saddr = RKEY(cols[0].key)->u.value.ui32; + row->ip_daddr = RKEY(cols[1].key)->u.value.ui32; + row->ip_proto = RKEY(cols[2].key)->u.value.ui8; + row->l4_dport = RKEY(cols[3].key)->u.value.ui16; + row->raw_in_pktlen = RKEY(cols[4].key)->u.value.ui32; + row->raw_in_pktcount = RKEY(cols[5].key)->u.value.ui32; + row->raw_out_pktlen = RKEY(cols[6].key)->u.value.ui32; + row->raw_out_pktcount = RKEY(cols[7].key)->u.value.ui32; + row->flow_start_sec = RKEY(cols[9].key)->u.value.ui32; + row->flow_duration = RKEY(cols[10].key)->u.value.ui32; - if (_sqlite3_open_db(db_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "can't open the database file\n"); - return 1; + if (row_add(priv, row) < 0) + return ULOGD_IRET_OK; + + if (priv->num_rows >= priv->buffer_size && priv->commit_time == 0) + db_commit_rows(pi); + + return ULOGD_IRET_OK; +} + + +static void +sqlite_timer_cb(struct ulogd_timer *t) +{ + struct ulogd_pluginstance *pi = t->data; + struct sqlite3_priv *priv = (void *)pi->private; + int rows; + + pr_debug("%s: timer=%p\n", __func__, t); + + if (priv->commit_time != 0 && priv->commit_time > t_now) + return; + + if (priv->num_rows == 0) + return; + + rows = db_commit_rows(pi); + + ulogd_log(ULOGD_DEBUG, "%s: rows=%d commited=%d\n", pi->id, + priv->num_rows, rows); +} + + +static int +sqlite3_configure(struct ulogd_pluginstance *pi, + struct ulogd_pluginstance_stack *stack) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + memset(priv, 0, sizeof(struct sqlite3_priv)); + + config_parse_file(pi->id, pi->config_kset); + + if (ulogd_wildcard_inputkeys(pi) < 0) + return -1; + + if (db_ce(pi) == NULL) { + ulogd_error("%s: configure: no database specified\n", pi->id); + return -1; } - /* set the timeout so that we don't automatically fail - * if the table is busy. */ - sqlite3_busy_timeout(dbh, _SQLITE3_BUSY_TIMEOUT); + if (table_ce(pi) == NULL) { + ulogd_error("%s: configure: no table specified\n", pi->id); + return -1; + } - /* read the fieldnames to know which values to insert */ - if (_sqlite3_get_columns(table_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "unable to get sqlite columns\n"); - return 1; + if (timer_ce(pi) <= 0) { + ulogd_error("%s: configure: invalid timer value\n", pi->id); + return -1; } - /* initialize our buffer size and counter */ - buffer_size = buffer_ce.u.value; - buffer_ctr = 0; + if (max_backlog_ce(pi)) { + if (max_backlog_ce(pi) <= buffer_ce(pi)) { + ulogd_error("%s: configure: invalid max-backlog value\n", + pi->id); + return -1; + } + } - DEBUGP("Have a buffer size of : %d\n", buffer_size); + priv->max_backlog = max_backlog_ce(pi); + priv->disable = disable_ce(pi); - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"can't create a new transaction\n"); + pr_debug("%s: db='%s' table='%s' timer=%d max-backlog=%d\n", pi->id, + db_ce(pi), table_ce(pi), timer_ce(pi), max_backlog_ce(pi)); - /* create and prepare the actual insert statement */ - _sqlite3_createstmt(); + return 0; +} + + +static int +sqlite3_start(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + pr_debug("%s: pi=%p\n", __func__, pi); + + if (priv->disable) { + ulogd_log(ULOGD_NOTICE, "%s: disabled\n", pi->id); + return 0; + } + + priv->num_rows = 0; + INIT_LLIST_HEAD(&priv->rows); + + if (db_start(pi) < 0) + return -1; + + /* init timer */ + priv->timer.cb = sqlite_timer_cb; + priv->timer.ival = timer_ce(pi); + priv->timer.flags = TIMER_F_PERIODIC; + priv->timer.data = pi; + + if (ulogd_register_timer(&priv->timer) < 0) + return -1; + + ulogd_log(ULOGD_INFO, "%s: started\n", pi->id); return 0; } -static ulog_output_t _sqlite3_plugin = { - .name = "sqlite3", - .output = &_sqlite3_output, - .init = &_sqlite3_init, - .fini = &_sqlite3_fini, -}; -void _init(void) +/* give us an opportunity to close the database down properly */ +static int +sqlite3_stop(struct ulogd_pluginstance *pi) { - register_output(&_sqlite3_plugin); + struct sqlite3_priv *priv = (void *)pi->private; + + pr_debug("%s: pi=%p\n", __func__, pi); + + if (priv->disable) + return 0; /* wasn't started */ + + if (priv->dbh == NULL) + return 0; /* already stopped */ + + db_reset(pi); + + return 0; } -#endif + +static void +sqlite3_signal(struct ulogd_pluginstance *pi, int sig) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + switch (sig) { + case SIGUSR1: + if (priv->dbh != NULL) { + db_reset(pi); + + if (db_start(pi) < 0) { + ulogd_log(ULOGD_FATAL, "%s: database reset failed\n", pi->id); + exit(EXIT_FAILURE); + } + } + break; + + default: + break; + } +} + + +static struct ulogd_plugin sqlite3_plugin = { + .name = "SQLITE3", + .flags = ULOGD_PF_RECONF, + .input = { + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, + }, + .output = { + .type = ULOGD_DTYPE_SINK, + }, + .config_kset = &sqlite3_kset, + .priv_size = sizeof(struct sqlite3_priv), + .configure = sqlite3_configure, + .start = sqlite3_start, + .stop = sqlite3_stop, + .signal = sqlite3_signal, + .interp = sqlite3_interp, + .version = ULOGD_VERSION, +}; + +static void init(void) __attribute__((constructor)); + +static void +init(void) +{ + ulogd_register_plugin(&sqlite3_plugin); + + ulogd_log(ULOGD_INFO, "using Sqlite version %s\n", sqlite3_libversion()); +} -- - To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html