Search Postgresql Archives

Re: Working around spurious unique constraint errors due to SERIALIZABLE bug

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Thu, 2009-07-16 at 14:13 +0000, Florian Weimer wrote:

> The drawback is that some of the side effects of the INSERT occur
> before the constraint check fails, so it seems to me that I still need
> to perform the select.

I was about to foolishly suggest:
Instead of:

SELECT 1 FROM x WHERE a = 4;
IF NOT FOUND THEN
  INSERT INTO x (a,b) VALUES (4,10);
END IF;

trying:

INSERT INTO x (a, b)
SELECT 4, 10 WHERE NOT EXISTS(SELECT 1 FROM x WHERE a = 4);

... but then realised I couldn't come up with any justification for how
it'd help (after all, the WHERE clause still has to be evaluated before
the INSERT can proceed, there's still no predicate locking, and the
statements can be evaluated concurrently) so I thought I'd test it.

The test program, attached, demonstrates what I should've known in the
first place. In SERIALIZABLE isolation, the above is *guaranteed* to
fail every time there's conflict, because concurrent transactions cannot
see changes committed by the others. So is a SELECT test then separate
INSERT, by the way. 

In READ COMITTED you get away with it a lot of the time because the
statement can see other transaction(s)' committed changes so the
subquery often matches - but it's a race, and eventually you'll hit a
situation where the subquery for two concurrent transactions is
evaluated before either's insert is issued or at least is committed. 

In my test program I've managed as many as 1283 steps before two racing
READ COMMITTED transactions collide. That's in a program designed to
synchronize each transaction before each insert for maximum collision
potential. With six racing transactions I've rarely seen more than three
steps without a collision.

( I've attached the test program in case it's of any interest. It's a
Python test controller that spawns slave processes which it synchronises
using Pg's advisory locking. It ensures that the slaves all start each
INSERT attempt together, and all finish before starting the next
attempt. Signals are used for failure notification, cleanup, etc. )

Anyway, the point is that you're actually worse off in this particular
situation thanks to your use of SERIALIZABLE isolation. However, READ
COMMITTED just gives you a race you're likely to win most of the time
instead of a guaranteed failure whenever there's a race, so it's not
really a solution.

Given that, it seems to me you'll have to rely on Pg's internal
lower-level synchonization around unique indexes. Try the insert and see
if it fails, then ROLLBACK TO SAVEPOINT (or use a PL/PgSQL exception
block). As you noted, this does mean that certain side-effects may
occur, including:

   - advancement of sequences due to nextval(...) calls

   - triggers that've done work that can't be rolled back, eg
     dblink calls, external file writes, inter-process communication etc

If you really can't afford the INSERT side effects and can't redesign
your code to be tolerant of them, you can always lock the table before
an INSERT.

If you can't afford to lock the table due to its impact on performance,
you can potentially use Pg's advisory locking mechanism to protect your
inserts. Eg (PL/PgSQL):

PERFORM pg_advisory_lock(4);
SELECT 1 FROM x WHERE a = 4;
IF NOT FOUND THEN
  INSERT INTO x (a,b) VALUES (4,10);
END IF;
PERFORM pg_advisory_unlock(4);

(You might want to use the two-argument form of the advisory locking
calls if your IDs are INTEGER size not INT8, and use the table oid for
the first argument.)

If every possible INSERTer ensures it holds the lock on the id of
interest before inserting, you'll be fine. Yes, it's ugly, but it
preserves concurrent insert performance while eliminating failed
INSERTs. A possible way to ensure that every possible INSERTer does do
the right thing is to drop the INSERT privilege on the table and then
use a SECURITY DEFINER function that checks the caller's rights and does
the INSERT.







Also: Is this really a phantom read? Your issue is not that you read a
record that then vanishes or no longer matches your filter criteria;
rather, it's that a record is created that matches your criteria after
you tested for it.

Certainly that wouldn't be possible if the concurrent transactions were
actually executed serially, but does the standard actually require that
this be the case? If it does, then compliant implementations would have
to do predicate locking. Ouch. Does anybody do that? It seems MS-SQL
implements very limited predicate locking (enough to handle your issue)
but not enough to tackle aggregates or anything complex.

-- 
Craig Ringer
#!/usr/bin/env python
import psycopg2
import sys
import time
import os
from psycopg2.extensions import (
    ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE, STATUS_BEGIN, STATUS_READY)
import signal
import subprocess
import random

######## CONFIGURATION ##########
debug = False
serializable = False
#################################

conn = psycopg2.connect("")
if serializable:
	conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
else:
	conn.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)
children = []

def main_slave(slave_number, master_pid):
	print "SLAVE[%02s] Spawned" % slave_number
	n = 0
	# Acquire the first finish lock 
	curs = conn.cursor()
	if debug: print "SLAVE[%02s] LS(1,0)" % slave_number
	curs.execute("SELECT pg_advisory_lock_shared(1,0);")
	# Register our successful start to the controller
	curs.execute("INSERT INTO control (slave) VALUES (%s);" % slave_number)
	conn.commit();
	print "SLAVE[%02s] Started" % slave_number
	# We enter this loop holding the finish lock for n, and no other locks.
	while True:
		# Aqcuire the finish lock for the next record pre-emptively
		if debug: print "SLAVE[%02s] LS(1,%s)" % (slave_number, (n+1))
		curs.execute("SELECT pg_advisory_lock_shared(1,%s);" % (n+1));
		# Then wait on the start lock for our current record.
		if debug: print "SLAVE[%02s] LS(0,%s)" % (slave_number, n)
		curs.execute("SELECT pg_advisory_lock_shared(0,%s);" % n)
		# The controller let us off the leash. Race!
		try:
			curs.execute("INSERT INTO x (a, slave) SELECT %s, %s WHERE NOT EXISTS (SELECT 1 FROM x WHERE a = %s);" % (n,slave_number,n))
			conn.commit();
		except psycopg2.IntegrityError, e:
			print "SLAVE[%02s]: Integrity error inserting %s!" % (slave_number, n)
			print e
			# SIGUSR1 the master
			os.kill( int(master_pid), signal.SIGUSR1)
			# and terminate.
			exit(2)
		# Release the start lock now we're done with it.
		if debug: print "SLAVE[%02s] US(0,%s)" % (slave_number, n)
		curs.execute("SELECT pg_advisory_unlock_shared(0,%s);" % n);
		# Done with n. Tell the controller about our success so it can go ahead
		# and unlock the next record.
		if debug: print "SLAVE[%02s] US(1,%s)" % (slave_number, n)
		curs.execute("SELECT pg_advisory_unlock_shared(1,%s);" % n)
		n += 1

def master_start_slaves(nslaves):
	for n in range(0, nslaves):
		child = subprocess.Popen([sys.argv[0], "slave", str(n), str(os.getpid())], shell=False, stdin=None, stdout=None, stderr=None, close_fds=True)
		children.append(child)

def master_wait_for_slaves(curs, nslaves):
	while True:
		curs.execute("SELECT count(slave) from control;")
		ret = curs.fetchone()
		if ret is not None and ret[0] == nslaves:
			break;
		else:
			conn.rollback()
			time.sleep(0.01)

def master_num_alive_slaves():
	alive = 0
	for child in children:
		child.poll()
		if child.returncode is None:
			alive += 1
	return alive

def master_sigusr1_handler(signum, frame):
	"""We get SIGUSR1 from a slave when the test fails. We should terminate our
	children and then ourselves."""
	print "MASTER: Test failed, terminating children"
	master_killall_slaves()
	print "MASTER: Test failed, terminating"
	sys.exit(2)

def master_killall_slaves():
	for child in children:
		child.poll()
		if child.returncode is None:
			try:
				os.kill(child.pid, signal.SIGTERM)
			except:
				pass
	time.sleep(0.1)
	if master_num_alive_slaves():
		time.sleep(1)
		for child in children:
			child.poll()
			if child.returncode is None:
				try:
					os.kill(child.pid, signal.SIGKILL)
				except:
					pass

def main_master():
	"""Test controller. This keeps the inserters in lockstep by
	forcing them to wait on an advisory lock for the id they're
	about to try to insert."""
	nslaves = 2
	# We use 0,n advisory locks for start locks. These locks
	# stop the slaves starting to insert "n" until the lock is
	# released by the controller.
	#
	# 1,n locks are finish locks. They're acquired by the slaves
	# as share locks before trying to get the finish lock, and only
	# released once the INSERT is done. The controller must acquire
	# this lock exclusively before proceeding to unlock the next
	# start lock.
	signal.signal(signal.SIGUSR1, master_sigusr1_handler)
	curs = conn.cursor()
	curs.execute("TRUNCATE TABLE x;")
	curs.execute("TRUNCATE TABLE control;")
	conn.commit()
	n = 0;
	# Take the first start lock so the slaves don't race off as soon
	# as they're started.
	if debug: print "MASTER: L(0,%s)" % n
	curs.execute("SELECT pg_advisory_lock(0,%s);" % n)
	# Start slaves
	print "MASTER: Spawning slaves..."
	master_start_slaves(nslaves);
	print "MASTER: slaves spawned, waiting for them to report in."
	# Wait until they've all reported in
	master_wait_for_slaves(curs, nslaves);
	print "MASTER: Slaves have reported in. Starting test."
	# and start them going. State on loop entry is start lock
	# for "n" is held, no other locks held.
	while True:
		# Lock the record they're going to try to insert
		# once we let them loose on this one. This keeps
		# them from running ahead.
		if debug: print "MASTER: L(0,%s)" % (n+1)
		curs.execute("SELECT pg_advisory_lock(0,%s);" % (n+1))
		# Unlock the current record, letting them try to
		# insert.
		if debug: print "MASTER: U(0,%s): Slaves inserting %s" % (n,n)
		curs.execute("SELECT pg_advisory_unlock(0,%s);" % n)
		# Attempt to acquire the finish lock on the record we just
		# let the slaves insert. We'll only be able to obtain it once
		# all slaves have released their finish locks.
		if debug: print "MASTER: L(1,%s)" % n
		curs.execute("SELECT pg_advisory_lock(1,%s)" % n);
		# Release it again, since we know they're all done.
		if debug: print "MASTER: U(1,%s): Slaves finished inserting %s" % (n,n)
		curs.execute("SELECT pg_advisory_unlock(1,%s)" % n);
		# and get ready for the next run.
		n += 1
		sys.stdout.write(" %s" % n);
		sys.stdout.flush()
		if not master_num_alive_slaves() == nslaves:
			print "One or more slaves appear to have died. Terminating the test"
			master_killall_slaves()
			sys.exit(3)
		


if len(sys.argv) > 1:
	if sys.argv[1] == "slave":
		main_slave(sys.argv[2], sys.argv[3])
	elif sys.argv[1] == "master":
		main_master()
	else:
		print "Usage: %s master|slave" % sys.argv[0]
		sys.exit(1);
else:
	main_master()

-- 
Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]
  Powered by Linux