Search Postgresql Archives

Re: assembling PGresults from multiple simultaneous queries (libpq, singlerowmode)

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Hi,

On postgres mailing lists please don't write your reply at the top of a
fully quoted email. We like the reply to be inline and trimmed to the
necessary parts.

On 2019-04-07 13:28:46 -0700, Konstantin Izmailov wrote:
> Yes, Andres, I meant "pipelining", just couldn't choose correct word. Thank
> you for the answer(s)!
> 
> I also made changes in my own copy of libpq, and they work fine. I think
> the pipelining support is needed in libpq. Btw, how can I get the patch
> code? I want to compare your approach with mine. I couldn't figure out how
> to get the patch from the link.

Hm, odd. There's a link on the page "Latest attachment" - but for
unknown reasons that's broken. I've attached it for now, but will also
inquire with the webadmin team about what's up.

Greetings,

Andres Freund
>From ba93ae02eca024997f2ce6a9c2c2987aea4a77b8 Mon Sep 17 00:00:00 2001
From: Prabakaran <Vaishnavi.Prabakaran@xxxxxxxxxxxxxx>
Date: Fri, 12 Jan 2018 10:09:09 +1100
Subject: [PATCH] Pipelining-batch-support-for-libpq-code-v16

---
 doc/src/sgml/libpq.sgml                            | 502 +++++++++++++++++
 doc/src/sgml/lobj.sgml                             |   4 +
 .../libpqwalreceiver/libpqwalreceiver.c            |   3 +
 src/interfaces/libpq/exports.txt                   |   5 +
 src/interfaces/libpq/fe-connect.c                  |  28 +
 src/interfaces/libpq/fe-exec.c                     | 595 +++++++++++++++++++--
 src/interfaces/libpq/fe-protocol2.c                |   6 +
 src/interfaces/libpq/fe-protocol3.c                |  15 +-
 src/interfaces/libpq/libpq-fe.h                    |  24 +-
 src/interfaces/libpq/libpq-int.h                   |  47 +-
 10 files changed, 1186 insertions(+), 43 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 4e46451..6aae637 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -4752,6 +4752,500 @@ int PQflush(PGconn *conn);
 
  </sect1>
 
+ <sect1 id="libpq-batch-mode">
+  <title>Batch mode and query pipelining</title>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>batch mode</secondary>
+  </indexterm>
+
+  <indexterm zone="libpq-batch-mode">
+   <primary>libpq</primary>
+   <secondary>pipelining</secondary>
+  </indexterm>
+
+  <para>
+   <application>libpq</application> supports queueing up queries into
+   a pipeline to be executed as a batch on the server. Batching queries allows
+   applications to avoid a client/server round-trip after each query to get
+   the results before issuing the next query.
+  </para>
+
+  <sect2>
+   <title>When to use batching</title>
+
+   <para>
+    Much like asynchronous query mode, there is no performance disadvantage to
+    using batching and pipelining. It increases client application complexity
+    and extra caution is required to prevent client/server deadlocks but
+    can sometimes offer considerable performance improvements.
+   </para>
+
+   <para>
+    Batching is most useful when the server is distant, i.e. network latency
+    (<quote>ping time</quote>) is high, and when many small operations are being performed in
+    rapid sequence. There is usually less benefit in using batches when each
+    query takes many multiples of the client/server round-trip time to execute.
+    A 100-statement operation run on a server 300ms round-trip-time away would take
+    30 seconds in network latency alone without batching; with batching it may spend
+    as little as 0.3s waiting for results from the server.
+   </para>
+
+   <para>
+    Use batches when your application does lots of small
+    <literal>INSERT</literal>, <literal>UPDATE</literal> and
+    <literal>DELETE</literal> operations that can't easily be transformed into
+    operations on sets or into a
+    <link linkend="libpq-copy"><literal>COPY</literal></link> operation.
+   </para>
+
+   <para>
+    Batching is not useful when information from one operation is required by the
+    client before it knows enough to send the next operation. The client must
+    introduce a synchronisation point and wait for a full client/server
+    round-trip to get the results it needs. However, it's often possible to
+    adjust the client design to exchange the required information server-side.
+    Read-modify-write cycles are especially good candidates; for example:
+    <programlisting>
+     BEGIN;
+     SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+     -- result: x=2
+     -- client adds 1 to x:
+     UPDATE mytable SET x = 3 WHERE id = 42;
+     COMMIT;
+    </programlisting>
+    could be much more efficiently done with:
+    <programlisting>
+     UPDATE mytable SET x = x + 1 WHERE id = 42;
+    </programlisting>
+   </para>
+
+   <note>
+    <para>
+     The batch API was introduced in PostgreSQL 10.0, but clients using PostgresSQL 10.0 version of libpq can
+     use batches on server versions 7.4 and newer. Batching works on any server
+     that supports the v3 extended query protocol.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="libpq-batch-using">
+   <title>Using batch mode</title>
+
+   <para>
+    To issue batches the application must switch
+    a connection into batch mode. Enter batch mode with <link
+    linkend="libpq-PQenterBatchMode"><function>PQenterBatchMode(conn)</function></link> or test
+    whether batch mode is active with <link
+    linkend="libpq-PQbatchStatus"><function>PQbatchStatus(conn)</function></link>. In batch mode only <link
+    linkend="libpq-async">asynchronous operations</link> are permitted, and
+    <literal>COPY</literal> is not recommended as it most likely will trigger failure in batch processing. 
+    Using any synchronous command execution functions such as <function>PQfn</function>,
+    <function>PQexec</function> or one of its sibling functions are error conditions.
+    Functions allowed in batch mode are described in <xref linkend="libpq-batch-sending"/>. 
+   </para>
+
+   <para>
+    The client uses libpq's asynchronous query functions to dispatch work,
+    marking the end of each batch with <function>PQbatchSendQueue</function>.
+    And to get results, it uses <function>PQgetResult</function> and
+    <function>PQbatchProcessQueue</function>. It may eventually exit
+    batch mode with <function>PQexitBatchMode</function> once all results are
+    processed.
+   </para>
+
+   <note>
+    <para>
+     It is best to use batch mode with <application>libpq</application> in
+     <link linkend="libpq-pqsetnonblocking">non-blocking mode</link>. If used in
+     blocking mode it is possible for a client/server deadlock to occur. The
+     client will block trying to send queries to the server, but the server will
+     block trying to send results from queries it has already processed to the
+     client. This only occurs when the client sends enough queries to fill its
+     output buffer and the server's receive buffer before switching to
+     processing input from the server, but it's hard to predict exactly when
+     that'll happen so it's best to always use non-blocking mode.
+     Batch mode consumes more memory when send/recv is not done as required even in non-blocking mode.
+    </para>
+   </note>
+
+   <sect3 id="libpq-batch-sending">
+    <title>Issuing queries</title>
+
+    <para>
+     After entering batch mode the application dispatches requests
+     using normal asynchronous <application>libpq</application> functions such as 
+     <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>,
+     <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>,
+     <function>PQsendDescribePrepared</function>.
+     The asynchronous requests are followed by a <link
+     linkend="libpq-PQbatchSendQueue"><function>PQbatchSendQueue(conn)</function></link> call to mark
+     the end of the batch. The client <emphasis>does not</emphasis> need to call
+     <function>PQgetResult</function> immediately after dispatching each
+     operation. <link linkend="libpq-batch-results">Result processing</link>
+     is handled separately.
+    </para>
+    
+    <para>
+     Batched operations will be executed by the server in the order the client
+     sends them. The server will send the results in the order the statements
+     executed. The server may begin executing the batch before all commands
+     in the batch are queued and the end of batch command is sent. If any
+     statement encounters an error the server aborts the current transaction and
+     skips processing the rest of the batch. Query processing resumes after the
+     end of the failed batch.
+    </para>
+
+    <para>
+     It's fine for one operation to depend on the results of a
+     prior one. One query may define a table that the next query in the same
+     batch uses; similarly, an application may create a named prepared statement
+     then execute it with later statements in the same batch.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-results">
+    <title>Processing results</title>
+
+    <para>
+     The client <link linkend="libpq-batch-interleave">interleaves result
+     processing</link> with sending batch queries, or for small batches may
+     process all results after sending the whole batch.
+    </para>
+
+    <para>
+     To get the result of the first batch entry the client must call <link
+     linkend="libpq-PQbatchProcessQueue"><function>PQbatchProcessQueue</function></link>. It must then call
+     <function>PQgetResult</function> and handle the results until
+     <function>PQgetResult</function> returns null. The result from the next batch entry 
+     may then be retrieved using <function>PQbatchProcessQueue</function> and the cycle repeated.  The
+     application handles individual statement results as normal.
+    </para>
+
+    <para>
+     To enter single-row mode, call <function>PQsetSingleRowMode</function> immediately after a
+     successful call of <function>PQbatchProcessQueue</function>. This mode selection is effective 
+     only for the query currently being processed. For more information on the use of <function>PQsetSingleRowMode
+     </function>, refer to <xref linkend="libpq-single-row-mode"/>.
+     
+    </para>
+
+    <para>
+     <function>PQgetResult</function> behaves the same as for normal asynchronous
+     processing except that it may contain the new <type>PGresult</type> types
+     <literal>PGRES_BATCH_END</literal> and <literal>PGRES_BATCH_ABORTED</literal>.
+     <literal>PGRES_BATCH_END</literal> is reported exactly once for each
+     <function>PQbatchSendQueue</function> call at the corresponding point in
+     the result stream and at no other time. <literal>PGRES_BATCH_ABORTED</literal>
+     is emitted during error handling; see <link linkend="libpq-batch-errors">
+     error handling</link>.
+    </para>
+
+    <para>
+     <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
+     operate as normal when processing batch results.
+    </para>
+
+    <para>
+     <application>libpq</application> does not provide any information to the
+     application about the query currently being processed. The application
+     must keep track of the order in which it sent queries and the expected
+     results. Applications will typically use a state machine or a FIFO queue
+     for this.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-errors">
+    <title>Error handling</title>
+
+    <para>
+     When a query in a batch causes an <literal>ERROR</literal> the server
+     skips processing all subsequent messages until the end-of-batch message.
+     The open transaction is aborted.
+    </para>
+
+    <para>
+     From the client perspective, after the client gets a
+     <literal>PGRES_FATAL_ERROR</literal> return from
+     <function>PQresultStatus</function> the batch is flagged as aborted.
+     <application>libpq</application> will report
+     <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued
+     operation in an aborted batch. The result for
+     <function>PQbatchSendQueue</function> is reported as
+     <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch
+     and resumption of normal result processing.
+    </para>
+
+    <para>
+     The client <emphasis>must</emphasis> process results with
+     <function>PQbatchProcessQueue(...)</function> and
+     <function>PQgetResult</function> during error recovery.
+    </para>
+
+    <para>
+     If the batch used an implicit transaction then operations that have
+     already executed are rolled back and operations that were queued for after
+     the failed operation are skipped entirely. The same behaviour holds if the
+     batch starts and commits a single explicit transaction (i.e. the first
+     statement is <literal>BEGIN</literal> and the last is
+     <literal>COMMIT</literal>) except that the session remains in an aborted
+     transaction state at the end of the batch. If a batch contains <emphasis>
+     multiple explicit transactions</emphasis>, all transactions that committed
+     prior to the error remain committed, the currently in-progress transaction
+     is aborted and all subsequent operations in the current and all later
+     transactions in the same batch are skipped completely.
+    </para>
+
+    <note>
+     <para>
+      The client must not assume that work is committed when it
+      <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the
+      corresponding result is received to confirm the commit is complete.
+      Because errors arrive asynchronously the application needs to be able to
+      restart from the last <emphasis>received</emphasis> committed change and
+      resend work done after that point if something goes wrong.
+     </para>
+    </note>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-interleave">
+    <title>Interleaving result processing and query dispatch</title>
+
+    <para>
+     To avoid deadlocks on large batches the client should be structured around
+     a nonblocking I/O loop using a function like <function>select</function>,
+     <function>poll</function>, <function>epoll</function>,
+     <function>WaitForMultipleObjectEx</function>, etc.
+    </para>
+
+    <para>
+     The client application should generally maintain a queue of work still to
+     be dispatched and a queue of work that has been dispatched but not yet had
+     its results processed. When the socket is writable it should dispatch more
+     work. When the socket is readable it should read results and process them,
+     matching them up to the next entry in its expected results queue. 
+     Based on available memory, results from socket should be read frequently and 
+     there's no need to wait till the batch end to read the results.  Batches
+     should be scoped to logical units of work, usually (but not always) one
+     transaction per batch. There's no need to exit batch mode and re-enter it
+     between batches or to wait for one batch to finish before sending the next.
+    </para>
+
+    <para>
+     An example using <function>select()</function> and a simple state machine
+     to track sent and received work is in
+     <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename> in the PostgreSQL
+     source distribution.
+    </para>
+
+   </sect3>
+
+   <sect3 id="libpq-batch-end">
+    <title>Ending batch mode</title>
+
+    <para>
+     Once all dispatched commands have had their results processed and the end batch
+     result has been consumed the application may return to non-batched mode with
+     <link linkend="libpq-PQexitBatchMode"><function>PQexitBatchMode(conn)</function></link>.
+    </para>
+   </sect3>
+
+  </sect2>
+
+  <sect2 id="libpq-funcs-batch">
+   <title>Functions associated with batch mode</title>
+
+   <variablelist>
+
+    <varlistentry id="libpq-PQbatchStatus">
+     <term>
+      <function>PQbatchStatus</function>
+      <indexterm>
+       <primary>PQbatchStatus</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Returns current batch mode status of the <application>libpq</application> connection.
+<synopsis>
+int PQbatchStatus(PGconn *conn);
+</synopsis>
+      </para>			
+      <variablelist>
+         <varlistentry id="libpq-PQbatchStatus-1">
+           <term>
+             <literal>PQBATCH_MODE_ON</literal>
+           </term>
+ 
+          <listitem>
+           <para>
+             Returns <literal>PQBATCH_MODE_ON</literal> if <application>libpq</application> connection is in <link
+             linkend="libpq-batch-mode">batch mode</link>.
+           </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-2">
+          <term>
+            <literal>PQBATCH_MODE_OFF</literal>
+          </term>
+  
+          <listitem>
+          <para>
+            Returns <literal>PQBATCH_MODE_OFF</literal> if <application>libpq</application> connection is not in <link
+            linkend="libpq-batch-mode">batch mode</link>.
+          </para>
+          </listitem>
+        </varlistentry>
+
+        <varlistentry id="libpq-PQbatchStatus-3">
+          <term>
+            <literal>PQBATCH_MODE_ABORTED</literal>
+          </term>
+          <listitem>
+            <para>
+                Returns <literal>PQBATCH_MODE_ABORTED</literal> if <application>libpq</application> connection is in 
+                aborted status. The aborted flag is cleared as soon as the result of the 
+                <function>PQbatchSendQueue</function> at the end of the aborted batch is 
+                processed. Clients don't usually need this function to verify aborted status 
+                as they can tell that the batch is aborted from <literal>PGRES_BATCH_ABORTED</literal> 
+                result codes.
+            </para>
+          </listitem>
+        </varlistentry>
+  
+       </variablelist>
+
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQenterBatchMode">
+     <term>
+      <function>PQenterBatchMode</function>
+      <indexterm>
+       <primary>PQenterBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to enter batch mode if it is currently idle or
+      already in batch mode.
+
+<synopsis>
+int PQenterBatchMode(PGconn *conn);
+</synopsis>
+
+        </para>
+        <para>
+          Returns 1 for success. Returns 0 and has no 
+          effect if the connection is not currently idle, i.e. it has a result 
+          ready, is waiting for more input from the server, etc. This function 
+          does not actually send anything to the server, it just changes the 
+          <application>libpq</application> connection state.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQexitBatchMode">
+     <term>
+      <function>PQexitBatchMode</function>
+      <indexterm>
+       <primary>PQexitBatchMode</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes a connection to exit batch mode if it is currently in batch mode
+      with an empty queue and no pending results.
+<synopsis>
+int PQexitBatchMode(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success.
+      Returns 1 and takes no action if not in batch mode. If the connection has
+      pending batch items in the queue for reading with
+      <function>PQbatchProcessQueue</function>, the current statement isn't finished
+      processing or there are results pending for collection with
+      <function>PQgetResult</function>, returns 0 and does nothing.
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchSendQueue">
+     <term>
+      <function>PQbatchSendQueue</function>
+      <indexterm>
+       <primary>PQbatchSendQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Delimits the end of a set of a batched commands by sending a <link
+      linkend="protocol-flow-ext-query">sync message</link> and flushing
+      the send buffer. The end of a batch serves as 
+      the delimiter of an implicit transaction and
+      an error recovery point; see <link linkend="libpq-batch-errors">
+      error handling</link>.
+
+<synopsis>
+int PQbatchSendQueue(PGconn *conn);
+</synopsis>
+        </para>
+        <para>Returns 1 for success. Returns 0 if the connection is not in batch mode
+              or sending a <link linkend="protocol-flow-ext-query">sync message</link> is failed.
+
+        </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry id="libpq-PQbatchProcessQueue">
+     <term>
+      <function>PQbatchProcessQueue</function>
+      <indexterm>
+       <primary>PQbatchProcessQueue</primary>
+      </indexterm>
+     </term>
+
+     <listitem>
+      <para>
+      Causes the connection to start processing the next queued query's
+      results. 
+     </para>
+
+<synopsis>
+int PQbatchProcessQueue(PGconn *conn);
+</synopsis>
+
+     <para>
+      Returns 1 if a new query was popped from the result queue
+      for processing. Returns 0 and has no effect if there are no query results
+      pending, batch mode is not enabled, or if the query currently processed
+      is incomplete or still has pending results. Reason for these failures can 
+      be verified with <function>PQbatchStatus
+      </function> and <function>PQgetResult</function>.
+      See <link linkend="libpq-batch-results">processing results</link>.
+
+      </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+
+  </sect2>
+
+ </sect1>
+
  <sect1 id="libpq-single-row-mode">
   <title>Retrieving Query Results Row-By-Row</title>
 
@@ -4792,6 +5286,14 @@ int PQflush(PGconn *conn);
    Each object should be freed with <function>PQclear</function> as usual.
   </para>
 
+  <note>
+    <para>
+     On using batch mode, call <function>PQsetSingleRowMode</function>
+     immediately after a successful call of <function>PQbatchProcessQueue</function>
+     See <xref linkend="libpq-batch-mode"/> for more information.
+    </para>
+   </note>
+
   <para>
    <variablelist>
     <varlistentry id="libpq-pqsetsinglerowmode">
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6b5aaeb..cf22f3f 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
     <application>libpq</application> library.
    </para>
 
+   <para>
+    Client applications cannot use these functions while libpq connection is in batch mode.
+   </para>
+
    <sect2 id="lo-create">
     <title>Creating a Large Object</title>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f9aec05..32151ff 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -944,6 +944,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
 			walres->status = WALRCV_ERROR;
 			walres->err = pchomp(PQerrorMessage(conn->streamConn));
 			break;
+		default:
+		/* This is just to keep compiler quiet */
+			break;
 	}
 
 	PQclear(pgres);
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index d6a38d0..625e74e 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -172,3 +172,8 @@ PQsslAttribute            169
 PQsetErrorContextVisibility 170
 PQresultVerboseErrorMessage 171
 PQencryptPasswordConn     172
+PQenterBatchMode	  173
+PQexitBatchMode           174
+PQbatchSendQueue	  175
+PQbatchProcessQueue	  176
+PQbatchStatus		  177
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 8d54333..c2df669 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -3574,6 +3574,25 @@ sendTerminateConn(PGconn *conn)
 }
 
 /*
+ * PQfreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+PQfreeCommandQueue(PGcommandQueueEntry *queue)
+{
+
+	while (queue != NULL)
+	{
+		PGcommandQueueEntry *prev = queue;
+
+		queue = queue->next;
+		if (prev->query)
+			free(prev->query);
+		free(prev);
+	}
+}
+
+/*
  * closePGconn
  *	 - properly close a connection to the backend
  *
@@ -3585,6 +3604,7 @@ static void
 closePGconn(PGconn *conn)
 {
 	PGnotify   *notify;
+	PGcommandQueueEntry *queue;
 	pgParameterStatus *pstatus;
 
 	sendTerminateConn(conn);
@@ -3616,6 +3636,14 @@ closePGconn(PGconn *conn)
 		free(prev);
 	}
 	conn->notifyHead = conn->notifyTail = NULL;
+	queue = conn->cmd_queue_head;
+	PQfreeCommandQueue(queue);
+	conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+	queue = conn->cmd_queue_recycle;
+	PQfreeCommandQueue(queue);
+
+	conn->cmd_queue_recycle = NULL;
 	pstatus = conn->pstatus;
 	while (pstatus != NULL)
 	{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 4c0114c..2049035 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -40,7 +40,9 @@ char	   *const pgresStatus[] = {
 	"PGRES_NONFATAL_ERROR",
 	"PGRES_FATAL_ERROR",
 	"PGRES_COPY_BOTH",
-	"PGRES_SINGLE_TUPLE"
+	"PGRES_SINGLE_TUPLE",
+	"PGRES_BATCH_END",
+	"PGRES_BATCH_ABORTED"
 };
 
 /*
@@ -71,7 +73,10 @@ static PGresult *PQexecFinish(PGconn *conn);
 static int PQsendDescribe(PGconn *conn, char desc_type,
 			   const char *desc_target);
 static int	check_field_number(const PGresult *res, int field_num);
-
+static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn);
+static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry);
+static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry);
+static int pqBatchFlush(PGconn *conn);
 
 /* ----------------
  * Space management for PGresult.
@@ -1159,7 +1164,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
 		conn->next_result = conn->result;
 		conn->result = res;
 		/* And mark the result ready to return */
-		conn->asyncStatus = PGASYNC_READY;
+		conn->asyncStatus = PGASYNC_READY_MORE;
 	}
 
 	return 1;
@@ -1182,6 +1187,13 @@ fail:
 int
 PQsendQuery(PGconn *conn, const char *query)
 {
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+		return false;
+	}
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1280,6 +1292,10 @@ PQsendPrepare(PGconn *conn,
 			  const char *stmtName, const char *query,
 			  int nParams, const Oid *paramTypes)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 	if (!PQsendQueryStart(conn))
 		return 0;
 
@@ -1338,31 +1354,51 @@ PQsendPrepare(PGconn *conn,
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+	else
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;                       /* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
 
 	/* remember we are doing just a Parse */
-	conn->queryclass = PGQUERY_PREPARE;
+	*queryclass = PGQUERY_PREPARE;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
-	conn->last_query = strdup(query);
+	if (*last_query)
+		free(*last_query);
+	*last_query = strdup(query);
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1410,7 +1446,80 @@ PQsendQueryPrepared(PGconn *conn,
 }
 
 /*
- * Common startup code for PQsendQuery and sibling routines
+ * PQmakePipelinedCommand
+ *	Get a new command queue entry, allocating it if required. Doesn't add it to
+ *	the tail of the queue yet, use PQappendPipelinedCommand once the command has
+ *	been written for that. If a command fails once it's called this, it should
+ *	use PQrecyclePipelinedCommand to put it on the freelist or release it.
+ *
+ * If allocation fails sets the error message and returns null.
+ */
+static PGcommandQueueEntry *
+PQmakePipelinedCommand(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (conn->cmd_queue_recycle == NULL)
+	{
+		entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+		if (entry == NULL)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory\n"));
+			return NULL;
+		}
+	}
+	else
+	{
+		entry = conn->cmd_queue_recycle;
+		conn->cmd_queue_recycle = entry->next;
+	}
+	entry->next = NULL;
+	entry->query = NULL;
+
+	return entry;
+}
+
+/*
+ * PQappendPipelinedCommand
+ *	Append a precreated command queue entry to the queue after it's been
+ *	sent successfully.
+ */
+static void
+PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+	if (conn->cmd_queue_head == NULL)
+		conn->cmd_queue_head = entry;
+	else
+		conn->cmd_queue_tail->next = entry;
+	conn->cmd_queue_tail = entry;
+}
+
+/*
+ * PQrecyclePipelinedCommand
+ *	Push a command queue entry onto the freelist. It must be a dangling entry
+ *	with null next pointer and not referenced by any other entry's next pointer.
+ */
+static void
+PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry)
+{
+	if (entry == NULL)
+		return;
+	if (entry->next != NULL)
+	{
+		fprintf(stderr, libpq_gettext("tried to recycle non-dangling command queue entry"));
+		abort();
+	}
+	if (entry->query)
+		free(entry->query);
+
+	entry->next = conn->cmd_queue_recycle;
+	conn->cmd_queue_recycle = entry;
+}
+
+/*
+ * PQsendQueryStart
+ *	Common startup code for PQsendQuery and sibling routines
  */
 static bool
 PQsendQueryStart(PGconn *conn)
@@ -1428,20 +1537,60 @@ PQsendQueryStart(PGconn *conn)
 						  libpq_gettext("no connection to the server\n"));
 		return false;
 	}
-	/* Can't send while already busy, either. */
-	if (conn->asyncStatus != PGASYNC_IDLE)
+
+	/* Can't send while already busy, either, unless enqueuing for later */
+	if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF)
 	{
 		printfPQExpBuffer(&conn->errorMessage,
 						  libpq_gettext("another command is already in progress\n"));
 		return false;
 	}
 
-	/* initialize async result-accumulation state */
-	pqClearAsyncResult(conn);
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		/*
+		 * When enqueuing a message we don't change much of the connection
+		 * state since it's already in use for the current command. The
+		 * connection state will get updated when PQbatchQueueProcess(...)
+		 * advances to start processing the queued message.
+		 *
+		 * Just make sure we can safely enqueue given the current connection
+		 * state. We can enqueue behind another queue item, or behind a
+		 * non-queue command (one that sends its own sync), but we can't
+		 * enqueue if the connection is in a copy state.
+		 */
+		switch (conn->asyncStatus)
+		{
+			case PGASYNC_QUEUED:
+			case PGASYNC_READY:
+			case PGASYNC_READY_MORE:
+			case PGASYNC_BUSY:
+				/* ok to queue */
+				break;
+			case PGASYNC_COPY_IN:
+			case PGASYNC_COPY_OUT:
+			case PGASYNC_COPY_BOTH:
+				printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext("cannot queue commands during COPY\n"));
+				return false;
+				break;
+			case PGASYNC_IDLE:
+				printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext_noop("internal error, idle state in batch mode"));
+				break;
+		}
+	}
+	else
+	{
+		/* This command's results will come in immediately.
+		 * Initialize async result-accumulation state
+		 */
+		pqClearAsyncResult(conn);
 
-	/* reset single-row processing mode */
-	conn->singleRowMode = false;
+		/* reset single-row processing mode */
+		conn->singleRowMode = false;
 
+	}
 	/* ready to send command message */
 	return true;
 }
@@ -1465,6 +1614,10 @@ PQsendQueryGuts(PGconn *conn,
 				int resultFormat)
 {
 	int			i;
+	PGcommandQueueEntry *pipeCmd = NULL;
+	char	  **last_query;
+	PGQueryClass *queryclass;
+
 
 	/* This isn't gonna work on a 2.0 server */
 	if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1474,6 +1627,23 @@ PQsendQueryGuts(PGconn *conn,
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		last_query = &pipeCmd->query;
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		last_query = &conn->last_query;
+		queryclass = &conn->queryclass;
+	}
+
+
 	/*
 	 * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
 	 * using specified statement name and the unnamed portal.
@@ -1586,35 +1756,42 @@ PQsendQueryGuts(PGconn *conn,
 		pqPutMsgEnd(conn) < 0)
 		goto sendFailed;
 
-	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		/* construct the Sync message */
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are using extended query protocol */
-	conn->queryclass = PGQUERY_EXTENDED;
+	*queryclass = PGQUERY_EXTENDED;
 
 	/* and remember the query text too, if possible */
 	/* if insufficient memory, last_query just winds up NULL */
-	if (conn->last_query)
-		free(conn->last_query);
+	if (*last_query)
+		free(*last_query);
 	if (command)
-		conn->last_query = strdup(command);
+		*last_query = strdup(command);
 	else
-		conn->last_query = NULL;
+		*last_query = NULL;
 
 	/*
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -1741,6 +1918,280 @@ PQisBusy(PGconn *conn)
 	return conn->asyncStatus == PGASYNC_BUSY;
 }
 
+/*
+ * PQbatchStatus
+ * 	Returns current batch mode status
+ */
+int
+PQbatchStatus(PGconn *conn)
+{
+	if (!conn)
+		return false;
+
+	return conn->batch_status;
+}
+
+/*
+ * PQenterBatchMode
+ * 	Put an idle connection in batch mode. Commands submitted after this
+ * 	can be pipelined on the connection, there's no requirement to wait for
+ * 	one to finish before the next is dispatched.
+ *
+ * 	Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * 	A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched
+ * 	commands may be sent while in batch mode. Batch mode can be exited by
+ * 	calling PQbatchEnd() once all results are processed.
+ *
+ * 	This doesn't actually send anything on the wire, it just puts libpq
+ * 	into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		return 1;
+
+	if (conn->asyncStatus != PGASYNC_IDLE)
+		return 0;
+
+	conn->batch_status = PQBATCH_MODE_ON;
+	conn->asyncStatus = PGASYNC_QUEUED;
+
+	return 1;
+}
+
+/*
+ * PQexitBatchMode
+ * 	End batch mode and return to normal command mode.
+ *
+ * 	Has no effect unless the client has processed all results
+ * 	from all outstanding batches and the connection is idle.
+ *
+ * 	Returns 1 if batch mode ended.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+	if (!conn)
+		goto exitFailed;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 1;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* can't end batch while busy */
+			goto exitFailed;
+		default:
+			break;
+	}
+
+	/* still work to process */
+	if (conn->cmd_queue_head != NULL)
+		goto exitFailed;
+
+	conn->batch_status = PQBATCH_MODE_OFF;
+	conn->asyncStatus = PGASYNC_IDLE;
+
+	/* Flush any pending data in out buffer */
+	if (pqFlush(conn) < 0)
+		goto sendFailed;
+	return 1;
+
+sendFailed:
+	pqHandleSendFailure(conn);
+
+exitFailed:
+	printfPQExpBuffer(&conn->errorMessage,
+							libpq_gettext_noop("internal error, Failed to exit batch mode"));
+	return 0;
+}
+
+/*
+ * PQbatchSendQueue
+ * 	End a batch submission by sending a protocol sync. The connection will
+ * 	remain in batch mode and unavailable for new synchronous command execution
+ * 	functions until all results from the batch are processed by the client.
+ *
+ * 	It's legal to start submitting another batch immediately, without waiting
+ * 	for the results of the current batch. There's no need to end batch mode
+ * 	and start it again.
+ *
+ * 	If a command in a batch fails, every subsequent command up to and including
+ * 	the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED state. If the
+ * 	whole batch is processed without error, a PGresult with PGRES_BATCH_END is
+ * 	produced.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *entry;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, IDLE in batch mode"));
+			break;
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, COPY in batch mode"));
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+		case PGASYNC_QUEUED:
+			/* can send sync to end this batch of cmds */
+			break;
+	}
+
+	entry = PQmakePipelinedCommand(conn);
+	if (entry == NULL)
+		return 0;			/* error msg already set */
+
+	entry->queryclass = PGQUERY_SYNC;
+	entry->query = NULL;
+
+	/* construct the Sync message */
+	if (pqPutMsgStart('S', false, conn) < 0 ||
+		pqPutMsgEnd(conn) < 0)
+		goto sendFailed;
+
+	PQappendPipelinedCommand(conn, entry);
+
+	/*
+	 * Give the data a push.  In nonblock mode, don't complain if we're unable
+	 * to send it all; PQgetResult() will do any additional flushing needed.
+	 */
+	if (PQflush(conn) < 0)
+		goto sendFailed;
+
+	return 1;
+
+sendFailed:
+	PQrecyclePipelinedCommand(conn, entry);
+	pqHandleSendFailure(conn);
+	return 0;
+}
+
+/*
+ * PQbatchProcessQueue
+ *	 In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+int
+PQbatchProcessQueue(PGconn *conn)
+{
+	PGcommandQueueEntry *next_query;
+
+	if (!conn)
+		return 0;
+
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+		return 0;
+
+	switch (conn->asyncStatus)
+	{
+		case PGASYNC_COPY_IN:
+		case PGASYNC_COPY_OUT:
+		case PGASYNC_COPY_BOTH:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, COPY in batch mode"));
+			break;
+		case PGASYNC_READY:
+		case PGASYNC_READY_MORE:
+		case PGASYNC_BUSY:
+			/* client still has to process current query or results */
+			return 0;
+			break;
+		case PGASYNC_IDLE:
+			printfPQExpBuffer(&conn->errorMessage,
+				   libpq_gettext_noop("internal error, IDLE in batch mode"));
+			break;
+		case PGASYNC_QUEUED:
+			/* next query please */
+			break;
+	}
+
+	if (conn->cmd_queue_head == NULL)
+	{
+		/*
+		 * In batch mode but nothing left on the queue; caller can submit more
+		 * work or PQbatchEnd() now.
+		 */
+		return 0;
+	}
+
+	/*
+	 * Pop the next query from the queue and set up the connection state as if
+	 * it'd just been dispatched from a non-batched call
+	 */
+	next_query = conn->cmd_queue_head;
+	conn->cmd_queue_head = next_query->next;
+	next_query->next = NULL;
+
+	/* Initialize async result-accumulation state */
+	pqClearAsyncResult(conn);
+
+	/* reset single-row processing mode */
+	conn->singleRowMode = false;
+
+
+	conn->last_query = next_query->query;
+	next_query->query = NULL;
+	conn->queryclass = next_query->queryclass;
+
+	PQrecyclePipelinedCommand(conn, next_query);
+
+	if (conn->batch_status == PQBATCH_MODE_ABORTED && conn->queryclass != PGQUERY_SYNC)
+	{
+		/*
+		 * In an aborted batch we don't get anything from the server for each
+		 * result; we're just discarding input until we get to the next sync
+		 * from the server. The client needs to know its queries got aborted
+		 * so we create a fake PGresult to return immediately from
+		 * PQgetResult.
+		 */
+		conn->result = PQmakeEmptyPGresult(conn,
+										   PGRES_BATCH_ABORTED);
+		if (!conn->result)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("out of memory"));
+			pqSaveErrorResult(conn);
+			return 0;
+		}
+		conn->asyncStatus = PGASYNC_READY;
+	}
+	else
+	{
+		/* allow parsing to continue */
+		conn->asyncStatus = PGASYNC_BUSY;
+	}
+
+	return 1;
+}
+
 
 /*
  * PQgetResult
@@ -1800,10 +2251,31 @@ PQgetResult(PGconn *conn)
 	switch (conn->asyncStatus)
 	{
 		case PGASYNC_IDLE:
+		case PGASYNC_QUEUED:
 			res = NULL;			/* query is complete */
 			break;
 		case PGASYNC_READY:
 			res = pqPrepareAsyncResult(conn);
+			if (conn->batch_status != PQBATCH_MODE_OFF)
+			{
+				/*
+				 * In batch mode, query execution state cannot be IDLE as there
+				 * can be other queries or results waiting in the queue
+				 *
+				 * The connection isn't idle since we can't submit new
+				 * nonbatched commands. It isn't also busy since the current
+				 * command is done and we need to process a new one.
+				 */
+				conn->asyncStatus = PGASYNC_QUEUED;
+			}
+			else
+			{
+				/* Set the state back to BUSY, allowing parsing to proceed. */
+				conn->asyncStatus = PGASYNC_BUSY;
+			}
+			break;
+		case PGASYNC_READY_MORE:
+			res = pqPrepareAsyncResult(conn);
 			/* Set the state back to BUSY, allowing parsing to proceed. */
 			conn->asyncStatus = PGASYNC_BUSY;
 			break;
@@ -1983,6 +2455,13 @@ PQexecStart(PGconn *conn)
 	if (!conn)
 		return false;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						  libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return false;
+	}
+
 	/*
 	 * Silently discard any prior query result that application didn't eat.
 	 * This is probably poor design, but it's here for backward compatibility.
@@ -2177,6 +2656,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
 static int
 PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 {
+	PGcommandQueueEntry *pipeCmd = NULL;
+	PGQueryClass *queryclass;
+
 	/* Treat null desc_target as empty string */
 	if (!desc_target)
 		desc_target = "";
@@ -2192,6 +2674,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		return 0;
 	}
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		pipeCmd = PQmakePipelinedCommand(conn);
+
+		if (pipeCmd == NULL)
+			return 0;			/* error msg already set */
+
+		queryclass = &pipeCmd->queryclass;
+	}
+	else
+	{
+		queryclass = &conn->queryclass;
+	}
+
 	/* construct the Describe message */
 	if (pqPutMsgStart('D', false, conn) < 0 ||
 		pqPutc(desc_type, conn) < 0 ||
@@ -2200,15 +2696,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 		goto sendFailed;
 
 	/* construct the Sync message */
-	if (pqPutMsgStart('S', false, conn) < 0 ||
-		pqPutMsgEnd(conn) < 0)
-		goto sendFailed;
+	if (conn->batch_status == PQBATCH_MODE_OFF)
+	{
+		if (pqPutMsgStart('S', false, conn) < 0 ||
+			pqPutMsgEnd(conn) < 0)
+			goto sendFailed;
+	}
 
 	/* remember we are doing a Describe */
-	conn->queryclass = PGQUERY_DESCRIBE;
+	*queryclass = PGQUERY_DESCRIBE;
 
 	/* reset last-query string (not relevant now) */
-	if (conn->last_query)
+	if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
 	{
 		free(conn->last_query);
 		conn->last_query = NULL;
@@ -2218,14 +2717,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
 	 * Give the data a push.  In nonblock mode, don't complain if we're unable
 	 * to send it all; PQgetResult() will do any additional flushing needed.
 	 */
-	if (pqFlush(conn) < 0)
+	if (pqBatchFlush(conn) < 0)
 		goto sendFailed;
 
 	/* OK, it's launched! */
-	conn->asyncStatus = PGASYNC_BUSY;
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+		PQappendPipelinedCommand(conn, pipeCmd);
+	else
+		conn->asyncStatus = PGASYNC_BUSY;
 	return 1;
 
 sendFailed:
+	PQrecyclePipelinedCommand(conn, pipeCmd);
 	pqHandleSendFailure(conn);
 	return 0;
 }
@@ -2620,6 +3123,13 @@ PQfn(PGconn *conn,
 	/* clear the error string */
 	resetPQExpBuffer(&conn->errorMessage);
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		printfPQExpBuffer(&conn->errorMessage,
+						libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+		return NULL;
+	}
+
 	if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
 		conn->result != NULL)
 	{
@@ -3813,3 +4323,14 @@ PQunescapeBytea(const unsigned char *strtext, size_t *retbuflen)
 	*retbuflen = buflen;
 	return tmpbuf;
 }
+/* pqBatchFlush
+ * In batch mode, data will be flushed only when the out buffer reaches the threshold value.
+ * In non-batch mode, data will be flushed all the time.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+	if ((conn->batch_status == PQBATCH_MODE_OFF)||(conn->outCount >= OUTBUFFER_THRESHOLD))
+		return(pqFlush(conn));
+	return 0; /* Just to keep compiler quiet */
+}
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 7dcef80..26c9a9e 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -411,6 +411,12 @@ pqParseInput2(PGconn *conn)
 {
 	char		id;
 
+	if (conn->batch_status != PQBATCH_MODE_OFF)
+	{
+		fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+		abort();
+	}
+
 	/*
 	 * Loop to parse successive complete messages available in the buffer.
 	 */
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index d3ca5d2..f699607 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -219,10 +219,18 @@ pqParseInput3(PGconn *conn)
 						return;
 					conn->asyncStatus = PGASYNC_READY;
 					break;
-				case 'Z':		/* backend is ready for new query */
+				case 'Z':		/* sync response, backend is ready for new query */
 					if (getReadyForQuery(conn))
 						return;
-					conn->asyncStatus = PGASYNC_IDLE;
+					if (conn->batch_status != PQBATCH_MODE_OFF)
+					{
+						conn->batch_status = PQBATCH_MODE_ON;
+						conn->result = PQmakeEmptyPGresult(conn,
+								PGRES_BATCH_END);
+						conn->asyncStatus = PGASYNC_READY;
+					}
+					else
+						conn->asyncStatus = PGASYNC_IDLE;
 					break;
 				case 'I':		/* empty query */
 					if (conn->result == NULL)
@@ -879,6 +887,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
 	PQExpBufferData workBuf;
 	char		id;
 
+	if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+		conn->batch_status = PQBATCH_MODE_ABORTED;
+
 	/*
 	 * Since the fields might be pretty long, we create a temporary
 	 * PQExpBuffer rather than using conn->workBuffer.  workBuffer is intended
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index ed9c806..7136980 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -95,7 +95,10 @@ typedef enum
 	PGRES_NONFATAL_ERROR,		/* notice or warning message */
 	PGRES_FATAL_ERROR,			/* query failed */
 	PGRES_COPY_BOTH,			/* Copy In/Out data transfer in progress */
-	PGRES_SINGLE_TUPLE			/* single tuple from larger resultset */
+	PGRES_SINGLE_TUPLE,			/* single tuple from larger resultset */
+	PGRES_BATCH_END,			/* end of a batch of commands */
+	PGRES_BATCH_ABORTED,		/* Command didn't run because of an abort
+								 * earlier in a batch */
 } ExecStatusType;
 
 typedef enum
@@ -134,6 +137,17 @@ typedef enum
 	PQPING_NO_ATTEMPT			/* connection not attempted (bad params) */
 } PGPing;
 
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+	PQBATCH_MODE_OFF,
+	PQBATCH_MODE_ON,
+	PQBATCH_MODE_ABORTED
+}	PQBatchStatus;
+
 /* PGconn encapsulates a connection to the backend.
  * The contents of this struct are not supposed to be known to applications.
  */
@@ -425,6 +439,14 @@ extern PGresult *PQgetResult(PGconn *conn);
 extern int	PQisBusy(PGconn *conn);
 extern int	PQconsumeInput(PGconn *conn);
 
+/* Routines for batch mode management */
+extern int	PQbatchStatus(PGconn *conn);
+extern int	PQbatchQueueCount(PGconn *conn);
+extern int	PQenterBatchMode(PGconn *conn);
+extern int	PQexitBatchMode(PGconn *conn);
+extern int	PQbatchSendQueue(PGconn *conn);
+extern int	PQbatchProcessQueue(PGconn *conn);
+
 /* LISTEN/NOTIFY support */
 extern PGnotify *PQnotifies(PGconn *conn);
 
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 4e35409..fc5376f 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -215,10 +215,15 @@ typedef enum
 {
 	PGASYNC_IDLE,				/* nothing's happening, dude */
 	PGASYNC_BUSY,				/* query in progress */
-	PGASYNC_READY,				/* result ready for PQgetResult */
+	PGASYNC_READY,				/* query done, waiting for client to fetch
+								 * result */
+	PGASYNC_READY_MORE,			/* query done, waiting for client to fetch
+								 * result, More results expected from this
+								 * query */
 	PGASYNC_COPY_IN,			/* Copy In data transfer in progress */
 	PGASYNC_COPY_OUT,			/* Copy Out data transfer in progress */
-	PGASYNC_COPY_BOTH			/* Copy In/Out data transfer in progress */
+	PGASYNC_COPY_BOTH,			/* Copy In/Out data transfer in progress */
+	PGASYNC_QUEUED				/* Current query done, more in queue */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */
@@ -227,7 +232,8 @@ typedef enum
 	PGQUERY_SIMPLE,				/* simple Query protocol (PQexec) */
 	PGQUERY_EXTENDED,			/* full Extended protocol (PQexecParams) */
 	PGQUERY_PREPARE,			/* Parse only (PQprepare) */
-	PGQUERY_DESCRIBE			/* Describe Statement or Portal */
+	PGQUERY_DESCRIBE,			/* Describe Statement or Portal */
+	PGQUERY_SYNC				/* A protocol sync to end a batch */
 } PGQueryClass;
 
 /* PGSetenvStatusType defines the state of the PQSetenv state machine */
@@ -297,6 +303,22 @@ typedef enum pg_conn_host_type
 	CHT_UNIX_SOCKET
 } pg_conn_host_type;
 
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+	PGQueryClass queryclass;	/* Query type; PGQUERY_SYNC for sync msg */
+	char	   *query;			/* SQL command, or NULL if unknown */
+	struct pgCommandQueueEntry *next;
+}	PGcommandQueueEntry;
+
+
 /*
  * pg_conn_host stores all information about one of possibly several hosts
  * mentioned in the connection string.  Derived by splitting the pghost
@@ -386,6 +408,7 @@ struct pg_conn
 	bool		options_valid;	/* true if OK to attempt connection */
 	bool		nonblocking;	/* whether this connection is using nonblock
 								 * sending semantics */
+	PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */
 	bool		singleRowMode;	/* return current query result row-by-row? */
 	char		copy_is_binary; /* 1 = copy binary, 0 = copy text */
 	int			copy_already_done;	/* # bytes already returned in COPY OUT */
@@ -397,6 +420,16 @@ struct pg_conn
 	int			whichhost;		/* host we're currently considering */
 	pg_conn_host *connhost;		/* details about each possible host */
 
+	/*
+	 * The command queue
+	 *
+	 * head is the next pending cmd, tail is where we append new commands.
+	 * Freed entries for recycling go on the recycle linked list.
+	 */
+	PGcommandQueueEntry *cmd_queue_head;
+	PGcommandQueueEntry *cmd_queue_tail;
+	PGcommandQueueEntry *cmd_queue_recycle;
+
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
 								 * unconnected */
@@ -680,6 +713,12 @@ extern char *pgtls_get_peer_certificate_hash(PGconn *conn, size_t *len);
  */
 #define pqIsnonblocking(conn)	((conn)->nonblocking)
 
+/*
+ * Connection's outbuffer threshold is set to 64k as it is safe
+ * in Windows as per comments in pqSendSome() API.
+ */
+#define OUTBUFFER_THRESHOLD	65536
+
 #ifdef ENABLE_NLS
 extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
 extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
@@ -688,6 +727,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne
 #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p))
 #endif
 
+#define libpq_gettext_noop(x) (x)
+
 /*
  * These macros are needed to let error-handling code be portable between
  * Unix and Windows.  (ugh)
-- 
2.7.4.windows.1


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]

  Powered by Linux