Re: Duplicate deletion optimizations

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




It's a fairly tricky problem. I have a number of sensors producing
energy data about every 5 minutes, but at random times between 1 and
15 minutes. I can't change that as that's the way the hardware of the
sensors works. These feed into another unit, which accumulates them
and forwards them in batches over the Internet to my PostgreSQL
database server every few minutes (again at random times outside my
control and with random batch sizes). To make things worse, if the
Internet connection between the unit and the database server fails, it
will send the latest data first to provide a quick update to the
current values and then send the backlog of stored values. Thus, data
do not always arrive in correct time order.

I'm stuck home with flu, so I'm happy to help ;)

I'll build an example setup to make it clearer...

-- A list of all sensors
create table sensors( sensor_id integer primary key );
insert into sensors select generate_series(1,100);

-- A table to contain raw sensor data
create table log(
  sensor_id integer not null references sensors(sensor_id),
  time integer not null,
  value float not null
);

-- Fill it up with test data
insert into log
select sensor_id, time, time from (
  select distinct sensor_id,
    (n+random()*10)::INTEGER as time
  from generate_series(0,50000,5) n
   cross join sensors
) d;

-- index it
alter table log add primary key( time, sensor_id );
create index log_sensor_time on log( sensor_id, time );

select * from log where sensor_id=1 order by time;
 sensor_id | time  | value
-----------+-------+-------
         1 |    12 |    12
         1 |    14 |    14
         1 |    21 |    21
         1 |    29 |    29
         1 |    30 |    30
(....)
         1 | 49996 | 49996
         1 | 50001 | 50001

-- create a table which will contain the time ticks
-- which will be used as x-axis for interpolation
-- (in this example, one tick every 10 time units)

create table ticks( time integer primary key,
   check( time%10 = 0 ) );
insert into ticks select
  generate_series( 0, (select max(time) from log), 10 );

-- create interpolated values table
create table interp(
  sensor_id integer not null references sensors( sensor_id ),
  time      integer not null references ticks( time ),
  value     float,
  distance  integer not null
);

-- fill interpolated values table
-- (pretty slow)

insert into interp
select
    sensor_id,
    t.time,
start_value + (end_value-start_value)*(t.time-start_time)/(end_time-start_time),
    greatest( t.time - start_time, end_time-t.time )
  from
    (select
      sensor_id,
      lag(time) over (partition by sensor_id order by time) as start_time,
      time as end_time,
lag(value) over (partition by sensor_id order by time) as start_value,
      value as end_value
    from log
    ) as l
  join ticks t on (t.time >= start_time and t.time < end_time);

-- alternate query if you don't like the ticks table (same sesult) :
insert into interp
select
    sensor_id,
    time,
start_value + (end_value-start_value)*(time-start_time)/(end_time-start_time),
    greatest( time - start_time, end_time-time )
  from
  (select
    *,
generate_series( ((start_time+9)/10)*10, ((end_time-1)/10)*10, 10 ) AS time
    from
      (select
        sensor_id,
lag(time) over (partition by sensor_id order by time) as start_time,
        time as end_time,
lag(value) over (partition by sensor_id order by time) as start_value,
        value as end_value
      from log
      ) as l
    ) l;

alter table interp add primary key( time,sensor_id );
create index interp_sensor_time on interp( sensor_id, time );

For each interval in the log table that contains a time tick, this query generates the interpolated data at that tick.

Note that the "distance" field represents the distance (in time) between the interpolated value and the farthest real data point that was used to calculate it. Therefore, it can be used as a measure of the quality of the interpolated point ; if the distance is greater than some threshold, the value might not be that precise.

Now, suppose we receive a bunch of data. The data isn't ordered according to time.
There are two possibilities :

- the new data starts right where we left off (ie, just after the last time for each sensor in table log) - the new data starts later in time, and we want to process the results right away, expecting to receive, at some later point, older data to fill the holes

The second one is hairier, lets' do that.

Anyway, let's create a packet :

-- A table to contain raw sensor data
create temporary table packet(
  sensor_id integer not null,
  time integer not null,
  value float not null
);

-- Fill it up with test data
insert into packet
select sensor_id, time, time from (
  select distinct sensor_id,
    (n+random()*10)::INTEGER as time
  from generate_series(50200,50400) n
   cross join sensors
) d;

Note that I deliberately inserted a hole : the log table contains times 0-50000 and the packet contains times 50200-50400.

We'll need to decide if we want the hole to appear in the "interp" table or not. Let's say we don't want it to appear, we'll just interpolate over the hole. he "distance" column will be there so we don't forget this data is some sort of guess. If we receive data to fill that hole later, we can always use it.

For each sensor in the packet, we need to grab some entries from table "log", at least the most recent one, to be able to do some interpolation with the first (oldest) value in the packet. To be more general, in case we receive old data that will plug a hole, we'll also grab the oldest log entry that is more recent than the most recent one in the packet for this sensor (hum... i have to re-read that...)

Anyway, first let's create the missing ticks :

INSERT INTO ticks
  SELECT generate_series(
    (SELECT max(time) FROM ticks)+10,
    (SELECT max(time) FROM packet),
    10);

And ...

CREATE TEMPORARY TABLE new_interp(
  sensor_id INTEGER NOT NULL,
  time INTEGER NOT NULL,
  value FLOAT NOT NULL,
  distance INTEGER NOT NULL
);

-- time range in the packet for each sensor
WITH ranges AS (
SELECT sensor_id, min(time) AS packet_start_time, max(time) AS packet_end_time
  FROM packet
  GROUP BY sensor_id
),
-- time ranges for records already in table log that will be needed to interpolate packet records
log_boundaries AS (
  SELECT
    sensor_id,
    COALESCE(
(SELECT max(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND l.time < r.packet_start_time),
      r.packet_start_time
    ) AS packet_start_time,
    COALESCE(
(SELECT min(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND l.time > r.packet_end_time),
      r.packet_end_time
    ) AS packet_end_time
  FROM ranges r
),
-- merge existing and new data
extended_packet AS (
  SELECT log.* FROM log JOIN log_boundaries USING (sensor_id)
    WHERE log.time BETWEEN packet_start_time AND packet_end_time
  UNION ALL
  SELECT * FROM packet
),
-- zip current and next records
pre_interp AS (
  SELECT
        sensor_id,
lag(time) OVER (PARTITION BY sensor_id ORDER BY time) AS start_time,
        time AS end_time,
lag(value) over (PARTITION BY sensor_id ORDER BY time) AS start_value,
        value AS end_value
      FROM extended_packet
),
-- add tick info
pre_interp2 AS (
SELECT *, generate_series( ((start_time+9)/10)*10, ((end_time-1)/10)*10, 10 ) AS time
  FROM pre_interp
)
-- interpolate
INSERT INTO new_interp SELECT
    sensor_id,
    time,
start_value + (end_value-start_value)*(time-start_time)/(end_time-start_time) AS value,
    greatest( time - start_time, end_time-time ) AS distance
FROM pre_interp2;

Although this query is huge, it's very fast, since it doesn't hit the big tables with any seq scans (hence the max() and min() tricks to use the indexes instead).

I love how postgres can blast that huge pile of SQL in, like, 50 ms...

If there is some overlap between packet data and data already in the log, you might get some division by zero errors, in this case you'll need to apply a DISTINCT somewhere (or simply replace the UNION ALL with an UNION, which might be wiser anyway...)

Anyway, that doesn't solve the "upsert" problem, so here we go :

-- Update the existing rows
UPDATE interp
  SET value = new_interp.value, distance = new_interp.distance
  FROM new_interp
  WHERE interp.sensor_id = new_interp.sensor_id
    AND interp.time = new_interp.time
    AND interp.distance > new_interp.distance;

-- insert new rows
INSERT INTO interp
SELECT new_interp.* FROM new_interp
  LEFT JOIN interp USING (sensor_id,time)
  WHERE interp.sensor_id IS NULL;

-- also insert data into log (don't forget this !)
INSERT INTO log SELECT * FROM packet;

Tada.

select * from interp where sensor_id=1 and time > 49950 order by time;
 sensor_id | time  | value | distance
-----------+-------+-------+----------
         1 | 49960 | 49960 |        7
         1 | 49970 | 49970 |        4
         1 | 49980 | 49980 |        3
         1 | 49990 | 49990 |        5
         1 | 50000 | 50000 |        2
         1 | 50010 | 50010 |      190
         1 | 50020 | 50020 |      180
         1 | 50030 | 50030 |      170
(...)
         1 | 50180 | 50180 |      178
         1 | 50190 | 50190 |      188
         1 | 50200 | 50200 |        2
         1 | 50210 | 50210 |        1
         1 | 50220 | 50220 |        1
         1 | 50230 | 50230 |        1
         1 | 50240 | 50240 |        2

Note that the hole was interpolated over, but the "distance" column shows this data is a guess, not real.

What happens if we receive some data later to plug the hole ?

-- plug the previously left hole
truncate packet;
truncate new_interp;
insert into packet
select sensor_id, time, time from (
  select distinct sensor_id,
    (n+random()*10)::INTEGER as time
  from generate_series(50050,50150) n
   cross join sensors
) d;

(re-run huge query and upsert)

select * from interp where sensor_id=1 and time > 49950 order by time;
sensor_id | time  | value | distance
-----------+-------+-------+----------
         1 | 49960 | 49960 |        7
         1 | 49970 | 49970 |        4
         1 | 49980 | 49980 |        3
         1 | 49990 | 49990 |        5
         1 | 50000 | 50000 |        2
         1 | 50010 | 50010 |       45
         1 | 50020 | 50020 |       35
         1 | 50030 | 50030 |       28
         1 | 50040 | 50040 |       38
         1 | 50050 | 50050 |       48
         1 | 50060 | 50060 |        1
         1 | 50070 | 50070 |        1
         1 | 50080 | 50080 |        2
(...)
         1 | 50130 | 50130 |        1
         1 | 50140 | 50140 |        3
         1 | 50150 | 50150 |        1
         1 | 50160 | 50160 |       40
         1 | 50170 | 50170 |       30
         1 | 50180 | 50180 |       26
         1 | 50190 | 50190 |       36
         1 | 50200 | 50200 |        2
         1 | 50210 | 50210 |        1
         1 | 50220 | 50220 |        1
         1 | 50230 | 50230 |        1
         1 | 50240 | 50240 |        2

It has used the new data to rewrite new values over the entire hole, and those values should have better precision.

Enjoy !

--
Sent via pgsql-performance mailing list (pgsql-performance@xxxxxxxxxxxxxx)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-performance



[Postgresql General]     [Postgresql PHP]     [PHP Users]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Yosemite]

  Powered by Linux