As described in http://www.sqlite.org/cvstrac/wiki?p=CorruptionFollowingBusyError some versions of SQLITE contain an error when transactions other than EXCLUSIVE are used. If a SQL commands to create a transaction or inside a transaction ever gets SQLITE_BUSY a ROLLBACK has to be done, as otherwise a corrupt database might be the result. Signed-off-by: Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx> Index: ulogd-netfilter/output/sqlite3/ulogd_output_SQLITE3.c =================================================================== --- ulogd-netfilter.orig/output/sqlite3/ulogd_output_SQLITE3.c +++ ulogd-netfilter/output/sqlite3/ulogd_output_SQLITE3.c @@ -1,4 +1,3 @@ -#if 0 /* * ulogd output plugin for logging to a SQLITE database * @@ -26,389 +25,736 @@ * * 2005-02-09 Harald Welte <laforge@xxxxxxxxxxxx>: * - port to ulogd-1.20 + * + * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx> Astaro AG, 2007 + * - port to ulogd-2.00 */ -#include <stdlib.h> -#include <string.h> -#include <arpa/inet.h> #include <ulogd/ulogd.h> #include <ulogd/conffile.h> +#include <ulogd/common.h> +#include <arpa/inet.h> #include <sqlite3.h> +#include <sys/queue.h> + +#define PFX "SQLITE3: " + +/* config defaults */ +#define CFG_BUFFER_DEFAULT 100 +#define CFG_TIMER_DEFAULT 1 SEC +#define CFG_MAX_BACKLOG_DEFAULT 0 /* unlimited */ + -#ifdef DEBUG_SQLITE3 +#define SQLITE3_BUSY_TIMEOUT 300 + +/* number of colums we have (really should be configurable) */ +#define DB_NUM_COLS 10 + +#if 0 #define DEBUGP(x, args...) fprintf(stderr, x, ## args) #else #define DEBUGP(x, args...) #endif -struct _field { +/* map DB column to ulogd key */ +struct col { char name[ULOGD_MAX_KEYLEN]; - unsigned int id; - struct _field *next; + struct ulogd_key *key; }; -/* the database handle we are using */ -static sqlite3 *dbh; +struct row { + TAILQ_ENTRY(row) link; + uint32_t ip_saddr; + uint32_t ip_daddr; + unsigned char ip_proto; + unsigned l4_dport; + unsigned raw_in_pktlen; + unsigned raw_in_pktcount; + unsigned raw_out_pktlen; + unsigned raw_out_pktcount; + unsigned flow_start_sec; + unsigned flow_duration; +}; -/* a linked list of the fields the table has */ -static struct _field *fields; +TAILQ_HEAD(row_lh, row); -/* buffer for our insert statement */ -static char *stmt; +#define TAILQ_FOR_EACH(pos, head, link) \ + for (pos = (head).tqh_first; pos != NULL; pos = pos->link.tqe_next) -/* pointer to the final prepared statement */ -static sqlite3_stmt *p_stmt; +#define RKEY(key) ((key)->u.source) -/* number of statements to buffer before we commit */ -static int buffer_size; -/* number of statements currently in the buffer */ -static int buffer_ctr; +struct sqlite3_priv { + sqlite3 *dbh; /* database handle we are using */ + char *stmt; + sqlite3_stmt *p_stmt; + int buffer_size; -/* our configuration directives */ -static config_entry_t db_ce = { - .key = "db", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, -}; + struct ulogd_timer timer; + + struct col cols[DB_NUM_COLS]; -static config_entry_t table_ce = { - .next = &db_ce, - .key = "table", - .type = CONFIG_TYPE_STRING, - .options = CONFIG_OPT_MANDATORY, + /* our backlog buffer */ + struct row_lh rows; + int num_rows; + int max_rows; /* number of rows actually seen */ + int max_rows_allowed; + + unsigned overlimit_msg : 1; }; -static config_entry_t buffer_ce = { - .next = &table_ce, - .key = "buffer", - .type = CONFIG_TYPE_INT, - .options = CONFIG_OPT_MANDATORY, + +static int do_reinit; +static struct config_keyset sqlite3_kset = { + .num_ces = 5, + .ces = { + { + .key = "db", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "table", + .type = CONFIG_TYPE_STRING, + .options = CONFIG_OPT_MANDATORY, + }, + { + .key = "buffer", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_BUFFER_DEFAULT, + }, + { + .key = "timer", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_TIMER_DEFAULT, + }, + { + .key = "max-backlog", + .type = CONFIG_TYPE_INT, + .options = CONFIG_OPT_NONE, + .u.value = CFG_MAX_BACKLOG_DEFAULT, + }, + }, }; -/* our main output function, called by ulogd */ -static int _sqlite3_output(ulog_iret_t *result) +#define db_ce(pi) (pi)->config_kset->ces[0].u.string +#define table_ce(pi) (pi)->config_kset->ces[1].u.string +#define buffer_ce(pi) (pi)->config_kset->ces[2].u.value +#define timer_ce(pi) (pi)->config_kset->ces[3].u.value +#define max_backlog_ce(pi) (pi)->config_kset->ces[4].u.value + + +#define SQL_CREATE_STR \ + "create table daily(ip_saddr integer, ip_daddr integer, " \ + "ip_protocol integer, l4_dport integer, raw_in_pktlen integer, " \ + "raw_in_pktcount integer, raw_out_pktlen integer, " \ + "raw_out_pktcount integer, flow_start_sec integer, " \ + "flow_duration integer)" + + +static struct row * +row_new(void) { - struct _field *f; - ulog_iret_t *res; - int col_counter; -#ifdef IP_AS_STRING - char *ipaddr; - struct in_addr addr; -#endif + struct row *row; + + if ((row = calloc(1, sizeof(struct row))) == NULL) + ulogd_error("%s: out of memory\n", __func__); + + return row; +} - col_counter = 1; - for (f = fields; f; f = f->next) { - res = keyh_getres(f->id); - - if (!res) { - ulogd_log(ULOGD_NOTICE, - "no result for %s ?!?\n", f->name); - } - - if (!res || !IS_VALID((*res))) { - /* no result, pass a null */ - sqlite3_bind_null(p_stmt, col_counter); - col_counter++; - continue; - } - - switch (res->type) { - case ULOGD_RET_INT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.i8); - break; - case ULOGD_RET_INT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.i16); - break; - case ULOGD_RET_INT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.i32); - break; - case ULOGD_RET_INT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.i64); - break; - case ULOGD_RET_UINT8: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui8); - break; - case ULOGD_RET_UINT16: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui16); - break; - case ULOGD_RET_IPADDR: -#ifdef IP_AS_STRING - memset(&addr, 0, sizeof(addr)); - addr.s_addr = ntohl(res->value.ui32); - ipaddr = inet_ntoa(addr); - sqlite3_bind_text(p_stmt,col_counter,ipaddr,strlen(ipaddr),SQLITE_STATIC); - break; -#endif /* IP_AS_STRING */ - /* EVIL: fallthrough when logging IP as u_int32_t */ - case ULOGD_RET_UINT32: - sqlite3_bind_int(p_stmt,col_counter,res->value.ui32); - break; - case ULOGD_RET_UINT64: - sqlite3_bind_int64(p_stmt,col_counter,res->value.ui64); - break; - case ULOGD_RET_BOOL: - sqlite3_bind_int(p_stmt,col_counter,res->value.b); - break; - case ULOGD_RET_STRING: - sqlite3_bind_text(p_stmt,col_counter,res->value.ptr,strlen(res->value.ptr),SQLITE_STATIC); - break; - default: - ulogd_log(ULOGD_NOTICE, - "unknown type %d for %s\n", - res->type, res->key); - break; - } - - col_counter++; - } - - /* now we have created our statement, insert it */ - - if (sqlite3_step(p_stmt) == SQLITE_DONE) { - sqlite3_reset(p_stmt); - buffer_ctr++; - } else { - ulogd_log(ULOGD_ERROR, "sql error during insert: %s\n", - sqlite3_errmsg(dbh)); - return 1; - } - /* commit all of the inserts to the database, ie flush buffer */ - if (buffer_ctr >= buffer_size) { - if (sqlite3_exec(dbh,"commit",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit records to db."); +static void +row_del(struct sqlite3_priv *priv, struct row *row) +{ + TAILQ_REMOVE(&priv->rows, row, link); + + free(row); + + priv->num_rows--; +} - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to begin a new transaction."); - buffer_ctr = 0; - DEBUGP("committing.\n"); +static void +row_add(struct sqlite3_priv *priv, struct row *row) +{ + if (priv->max_rows_allowed && priv->num_rows > priv->max_rows_allowed) { + if (!priv->overlimit_msg) { + ulogd_error(PFX "over max-backlog limit, dropping row\n"); + + priv->overlimit_msg = 1; + } + + return; } - return 0; + TAILQ_INSERT_TAIL(&priv->rows, row, link); + + priv->num_rows++; } + #define _SQLITE3_INSERTTEMPL "insert into X (Y) values (Z)" -/* create the static part of our insert statement */ -static int _sqlite3_createstmt(void) +/* create static part of our insert statement */ +static int +db_createstmt(struct ulogd_pluginstance *pi) { - struct _field *f; - unsigned int size; + struct sqlite3_priv *priv = (void *)pi->private; char buf[ULOGD_MAX_KEYLEN]; char *underscore; char *stmt_pos; - int col_count; int i; - if (stmt) { - ulogd_log(ULOGD_NOTICE, "createstmt called, but stmt" - " already existing\n"); - return 1; - } - - /* caclulate the size for the insert statement */ - size = strlen(_SQLITE3_INSERTTEMPL) + strlen(table_ce.u.string); - - DEBUGP("initial size: %u\n", size); + if (priv->stmt != NULL) + free(priv->stmt); - col_count = 0; - for (f = fields; f; f = f->next) { - /* we need space for the key and a comma, and a ? */ - size += strlen(f->name) + 3; - DEBUGP("size is now %u since adding %s\n",size,f->name); - col_count++; + if ((priv->stmt = calloc(1, 1024)) == NULL) { + ulogd_error(PFX "out of memory\n"); + return -1; } - DEBUGP("there were %d columns\n",col_count); - DEBUGP("after calc name length: %u\n",size); + sprintf(priv->stmt, "insert into %s (", table_ce(pi)); + stmt_pos = priv->stmt + strlen(priv->stmt); - ulogd_log(ULOGD_DEBUG, "allocating %u bytes for statement\n", size); + for (i = 0; i < DB_NUM_COLS; i++) { + struct col *col = &priv->cols[i]; - stmt = (char *) malloc(size); + /* convert name */ + strncpy(buf, col->name, ULOGD_MAX_KEYLEN); - if (!stmt) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; - } - - sprintf(stmt, "insert into %s (", table_ce.u.string); - stmt_pos = stmt + strlen(stmt); - - for (f = fields; f; f = f->next) { - strncpy(buf, f->name, ULOGD_MAX_KEYLEN); while ((underscore = strchr(buf, '.'))) *underscore = '_'; + sprintf(stmt_pos, "%s,", buf); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); } *(stmt_pos - 1) = ')'; sprintf(stmt_pos, " values ("); - stmt_pos = stmt + strlen(stmt); + stmt_pos = priv->stmt + strlen(priv->stmt); - for (i = 0; i < col_count - 1; i++) { + for (i = 0; i < DB_NUM_COLS - 1; i++) { sprintf(stmt_pos,"?,"); stmt_pos += 2; } sprintf(stmt_pos, "?)"); - ulogd_log(ULOGD_DEBUG, "stmt='%s'\n", stmt); + ulogd_log(ULOGD_DEBUG, "%s: stmt='%s'\n", pi->id, priv->stmt); DEBUGP("about to prepare statement.\n"); - sqlite3_prepare(dbh,stmt,-1,&p_stmt,0); + sqlite3_prepare(priv->dbh, priv->stmt, -1, &priv->p_stmt, 0); + if (priv->p_stmt == NULL) { + ulogd_error(PFX "prepare: %s\n", sqlite3_errmsg(priv->dbh)); + return 1; + } DEBUGP("statement prepared.\n"); - if (!p_stmt) { - ulogd_log(ULOGD_ERROR,"unable to prepare statement"); - return 1; + return 0; +} + + +static struct ulogd_key * +ulogd_find_key(struct ulogd_pluginstance *pi, const char *name) +{ + int i; + + for (i = 0; i < pi->input.num_keys; i++) { + if (strcmp(pi->input.keys[i].name, name) == 0) + return &pi->input.keys[i]; } - return 0; + return NULL; +} + +#define SELECT_ALL_STR "select * from " +#define SELECT_ALL_LEN sizeof(SELECT_ALL_STR) + +static int +db_count_cols(struct ulogd_pluginstance *pi, sqlite3_stmt **stmt) +{ + struct sqlite3_priv *priv = (void *)pi->private; + char query[SELECT_ALL_LEN + CONFIG_VAL_STRING_LEN] = SELECT_ALL_STR; + + strncat(query, table_ce(pi), LINE_LEN); + + if (sqlite3_prepare(priv->dbh, query, -1, stmt, 0) != SQLITE_OK) { + return -1; + } + + return sqlite3_column_count(*stmt); } -/* length of "select * from \0" */ -#define SQLITE_SELECT_LEN 15 +static int +db_create_tbl(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + char *errmsg; + int ret; + + sqlite3_exec(priv->dbh, "drop table daily", NULL, NULL, NULL); + + ret = sqlite3_exec(priv->dbh, SQL_CREATE_STR, NULL, NULL, &errmsg); + if (ret != SQLITE_OK) { + ulogd_error(PFX "create table: %s\n", errmsg); + sqlite3_free(errmsg); + + return -1; + } + + return 0; +} + -/* find out which columns the table has */ -static int _sqlite3_get_columns(const char *table) +/* initialize DB, possibly creating it */ +static int +db_init(struct ulogd_pluginstance *pi) { + struct sqlite3_priv *priv = (void *)pi->private; char buf[ULOGD_MAX_KEYLEN]; - char query[SQLITE_SELECT_LEN + CONFIG_VAL_STRING_LEN] = "select * from \0"; char *underscore; - struct _field *f; sqlite3_stmt *schema_stmt; - int column; - int result; - int id; + int num_cols, i; - if (!dbh) - return 1; + if (priv->dbh == NULL) + return -1; - strncat(query,table,LINE_LEN); - - result = sqlite3_prepare(dbh,query,-1,&schema_stmt,0); - - if (result != SQLITE_OK) - return 1; + num_cols = db_count_cols(pi, &schema_stmt); + if (num_cols != DB_NUM_COLS) { + ulogd_log(ULOGD_INFO, PFX "(re)creating database\n"); + + if (db_create_tbl(pi) < 0) + return -1; + + num_cols = db_count_cols(pi, &schema_stmt); + } + + assert(num_cols == DB_NUM_COLS); + + for (i = 0; i < DB_NUM_COLS; i++) { + struct col *col = &priv->cols[i]; + + strncpy(buf, sqlite3_column_name(schema_stmt, i), ULOGD_MAX_KEYLEN); - for (column = 0; column < sqlite3_column_count(schema_stmt); column++) { /* replace all underscores with dots */ - strncpy(buf, sqlite3_column_name(schema_stmt,column), ULOGD_MAX_KEYLEN); - while ((underscore = strchr(buf, '_'))) + while ((underscore = strchr(buf, '_')) != NULL) *underscore = '.'; - DEBUGP("field '%s' found: ", buf); + DEBUGP("column '%s' found\n", buf); - if (!(id = keyh_getid(buf))) { - DEBUGP(" no keyid!\n"); - continue; + strncpy(col->name, buf, ULOGD_MAX_KEYLEN); + + if ((col->key = ulogd_find_key(pi, buf)) == NULL) { + printf(PFX "%s: key not found\n", buf); + return -1; } + } - DEBUGP("keyid %u\n", id); + ulogd_log(ULOGD_INFO, PFX "database successfully opened\n"); - /* prepend it to the linked list */ - f = (struct _field *) malloc(sizeof *f); - if (!f) { - ulogd_log(ULOGD_ERROR, "OOM!\n"); - return 1; - } - strncpy(f->name, buf, ULOGD_MAX_KEYLEN); - f->id = id; - f->next = fields; - fields = f; + if (sqlite3_finalize(schema_stmt) != SQLITE_OK) { + ulogd_error(PFX "sqlite_finalize: %s\n", + sqlite3_errmsg(priv->dbh)); + return -1; } - sqlite3_finalize(schema_stmt); return 0; } -/** - * make connection and select database - * returns 0 if database failed to open. - */ -static int _sqlite3_open_db(char *db_file) + +static void +db_reset(struct ulogd_pluginstance *pi) { - DEBUGP("opening database.\n"); - return sqlite3_open(db_file,&dbh); + struct sqlite3_priv *priv = (void *)pi->private; + + sqlite3_finalize(priv->p_stmt); + + sqlite3_close(priv->dbh); + priv->dbh = NULL; + + free(priv->stmt); + priv->stmt = NULL; } -/* give us an opportunity to close the database down properly */ -static void _sqlite3_fini(void) + +static int +db_start(struct ulogd_pluginstance *pi) { - DEBUGP("cleaning up db connection\n"); + struct sqlite3_priv *priv = (void *)pi->private; - /* free up our prepared statements so we can close the db */ - if (p_stmt) { - sqlite3_finalize(p_stmt); - DEBUGP("prepared statement finalized\n"); + ulogd_log(ULOGD_DEBUG, PFX "opening database connection\n"); + + if (sqlite3_open(db_ce(pi), &priv->dbh) != SQLITE_OK) { + ulogd_error(PFX "%s\n", sqlite3_errmsg(priv->dbh)); + return -1; + } + + /* set the timeout so that we don't automatically fail + if the table is busy */ + sqlite3_busy_timeout(priv->dbh, SQLITE3_BUSY_TIMEOUT); + + /* read the fieldnames to know which values to insert */ + if (db_init(pi) < 0) + return -1; + + /* initialize our buffer size and counter */ + priv->buffer_size = buffer_ce(pi); + + priv->max_rows_allowed = max_backlog_ce(pi); + + /* create and prepare the actual insert statement */ + db_createstmt(pi); + + return 0; +} + + +static int +db_add_row(struct ulogd_pluginstance *pi, const struct row *row) +{ + struct sqlite3_priv *priv = (void *)pi->private; + int db_col = 1, ret = 0, db_ret; + + db_ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->ip_saddr); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->ip_daddr); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->ip_proto); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->l4_dport); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->raw_in_pktlen); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->raw_in_pktcount); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->raw_out_pktlen); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int64(priv->p_stmt, db_col++, row->raw_out_pktcount); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->flow_start_sec); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_bind_int(priv->p_stmt, db_col++, row->flow_duration); + if (db_ret != SQLITE_OK) + goto err_bind; + + db_ret = sqlite3_step(priv->p_stmt); + + if (db_ret == SQLITE_DONE) { + /* the SQLITE book doesn't say that expclicitely _but_ between + two sqlite_bind_*() calls to the same variable you need to + call sqlite3_reset(). */ + sqlite3_reset(priv->p_stmt); + + return 0; + } + + /* Ok, this is a bit confusing: some errors are reported as return + values, most others are reported through sqlite3_errcode() instead. + I think the only authorative source of information is the sqlite + source code. */ + switch (sqlite3_errcode(priv->dbh)) { + case SQLITE_LOCKED: + case SQLITE_BUSY: + break; + + case SQLITE_SCHEMA: + if (priv->stmt) { + sqlite3_finalize(priv->p_stmt); + + db_createstmt(pi); + } + return -1; + + case SQLITE_ERROR: /* e.g. constraint violation */ + case SQLITE_MISUSE: + ulogd_error(PFX "step: %s\n", sqlite3_errmsg(priv->dbh)); + ret = -1; + break; + + default: + break; } - if (dbh) { - int result; - /* flush the remaining insert statements to the database. */ - result = sqlite3_exec(dbh,"commit",NULL,NULL,NULL); + sqlite3_reset(priv->p_stmt); + + return ret; - if (result != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"unable to commit remaining records to db."); + err_bind: + ulogd_error(PFX "bind: %s\n", sqlite3_errmsg(priv->dbh)); - sqlite3_close(dbh); - DEBUGP("database file closed\n"); + sqlite3_reset(priv->p_stmt); + + return -1; +} + + +static int +delete_all_rows(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + while (priv->rows.tqh_first != NULL) { + struct row *row = priv->rows.tqh_first; + + row_del(priv, row); } + + return 0; } -#define _SQLITE3_BUSY_TIMEOUT 300 -static int _sqlite3_init(void) +static int +db_commit_rows(struct ulogd_pluginstance *pi) { - /* have the opts parsed */ - config_parse_file("SQLITE3", &buffer_ce); + struct sqlite3_priv *priv = (void *)pi->private; + struct row *row; + int ret, rows = 0; + + ret = sqlite3_exec(priv->dbh, "begin immediate transaction", NULL, + NULL, NULL); + if (ret != SQLITE_OK) { + if (ret == SQLITE_BUSY) + goto err_rollback; - if (_sqlite3_open_db(db_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "can't open the database file\n"); - return 1; + if (sqlite3_errcode(priv->dbh) == SQLITE_LOCKED) + return 0; /* perform commit later */ + + ulogd_error(PFX "begin transaction: %s\n", sqlite3_errmsg(priv->dbh)); + + return -1; } - /* set the timeout so that we don't automatically fail - * if the table is busy. */ - sqlite3_busy_timeout(dbh, _SQLITE3_BUSY_TIMEOUT); + TAILQ_FOR_EACH(row, priv->rows, link) { + if (db_add_row(pi, row) < 0) + goto err_rollback; - /* read the fieldnames to know which values to insert */ - if (_sqlite3_get_columns(table_ce.u.string)) { - ulogd_log(ULOGD_ERROR, "unable to get sqlite columns\n"); - return 1; + rows++; } - /* initialize our buffer size and counter */ - buffer_size = buffer_ce.u.value; - buffer_ctr = 0; + ret = sqlite3_exec(priv->dbh, "commit", NULL, NULL, NULL); + if (ret == SQLITE_OK) { + sqlite3_reset(priv->p_stmt); - DEBUGP("Have a buffer size of : %d\n", buffer_size); + if (priv->num_rows > priv->buffer_size) + ulogd_log(ULOGD_INFO, PFX "commited backlog buffer (%d rows)\n", + priv->num_rows); - if (sqlite3_exec(dbh,"begin deferred",NULL,NULL,NULL) != SQLITE_OK) - ulogd_log(ULOGD_ERROR,"can't create a new transaction\n"); + delete_all_rows(pi); - /* create and prepare the actual insert statement */ - _sqlite3_createstmt(); + if (priv->overlimit_msg) + priv->overlimit_msg = 0; + + return 0; + } + + err_rollback: + if (sqlite3_errcode(priv->dbh) == SQLITE_LOCKED) + return 0; + + sqlite3_exec(priv->dbh, "rollback", NULL, NULL, NULL); + + return -1; +} + + +/* our main output function, called by ulogd */ +static int +sqlite3_interp(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + struct col *cols = priv->cols; + struct row *row; + + if (do_reinit) { + db_reset(pi); + + if (db_start(pi) < 0) + return ULOGD_IRET_STOP; + + do_reinit = 0; + } + + if ((row = row_new()) == NULL) + return ULOGD_IRET_ERR; + + row->ip_saddr = RKEY(cols[0].key)->u.value.ui32; + row->ip_daddr = RKEY(cols[1].key)->u.value.ui32; + row->ip_proto = RKEY(cols[2].key)->u.value.ui8; + row->l4_dport = RKEY(cols[3].key)->u.value.ui16; + row->raw_in_pktlen = RKEY(cols[4].key)->u.value.ui32; + row->raw_in_pktcount = RKEY(cols[5].key)->u.value.ui32; + row->raw_out_pktlen = RKEY(cols[6].key)->u.value.ui32; + row->raw_out_pktcount = RKEY(cols[7].key)->u.value.ui32; + row->flow_start_sec = RKEY(cols[9].key)->u.value.ui32; + row->flow_duration = RKEY(cols[10].key)->u.value.ui32; + + row_add(priv, row); + + if (priv->num_rows >= priv->buffer_size) + db_commit_rows(pi); + + return ULOGD_IRET_OK; +} + + +static void +timer_cb(struct ulogd_timer *t) +{ + struct ulogd_pluginstance *pi = t->data; + struct sqlite3_priv *priv = (void *)pi->private; + + priv->max_rows = max(priv->max_rows, priv->num_rows); + + if (priv->num_rows > 0) + db_commit_rows(pi); +} + + +static int +sqlite3_configure(struct ulogd_pluginstance *pi, + struct ulogd_pluginstance_stack *stack) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + config_parse_file(pi->id, pi->config_kset); + + if (ulogd_wildcard_inputkeys(pi) < 0) + return -1; + + if (db_ce(pi) == NULL) { + ulogd_error(PFX "configure: no database specified\n"); + return -1; + } + + if (table_ce(pi) == NULL) { + ulogd_error(PFX "configure: no table specified\n"); + return -1; + } + + if (timer_ce(pi) <= 0) { + ulogd_error(PFX "configure: invalid timer value\n"); + return -1; + } + + if (max_backlog_ce(pi)) { + if (max_backlog_ce(pi) <= buffer_ce(pi)) { + ulogd_error(PFX "configure: invalid max-backlog value\n"); + return -1; + } + } + + priv->max_rows_allowed = max_backlog_ce(pi); + + DEBUGP("%s: db='%s' table='%s' timer=%d max-backlog=%d\n", pi->id, + db_ce(pi), table_ce(pi), timer_ce(pi), max_backlog_ce(pi)); + + /* init timer */ + priv->timer.cb = timer_cb; + priv->timer.ival = timer_ce(pi); + priv->timer.flags = TIMER_F_PERIODIC; + priv->timer.data = pi; + + ulogd_register_timer(&priv->timer); return 0; } -static ulog_output_t _sqlite3_plugin = { - .name = "sqlite3", - .output = &_sqlite3_output, - .init = &_sqlite3_init, - .fini = &_sqlite3_fini, -}; -void _init(void) +static int +sqlite3_start(struct ulogd_pluginstance *pi) { - register_output(&_sqlite3_plugin); + struct sqlite3_priv *priv = (void *)pi->private; + + priv->num_rows = priv->max_rows = 0; + TAILQ_INIT(&priv->rows); + + return db_start(pi); } -#endif + +/* give us an opportunity to close the database down properly */ +static int +sqlite3_stop(struct ulogd_pluginstance *pi) +{ + struct sqlite3_priv *priv = (void *)pi->private; + + /* free up our prepared statements so we can close the db */ + if (priv->p_stmt) { + sqlite3_finalize(priv->p_stmt); + DEBUGP("prepared statement finalized\n"); + } + + if (priv->dbh == NULL) + return -1; + + sqlite3_close(priv->dbh); + + return 0; +} + + +static void +sqlite3_signal(struct ulogd_pluginstance *pi, int sig) +{ + switch (sig) { + case SIGUSR1: + do_reinit++; + break; + + default: + break; + } +} + + +static struct ulogd_plugin sqlite3_plugin = { + .name = "SQLITE3", + .input = { + .type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, + }, + .output = { + .type = ULOGD_DTYPE_SINK, + }, + .config_kset = &sqlite3_kset, + .priv_size = sizeof(struct sqlite3_priv), + .configure = sqlite3_configure, + .start = sqlite3_start, + .stop = sqlite3_stop, + .signal = sqlite3_signal, + .interp = sqlite3_interp, + .version = ULOGD_VERSION, +}; + +static void init(void) __attribute__((constructor)); + +static void +init(void) +{ + ulogd_register_plugin(&sqlite3_plugin); + + ulogd_log(ULOGD_INFO, "using Sqlite version %s\n", sqlite3_libversion()); +} -- - To unsubscribe from this list: send the line "unsubscribe netfilter-devel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html