It's a fairly tricky problem. I have a number of sensors producing
energy data about every 5 minutes, but at random times between 1 and
15 minutes. I can't change that as that's the way the hardware of the
sensors works. These feed into another unit, which accumulates them
and forwards them in batches over the Internet to my PostgreSQL
database server every few minutes (again at random times outside my
control and with random batch sizes). To make things worse, if the
Internet connection between the unit and the database server fails, it
will send the latest data first to provide a quick update to the
current values and then send the backlog of stored values. Thus, data
do not always arrive in correct time order.
I'm stuck home with flu, so I'm happy to help ;)
I'll build an example setup to make it clearer...
-- A list of all sensors
create table sensors( sensor_id integer primary key );
insert into sensors select generate_series(1,100);
-- A table to contain raw sensor data
create table log(
sensor_id integer not null references sensors(sensor_id),
time integer not null,
value float not null
);
-- Fill it up with test data
insert into log
select sensor_id, time, time from (
select distinct sensor_id,
(n+random()*10)::INTEGER as time
from generate_series(0,50000,5) n
cross join sensors
) d;
-- index it
alter table log add primary key( time, sensor_id );
create index log_sensor_time on log( sensor_id, time );
select * from log where sensor_id=1 order by time;
sensor_id | time | value
-----------+-------+-------
1 | 12 | 12
1 | 14 | 14
1 | 21 | 21
1 | 29 | 29
1 | 30 | 30
(....)
1 | 49996 | 49996
1 | 50001 | 50001
-- create a table which will contain the time ticks
-- which will be used as x-axis for interpolation
-- (in this example, one tick every 10 time units)
create table ticks( time integer primary key,
check( time%10 = 0 ) );
insert into ticks select
generate_series( 0, (select max(time) from log), 10 );
-- create interpolated values table
create table interp(
sensor_id integer not null references sensors( sensor_id ),
time integer not null references ticks( time ),
value float,
distance integer not null
);
-- fill interpolated values table
-- (pretty slow)
insert into interp
select
sensor_id,
t.time,
start_value +
(end_value-start_value)*(t.time-start_time)/(end_time-start_time),
greatest( t.time - start_time, end_time-t.time )
from
(select
sensor_id,
lag(time) over (partition by sensor_id order by time) as
start_time,
time as end_time,
lag(value) over (partition by sensor_id order by time) as
start_value,
value as end_value
from log
) as l
join ticks t on (t.time >= start_time and t.time < end_time);
-- alternate query if you don't like the ticks table (same sesult) :
insert into interp
select
sensor_id,
time,
start_value +
(end_value-start_value)*(time-start_time)/(end_time-start_time),
greatest( time - start_time, end_time-time )
from
(select
*,
generate_series( ((start_time+9)/10)*10, ((end_time-1)/10)*10, 10 )
AS
time
from
(select
sensor_id,
lag(time) over (partition by sensor_id order by time) as
start_time,
time as end_time,
lag(value) over (partition by sensor_id order by time) as
start_value,
value as end_value
from log
) as l
) l;
alter table interp add primary key( time,sensor_id );
create index interp_sensor_time on interp( sensor_id, time );
For each interval in the log table that contains a time tick, this query
generates the interpolated data at that tick.
Note that the "distance" field represents the distance (in time) between
the interpolated value and the farthest real data point that was used to
calculate it. Therefore, it can be used as a measure of the quality of the
interpolated point ; if the distance is greater than some threshold, the
value might not be that precise.
Now, suppose we receive a bunch of data. The data isn't ordered according
to time.
There are two possibilities :
- the new data starts right where we left off (ie, just after the last
time for each sensor in table log)
- the new data starts later in time, and we want to process the results
right away, expecting to receive, at some later point, older data to fill
the holes
The second one is hairier, lets' do that.
Anyway, let's create a packet :
-- A table to contain raw sensor data
create temporary table packet(
sensor_id integer not null,
time integer not null,
value float not null
);
-- Fill it up with test data
insert into packet
select sensor_id, time, time from (
select distinct sensor_id,
(n+random()*10)::INTEGER as time
from generate_series(50200,50400) n
cross join sensors
) d;
Note that I deliberately inserted a hole : the log table contains times
0-50000 and the packet contains times 50200-50400.
We'll need to decide if we want the hole to appear in the "interp" table
or not. Let's say we don't want it to appear, we'll just interpolate over
the hole. he "distance" column will be there so we don't forget this data
is some sort of guess. If we receive data to fill that hole later, we can
always use it.
For each sensor in the packet, we need to grab some entries from table
"log", at least the most recent one, to be able to do some interpolation
with the first (oldest) value in the packet. To be more general, in case
we receive old data that will plug a hole, we'll also grab the oldest log
entry that is more recent than the most recent one in the packet for this
sensor (hum... i have to re-read that...)
Anyway, first let's create the missing ticks :
INSERT INTO ticks
SELECT generate_series(
(SELECT max(time) FROM ticks)+10,
(SELECT max(time) FROM packet),
10);
And ...
CREATE TEMPORARY TABLE new_interp(
sensor_id INTEGER NOT NULL,
time INTEGER NOT NULL,
value FLOAT NOT NULL,
distance INTEGER NOT NULL
);
-- time range in the packet for each sensor
WITH ranges AS (
SELECT sensor_id, min(time) AS packet_start_time, max(time) AS
packet_end_time
FROM packet
GROUP BY sensor_id
),
-- time ranges for records already in table log that will be needed to
interpolate packet records
log_boundaries AS (
SELECT
sensor_id,
COALESCE(
(SELECT max(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND
l.time < r.packet_start_time),
r.packet_start_time
) AS packet_start_time,
COALESCE(
(SELECT min(l.time) FROM log l WHERE l.sensor_id=r.sensor_id AND
l.time > r.packet_end_time),
r.packet_end_time
) AS packet_end_time
FROM ranges r
),
-- merge existing and new data
extended_packet AS (
SELECT log.* FROM log JOIN log_boundaries USING (sensor_id)
WHERE log.time BETWEEN packet_start_time AND packet_end_time
UNION ALL
SELECT * FROM packet
),
-- zip current and next records
pre_interp AS (
SELECT
sensor_id,
lag(time) OVER (PARTITION BY sensor_id ORDER BY time) AS
start_time,
time AS end_time,
lag(value) over (PARTITION BY sensor_id ORDER BY time) AS
start_value,
value AS end_value
FROM extended_packet
),
-- add tick info
pre_interp2 AS (
SELECT *, generate_series( ((start_time+9)/10)*10,
((end_time-1)/10)*10,
10 ) AS time
FROM pre_interp
)
-- interpolate
INSERT INTO new_interp SELECT
sensor_id,
time,
start_value +
(end_value-start_value)*(time-start_time)/(end_time-start_time) AS value,
greatest( time - start_time, end_time-time ) AS distance
FROM pre_interp2;
Although this query is huge, it's very fast, since it doesn't hit the big
tables with any seq scans (hence the max() and min() tricks to use the
indexes instead).
I love how postgres can blast that huge pile of SQL in, like, 50 ms...
If there is some overlap between packet data and data already in the log,
you might get some division by zero errors, in this case you'll need to
apply a DISTINCT somewhere (or simply replace the UNION ALL with an UNION,
which might be wiser anyway...)
Anyway, that doesn't solve the "upsert" problem, so here we go :
-- Update the existing rows
UPDATE interp
SET value = new_interp.value, distance = new_interp.distance
FROM new_interp
WHERE interp.sensor_id = new_interp.sensor_id
AND interp.time = new_interp.time
AND interp.distance > new_interp.distance;
-- insert new rows
INSERT INTO interp
SELECT new_interp.* FROM new_interp
LEFT JOIN interp USING (sensor_id,time)
WHERE interp.sensor_id IS NULL;
-- also insert data into log (don't forget this !)
INSERT INTO log SELECT * FROM packet;
Tada.
select * from interp where sensor_id=1 and time > 49950 order by time;
sensor_id | time | value | distance
-----------+-------+-------+----------
1 | 49960 | 49960 | 7
1 | 49970 | 49970 | 4
1 | 49980 | 49980 | 3
1 | 49990 | 49990 | 5
1 | 50000 | 50000 | 2
1 | 50010 | 50010 | 190
1 | 50020 | 50020 | 180
1 | 50030 | 50030 | 170
(...)
1 | 50180 | 50180 | 178
1 | 50190 | 50190 | 188
1 | 50200 | 50200 | 2
1 | 50210 | 50210 | 1
1 | 50220 | 50220 | 1
1 | 50230 | 50230 | 1
1 | 50240 | 50240 | 2
Note that the hole was interpolated over, but the "distance" column shows
this data is a guess, not real.
What happens if we receive some data later to plug the hole ?
-- plug the previously left hole
truncate packet;
truncate new_interp;
insert into packet
select sensor_id, time, time from (
select distinct sensor_id,
(n+random()*10)::INTEGER as time
from generate_series(50050,50150) n
cross join sensors
) d;
(re-run huge query and upsert)
select * from interp where sensor_id=1 and time > 49950 order by time;
sensor_id | time | value | distance
-----------+-------+-------+----------
1 | 49960 | 49960 | 7
1 | 49970 | 49970 | 4
1 | 49980 | 49980 | 3
1 | 49990 | 49990 | 5
1 | 50000 | 50000 | 2
1 | 50010 | 50010 | 45
1 | 50020 | 50020 | 35
1 | 50030 | 50030 | 28
1 | 50040 | 50040 | 38
1 | 50050 | 50050 | 48
1 | 50060 | 50060 | 1
1 | 50070 | 50070 | 1
1 | 50080 | 50080 | 2
(...)
1 | 50130 | 50130 | 1
1 | 50140 | 50140 | 3
1 | 50150 | 50150 | 1
1 | 50160 | 50160 | 40
1 | 50170 | 50170 | 30
1 | 50180 | 50180 | 26
1 | 50190 | 50190 | 36
1 | 50200 | 50200 | 2
1 | 50210 | 50210 | 1
1 | 50220 | 50220 | 1
1 | 50230 | 50230 | 1
1 | 50240 | 50240 | 2
It has used the new data to rewrite new values over the entire hole, and
those values should have better precision.
Enjoy !
--
Sent via pgsql-performance mailing list (pgsql-performance@xxxxxxxxxxxxxx)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-performance