Search Postgresql Archives

Triggers made with plpythonu performance issue

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi everybody,
My requirements was:
 + Made a table charge to be partitioned by carrier and month
 + summarize by charges
 + summarize by users,
 + each summarization must be by month and several others columns.

Here is the database

CREATE TABLE charges.charge
(
  id serial NOT NULL,
  transaction_id uuid NOT NULL,
  carrier_id integer NOT NULL,
  msisdn character varying(60) NOT NULL,
  partner_id integer NOT NULL,
  product_id integer NOT NULL,
  parent_id integer,
  retry_count integer NOT NULL,
  created_at timestamp with time zone NOT NULL,
  CONSTRAINT charge_pkey PRIMARY KEY (id)
);

CREATE TABLE charges.charge_summarized
(
  id serial NOT NULL,
  created_at timestamp with time zone NOT NULL,
  carrier_id integer NOT NULL,
  partner_id integer NOT NULL,
  product_id integer NOT NULL,
  retry_count integer NOT NULL,
  amount integer NOT NULL,
  CONSTRAINT charge_summarized_pkey PRIMARY KEY (id),
CONSTRAINT client_charge_client_id_key UNIQUE (carrier_id, partner_id, product_id, retry_count)
);

CREATE TABLE charges.client
(
  id serial NOT NULL,
  carrier_id integer NOT NULL,
  msisdn character varying(60) NOT NULL,
  collectibility numeric(5,2) NOT NULL,
  CONSTRAINT client_pkey PRIMARY KEY (id),
);


CREATE TABLE charges.client_charge
(
  id serial NOT NULL,
  client_id integer NOT NULL,
  date date NOT NULL,
  amount integer NOT NULL,
  CONSTRAINT client_charge_pkey PRIMARY KEY (id),
  CONSTRAINT client_charge_client_id_fkey FOREIGN KEY (client_id)
      REFERENCES charges.client (id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED,
  CONSTRAINT client_charge_client_id_key UNIQUE (client_id, date),
);

OK, now the functions I made to complete all this, Inserts are made only in charges.charge

create or replace function charges.insert_into_charges() returns trigger as $body1$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
from datetime import datetime, timedelta
if event == 'INSERT':
  carrier = args['carrier_id']
  created_at = datetime.strptime(args['created_at'].split(" ")[0], "%Y-%m-%d") 
  month, year = created_at.month, created_at.year
  next_month, next_year = (month + 1, year) if month < 12 else (1, year+1)
  target_table_name = "charge_%s_%04d%02d" % (carrier, year, month)
  while True:
    exist_table = len(plpy.execute("select relname from pg_stat_user_tables where relname = '%s';" % target_table_name))
    if not exist_table:
      sql = """create table charges.%(target_table_name)s
        (CONSTRAINT charge_%(carrier_id)s_carrier_id_check
        CHECK (
          carrier_id = '%(carrier_id)s' AND
          created_at >= '%(from_year)s-%(from_month)s-01' AND
          created_at < '%(to_year)s-%(to_month)s-01'
          )
        )
        INHERITS (charges.charge)
        WITH (
        OIDS=FALSE
        );
       
    create trigger summarize_%(target_table_name)s
    AFTER insert
    on charges.%(target_table_name)s
    for each row
    execute procedure charges.summarize();
   
    create trigger client_charge_sum_%(target_table_name)s
    AFTER insert
    on charges.%(target_table_name)s
    for each row
    execute procedure charges.client_charge_sum();
        """ % {"carrier_id": carrier, "from_year": year, "from_month": month,
        "to_year": next_year, "to_month": next_month, "target_table_name": target_table_name}
      try:
        # multithreading could have a race condition here. Better to ask for forgiveness than permission.
        plpy.execute(sql)
      except:
        continue
    break
  keys, values = zip(*tuple([(x,y) for x,y in args.items() if y is not None]))
  sql = "insert into charges.%(target_table_name)s " \
    "(%(keys)s) VALUES (%(values)s);" % \
    {"carrier_id": carrier,
    "keys"       : ",".join(keys),
    "values"     : ",".join(["'%s'" % x for x in values]),
    "target_table_name": target_table_name
    }
 
  plpy.execute(sql)
return "SKIP"
$body1$
language plpythonu;

create or replace function charges.client_charge_sum() returns trigger as
$body3$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
  return
while True:
  # to populate clients if is needed
  sql = "select id from charges.client where msisdn='%s';" % args["msisdn"]
  clients = plpy.execute(sql)
  if len(clients):
    client_id = clients[0]['id']
  else:
    sql= """INSERT INTO charges.client ( carrier_id, msisdn,collectibility )
    VALUES ( %s, '%s',0) RETURNING
    CURRVAL('charges.client_id_seq') as id;""" % (args['carrier_id'], args['msisdn'])
  try:
    client_id = plpy.execute(sql)[0]['id']
  except:
    continue
  break
group_by_data ={
"date"    : args['created_at'].split(" ")[0],
"client_id"    : str(client_id),
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
  sql = "select 1 from charges.client_charge where %s;" % (filter_string,)
  if len(plpy.execute(sql)):
    sql = "update charges.client_charge set amount=amount + 1 where %s" % filter_string
    plpy.info("update")
  else:
    field_names, field_datas = zip(*tuple(group_by_data.items()))
    field_data_string = ", ".join(["'%s'" % x for x in field_datas])
    sql = "insert into charges.client_charge (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
    plpy.info("insert")
  try:
    plpy.execute(sql)
  except:
    continue
  break
$body3$
language plpythonu;

create or replace function charges.summarize() returns trigger as
$body2$
args = TD['new']
event = TD['event']
table_name = TD['table_name']
if event != 'INSERT':
  return

group_by_data ={
"carrier_id" : args['carrier_id'],
"charged_at"    : args['created_at'].split(" ")[0],
"partner_id"    : args['partner_id'],
"product_id"    : args['product_id'],
"retry_count"   : args['retry_count'],
}
filter_string = " and ".join(["%s='%s'" % (key, value) for key, value in group_by_data.items()])
while True:
  sql = "select 1 from charges.charge_summarized where %s;" % (filter_string,)
  if len(plpy.execute(sql)):
    sql = "update charges.charge_summarized set amount=amount + 1 where %s" % filter_string
    plpy.info("update")
  else:
    field_names, field_datas = zip(*tuple(group_by_data.items()))
    field_data_string = ", ".join(["'%s'" % x for x in field_datas])
    sql = "insert into charges.charge_summarized (amount, %s) values (1, %s)" % (", ".join(field_names), field_data_string)
    plpy.info("insert")
  try:
    plpy.execute(sql)
  except:
    continue
  break
$body2$
language plpythonu;


And finally the trigger:


CREATE TRIGGER inserta_tg
  BEFORE INSERT
  ON charges.charge
  FOR EACH ROW
  EXECUTE PROCEDURE charges.insert_into_charges();

Doesn't sound like too much? As I say, im new and I didn't found any better. But an insert takes around 135ms in the worst case (create tables and insert rows) and about 85 ms in best case (only updates). There are something better?

Thanks in advance, Sabrina



[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]
  Powered by Linux