Re: Daily Rotated Insertion

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, Mar 27, 2012 at 10:02 AM, Haifeng Liu <liuhaifeng@xxxxxxxx> wrote:
> Yes, that's what I am doing. There are many ways to implement partitioned table, I just want to find an elegant one. Bulk create a certain number of partitions is not good in my opinion. What I am trying now is let the trigger which switches insertions to catch exceptions and create new partitions on demand, meanwhile. update the trigger itself to use the new switch rule.
>
> This idea can use static statement to switch insertions, dynamic statements are only used to create partition and update the trigger, this should be good for performance. And compare to the static partition example given in the document, this solution can auto-update itself, there is no need for cron jobs to maintenance partition.

Okay, I got what you mean. I used to use auto partitioning in one of
my projects but it assumed errors catching on the application server
side. However you can easily adapt the solution for your needs. I
attached the script to the message.

BTW, I created it several years ago. A lot of nice stuff like CREATE
TABLE IF NOT EXISTS were added in the new versions of PostgreSQL. So I
think may be you could get rid of catching exceptions completely with
help of these new features.

Please let me know if you decide to use this approach and modify it.

>
> I am not sure if this idea work, still trying now.
>
>>
>>>
>>> Any idea is appreciated. Thanks in advance.
>>>
>>>
>>> Best regards.
>>> liuhaifeng
>>> --
>>> Sent via pgsql-admin mailing list (pgsql-admin@xxxxxxxxxxxxxx)
>>> To make changes to your subscription:
>>> http://www.postgresql.org/mailpref/pgsql-admin
>>
>>
>>
>> --
>> Sergey Konoplev
>>
>> Blog: http://gray-hemp.blogspot.com
>> LinkedIn: http://ru.linkedin.com/in/grayhemp
>> JID/GTalk: gray.ru@xxxxxxxxx Skype: gray-hemp
>>
>> --
>> Sent via pgsql-admin mailing list (pgsql-admin@xxxxxxxxxxxxxx)
>> To make changes to your subscription:
>> http://www.postgresql.org/mailpref/pgsql-admin
>>
>
>
> --
> Sent via pgsql-admin mailing list (pgsql-admin@xxxxxxxxxxxxxx)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-admin



-- 
Sergey Konoplev

Blog: http://gray-hemp.blogspot.com
LinkedIn: http://ru.linkedin.com/in/grayhemp
JID/GTalk: gray.ru@xxxxxxxxx Skype: gray-hemp
CREATE TABLE obj_base
(
 obj_id bigint NOT NULL,
 obj_created timestamp with time zone NOT NULL DEFAULT now(),
 obj_modified timestamp with time zone NOT NULL DEFAULT now(),
 obj_status_did smallint NOT NULL DEFAULT 1,

 CONSTRAINT pk_obj_base PRIMARY KEY (obj_id)
);


COMMENT ON TABLE obj_base IS 'Abstract base obj';


CREATE OR REPLACE FUNCTION t_obj_modified()
 RETURNS trigger AS
$BODY$
-- Trigger function to set modification field
BEGIN
   SELECT INTO NEW.obj_modified CURRENT_TIMESTAMP;
   RETURN NEW;
END;
$BODY$
 LANGUAGE 'plpgsql' VOLATILE
 COST 1;


COMMENT ON FUNCTION t_obj_modified() IS 'Trigger function to set
modification field';


CREATE SCHEMA activity;


CREATE TABLE activity.stream
(
 obj_id bigserial NOT NULL,

 as_generator_obj_id bigint NOT NULL,
 as_type_did bigint NOT NULL,
 as_obj_ids bigint[],
 as_data hstore,

 CONSTRAINT pk_activity_stream PRIMARY KEY (obj_id)
) INHERITS (obj_base);


COMMENT ON TABLE activity.stream IS 'Activity streams';


CREATE TRIGGER t_activity_stream__modified_bu
 BEFORE UPDATE
 ON activity.stream
 FOR EACH ROW
 EXECUTE PROCEDURE t_obj_modified();


CREATE INDEX i_activity_stream__generator_created ON activity.stream
USING btree (as_generator_obj_id, obj_created DESC);


CREATE OR REPLACE FUNCTION activity.t_stream__partition_redirect()
 RETURNS trigger AS
$BODY$
""" Trigger function to redirect inserts into appropriate partitions
for activity.stream table.

activity.stream tables partitioning is based on obj_created column.
New partition is created
when first row insert appears after midnight in UTC. All partition
constraints are also in UTC
so it is guaranty that every row will be inserted into correct
partition despite daylight time
changes. Partition name template is:

   activity.stream__pYYYYMMDD

For each partition it creates an index:

   i_activity_stream__pYYYYMMDD__generator_created

The functions body, query plans and other stuff are stored into
session data (SD) to get
better performance.

TODO: Should think about generic partitioning solution.
"""

# Declare session data namespace.
sdNamespace = 'activity.t_stream__partition_redirect'

# If the function is not in session data do all the preparations and
put it there.
if sdNamespace not in SD:

   import re
   from datetime import datetime, timedelta, tzinfo

   def pgTimestampToPyDatetime(timestamp):
       """ Parse ISO 8601 format timestamp to datetime.

       YYYY-MM-DD[ T]HH:MM[:SS[.ss]][-+HH[:MM]]

       Based on solution of Marko Kreen.
       """

       class FixedOffsetTimezone(tzinfo):
           """Fixed offset in minutes east from UTC."""

           def __init__(self, offset, name):
               self.__offset = timedelta(minutes = offset)
               self.__name = name

           def utcoffset(self, dt):
               return self.__offset

           def tzname(self, dt):
               return self.__name

           def dst(self, dt):
               return timedelta(0)

       # Prerare ISO8601 regexp to parse PG timestamp.
       regexpKey = '%s:iso8601Regexp' % sdNamespace
       if regexpKey not in SD:
           iso8601Regexp = SD[regexpKey] = re.compile(r"""
               \s*
               (?P<year> \d\d\d\d) [-] (?P<month> \d\d) [-] (?P<day> \d\d) [ T]
               (?P<hour> \d\d) [:] (?P<min> \d\d)
               (?: [:] (?P<sec> \d\d ) (?: [.,] (?P<ss> \d+))? )?
               (?: \s* (?P<tzsign> [-+]) (?P<tzhr> \d\d) (?: [:]?(?P<tzmin> \d\d))? )?
               \s* $
               """, re.X)
       else:
           iso8601Regexp = SD[regexpKey]

       # Try to parse, raise error on failure.
       matches = iso8601Regexp.match(timestamp)
       if not matches:
           plpy.error('Timestamp not in ISO8601 format: %s' % repr(timestamp))

       # Process timezone if it is declared.
       timezone = None
       if matches.group('tzsign'):
           tzOffsetMinutes = int(matches.group('tzhr')) * 60

           if matches.group('tzmin'):
               tzOffsetMinutes += int(matches.group('tzmin'))

           if matches.group('tzsign') == '-':
               tzOffsetMinutes = -tzOffset

           tzName = '%s%s:%s' % (
               matches.group('tzsign'), matches.group('tzhr'),
               matches.group('tzmin') or '00')

           timezone = FixedOffsetTimezone(tzOffsetMinutes, tzName)

       # Create datetime object and return it.
       return datetime(
           int(matches.group('year')),
           int(matches.group('month')),
           int(matches.group('day')),
           int(matches.group('hour')),
           int(matches.group('min')),
           matches.group('sec') and int(matches.group('sec')) or 0,
           matches.group('ss') and int(matches.group('ss').ljust(6, '0')) or 0,
           timezone)

   def requireCorrectExecutionContext():
       """ Check if it is BEFORE INSERT trigger, raise error if not. """

       if TD['event'] != 'INSERT' or TD['when'] != 'BEFORE':
           plpy.error('This procedure is designed to be called *only* from a BEFORE INSERT trigger.')

   def createPartitionIfDoesNotExists((partitionName, pgSameDay, pgNextDay)):
       """ Check if nesessary partition has already been created, create if not. """

       # Check partition existence and return if true.
       planKey = '%s:partitionExistsPlan_%s' % (sdNamespace, partitionName)
       if planKey not in SD:
           partitionExistsPlan = SD[planKey] = plpy.prepare(
               "SELECT 1 FROM pg_stat_user_tables WHERE schemaname = 'activity' AND relname = $1",
               ('text',))
       else:
           partitionExistsPlan = SD[planKey]
       if len(plpy.execute(partitionExistsPlan, ('stream__p%s' % partitionName,))):
           return

       # Create partition...
       plpy.execute("""
           CREATE TABLE activity.stream__p%(partition)s (
               CONSTRAINT pk_activity_stream__p%(partition)s PRIMARY KEY (obj_id),
               CHECK (obj_created >= '%(same)s' AND obj_created < '%(next)s')
           ) INHERITS (activity.stream);
           """ % {'partition': partitionName, 'same': pgSameDay, 'next': pgNextDay})

       # ...and its index.
       plpy.execute("""
           CREATE INDEX i_activity_stream__p%(partition)s__generator_created
           ON activity.stream__p%(partition)s
           USING btree (as_generator_obj_id, obj_created DESC);
           """ % {'partition': partitionName})

   def insertIntoPartition((partitionName, pgSameDay, pgNextDay), newRow):
       """ Do insert into specified partition. """

       planKey = '%s:insertIntoPartitionPlan:%s' % (sdNamespace, partitionName)
       if planKey not in SD:
           insertIntoPartitionPlan = SD[planKey] = plpy.prepare("""
               INSERT INTO activity.stream__p%s (
                   obj_id, obj_created, obj_modified, obj_status_did,
                   as_generator_obj_id, as_type_did, as_obj_ids, as_data
               ) VALUES (
                   $1, $2::timestamptz, $3::timestamptz, $4,
                   $5, $6, $7::bigint[], $8::hstore);
               """ % partitionName, (
                   'int8', 'text', 'text', 'int2',
                   'int8', 'int8', 'text', 'text'))
       else:
           insertIntoPartitionPlan = SD[planKey]

       plpy.execute(
           insertIntoPartitionPlan,
           [newRow[k] for k in ('obj_id', 'obj_created', 'obj_modified', 'obj_status_did',
                                'as_generator_obj_id', 'as_type_did', 'as_obj_ids', 'as_data')])

   def getPartitionMetadata(row):
       """ Define partition identifier and borders using obj_created column. """

       pyDatetime = pgTimestampToPyDatetime(row['obj_created'])

       utcSameYmdTuple = pyDatetime.utctimetuple()[:3]
       utcNextYmdTuple = (pyDatetime + timedelta(1)).utctimetuple()[:3]

       return (
           '%04d%02d%02d' % utcSameYmdTuple,
           '%04d-%02d-%02d UTC' % utcSameYmdTuple,
           '%04d-%02d-%02d UTC' % utcNextYmdTuple)

   def main(newRow):
       """ The main function. It is assumed to be cached in session data. """

       requireCorrectExecutionContext()

       partitionMetadata = getPartitionMetadata(newRow)

       createPartitionIfDoesNotExists(partitionMetadata)
       insertIntoPartition(partitionMetadata, newRow)

   # Cache body in SD
   SD[sdNamespace] = main

try:
   SD[sdNamespace](TD['new'])
except Exception, e:
   import traceback
   plpy.info(traceback.format_exc())

return 'SKIP'

$BODY$
 LANGUAGE 'plpythonu' VOLATILE
 COST 1;


CREATE TRIGGER t_activity_stream__partition_redirect__bi
 BEFORE INSERT
 ON activity.stream
 FOR EACH ROW
 EXECUTE PROCEDURE activity.t_stream__partition_redirect();


DROP TABLE activity.stream__p20091125;

INSERT INTO activity.stream (obj_created, as_generator_obj_id, as_type_did)
VALUES ('2009-11-25 12:07:25.123456+03', 1234567890, 9876543210);

SELECT * FROM activity.stream__p20091125;
-- 
Sent via pgsql-admin mailing list (pgsql-admin@xxxxxxxxxxxxxx)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-admin

[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux