Hi, On postgres mailing lists please don't write your reply at the top of a fully quoted email. We like the reply to be inline and trimmed to the necessary parts. On 2019-04-07 13:28:46 -0700, Konstantin Izmailov wrote: > Yes, Andres, I meant "pipelining", just couldn't choose correct word. Thank > you for the answer(s)! > > I also made changes in my own copy of libpq, and they work fine. I think > the pipelining support is needed in libpq. Btw, how can I get the patch > code? I want to compare your approach with mine. I couldn't figure out how > to get the patch from the link. Hm, odd. There's a link on the page "Latest attachment" - but for unknown reasons that's broken. I've attached it for now, but will also inquire with the webadmin team about what's up. Greetings, Andres Freund
>From ba93ae02eca024997f2ce6a9c2c2987aea4a77b8 Mon Sep 17 00:00:00 2001 From: Prabakaran <Vaishnavi.Prabakaran@xxxxxxxxxxxxxx> Date: Fri, 12 Jan 2018 10:09:09 +1100 Subject: [PATCH] Pipelining-batch-support-for-libpq-code-v16 --- doc/src/sgml/libpq.sgml | 502 +++++++++++++++++ doc/src/sgml/lobj.sgml | 4 + .../libpqwalreceiver/libpqwalreceiver.c | 3 + src/interfaces/libpq/exports.txt | 5 + src/interfaces/libpq/fe-connect.c | 28 + src/interfaces/libpq/fe-exec.c | 595 +++++++++++++++++++-- src/interfaces/libpq/fe-protocol2.c | 6 + src/interfaces/libpq/fe-protocol3.c | 15 +- src/interfaces/libpq/libpq-fe.h | 24 +- src/interfaces/libpq/libpq-int.h | 47 +- 10 files changed, 1186 insertions(+), 43 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 4e46451..6aae637 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -4752,6 +4752,500 @@ int PQflush(PGconn *conn); </sect1> + <sect1 id="libpq-batch-mode"> + <title>Batch mode and query pipelining</title> + + <indexterm zone="libpq-batch-mode"> + <primary>libpq</primary> + <secondary>batch mode</secondary> + </indexterm> + + <indexterm zone="libpq-batch-mode"> + <primary>libpq</primary> + <secondary>pipelining</secondary> + </indexterm> + + <para> + <application>libpq</application> supports queueing up queries into + a pipeline to be executed as a batch on the server. Batching queries allows + applications to avoid a client/server round-trip after each query to get + the results before issuing the next query. + </para> + + <sect2> + <title>When to use batching</title> + + <para> + Much like asynchronous query mode, there is no performance disadvantage to + using batching and pipelining. It increases client application complexity + and extra caution is required to prevent client/server deadlocks but + can sometimes offer considerable performance improvements. + </para> + + <para> + Batching is most useful when the server is distant, i.e. network latency + (<quote>ping time</quote>) is high, and when many small operations are being performed in + rapid sequence. There is usually less benefit in using batches when each + query takes many multiples of the client/server round-trip time to execute. + A 100-statement operation run on a server 300ms round-trip-time away would take + 30 seconds in network latency alone without batching; with batching it may spend + as little as 0.3s waiting for results from the server. + </para> + + <para> + Use batches when your application does lots of small + <literal>INSERT</literal>, <literal>UPDATE</literal> and + <literal>DELETE</literal> operations that can't easily be transformed into + operations on sets or into a + <link linkend="libpq-copy"><literal>COPY</literal></link> operation. + </para> + + <para> + Batching is not useful when information from one operation is required by the + client before it knows enough to send the next operation. The client must + introduce a synchronisation point and wait for a full client/server + round-trip to get the results it needs. However, it's often possible to + adjust the client design to exchange the required information server-side. + Read-modify-write cycles are especially good candidates; for example: + <programlisting> + BEGIN; + SELECT x FROM mytable WHERE id = 42 FOR UPDATE; + -- result: x=2 + -- client adds 1 to x: + UPDATE mytable SET x = 3 WHERE id = 42; + COMMIT; + </programlisting> + could be much more efficiently done with: + <programlisting> + UPDATE mytable SET x = x + 1 WHERE id = 42; + </programlisting> + </para> + + <note> + <para> + The batch API was introduced in PostgreSQL 10.0, but clients using PostgresSQL 10.0 version of libpq can + use batches on server versions 7.4 and newer. Batching works on any server + that supports the v3 extended query protocol. + </para> + </note> + + </sect2> + + <sect2 id="libpq-batch-using"> + <title>Using batch mode</title> + + <para> + To issue batches the application must switch + a connection into batch mode. Enter batch mode with <link + linkend="libpq-PQenterBatchMode"><function>PQenterBatchMode(conn)</function></link> or test + whether batch mode is active with <link + linkend="libpq-PQbatchStatus"><function>PQbatchStatus(conn)</function></link>. In batch mode only <link + linkend="libpq-async">asynchronous operations</link> are permitted, and + <literal>COPY</literal> is not recommended as it most likely will trigger failure in batch processing. + Using any synchronous command execution functions such as <function>PQfn</function>, + <function>PQexec</function> or one of its sibling functions are error conditions. + Functions allowed in batch mode are described in <xref linkend="libpq-batch-sending"/>. + </para> + + <para> + The client uses libpq's asynchronous query functions to dispatch work, + marking the end of each batch with <function>PQbatchSendQueue</function>. + And to get results, it uses <function>PQgetResult</function> and + <function>PQbatchProcessQueue</function>. It may eventually exit + batch mode with <function>PQexitBatchMode</function> once all results are + processed. + </para> + + <note> + <para> + It is best to use batch mode with <application>libpq</application> in + <link linkend="libpq-pqsetnonblocking">non-blocking mode</link>. If used in + blocking mode it is possible for a client/server deadlock to occur. The + client will block trying to send queries to the server, but the server will + block trying to send results from queries it has already processed to the + client. This only occurs when the client sends enough queries to fill its + output buffer and the server's receive buffer before switching to + processing input from the server, but it's hard to predict exactly when + that'll happen so it's best to always use non-blocking mode. + Batch mode consumes more memory when send/recv is not done as required even in non-blocking mode. + </para> + </note> + + <sect3 id="libpq-batch-sending"> + <title>Issuing queries</title> + + <para> + After entering batch mode the application dispatches requests + using normal asynchronous <application>libpq</application> functions such as + <function>PQsendQueryParams</function>, <function>PQsendPrepare</function>, + <function>PQsendQueryPrepared</function>, <function>PQsendDescribePortal</function>, + <function>PQsendDescribePrepared</function>. + The asynchronous requests are followed by a <link + linkend="libpq-PQbatchSendQueue"><function>PQbatchSendQueue(conn)</function></link> call to mark + the end of the batch. The client <emphasis>does not</emphasis> need to call + <function>PQgetResult</function> immediately after dispatching each + operation. <link linkend="libpq-batch-results">Result processing</link> + is handled separately. + </para> + + <para> + Batched operations will be executed by the server in the order the client + sends them. The server will send the results in the order the statements + executed. The server may begin executing the batch before all commands + in the batch are queued and the end of batch command is sent. If any + statement encounters an error the server aborts the current transaction and + skips processing the rest of the batch. Query processing resumes after the + end of the failed batch. + </para> + + <para> + It's fine for one operation to depend on the results of a + prior one. One query may define a table that the next query in the same + batch uses; similarly, an application may create a named prepared statement + then execute it with later statements in the same batch. + </para> + + </sect3> + + <sect3 id="libpq-batch-results"> + <title>Processing results</title> + + <para> + The client <link linkend="libpq-batch-interleave">interleaves result + processing</link> with sending batch queries, or for small batches may + process all results after sending the whole batch. + </para> + + <para> + To get the result of the first batch entry the client must call <link + linkend="libpq-PQbatchProcessQueue"><function>PQbatchProcessQueue</function></link>. It must then call + <function>PQgetResult</function> and handle the results until + <function>PQgetResult</function> returns null. The result from the next batch entry + may then be retrieved using <function>PQbatchProcessQueue</function> and the cycle repeated. The + application handles individual statement results as normal. + </para> + + <para> + To enter single-row mode, call <function>PQsetSingleRowMode</function> immediately after a + successful call of <function>PQbatchProcessQueue</function>. This mode selection is effective + only for the query currently being processed. For more information on the use of <function>PQsetSingleRowMode + </function>, refer to <xref linkend="libpq-single-row-mode"/>. + + </para> + + <para> + <function>PQgetResult</function> behaves the same as for normal asynchronous + processing except that it may contain the new <type>PGresult</type> types + <literal>PGRES_BATCH_END</literal> and <literal>PGRES_BATCH_ABORTED</literal>. + <literal>PGRES_BATCH_END</literal> is reported exactly once for each + <function>PQbatchSendQueue</function> call at the corresponding point in + the result stream and at no other time. <literal>PGRES_BATCH_ABORTED</literal> + is emitted during error handling; see <link linkend="libpq-batch-errors"> + error handling</link>. + </para> + + <para> + <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc + operate as normal when processing batch results. + </para> + + <para> + <application>libpq</application> does not provide any information to the + application about the query currently being processed. The application + must keep track of the order in which it sent queries and the expected + results. Applications will typically use a state machine or a FIFO queue + for this. + </para> + + </sect3> + + <sect3 id="libpq-batch-errors"> + <title>Error handling</title> + + <para> + When a query in a batch causes an <literal>ERROR</literal> the server + skips processing all subsequent messages until the end-of-batch message. + The open transaction is aborted. + </para> + + <para> + From the client perspective, after the client gets a + <literal>PGRES_FATAL_ERROR</literal> return from + <function>PQresultStatus</function> the batch is flagged as aborted. + <application>libpq</application> will report + <literal>PGRES_BATCH_ABORTED</literal> result for each remaining queued + operation in an aborted batch. The result for + <function>PQbatchSendQueue</function> is reported as + <literal>PGRES_BATCH_END</literal> to signal the end of the aborted batch + and resumption of normal result processing. + </para> + + <para> + The client <emphasis>must</emphasis> process results with + <function>PQbatchProcessQueue(...)</function> and + <function>PQgetResult</function> during error recovery. + </para> + + <para> + If the batch used an implicit transaction then operations that have + already executed are rolled back and operations that were queued for after + the failed operation are skipped entirely. The same behaviour holds if the + batch starts and commits a single explicit transaction (i.e. the first + statement is <literal>BEGIN</literal> and the last is + <literal>COMMIT</literal>) except that the session remains in an aborted + transaction state at the end of the batch. If a batch contains <emphasis> + multiple explicit transactions</emphasis>, all transactions that committed + prior to the error remain committed, the currently in-progress transaction + is aborted and all subsequent operations in the current and all later + transactions in the same batch are skipped completely. + </para> + + <note> + <para> + The client must not assume that work is committed when it + <emphasis>sends</emphasis> a <literal>COMMIT</literal>, only when the + corresponding result is received to confirm the commit is complete. + Because errors arrive asynchronously the application needs to be able to + restart from the last <emphasis>received</emphasis> committed change and + resend work done after that point if something goes wrong. + </para> + </note> + + </sect3> + + <sect3 id="libpq-batch-interleave"> + <title>Interleaving result processing and query dispatch</title> + + <para> + To avoid deadlocks on large batches the client should be structured around + a nonblocking I/O loop using a function like <function>select</function>, + <function>poll</function>, <function>epoll</function>, + <function>WaitForMultipleObjectEx</function>, etc. + </para> + + <para> + The client application should generally maintain a queue of work still to + be dispatched and a queue of work that has been dispatched but not yet had + its results processed. When the socket is writable it should dispatch more + work. When the socket is readable it should read results and process them, + matching them up to the next entry in its expected results queue. + Based on available memory, results from socket should be read frequently and + there's no need to wait till the batch end to read the results. Batches + should be scoped to logical units of work, usually (but not always) one + transaction per batch. There's no need to exit batch mode and re-enter it + between batches or to wait for one batch to finish before sending the next. + </para> + + <para> + An example using <function>select()</function> and a simple state machine + to track sent and received work is in + <filename>src/test/modules/test_libpq/testlibpqbatch.c</filename> in the PostgreSQL + source distribution. + </para> + + </sect3> + + <sect3 id="libpq-batch-end"> + <title>Ending batch mode</title> + + <para> + Once all dispatched commands have had their results processed and the end batch + result has been consumed the application may return to non-batched mode with + <link linkend="libpq-PQexitBatchMode"><function>PQexitBatchMode(conn)</function></link>. + </para> + </sect3> + + </sect2> + + <sect2 id="libpq-funcs-batch"> + <title>Functions associated with batch mode</title> + + <variablelist> + + <varlistentry id="libpq-PQbatchStatus"> + <term> + <function>PQbatchStatus</function> + <indexterm> + <primary>PQbatchStatus</primary> + </indexterm> + </term> + + <listitem> + <para> + Returns current batch mode status of the <application>libpq</application> connection. +<synopsis> +int PQbatchStatus(PGconn *conn); +</synopsis> + </para> + <variablelist> + <varlistentry id="libpq-PQbatchStatus-1"> + <term> + <literal>PQBATCH_MODE_ON</literal> + </term> + + <listitem> + <para> + Returns <literal>PQBATCH_MODE_ON</literal> if <application>libpq</application> connection is in <link + linkend="libpq-batch-mode">batch mode</link>. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQbatchStatus-2"> + <term> + <literal>PQBATCH_MODE_OFF</literal> + </term> + + <listitem> + <para> + Returns <literal>PQBATCH_MODE_OFF</literal> if <application>libpq</application> connection is not in <link + linkend="libpq-batch-mode">batch mode</link>. + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQbatchStatus-3"> + <term> + <literal>PQBATCH_MODE_ABORTED</literal> + </term> + <listitem> + <para> + Returns <literal>PQBATCH_MODE_ABORTED</literal> if <application>libpq</application> connection is in + aborted status. The aborted flag is cleared as soon as the result of the + <function>PQbatchSendQueue</function> at the end of the aborted batch is + processed. Clients don't usually need this function to verify aborted status + as they can tell that the batch is aborted from <literal>PGRES_BATCH_ABORTED</literal> + result codes. + </para> + </listitem> + </varlistentry> + + </variablelist> + + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQenterBatchMode"> + <term> + <function>PQenterBatchMode</function> + <indexterm> + <primary>PQenterBatchMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to enter batch mode if it is currently idle or + already in batch mode. + +<synopsis> +int PQenterBatchMode(PGconn *conn); +</synopsis> + + </para> + <para> + Returns 1 for success. Returns 0 and has no + effect if the connection is not currently idle, i.e. it has a result + ready, is waiting for more input from the server, etc. This function + does not actually send anything to the server, it just changes the + <application>libpq</application> connection state. + + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQexitBatchMode"> + <term> + <function>PQexitBatchMode</function> + <indexterm> + <primary>PQexitBatchMode</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes a connection to exit batch mode if it is currently in batch mode + with an empty queue and no pending results. +<synopsis> +int PQexitBatchMode(PGconn *conn); +</synopsis> + </para> + <para>Returns 1 for success. + Returns 1 and takes no action if not in batch mode. If the connection has + pending batch items in the queue for reading with + <function>PQbatchProcessQueue</function>, the current statement isn't finished + processing or there are results pending for collection with + <function>PQgetResult</function>, returns 0 and does nothing. + + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQbatchSendQueue"> + <term> + <function>PQbatchSendQueue</function> + <indexterm> + <primary>PQbatchSendQueue</primary> + </indexterm> + </term> + + <listitem> + <para> + Delimits the end of a set of a batched commands by sending a <link + linkend="protocol-flow-ext-query">sync message</link> and flushing + the send buffer. The end of a batch serves as + the delimiter of an implicit transaction and + an error recovery point; see <link linkend="libpq-batch-errors"> + error handling</link>. + +<synopsis> +int PQbatchSendQueue(PGconn *conn); +</synopsis> + </para> + <para>Returns 1 for success. Returns 0 if the connection is not in batch mode + or sending a <link linkend="protocol-flow-ext-query">sync message</link> is failed. + + </para> + </listitem> + </varlistentry> + + <varlistentry id="libpq-PQbatchProcessQueue"> + <term> + <function>PQbatchProcessQueue</function> + <indexterm> + <primary>PQbatchProcessQueue</primary> + </indexterm> + </term> + + <listitem> + <para> + Causes the connection to start processing the next queued query's + results. + </para> + +<synopsis> +int PQbatchProcessQueue(PGconn *conn); +</synopsis> + + <para> + Returns 1 if a new query was popped from the result queue + for processing. Returns 0 and has no effect if there are no query results + pending, batch mode is not enabled, or if the query currently processed + is incomplete or still has pending results. Reason for these failures can + be verified with <function>PQbatchStatus + </function> and <function>PQgetResult</function>. + See <link linkend="libpq-batch-results">processing results</link>. + + </para> + </listitem> + </varlistentry> + + </variablelist> + + </sect2> + + </sect1> + <sect1 id="libpq-single-row-mode"> <title>Retrieving Query Results Row-By-Row</title> @@ -4792,6 +5286,14 @@ int PQflush(PGconn *conn); Each object should be freed with <function>PQclear</function> as usual. </para> + <note> + <para> + On using batch mode, call <function>PQsetSingleRowMode</function> + immediately after a successful call of <function>PQbatchProcessQueue</function> + See <xref linkend="libpq-batch-mode"/> for more information. + </para> + </note> + <para> <variablelist> <varlistentry id="libpq-pqsetsinglerowmode"> diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml index 6b5aaeb..cf22f3f 100644 --- a/doc/src/sgml/lobj.sgml +++ b/doc/src/sgml/lobj.sgml @@ -130,6 +130,10 @@ <application>libpq</application> library. </para> + <para> + Client applications cannot use these functions while libpq connection is in batch mode. + </para> + <sect2 id="lo-create"> <title>Creating a Large Object</title> diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f9aec05..32151ff 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -944,6 +944,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query, walres->status = WALRCV_ERROR; walres->err = pchomp(PQerrorMessage(conn->streamConn)); break; + default: + /* This is just to keep compiler quiet */ + break; } PQclear(pgres); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index d6a38d0..625e74e 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -172,3 +172,8 @@ PQsslAttribute 169 PQsetErrorContextVisibility 170 PQresultVerboseErrorMessage 171 PQencryptPasswordConn 172 +PQenterBatchMode 173 +PQexitBatchMode 174 +PQbatchSendQueue 175 +PQbatchProcessQueue 176 +PQbatchStatus 177 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 8d54333..c2df669 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -3574,6 +3574,25 @@ sendTerminateConn(PGconn *conn) } /* + * PQfreeCommandQueue + * Free all the entries of PGcommandQueueEntry queue passed. + */ +static void +PQfreeCommandQueue(PGcommandQueueEntry *queue) +{ + + while (queue != NULL) + { + PGcommandQueueEntry *prev = queue; + + queue = queue->next; + if (prev->query) + free(prev->query); + free(prev); + } +} + +/* * closePGconn * - properly close a connection to the backend * @@ -3585,6 +3604,7 @@ static void closePGconn(PGconn *conn) { PGnotify *notify; + PGcommandQueueEntry *queue; pgParameterStatus *pstatus; sendTerminateConn(conn); @@ -3616,6 +3636,14 @@ closePGconn(PGconn *conn) free(prev); } conn->notifyHead = conn->notifyTail = NULL; + queue = conn->cmd_queue_head; + PQfreeCommandQueue(queue); + conn->cmd_queue_head = conn->cmd_queue_tail = NULL; + + queue = conn->cmd_queue_recycle; + PQfreeCommandQueue(queue); + + conn->cmd_queue_recycle = NULL; pstatus = conn->pstatus; while (pstatus != NULL) { diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 4c0114c..2049035 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -40,7 +40,9 @@ char *const pgresStatus[] = { "PGRES_NONFATAL_ERROR", "PGRES_FATAL_ERROR", "PGRES_COPY_BOTH", - "PGRES_SINGLE_TUPLE" + "PGRES_SINGLE_TUPLE", + "PGRES_BATCH_END", + "PGRES_BATCH_ABORTED" }; /* @@ -71,7 +73,10 @@ static PGresult *PQexecFinish(PGconn *conn); static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); - +static PGcommandQueueEntry *PQmakePipelinedCommand(PGconn *conn); +static void PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry); +static void PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry); +static int pqBatchFlush(PGconn *conn); /* ---------------- * Space management for PGresult. @@ -1159,7 +1164,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp) conn->next_result = conn->result; conn->result = res; /* And mark the result ready to return */ - conn->asyncStatus = PGASYNC_READY; + conn->asyncStatus = PGASYNC_READY_MORE; } return 1; @@ -1182,6 +1187,13 @@ fail: int PQsendQuery(PGconn *conn, const char *query) { + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n")); + return false; + } + if (!PQsendQueryStart(conn)) return 0; @@ -1280,6 +1292,10 @@ PQsendPrepare(PGconn *conn, const char *stmtName, const char *query, int nParams, const Oid *paramTypes) { + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + if (!PQsendQueryStart(conn)) return 0; @@ -1338,31 +1354,51 @@ PQsendPrepare(PGconn *conn, goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + else + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } /* remember we are doing just a Parse */ - conn->queryclass = PGQUERY_PREPARE; + *queryclass = PGQUERY_PREPARE; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); - conn->last_query = strdup(query); + if (*last_query) + free(*last_query); + *last_query = strdup(query); /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1410,7 +1446,80 @@ PQsendQueryPrepared(PGconn *conn, } /* - * Common startup code for PQsendQuery and sibling routines + * PQmakePipelinedCommand + * Get a new command queue entry, allocating it if required. Doesn't add it to + * the tail of the queue yet, use PQappendPipelinedCommand once the command has + * been written for that. If a command fails once it's called this, it should + * use PQrecyclePipelinedCommand to put it on the freelist or release it. + * + * If allocation fails sets the error message and returns null. + */ +static PGcommandQueueEntry * +PQmakePipelinedCommand(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (conn->cmd_queue_recycle == NULL) + { + entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry)); + if (entry == NULL) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory\n")); + return NULL; + } + } + else + { + entry = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry->next; + } + entry->next = NULL; + entry->query = NULL; + + return entry; +} + +/* + * PQappendPipelinedCommand + * Append a precreated command queue entry to the queue after it's been + * sent successfully. + */ +static void +PQappendPipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry) +{ + if (conn->cmd_queue_head == NULL) + conn->cmd_queue_head = entry; + else + conn->cmd_queue_tail->next = entry; + conn->cmd_queue_tail = entry; +} + +/* + * PQrecyclePipelinedCommand + * Push a command queue entry onto the freelist. It must be a dangling entry + * with null next pointer and not referenced by any other entry's next pointer. + */ +static void +PQrecyclePipelinedCommand(PGconn *conn, PGcommandQueueEntry * entry) +{ + if (entry == NULL) + return; + if (entry->next != NULL) + { + fprintf(stderr, libpq_gettext("tried to recycle non-dangling command queue entry")); + abort(); + } + if (entry->query) + free(entry->query); + + entry->next = conn->cmd_queue_recycle; + conn->cmd_queue_recycle = entry; +} + +/* + * PQsendQueryStart + * Common startup code for PQsendQuery and sibling routines */ static bool PQsendQueryStart(PGconn *conn) @@ -1428,20 +1537,60 @@ PQsendQueryStart(PGconn *conn) libpq_gettext("no connection to the server\n")); return false; } - /* Can't send while already busy, either. */ - if (conn->asyncStatus != PGASYNC_IDLE) + + /* Can't send while already busy, either, unless enqueuing for later */ + if (conn->asyncStatus != PGASYNC_IDLE && conn->batch_status == PQBATCH_MODE_OFF) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("another command is already in progress\n")); return false; } - /* initialize async result-accumulation state */ - pqClearAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * When enqueuing a message we don't change much of the connection + * state since it's already in use for the current command. The + * connection state will get updated when PQbatchQueueProcess(...) + * advances to start processing the queued message. + * + * Just make sure we can safely enqueue given the current connection + * state. We can enqueue behind another queue item, or behind a + * non-queue command (one that sends its own sync), but we can't + * enqueue if the connection is in a copy state. + */ + switch (conn->asyncStatus) + { + case PGASYNC_QUEUED: + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* ok to queue */ + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot queue commands during COPY\n")); + return false; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, idle state in batch mode")); + break; + } + } + else + { + /* This command's results will come in immediately. + * Initialize async result-accumulation state + */ + pqClearAsyncResult(conn); - /* reset single-row processing mode */ - conn->singleRowMode = false; + /* reset single-row processing mode */ + conn->singleRowMode = false; + } /* ready to send command message */ return true; } @@ -1465,6 +1614,10 @@ PQsendQueryGuts(PGconn *conn, int resultFormat) { int i; + PGcommandQueueEntry *pipeCmd = NULL; + char **last_query; + PGQueryClass *queryclass; + /* This isn't gonna work on a 2.0 server */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -1474,6 +1627,23 @@ PQsendQueryGuts(PGconn *conn, return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + last_query = &pipeCmd->query; + queryclass = &pipeCmd->queryclass; + } + else + { + last_query = &conn->last_query; + queryclass = &conn->queryclass; + } + + /* * We will send Parse (if needed), Bind, Describe Portal, Execute, Sync, * using specified statement name and the unnamed portal. @@ -1586,35 +1756,42 @@ PQsendQueryGuts(PGconn *conn, pqPutMsgEnd(conn) < 0) goto sendFailed; - /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are using extended query protocol */ - conn->queryclass = PGQUERY_EXTENDED; + *queryclass = PGQUERY_EXTENDED; /* and remember the query text too, if possible */ /* if insufficient memory, last_query just winds up NULL */ - if (conn->last_query) - free(conn->last_query); + if (*last_query) + free(*last_query); if (command) - conn->last_query = strdup(command); + *last_query = strdup(command); else - conn->last_query = NULL; + *last_query = NULL; /* * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -1741,6 +1918,280 @@ PQisBusy(PGconn *conn) return conn->asyncStatus == PGASYNC_BUSY; } +/* + * PQbatchStatus + * Returns current batch mode status + */ +int +PQbatchStatus(PGconn *conn) +{ + if (!conn) + return false; + + return conn->batch_status; +} + +/* + * PQenterBatchMode + * Put an idle connection in batch mode. Commands submitted after this + * can be pipelined on the connection, there's no requirement to wait for + * one to finish before the next is dispatched. + * + * Queuing of a new query or syncing during COPY is not allowed. + * + * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of batched + * commands may be sent while in batch mode. Batch mode can be exited by + * calling PQbatchEnd() once all results are processed. + * + * This doesn't actually send anything on the wire, it just puts libpq + * into a state where it can pipeline work. + */ +int +PQenterBatchMode(PGconn *conn) +{ + if (!conn) + return 0; + + if (conn->batch_status != PQBATCH_MODE_OFF) + return 1; + + if (conn->asyncStatus != PGASYNC_IDLE) + return 0; + + conn->batch_status = PQBATCH_MODE_ON; + conn->asyncStatus = PGASYNC_QUEUED; + + return 1; +} + +/* + * PQexitBatchMode + * End batch mode and return to normal command mode. + * + * Has no effect unless the client has processed all results + * from all outstanding batches and the connection is idle. + * + * Returns 1 if batch mode ended. + */ +int +PQexitBatchMode(PGconn *conn) +{ + if (!conn) + goto exitFailed; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 1; + + switch (conn->asyncStatus) + { + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* can't end batch while busy */ + goto exitFailed; + default: + break; + } + + /* still work to process */ + if (conn->cmd_queue_head != NULL) + goto exitFailed; + + conn->batch_status = PQBATCH_MODE_OFF; + conn->asyncStatus = PGASYNC_IDLE; + + /* Flush any pending data in out buffer */ + if (pqFlush(conn) < 0) + goto sendFailed; + return 1; + +sendFailed: + pqHandleSendFailure(conn); + +exitFailed: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, Failed to exit batch mode")); + return 0; +} + +/* + * PQbatchSendQueue + * End a batch submission by sending a protocol sync. The connection will + * remain in batch mode and unavailable for new synchronous command execution + * functions until all results from the batch are processed by the client. + * + * It's legal to start submitting another batch immediately, without waiting + * for the results of the current batch. There's no need to end batch mode + * and start it again. + * + * If a command in a batch fails, every subsequent command up to and including + * the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED state. If the + * whole batch is processed without error, a PGresult with PGRES_BATCH_END is + * produced. + */ +int +PQbatchSendQueue(PGconn *conn) +{ + PGcommandQueueEntry *entry; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 0; + + switch (conn->asyncStatus) + { + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + case PGASYNC_QUEUED: + /* can send sync to end this batch of cmds */ + break; + } + + entry = PQmakePipelinedCommand(conn); + if (entry == NULL) + return 0; /* error msg already set */ + + entry->queryclass = PGQUERY_SYNC; + entry->query = NULL; + + /* construct the Sync message */ + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + + PQappendPipelinedCommand(conn, entry); + + /* + * Give the data a push. In nonblock mode, don't complain if we're unable + * to send it all; PQgetResult() will do any additional flushing needed. + */ + if (PQflush(conn) < 0) + goto sendFailed; + + return 1; + +sendFailed: + PQrecyclePipelinedCommand(conn, entry); + pqHandleSendFailure(conn); + return 0; +} + +/* + * PQbatchProcessQueue + * In batch mode, start processing the next query in the queue. + * + * Returns 1 if the next query was popped from the queue and can + * be processed by PQconsumeInput, PQgetResult, etc. + * + * Returns 0 if the current query isn't done yet, the connection + * is not in a batch, or there are no more queries to process. + */ +int +PQbatchProcessQueue(PGconn *conn) +{ + PGcommandQueueEntry *next_query; + + if (!conn) + return 0; + + if (conn->batch_status == PQBATCH_MODE_OFF) + return 0; + + switch (conn->asyncStatus) + { + case PGASYNC_COPY_IN: + case PGASYNC_COPY_OUT: + case PGASYNC_COPY_BOTH: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, COPY in batch mode")); + break; + case PGASYNC_READY: + case PGASYNC_READY_MORE: + case PGASYNC_BUSY: + /* client still has to process current query or results */ + return 0; + break; + case PGASYNC_IDLE: + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext_noop("internal error, IDLE in batch mode")); + break; + case PGASYNC_QUEUED: + /* next query please */ + break; + } + + if (conn->cmd_queue_head == NULL) + { + /* + * In batch mode but nothing left on the queue; caller can submit more + * work or PQbatchEnd() now. + */ + return 0; + } + + /* + * Pop the next query from the queue and set up the connection state as if + * it'd just been dispatched from a non-batched call + */ + next_query = conn->cmd_queue_head; + conn->cmd_queue_head = next_query->next; + next_query->next = NULL; + + /* Initialize async result-accumulation state */ + pqClearAsyncResult(conn); + + /* reset single-row processing mode */ + conn->singleRowMode = false; + + + conn->last_query = next_query->query; + next_query->query = NULL; + conn->queryclass = next_query->queryclass; + + PQrecyclePipelinedCommand(conn, next_query); + + if (conn->batch_status == PQBATCH_MODE_ABORTED && conn->queryclass != PGQUERY_SYNC) + { + /* + * In an aborted batch we don't get anything from the server for each + * result; we're just discarding input until we get to the next sync + * from the server. The client needs to know its queries got aborted + * so we create a fake PGresult to return immediately from + * PQgetResult. + */ + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_ABORTED); + if (!conn->result) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory")); + pqSaveErrorResult(conn); + return 0; + } + conn->asyncStatus = PGASYNC_READY; + } + else + { + /* allow parsing to continue */ + conn->asyncStatus = PGASYNC_BUSY; + } + + return 1; +} + /* * PQgetResult @@ -1800,10 +2251,31 @@ PQgetResult(PGconn *conn) switch (conn->asyncStatus) { case PGASYNC_IDLE: + case PGASYNC_QUEUED: res = NULL; /* query is complete */ break; case PGASYNC_READY: res = pqPrepareAsyncResult(conn); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + /* + * In batch mode, query execution state cannot be IDLE as there + * can be other queries or results waiting in the queue + * + * The connection isn't idle since we can't submit new + * nonbatched commands. It isn't also busy since the current + * command is done and we need to process a new one. + */ + conn->asyncStatus = PGASYNC_QUEUED; + } + else + { + /* Set the state back to BUSY, allowing parsing to proceed. */ + conn->asyncStatus = PGASYNC_BUSY; + } + break; + case PGASYNC_READY_MORE: + res = pqPrepareAsyncResult(conn); /* Set the state back to BUSY, allowing parsing to proceed. */ conn->asyncStatus = PGASYNC_BUSY; break; @@ -1983,6 +2455,13 @@ PQexecStart(PGconn *conn) if (!conn) return false; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return false; + } + /* * Silently discard any prior query result that application didn't eat. * This is probably poor design, but it's here for backward compatibility. @@ -2177,6 +2656,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal) static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) { + PGcommandQueueEntry *pipeCmd = NULL; + PGQueryClass *queryclass; + /* Treat null desc_target as empty string */ if (!desc_target) desc_target = ""; @@ -2192,6 +2674,20 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) return 0; } + if (conn->batch_status != PQBATCH_MODE_OFF) + { + pipeCmd = PQmakePipelinedCommand(conn); + + if (pipeCmd == NULL) + return 0; /* error msg already set */ + + queryclass = &pipeCmd->queryclass; + } + else + { + queryclass = &conn->queryclass; + } + /* construct the Describe message */ if (pqPutMsgStart('D', false, conn) < 0 || pqPutc(desc_type, conn) < 0 || @@ -2200,15 +2696,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) goto sendFailed; /* construct the Sync message */ - if (pqPutMsgStart('S', false, conn) < 0 || - pqPutMsgEnd(conn) < 0) - goto sendFailed; + if (conn->batch_status == PQBATCH_MODE_OFF) + { + if (pqPutMsgStart('S', false, conn) < 0 || + pqPutMsgEnd(conn) < 0) + goto sendFailed; + } /* remember we are doing a Describe */ - conn->queryclass = PGQUERY_DESCRIBE; + *queryclass = PGQUERY_DESCRIBE; /* reset last-query string (not relevant now) */ - if (conn->last_query) + if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF) { free(conn->last_query); conn->last_query = NULL; @@ -2218,14 +2717,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) * Give the data a push. In nonblock mode, don't complain if we're unable * to send it all; PQgetResult() will do any additional flushing needed. */ - if (pqFlush(conn) < 0) + if (pqBatchFlush(conn) < 0) goto sendFailed; /* OK, it's launched! */ - conn->asyncStatus = PGASYNC_BUSY; + if (conn->batch_status != PQBATCH_MODE_OFF) + PQappendPipelinedCommand(conn, pipeCmd); + else + conn->asyncStatus = PGASYNC_BUSY; return 1; sendFailed: + PQrecyclePipelinedCommand(conn, pipeCmd); pqHandleSendFailure(conn); return 0; } @@ -2620,6 +3123,13 @@ PQfn(PGconn *conn, /* clear the error string */ resetPQExpBuffer(&conn->errorMessage); + if (conn->batch_status != PQBATCH_MODE_OFF) + { + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n")); + return NULL; + } + if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE || conn->result != NULL) { @@ -3813,3 +4323,14 @@ PQunescapeBytea(const unsigned char *strtext, size_t *retbuflen) *retbuflen = buflen; return tmpbuf; } +/* pqBatchFlush + * In batch mode, data will be flushed only when the out buffer reaches the threshold value. + * In non-batch mode, data will be flushed all the time. + */ +static int +pqBatchFlush(PGconn *conn) +{ + if ((conn->batch_status == PQBATCH_MODE_OFF)||(conn->outCount >= OUTBUFFER_THRESHOLD)) + return(pqFlush(conn)); + return 0; /* Just to keep compiler quiet */ +} diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 7dcef80..26c9a9e 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -411,6 +411,12 @@ pqParseInput2(PGconn *conn) { char id; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode"); + abort(); + } + /* * Loop to parse successive complete messages available in the buffer. */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index d3ca5d2..f699607 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -219,10 +219,18 @@ pqParseInput3(PGconn *conn) return; conn->asyncStatus = PGASYNC_READY; break; - case 'Z': /* backend is ready for new query */ + case 'Z': /* sync response, backend is ready for new query */ if (getReadyForQuery(conn)) return; - conn->asyncStatus = PGASYNC_IDLE; + if (conn->batch_status != PQBATCH_MODE_OFF) + { + conn->batch_status = PQBATCH_MODE_ON; + conn->result = PQmakeEmptyPGresult(conn, + PGRES_BATCH_END); + conn->asyncStatus = PGASYNC_READY; + } + else + conn->asyncStatus = PGASYNC_IDLE; break; case 'I': /* empty query */ if (conn->result == NULL) @@ -879,6 +887,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError) PQExpBufferData workBuf; char id; + if (isError && conn->batch_status != PQBATCH_MODE_OFF) + conn->batch_status = PQBATCH_MODE_ABORTED; + /* * Since the fields might be pretty long, we create a temporary * PQExpBuffer rather than using conn->workBuffer. workBuffer is intended diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ed9c806..7136980 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -95,7 +95,10 @@ typedef enum PGRES_NONFATAL_ERROR, /* notice or warning message */ PGRES_FATAL_ERROR, /* query failed */ PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */ - PGRES_SINGLE_TUPLE /* single tuple from larger resultset */ + PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */ + PGRES_BATCH_END, /* end of a batch of commands */ + PGRES_BATCH_ABORTED, /* Command didn't run because of an abort + * earlier in a batch */ } ExecStatusType; typedef enum @@ -134,6 +137,17 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */ } PGPing; +/* + * PQBatchStatus - Current status of batch mode + */ + +typedef enum +{ + PQBATCH_MODE_OFF, + PQBATCH_MODE_ON, + PQBATCH_MODE_ABORTED +} PQBatchStatus; + /* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications. */ @@ -425,6 +439,14 @@ extern PGresult *PQgetResult(PGconn *conn); extern int PQisBusy(PGconn *conn); extern int PQconsumeInput(PGconn *conn); +/* Routines for batch mode management */ +extern int PQbatchStatus(PGconn *conn); +extern int PQbatchQueueCount(PGconn *conn); +extern int PQenterBatchMode(PGconn *conn); +extern int PQexitBatchMode(PGconn *conn); +extern int PQbatchSendQueue(PGconn *conn); +extern int PQbatchProcessQueue(PGconn *conn); + /* LISTEN/NOTIFY support */ extern PGnotify *PQnotifies(PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 4e35409..fc5376f 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -215,10 +215,15 @@ typedef enum { PGASYNC_IDLE, /* nothing's happening, dude */ PGASYNC_BUSY, /* query in progress */ - PGASYNC_READY, /* result ready for PQgetResult */ + PGASYNC_READY, /* query done, waiting for client to fetch + * result */ + PGASYNC_READY_MORE, /* query done, waiting for client to fetch + * result, More results expected from this + * query */ PGASYNC_COPY_IN, /* Copy In data transfer in progress */ PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */ - PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */ + PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */ + PGASYNC_QUEUED /* Current query done, more in queue */ } PGAsyncStatusType; /* PGQueryClass tracks which query protocol we are now executing */ @@ -227,7 +232,8 @@ typedef enum PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */ PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */ PGQUERY_PREPARE, /* Parse only (PQprepare) */ - PGQUERY_DESCRIBE /* Describe Statement or Portal */ + PGQUERY_DESCRIBE, /* Describe Statement or Portal */ + PGQUERY_SYNC /* A protocol sync to end a batch */ } PGQueryClass; /* PGSetenvStatusType defines the state of the PQSetenv state machine */ @@ -297,6 +303,22 @@ typedef enum pg_conn_host_type CHT_UNIX_SOCKET } pg_conn_host_type; +/* An entry in the pending command queue. Used by batch mode to keep track + * of the expected results of future commands we've dispatched. + * + * Note that entries in this list are reused by being zeroed and appended to + * the tail when popped off the head. The entry with null next pointer is not + * the end of the list of expected commands, that's the tail pointer in + * pg_conn. + */ +typedef struct pgCommandQueueEntry +{ + PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */ + char *query; /* SQL command, or NULL if unknown */ + struct pgCommandQueueEntry *next; +} PGcommandQueueEntry; + + /* * pg_conn_host stores all information about one of possibly several hosts * mentioned in the connection string. Derived by splitting the pghost @@ -386,6 +408,7 @@ struct pg_conn bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock * sending semantics */ + PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */ bool singleRowMode; /* return current query result row-by-row? */ char copy_is_binary; /* 1 = copy binary, 0 = copy text */ int copy_already_done; /* # bytes already returned in COPY OUT */ @@ -397,6 +420,16 @@ struct pg_conn int whichhost; /* host we're currently considering */ pg_conn_host *connhost; /* details about each possible host */ + /* + * The command queue + * + * head is the next pending cmd, tail is where we append new commands. + * Freed entries for recycling go on the recycle linked list. + */ + PGcommandQueueEntry *cmd_queue_head; + PGcommandQueueEntry *cmd_queue_tail; + PGcommandQueueEntry *cmd_queue_recycle; + /* Connection data */ pgsocket sock; /* FD for socket, PGINVALID_SOCKET if * unconnected */ @@ -680,6 +713,12 @@ extern char *pgtls_get_peer_certificate_hash(PGconn *conn, size_t *len); */ #define pqIsnonblocking(conn) ((conn)->nonblocking) +/* + * Connection's outbuffer threshold is set to 64k as it is safe + * in Windows as per comments in pqSendSome() API. + */ +#define OUTBUFFER_THRESHOLD 65536 + #ifdef ENABLE_NLS extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1); extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2); @@ -688,6 +727,8 @@ extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigne #define libpq_ngettext(s, p, n) ((n) == 1 ? (s) : (p)) #endif +#define libpq_gettext_noop(x) (x) + /* * These macros are needed to let error-handling code be portable between * Unix and Windows. (ugh) -- 2.7.4.windows.1