Search Postgresql Archives

Re: Using the database to validate data

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



A little late to the party, but i'll share how I do my data imports / validation for anyone interested.

I have a bunch of data that comes in from various sources, and it isn't always guaranteed to be in the correct format, have the right foreign keys, or even the right data types.

I have a staging table that is in the format of the feed I have coming in, with all columns text and no constraints at all on the data columns. Example:
CREATE TABLE import_sale
(
  client_id uuid NOT NULL,
  row_id uuid NOT NULL DEFAULT gen_random_uuid(),
  row_date timestamp with time zone NOT NULL DEFAULT now(),
  file_id uuid NOT NULL,
  sale_number character varying,
  company_number character varying,
  invoice_number character varying,
  invoice_date character varying,
  order_date character varying,
  ship_date character varying,
  sale_date character varying,
  product_number character varying,
  quantity character varying,
  quantity_uom character varying,
  price character varying,
  reduction character varying,
  direct_indicator character varying,
  redistributor_company_number character varying,
  freight numeric,
  processed_ind boolean DEFAULT false,
  CONSTRAINT import_sales_client_id_fkey FOREIGN KEY (client_id)
      REFERENCES client (client_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT import_sales_file_id_fkey FOREIGN KEY (file_id)
      REFERENCES import_file (file_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT import_sales_row_id_unique UNIQUE (row_id)
);

I use a talend package, or COPY to get the data into this table. However you want to do that is up to you.

I have a final table that I want all this data to eventually get to once there are no issues with it.  Example:
CREATE TABLE sale
(
  sale_id uuid NOT NULL DEFAULT gen_random_uuid(),
  client_id uuid NOT NULL,
  source_row_id uuid NOT NULL,
  sale_number character varying NOT NULL,
  company_id uuid NOT NULL,
  invoice_number character varying NOT NULL,
  invoice_date date,
  order_date date,
  ship_date date,
  sale_date date NOT NULL,
  product_id uuid NOT NULL,
  quantity numeric NOT NULL,
  uom_type_id uuid NOT NULL,
  price numeric NOT NULL,
  reduction numeric NOT NULL,
  redistributor_company_id uuid,
  freight numeric,
  active_range tstzrange DEFAULT tstzrange(now(), NULL::timestamp with time zone),
  CONSTRAINT sale_pkey PRIMARY KEY (sale_id),
  CONSTRAINT sale_client_id_fkey FOREIGN KEY (client_id)
      REFERENCES client (client_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_company_id_fkey FOREIGN KEY (company_id)
      REFERENCES company (company_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_product_id_fkey FOREIGN KEY (product_id)
      REFERENCES product (product_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_redistributor_company_id_fkey FOREIGN KEY (redistributor_company_id)
      REFERENCES company (company_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_source_row_id_fkey FOREIGN KEY (source_row_id)
      REFERENCES import_sale (row_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_uom_type_id_fkey FOREIGN KEY (uom_type_id)
      REFERENCES uom_type (uom_type_id) MATCH SIMPLE
      ON UPDATE NO ACTION ON DELETE NO ACTION,
  CONSTRAINT sale_sale_number_active_range_excl EXCLUDE 
  USING gist (sale_number WITH =, (client_id::character varying) WITH =, active_range WITH &&),
  CONSTRAINT sale_unique UNIQUE (sale_number, client_id, active_range)
);

I then have couple functions which run over the data and do the validations / insert / update where necessary.

This one validates that the data is able to map to all the foreign keys, the data types can be converted properly, and that not null constraints are enforced.

CREATE OR REPLACE FUNCTION import_validate_sale()
  RETURNS void AS
$BODY$
/*
Remove any prior exceptions
*/
DELETE FROM import_sale_error
WHERE EXISTS (
SELECT 1
FROM import_sale
WHERE import_sale_error.row_id = import_sale.row_id);
/*
Null checks for required fields
*/
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'sale_number is null, but required.'
FROM import_sale s
WHERE s.sale_number IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'distributor company_number is null, but required.'
FROM import_sale s
WHERE s.company_number IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'invoice_number is null, but required.'
FROM import_sale s
WHERE s.invoice_number IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'sale_date is null, but required.'
FROM import_sale s
WHERE s.sale_date IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'product_number is null, but required.'
FROM import_sale s
WHERE s.product_number IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'quantity is null, but required.'
FROM import_sale s
WHERE s.quantity IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'quantity_uom is null, but required.'
FROM import_sale s
WHERE s.quantity_uom IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'price is null, but required.'
FROM import_sale s
WHERE s.price IS NULL;

/*
Check type conversions
*/
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'invoice_date cannot be converted to date.'
FROM import_sale s
WHERE can_convert_date(s.invoice_date) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'order_date cannot be converted to date.'
FROM import_sale s
WHERE can_convert_date(s.order_date) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'ship_date cannot be converted to date.'
FROM import_sale s
WHERE can_convert_date(s.ship_date) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'sale_date cannot be converted to date.'
FROM import_sale s
WHERE can_convert_date(s.sale_date) = FALSE;
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'quantity cannot be converted to numeric.'
FROM import_sale s
WHERE can_convert_numeric(s.quantity) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'price cannot be converted to numeric.'
FROM import_sale s
WHERE can_convert_numeric(s.price) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'reduction cannot be converted to numeric.'
FROM import_sale s
WHERE can_convert_numeric(s.reduction) = FALSE; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'freight cannot be converted to numeric.'
FROM import_sale s
WHERE can_convert_numeric(s.freight) = FALSE;
 
/*
Check item resolutions
*/
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'Distributor company record could not be mapped.'
FROM import_sale s
LEFT JOIN company c
ON s.company_number = c.company_number
WHERE c.company_id IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'Re-Distributor company record could not be mapped.'
FROM import_sale s
LEFT JOIN company c
ON s.redistributor_company_number = c.company_number
WHERE c.company_id IS NULL
AND s.redistributor_company_number IS NOT NULL;
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'Product record could not be mapped.'
FROM import_sale s
LEFT JOIN product p
ON s.product_number = p.product_number
WHERE p.product_id IS NULL;

INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'Unit Of Measure record could not be mapped.'
FROM import_sale s
LEFT JOIN uom_type u
ON u.uom_type_cd = s.quantity_uom
WHERE u.uom_type_id IS NULL; 
 
INSERT INTO import_sale_error(row_id, error_message)
SELECT s.row_id, 'Product does not have a conversion rate for this Unit Of Measure.'
FROM import_sale s
INNER JOIN product p
ON s.product_number = p.product_number
INNER JOIN uom_type u
ON u.uom_type_cd = s.quantity_uom
LEFT JOIN product_uom_conversion puc
ON p.product_id = puc.product_id
AND u.uom_type_id = puc.uom_type_id
WHERE puc.rate IS NULL;

$BODY$
  LANGUAGE sql VOLATILE
  COST 100;

The other function is then in charge of the insert into the final table:
CREATE OR REPLACE FUNCTION import_sale()
  RETURNS void AS
$BODY$
/**
Insert into the production sale table where no errors exist.
**/

INSERT INTO sale(
  client_id
, source_row_id
, sale_number
, company_id
, invoice_number
, invoice_date
, order_date
, ship_date
, sale_date
, product_id
, quantity
, uom_type_id
, price
, reduction
, redistributor_company_id
, freight)
SELECT 
  s.client_id 
, s.row_id
, s.sale_number
, cmpd.company_id
, s.invoice_number
, s.invoice_date::date
, s.order_date::date
, s.ship_date::date
, s.sale_date::date
, p.product_id
, s.quantity::numeric
, uom.uom_type_id
, s.price::numeric
, s.reduction::numeric
, cmpr.company_id
, s.freight
FROM import_sale s
INNER JOIN company cmpd
ON cmpd.company_number = s.company_number
LEFT JOIN company cmpr
ON cmpr.company_number = s.redistributor_company_number
INNER JOIN product p
ON s.product_number = p.product_number
INNER JOIN uom_type uom
ON uom.uom_type_cd = s.quantity_uom
WHERE NOT EXISTS (
SELECT 1
FROM import_sale_error se
WHERE se.row_id = s.row_id)
----new sale_number
AND (NOT EXISTS (
SELECT 1
FROM sale s2
WHERE s.row_id = s2.source_row_id
AND s.client_id = s2.client_id)
----existing sale_number with changes
OR EXISTS (
SELECT 1
FROM sale s2
WHERE s.processed_ind = false
AND s.sale_number = s2.sale_number
AND s.client_id = s2.client_id
AND (cmpd.company_id != s2.company_id
OR s.invoice_number != s2.invoice_number
OR s.invoice_date::date != s2.invoice_date
OR s.order_date::date != s2.order_date
OR s.ship_date::date != s2.ship_date
OR s.sale_date::date != s2.sale_date
OR p.product_id != s2.product_id
OR s.quantity::numeric != s2.quantity
OR uom.uom_type_id != s2.uom_type_id
OR  s.price::numeric != s2.price
OR s.reduction::numeric != s2.reduction
OR cmpr.company_id != s2.redistributor_company_id)
);


-------------------------------------------------------------------------
--Update processed_ind
UPDATE import_sale AS s
SET processed_ind = true
WHERE NOT EXISTS (
SELECT 1
FROM import_sale_error se
WHERE se.row_id = s.row_id);

$BODY$
  LANGUAGE sql VOLATILE
  COST 1000; 

So all of that together is a pretty solid system for importing data, and showing a user what went wrong with the data they sent if it is a bad row.

The other good part of this, is it's all set based. This process will run through 20,000 lines on a single core server in around 5-10 seconds.

At one of my old jobs, we had something that did the same type of validations / inserts, but did it row by row in a cursor (not written by me), and that took a good 5 min (if I remember correctly) to process 20,000 lines.  This was also on a server running Sql Server on a 32 core machine.

Anyways, good luck!
-Adam

On Fri, Jul 24, 2015 at 9:55 AM, JPLapham <lapham@xxxxxxxxx> wrote:
Zdeněk Bělehrádek wrote
> What about creating a SAVEPOINT before each INSERT, and if the INSERT
> returns
> an error, then ROLLBACK TO SAVEPOINT? This way you will have all the
> insertable data in your table, and you can still ROLLBACK the whole
> transaction, or COMMIT it if there were no errors.
>
> It will probably be quite slow, but if you have only thousands of lines,
> it
> should be fast enough for your usecase IMHO.
>
> -- Zdeněk Bělehrádek

Hmmm, interesting. Thanks, if that works, it would be exactly what I'm
looking for!

You are right, speed is not an issue.

-Jon



--
View this message in context: http://postgresql.nabble.com/Using-the-database-to-validate-data-tp5859046p5859239.html
Sent from the PostgreSQL - general mailing list archive at Nabble.com.


--
Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]
  Powered by Linux