2011/9/28, Merlin Moncure <mmoncure@xxxxxxxxx>: > > I disagree. unnest() and array_agg() (or, even better, array() > constructor syntax) are an absolute joy to work with and thinking in a > more functional way, which is usually the key to making things run > quickly. Also both functions are trivial to emulate in userland for > compatibility. Arrays of composites IIRC only go back to 8.3 so that > would be a true stopper for any solution in that vein. Ok, tastes are tastes: I hate to make two or three more levels of subqueries. Regarding arrays of composites, that would be perfectly solved if we use no composite at all! Instead of a field with an array of a composite of three instrinsics, use three fields, each of an intrinsic type. See your proposal: >> create type audit_field_t as (field text, old_value text, new_value text); Instad, in the audit table you may use: ..., field smallint[], before text[], after text[],... Note the smallint in field, that means I really want to keep the reference to the "field" table. That is for the two reasons I had mentioned earlier (to reduce space: 2 bytes of type "smallint" against variable size of type "text"; and to keep track of names been used too). You can also set up something like this if you like dimensions: ..., field smallint[], values text[][],... Implying that the first dimension is the "before" value and the second one is for the "after" value. Any of these prevents us from using composites and makes the box a little wider and simpler. Even further, I would like to keep the logging "on demand": ..., field smallint[], is_pk boolean[], { before text[], after text[] | values text[][] },... You know what are the braces and pipe for... So, at the end, we have the entire "audet" table inside the "audit" table, as a series of arrays. We got a real compact table with only enough data to fully log the changes which triggered the event. No less, no more. At this point we know querying this table will be much more slow and rotation will have to be done more frequently. If we dump>restore the table somewhere else we will still be able to split the table in the original two ones, and make indexes, cluster them, and query as desired. But this can get so complicated that maybe I should implement a function doing all this. In an event, we are getting less responsiveness because of this. But a couple of mins more may not be a problem for most cases. I'm just trying to summarize. As a rule of thumb, you may need to run a cron job every night or so to check if 'select count(*) from audit' is bigger than X then rotate the table (or maybe each X days/weeks/etc.). The smaller the X, the bigger responsiveness _in_ some cases: if we know an interval in time we will just have to dump>restore those logs. In other cases this would not be of much help: if you need to track a tupple to the very beggining of the times, you'll have a lot of work to do dumping>restoring (and so forth... remember to split the table, indexing...). Still, rotation seems to be a good practice, and you can include in the cron job the dump/restore part into another server and then delete the old table. That would save a lot of space in your production environment. > As for the rest of it, I'd be looking to try and come up with an all > sql implementation. Also you should give an honest comparison between > what you've come up with vs. this: > http://pgfoundry.org/projects/tablelog/. > > merlin > "All SQL implementation"? Didn't we agree that's not possible in pg<=8.4? then what do you mean by that? About project "tablelog", I didn't really try it, but read it's documentation and seems not appropiate at all for my case. First of all, it's propose seems to be to log everything in a table to be able to restore it later as of any time in the past. My propose is to log to run analysis. Also, it needs to create one table per logged table, consisting of the same structure of the logged table (without constraints) plus three, four or five columns for control (depending on usage, four or five recommended). I have a lot of tables to log (hundreds!) with small changes to each of them; that means to duplicate the amount of tables for a few changes. Speaking of compactness... It also logs everything, not only changed values. It is written in C, so I assume it runs much, much faster (specially needed for highly transactional DBs). But it's not proven to be binary safe (which I don't remember what that is). Bugs: nothing known. So, if you need to be able to restore your table as of any time, use tablelog. If you need to run analysis on who did what, use my option. Finally attaching the code! Cheers. -- Diego Augusto Molina diegoaugustomolina@xxxxxxxxx ES: Por favor, evite adjuntar documentos de Microsoft Office. Serán desestimados. EN: Please, avoid attaching Microsoft Office documents. They shall be discarded. LINK: http://www.gnu.org/philosophy/no-word-attachments.html
/* Created by Diego Augusto Molina in 2011 for Tucuman Government, Argentina. */ /* Needs: You must execute the following accordingly to your needs: CREATE TRUSTED PROCEDURAL LANGUAGE 'plpgsql'; CREATE TRUSTED PROCEDURAL LANGUAGE 'plperl'; Description: The "audit" trigger is the trigger which should be called by any table we want to audit. If it receives arguments, they will be interpreted as the primary key of the table. If any of the arguments is not a column of the table, or no arguments is received at all, a probing process is taken which ends up determining the pk of the table. Thus, it is better to create the trigger calling this function with no arguments. See the TODO list. Usage of the "audit" trigger: CREATE TRIGGER <tg_name> AFTER {INSERT | UPDATE | DELETE} [ OR ...] ON <table_name> FOR EACH ROW EXECUTE PROCEDURE audit.audit( { <column_list> | <nothing_at_all> } ); Known issues: 1) You don't want to use this trigger on a table which has a column of some type that doesn't have a cast to string. That would cause a runtime error killing your transaction! See TODO #2. Be easy, most 'common' types have a cast to string by default. TODO: 1) In phase 3 of the "audit" trigger ('P3'), instead of asking '( scalar keys %pk == 0 )' each time, put an 'else'. 1') If the pk was not passed as argument, at the end of the probing execute an 'alter trigger' so that next time there's no probing at all. This would be unfriendly with modifications in the table definition, which should carry an update in the trigger (putting no arguments at all would imply probing again for the next time). 2) Manage the logging of columns of types which don't have a cast to string. Option 1 is to prevent the logging of such columns at all. Option 2 is more complex: Search for a way to save the binary contents of the columns instead of the formatted content. The table 'field' would have an extra column of type 'type' and that would help describing the field audited. That would solve the problem with strange fields (assuming that _any_ value can be converted to it's binary/internal representation). This may carry some extra complexity, maybe needing extra tables holding information about types. 3) Make this function receive only two parameters: two arrays of type name[], the first holding the set of columns which are the primary key, the second one is the set of columns which in addition to the pk one's are to be registered. Note that pk columns will always be registered because that will identify the tuple modified. 4) Support for TRUNCATE event. */ CREATE ROLE auditor NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE NOLOGIN; CREATE ROLE audit NOSUPERUSER NOINHERIT NOCREATEDB NOCREATEROLE LOGIN ENCRYPTED PASSWORD 'test.1234'; CREATE SCHEMA audit AUTHORIZATION audit; ALTER ROLE auditor SET search_path=audit; ALTER ROLE audit SET search_path=audit; SET search_path to audit; SET SESSION AUTHORIZATION audit; CREATE SEQUENCE seq_audit INCREMENT 1 START 0 CACHE 1 CYCLE MINVALUE -9223372036854775808 MAXVALUE 9223372036854775807; CREATE SEQUENCE seq_elems INCREMENT 1 START 0 CACHE 1 CYCLE MINVALUE -32768 MAXVALUE 32767; CREATE TABLE field ( id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass), value name NOT NULL, CONSTRAINT field_pk PRIMARY KEY (id) WITH (FILLFACTOR=100), CONSTRAINT field_uq_value UNIQUE (value) WITH (FILLFACTOR=100) ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE field TO auditor; CREATE TABLE client_inet ( id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass), value inet NOT NULL DEFAULT inet_client_addr(), CONSTRAINT dir_inet_pk PRIMARY KEY (id) WITH (FILLFACTOR=100), CONSTRAINT dir_inet_uq_value UNIQUE (value) WITH (FILLFACTOR=95) ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE client_inet TO auditor; CREATE TABLE schema ( id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass), value name NOT NULL, CONSTRAINT schema_pk PRIMARY KEY (id) WITH (FILLFACTOR=100), CONSTRAINT schema_uq_value UNIQUE (value) WITH (FILLFACTOR=100) ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE schema TO auditor; CREATE TABLE table ( id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass), value name NOT NULL, CONSTRAINT table_pk PRIMARY KEY (id) WITH (FILLFACTOR=100), CONSTRAINT table_uq_value UNIQUE (value) WITH (FILLFACTOR=100) ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE table TO auditor; CREATE TABLE user ( id smallint NOT NULL DEFAULT nextval('seq_elems'::regclass), value name NOT NULL DEFAULT "current_user"(), CONSTRAINT user_pk PRIMARY KEY (id) WITH (FILLFACTOR=100), CONSTRAINT user_uq_value UNIQUE (value) WITH (FILLFACTOR=95) ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE user TO auditor; CREATE TABLE audit ( id bigint, type character(1), tstmp timestamp with time zone DEFAULT now(), schema smallint, table smallint, user smallint, client_inet smallint, client_port integer DEFAULT inet_client_port(), pid integer DEFAULT pg_backend_pid() ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE audit TO auditor; CREATE TABLE audet ( id bigint, field smallint, is_pk boolean, before text, after text ) WITH (OIDS=FALSE); GRANT SELECT ON TABLE audet TO auditor; CREATE OR REPLACE FUNCTION tgf_ins_audet() RETURNS trigger AS $BODY$ begin execute E'insert into audet_' || tg_argv[0] || E' ( id, field, is_pk, before, after ) values ( '||coalesce(new.id::text,'NULL')||E', '||coalesce(new.field::text,'NULL')||E', '||coalesce(new.is_pk::text,'NULL')||E', '||coalesce(quote_literal(new.before),'NULL')||E', '||coalesce(quote_literal(new.after),'NULL')||E' )'; return null; end;$BODY$ LANGUAGE plpgsql VOLATILE; ALTER FUNCTION tgf_ins_audet() SET search_path=auditoria; CREATE OR REPLACE FUNCTION tgf_ins_audit() RETURNS trigger AS $BODY$ begin execute E'insert into audit_' || tg_argv[0] || E' ( id, type, tstmp, schema, table, user, client_inet, client_port, pid ) values ( '||coalesce(new.id::text,'NULL')||E', '||coalesce(quote_literal(new.type),'NULL')||E', '||coalesce(quote_literal(new.tstmp),'NULL')||E', '||coalesce(new.schema::text,'NULL')||E', '||coalesce(new.table::text,'NULL')||E', '||coalesce(new.user::text,'NULL')||E', '||coalesce(new.client_inet::text,'NULL')||E', '||coalesce(new.client_port::text,'NULL')||E', '||coalesce(new.pid::text,'NULL')||E' )'; return null; end;$BODY$ LANGUAGE plpgsql VOLATILE; ALTER FUNCTION tgf_ins_audit() SET search_path=auditoria; CREATE OR REPLACE FUNCTION rotate(character) RETURNS void AS $BODY$ declare first_execution boolean := false; cur_start char(8) := null; cur_tstmp_min timestamp with time zone; cur_tstmp_max timestamp with time zone; cur_id_min bigint; cur_id_max bigint; new_start char(8); begin /* Determine the creation tstmp of the tables ========================================================================= */ select substring(max(c.relname::text) from $1 || E'_(........)') into cur_start from pg_namespace n inner join pg_class c on (n.oid = c.relnamespace) where n.nspname = 'audit'::name and c.relname::text like $1 || '_%'; if cur_start is null then first_execution := true; cur_start := ''; end if; new_start := cast(to_char(current_timestamp,'YYYYMMDD') as name); case $1 when 'audit' then /* if I'm rotating the table audit ================================================================== */ /* current table */ if not first_execution then execute 'select min(tstmp), max(tstmp) from audit_' || cur_start into cur_tstmp_min, cur_tstmp_max; execute $$ alter index idx_audit_$$|| cur_start ||$$_id set (fillfactor = 100); alter index idx_audit_$$|| cur_start ||$$_tstmp set (fillfactor = 100); alter index idx_audit_$$|| cur_start ||$$_schema__table set (fillfactor = 100); alter index idx_audit_$$|| cur_start ||$$_user set (fillfactor = 100); cluster audit_$$|| cur_start ||$$; analyze audit_$$|| cur_start ||$$; alter table audit_$$|| cur_start ||$$ add constraint audit_$$|| cur_start ||$$_ck_exclusion check ( tstmp >= '$$|| cur_tstmp_min ||$$' and tstmp <= '$$|| cur_tstmp_max ||$$' ) $$; end if; execute $$ /* new table */ create table audit_$$|| new_start ||$$ () inherits (audit); create index idx_audit_$$|| new_start ||$$_id on audit_$$|| new_start ||$$ using btree (id) with (fillfactor = 99); create index idx_audit_$$|| new_start ||$$_tstmp on audit_$$|| new_start ||$$ using btree (tstmp) with (fillfactor = 99); create index idx_audit_$$|| new_start ||$$_schema__tabla on audit_$$|| new_start ||$$ using btree (schema, table) with (fillfactor = 95); create index idx_audit_$$|| new_start ||$$_user on audit_$$|| new_start ||$$ using btree (usuario) with (fillfactor = 95); cluster audit_$$|| new_start ||$$ using idx_audit_$$|| new_start ||$$_tstmp; /* Parent table */ drop trigger if exists tg_audit_$$|| cur_start ||$$ on audit; create trigger tg_audit_$$|| new_start ||$$ before insert on audit for each row execute procedure tgf_ins_audit('$$|| new_start ||$$'); $$; when 'audet' then /* if I'm rotating the table audet ================================================================== */ /* current table */ if not first_execution then execute 'select min(id), max(id) from audet_' || cur_start into cur_id_min, cur_id_max; execute $$ alter index idx_audet_$$|| cur_start ||$$_id set (fillfactor = 100); alter index idx_audet_$$|| cur_start ||$$_fieldpk set (fillfactor = 100); cluster audet_$$|| cur_start ||$$; analyze audet_$$|| cur_start ||$$; alter table audet_$$|| cur_start ||$$ add constraint audet_$$|| cur_start ||$$_ck_exclusion check ( id >= '$$|| cur_id_min ||$$' and id <= '$$|| cur_id_max ||$$' ); /* Parent table */ drop trigger tg_audet_$$|| cur_start ||$$ on audet; $$; end if; execute $$ /* new table */ create table audet_$$|| new_start ||$$ () inherits (audet); create index idx_audet_$$|| new_start ||$$_id on audet_$$|| new_start ||$$ using btree (id) with (fillfactor = 99); create index idx_audet_$$|| new_start ||$$_fieldpk on audet_$$|| new_start ||$$ using btree (field) with (fillfactor = 99) where es_pk; cluster audet_$$|| new_start ||$$ using idx_audet_$$|| new_start ||$$_id; /* Parent table */ create trigger tg_audet_$$|| new_start ||$$ before insert on audet for each row execute procedure tgf_ins_audet('$$|| new_start ||$$'); $$; else /* if I got a wrong argument ===================================================================================== */ raise notice E'Error: expected \'audit\' o \'audet\'. Got \'%\'.', $1; return; end case; end;$BODY$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; ALTER FUNCTION rotate(character) SET search_path=auditoria; CREATE OR REPLACE FUNCTION audit() RETURNS trigger AS $BODY$ #/* # P0. Declarations and general definitions ################################################### ############################################################################################## my $elog_pref = "(schm:$_TD->{table_schema};tab:$_TD->{table_name};trg:$_TD->{name};evt:$_TD->{event}):"; my $rv = ""; # Query execution my $val = ""; # Iterating value my %tables = ( # Value of the respective tables inserted in "audit" "user" => 'pg_catalog."session_user"()', "table" => "'$_TD->{table_name}'", "schema" => "'$_TD->{table_schema}'", "client_inet" => "pg_catalog.inet_client_addr()" ); my $id = ""; # Id of the tuple inserted in "audit" my $field = ""; # Field id my $is_pk = 0; # Determines if a field is part of the PK my $before = ""; # Value of a field in OLD my $after = ""; # Value of a field in NEW my %cols = (); # Columns of the table my %pk = (); # Primary key # Copy columns from some available transitional variable ------------------------------------- if (exists $_TD->{new}){ %cols = %{$_TD->{new}}; } else { %cols = %{$_TD->{old}}; } # P1. Create necessary tuples in user, table, schema and client_inet ######################### ############################################################################################## foreach $val (keys %tables){ $rv = spi_exec_query("select id from $val where value = $tables{$val}"); if ( $rv->{status} != SPI_OK_SELECT ){ elog(ERROR, "$elog_pref Error querying table '$val'."); } if ( $rv->{processed} == 1 ){ $tables{$val} = $rv->{rows}[0]->{id}; } else { $rv = spi_exec_query("insert into $val (value) values ($tables{$val}) returning id"); if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){ elog(ERROR, "$elog_pref Error inserting in table '$val'."); } $tables{$val} = $rv->{rows}[0]->{id}; } } # P2. Insert in audit ######################################################################## ############################################################################################## $rv = spi_exec_query("select nextval('seq_audit'::regclass) as id"); if ( $rv->{status} != SPI_OK_SELECT ){ elog(ERROR, "$elog_pref Error querying next value of sequence 'seq_audit'."); } $id = $rv->{rows}[0]->{id}; $rv = spi_exec_query("insert into audit (id, type, schema, table, user, client_inet) values ( $id, substring('$_TD->{event}', 1, 1), $tables{'schema'}, $tables{'table'}, $tables{'user'}, $tables{'client_inet'} ) "); if ($rv->{status} != SPI_OK_INSERT){ elog(ERROR,"$elog_pref Error inserting tuple in table 'audit'."); } # P3. Determine PK of the table ############################################################## ############################################################################################## if ( scalar keys %pk == 0){ # Criterion 1: if got params, each of them is a column of the table, and all of them make # up the pk of the table ------------------------------------------------------------------- elog(DEBUG, "$elog_pref Searching pk in the trigger's params."); if ($_TD->{argc} > 0){ ARGS: foreach $val ( @{$_TD->{args}} ){ if (exists $cols{$val}){ $pk{$val} = "-"; } else { %pk = (); elog(DEBUG, "$elog_pref The column '$val' given as argument does not exist. Skipping to next criterion."); last ARGS; } } } } if ( scalar keys %pk == 0 ) { # Criterion 2: search the pk in the system catalogs --------------------------------------- elog(DEBUG, "$elog_pref Searching pk in system catalogs."); $rv = spi_exec_query(" select a.attname from ( select cl.oid, unnest(c.conkey) as att from pg_catalog.pg_constraint c inner join pg_catalog.pg_class cl on (c.conrelid = cl.oid) where c.contype = 'p' and cl.oid = $_TD->{relid} ) as c inner join pg_catalog.pg_attribute a on (c.att = a.attnum and c.oid = a.attrelid) "); if ( $rv->{status} == SPI_OK_SELECT ){ if ( $rv->{processed} > 0 ){ foreach $val ($rv->{rows}){ $pk{$val->{attname}} = "-"; } } } else { elog(DEBUG, "$elog_pref Error querying the system catalogs. Skipping to next criterion."); } } if ( scalar keys %pk == 0) { # Criterion 3: if the table has OIDs, use that as pk and emit a warning ------------------- elog(DEBUG, "$elog_pref Searching OIDs in the table."); $rv = spi_exec_query("select * from pg_catalog.pg_class where oid = $_TD->{relid} and relhasoids = true"); if( $rv->{status} == SPI_OK_SELECT ){ if ( $rv->{processed} > 0 ){ %pk = ("oid","-"); elog(DEBUG, "$elog_pref Using OIDs as table pk for '$_TD->{table_name}' because no previous criterion could find one."); } } else { elog(DEBUG, "$elog_pref Error querying the system catalogs. Skipping to next criterion."); } } if ( scalar keys %pk == 0){ # Default criterion: all tuples ----------------------------------------------------------- elog(DEBUG, "$elog_pref Could not find a suitable pk. Logging every column."); %pk = %cols; } # P4. Insert in audet ######################################################################## ############################################################################################## foreach $val (keys %cols){ $is_pk = 0 + exists($pk{$val}); if ( $_TD->{event} ne "UPDATE" || $is_pk || $_TD->{new}{$val} ne $_TD->{old}{$val} ){ $before = (exists $_TD->{old}) ? "'".$_TD->{old}{$val}."'" : "NULL"; $after = (exists $_TD->{new}) ? "'".$_TD->{new}{$val}."'" : "NULL"; if ( $_TD->{event} eq "UPDATE" && $_TD->{new}{$val} eq $_TD->{old}{$val}){ # We don't save the previous state of the column which is part of the pk while updating # if it hasn't changed. $before = "NULL"; } $rv = spi_exec_query("select id from field where value = '$val'"); if ( $rv->{status} != SPI_OK_SELECT ){ elog(ERROR, "$elog_pref Error querying table 'field'."); } if ( $rv->{processed} > 0 ){ $field = $rv->{rows}[0]->{id}; } else { $rv = spi_exec_query("insert into field (value) values ('$val') returning id"); if ( $rv->{status} != SPI_OK_INSERT_RETURNING ){ elog(ERROR, "$elog_pref Error executing insert returning in table 'field'."); } $field = $rv->{rows}[0]->{id}; } $rv = spi_exec_query("insert into audet (id, field, is_pk, before, after) values ($id, $field, cast($is_pk as boolean), cast($before as text), cast($after as text))"); if ( $rv->{status} ne SPI_OK_INSERT ){ elog(ERROR, "$elog_pref Error inserting tuples in table 'audet'."); } } } # P5. Finishing ############################################################################## ############################################################################################## return; #*/$BODY$ LANGUAGE plperl VOLATILE SECURITY DEFINER; ALTER FUNCTION audit() SET search_path=auditoria; -- Let's create the first tables SELECT rotate('audit'); SELECT rotate('audet');
-- Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-general