My requirements was:
+ Made a table charge to be partitioned by carrier and month
+ summarize by charges
+ summarize by users,
+ each summarization must be by month and several others columns.
Here is the database
CREATE TABLE charges.charge
(
id serial NOT NULL,
transaction_id uuid NOT NULL,
carrier_id integer NOT NULL,
msisdn character varying(60) NOT NULL,
partner_id integer NOT NULL,
product_id integer NOT NULL,
parent_id integer,
retry_count integer NOT NULL,
created_at timestamp with time zone NOT NULL,
CONSTRAINT charge_pkey PRIMARY KEY (id)
);
CREATE TABLE charges.charge_summarized
(
id serial NOT NULL,
created_at timestamp with time zone NOT NULL,
carrier_id integer NOT NULL,
partner_id integer NOT NULL,
product_id integer NOT NULL,
retry_count integer NOT NULL,
amount integer NOT NULL,
CONSTRAINT charge_summarized_pkey PRIMARY KEY (id),
CONSTRAINT client_charge_client_id_key UNIQUE (carrier_id, partner_id, product_id, retry_count)
);
CREATE TABLE charges.client
(
id serial NOT NULL,
carrier_id integer NOT NULL,
msisdn character varying(60) NOT NULL,
collectibility numeric(5,2) NOT NULL,
CONSTRAINT client_pkey PRIMARY KEY (id),
);
CREATE TABLE charges.client_charge
(
id serial NOT NULL,
client_id integer NOT NULL,
date date NOT NULL,
amount integer NOT NULL,
CONSTRAINT client_charge_pkey PRIMARY KEY (id),
CONSTRAINT client_charge_client_id_fkey FOREIGN KEY (client_id)
REFERENCES charges.client (id) MATCH SIMPLE
ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,
CONSTRAINT client_charge_client_id_key UNIQUE (client_id, date),
);
(
id serial NOT NULL,
transaction_id uuid NOT NULL,
carrier_id integer NOT NULL,
msisdn character varying(60) NOT NULL,
partner_id integer NOT NULL,
product_id integer NOT NULL,
parent_id integer,
retry_count integer NOT NULL,
created_at timestamp with time zone NOT NULL,
CONSTRAINT charge_pkey PRIMARY KEY (id)
);
CREATE TABLE charges.charge_summarized
(
id serial NOT NULL,
created_at timestamp with time zone NOT NULL,
carrier_id integer NOT NULL,
partner_id integer NOT NULL,
product_id integer NOT NULL,
retry_count integer NOT NULL,
amount integer NOT NULL,
CONSTRAINT charge_summarized_pkey PRIMARY KEY (id),
CONSTRAINT client_charge_client_id_key UNIQUE (carrier_id, partner_id, product_id, retry_count)
);
CREATE TABLE charges.client
(
id serial NOT NULL,
carrier_id integer NOT NULL,
msisdn character varying(60) NOT NULL,
collectibility numeric(5,2) NOT NULL,
CONSTRAINT client_pkey PRIMARY KEY (id),
);
CREATE TABLE charges.client_charge
(
id serial NOT NULL,
client_id integer NOT NULL,
date date NOT NULL,
amount integer NOT NULL,
CONSTRAINT client_charge_pkey PRIMARY KEY (id),
CONSTRAINT client_charge_client_id_fkey FOREIGN KEY (client_id)
REFERENCES charges.client (id) MATCH SIMPLE
ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,
CONSTRAINT client_charge_client_id_key UNIQUE (client_id, date),
);
OK, now the functions I made to complete all this, Inserts are made only in charges.charge
create or replace function charges.insert_into_charges() returns trigger as $body1$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
from datetime import datetime, timedelta
if event == 'INSERT':
carrier = args['carrier_id']
created_at = datetime.strptime(args['created_at'].split(" ")[0], "%Y-%m-%d")
month, year = created_at.month, created_at.year
next_month, next_year = (month + 1, year) if month < 12 else (1, year+1)
target_table_name = "charge_%s_%04d%02d" % (carrier, year, month)
while True:
exist_table = len(plpy.execute("select relname from pg_stat_user_tables where relname = '%s';" % target_table_name))
if not exist_table:
sql = """create table charges.%(target_table_name)s
(CONSTRAINT charge_%(carrier_id)s_carrier_id_check
CHECK (
carrier_id = '%(carrier_id)s' AND
created_at >= '%(from_year)s-%(from_month)s-01' AND
created_at < '%(to_year)s-%(to_month)s-01'
)
)
INHERITS (charges.charge)
WITH (
OIDS=FALSE
);
create trigger summarize_%(target_table_name)s
AFTER insert
on charges.%(target_table_name)s
for each row
execute procedure charges.summarize();
create trigger client_charge_sum_%(target_table_name)s
AFTER insert
on charges.%(target_table_name)s
for each row
execute procedure charges.client_charge_sum();
""" % {"carrier_id": carrier, "from_year": year, "from_month": month,
"to_year": next_year, "to_month": next_month, "target_table_name": target_table_name}
try:
# multithreading could have a race condition here. Better to ask for forgiveness than permission.
plpy.execute(sql)
except:
continue
break
keys, values = zip(*tuple([(x,y) for x,y in args.items() if y is not None]))
sql = "insert into charges.%(target_table_name)s " \
"(%(keys)s) VALUES (%(values)s);" % \
{"carrier_id": carrier,
"keys" : ",".join(keys),
"values" : ",".join(["'%s'" % x for x in values]),
"target_table_name": target_table_name
}
plpy.execute(sql)
return "SKIP"
$body1$
language plpythonu;
create or replace function charges.client_charge_sum() returns trigger as
$body3$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
return
while True:
# to populate clients if is needed
sql = "select id from charges.client where msisdn='%s';" % args["msisdn"]
clients = plpy.execute(sql)
if len(clients):
client_id = clients[0]['id']
else:
sql= """INSERT INTO charges.client ( carrier_id, msisdn,collectibility )
VALUES ( %s, '%s',0) RETURNING
CURRVAL('charges.client_id_seq') as id;""" % (args['carrier_id'], args['msisdn'])
try:
client_id = plpy.execute(sql)[0]['id']
except:
continue
break
group_by_data ={
"date" : args['created_at'].split(" ")[0],
"client_id" : str(client_id),
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
sql = "select 1 from charges.client_charge where %s;" % (filter_string,)
if len(plpy.execute(sql)):
sql = "update charges.client_charge set amount=amount + 1 where %s" % filter_string
plpy.info("update")
else:
field_names, field_datas = zip(*tuple(group_by_data.items()))
field_data_string = ", ".join(["'%s'" % x for x in field_datas])
sql = "insert into charges.client_charge (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
plpy.info("insert")
try:
plpy.execute(sql)
except:
continue
break
$body3$
language plpythonu;
create or replace function charges.summarize() returns trigger as
$body2$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
return
group_by_data ={
"carrier_id" : args['carrier_id'],
"charged_at" : args['created_at'].split(" ")[0],
"partner_id" : args['partner_id'],
"product_id" : args['product_id'],
"retry_count" : args['retry_count'],
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
sql = "select 1 from charges.charge_summarized where %s;" % (filter_string,)
if len(plpy.execute(sql)):
sql = "update charges.charge_summarized set amount=amount + 1 where %s" % filter_string
plpy.info("update")
else:
field_names, field_datas = zip(*tuple(group_by_data.items()))
field_data_string = ", ".join(["'%s'" % x for x in field_datas])
sql = "insert into charges.charge_summarized (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
plpy.info("insert")
try:
plpy.execute(sql)
except:
continue
break
$body2$
language plpythonu;
args = TD['new']
event = TD['event']
table_name = TD['table_name']
from datetime import datetime, timedelta
if event == 'INSERT':
carrier = args['carrier_id']
created_at = datetime.strptime(args['created_at'].split(" ")[0], "%Y-%m-%d")
month, year = created_at.month, created_at.year
next_month, next_year = (month + 1, year) if month < 12 else (1, year+1)
target_table_name = "charge_%s_%04d%02d" % (carrier, year, month)
while True:
exist_table = len(plpy.execute("select relname from pg_stat_user_tables where relname = '%s';" % target_table_name))
if not exist_table:
sql = """create table charges.%(target_table_name)s
(CONSTRAINT charge_%(carrier_id)s_carrier_id_check
CHECK (
carrier_id = '%(carrier_id)s' AND
created_at >= '%(from_year)s-%(from_month)s-01' AND
created_at < '%(to_year)s-%(to_month)s-01'
)
)
INHERITS (charges.charge)
WITH (
OIDS=FALSE
);
create trigger summarize_%(target_table_name)s
AFTER insert
on charges.%(target_table_name)s
for each row
execute procedure charges.summarize();
create trigger client_charge_sum_%(target_table_name)s
AFTER insert
on charges.%(target_table_name)s
for each row
execute procedure charges.client_charge_sum();
""" % {"carrier_id": carrier, "from_year": year, "from_month": month,
"to_year": next_year, "to_month": next_month, "target_table_name": target_table_name}
try:
# multithreading could have a race condition here. Better to ask for forgiveness than permission.
plpy.execute(sql)
except:
continue
break
keys, values = zip(*tuple([(x,y) for x,y in args.items() if y is not None]))
sql = "insert into charges.%(target_table_name)s " \
"(%(keys)s) VALUES (%(values)s);" % \
{"carrier_id": carrier,
"keys" : ",".join(keys),
"values" : ",".join(["'%s'" % x for x in values]),
"target_table_name": target_table_name
}
plpy.execute(sql)
return "SKIP"
$body1$
language plpythonu;
create or replace function charges.client_charge_sum() returns trigger as
$body3$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
return
while True:
# to populate clients if is needed
sql = "select id from charges.client where msisdn='%s';" % args["msisdn"]
clients = plpy.execute(sql)
if len(clients):
client_id = clients[0]['id']
else:
sql= """INSERT INTO charges.client ( carrier_id, msisdn,collectibility )
VALUES ( %s, '%s',0) RETURNING
CURRVAL('charges.client_id_seq') as id;""" % (args['carrier_id'], args['msisdn'])
try:
client_id = plpy.execute(sql)[0]['id']
except:
continue
break
group_by_data ={
"date" : args['created_at'].split(" ")[0],
"client_id" : str(client_id),
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
sql = "select 1 from charges.client_charge where %s;" % (filter_string,)
if len(plpy.execute(sql)):
sql = "update charges.client_charge set amount=amount + 1 where %s" % filter_string
plpy.info("update")
else:
field_names, field_datas = zip(*tuple(group_by_data.items()))
field_data_string = ", ".join(["'%s'" % x for x in field_datas])
sql = "insert into charges.client_charge (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
plpy.info("insert")
try:
plpy.execute(sql)
except:
continue
break
$body3$
language plpythonu;
create or replace function charges.summarize() returns trigger as
$body2$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
return
group_by_data ={
"carrier_id" : args['carrier_id'],
"charged_at" : args['created_at'].split(" ")[0],
"partner_id" : args['partner_id'],
"product_id" : args['product_id'],
"retry_count" : args['retry_count'],
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
sql = "select 1 from charges.charge_summarized where %s;" % (filter_string,)
if len(plpy.execute(sql)):
sql = "update charges.charge_summarized set amount=amount + 1 where %s" % filter_string
plpy.info("update")
else:
field_names, field_datas = zip(*tuple(group_by_data.items()))
field_data_string = ", ".join(["'%s'" % x for x in field_datas])
sql = "insert into charges.charge_summarized (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
plpy.info("insert")
try:
plpy.execute(sql)
except:
continue
break
$body2$
language plpythonu;
And finally the trigger:
CREATE TRIGGER inserta_tg
BEFORE INSERT
ON charges.charge
FOR EACH ROW
EXECUTE PROCEDURE charges.insert_into_charges();
Doesn't sound like too much? As I say, im new and I didn't found any better. But an insert takes around 135ms in the worst case (create tables and insert rows) and about 85 ms in best case (only updates). There are something better?BEFORE INSERT
ON charges.charge
FOR EACH ROW
EXECUTE PROCEDURE charges.insert_into_charges();
Thanks in advance, Sabrina