Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions bench.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Minimal benchmark for PQgetCopyData alternative.
*
* Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the
* proposed new function), then run the binary through "time" to get time and
* CPU usage stats.
*
* DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype
* patch.
*/
#include <stdio.h>
#include <stdlib.h>

#include <libpq-fe.h>

/* Define CALL to...
* 0: Use classic PQgetCopyData()
* 1: Use experimental PQhandleCopyData()
*/

/* Benchmark results (best result per category, out of 4 runs):
*
* PQgetCopyData:
* real - 0m32.972s
* user - 0m11.364s
* sys - 0m1.255s
*
* PQhandleCopyData:
* real - 0m32.839s
* user - 0m3.407s
* sys - 0m0.872s
*/

#if CALL == 1
/*
* Print line, add newline.
*/
static int
print_row_and_newline(void *, const char *buf, size_t len)
{
fwrite(buf, 1, len, stdout);
return 0;
}
#endif


int
main()
{
#if !defined(CALL)
#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData."
#elif CALL == 0
fprintf(stderr, "Testing classic PQgetCopyData().\n");
#elif CALL == 1
fprintf(stderr, "Testing experimental PQhandleCopyData.\n");
#else
#error "Unknown CALL value."
#endif

PGconn *cx = PQconnectdb("");

if (!cx)
{
fprintf(stderr, "Could not connect.\n");
exit(1);
}
PGresult *tx = PQexec(cx, "BEGIN");

if (!tx)
{
fprintf(stderr, "No result from BEGIN!\n");
exit(1);
}
int s = PQresultStatus(tx);

if (s != PGRES_COMMAND_OK)
{
fprintf(stderr, "Failed to start transaction: status %d.\n", s);
exit(1);
}

PGresult *r = PQexec(
cx,
"COPY ("
"SELECT generate_series, 'row #' || generate_series "
"FROM generate_series(1, 100000000)"
") TO STDOUT"
);

if (!r)
{
fprintf(stderr, "No result!\n");
exit(1);
}
int status = PQresultStatus(r);

if (status != PGRES_COPY_OUT)
{
fprintf(stderr, "Failed to start COPY: status %d.\n", status);
exit(1);
}

int bytes;
#if CALL == 0
char *buffer = NULL;

for (
bytes = PQgetCopyData(cx, &buffer, 0);
bytes > 0;
bytes = PQgetCopyData(cx, &buffer, 0)
)
{
if (buffer)
{
printf("%s", buffer);
PQfreemem(buffer);
}
}
#elif CALL == 1
while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0);
#else
#error "Unknown CALL value."
#endif

if (bytes != -1)
{
fprintf(stderr, "Got unexpected result: %d.\n", bytes);
exit(1);
}

/* (Don't bother cleaning up.) */
}
18 changes: 11 additions & 7 deletions src/interfaces/ecpg/ecpglib/execute.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@
#include "sqlda-compat.h"
#include "sqlda-native.h"

/*
* Print non-zero-terminated line received from COPY.
*/
static int
print_row(void *, const char *buf, size_t len)
{
fwrite(buf, 1, len, stdout);
return 0;
}

/*
* This function returns a newly malloced string that has ' and \
* escaped.
Expand Down Expand Up @@ -1876,16 +1886,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result)
break;
case PGRES_COPY_OUT:
{
char *buffer;
int res;

ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno);
while ((res = PQgetCopyData(stmt->connection->connection,
&buffer, 0)) > 0)
{
printf("%s", buffer);
PQfreemem(buffer);
}
while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0);
if (res == -1)
{
/* COPY done */
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/libpq/exports.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,4 @@ PQpipelineStatus 183
PQsetTraceFlags 184
PQmblenBounded 185
PQsendFlushRequest 186
PQhandleCopyData 187
59 changes: 57 additions & 2 deletions src/interfaces/libpq/fe-exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,27 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
return 1;
}

struct GetCopyData_context
{
PGconn *conn;
char *buffer;
};

static int alloc_copy_buffer(void *context, const char *inbuf, size_t len)
{
struct GetCopyData_context *params = (struct GetCopyData_context *) context;
PGconn *conn = params->conn;
params->buffer = (char *) malloc(len + 1);
if (params->buffer == NULL)
{
libpq_append_conn_error(conn, "out of memory");
return -2;
}
memcpy(params->buffer, &conn->inBuffer[conn->inCursor], len);
params->buffer[len] = '\0'; /* Add terminating null */
return 0;
}

/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
Expand All @@ -2697,7 +2718,41 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
int
PQgetCopyData(PGconn *conn, char **buffer, int async)
{
*buffer = NULL; /* for all failure cases */
struct GetCopyData_context context;
int result;
context.conn = conn;
context.buffer = NULL; /* for all failure cases */
result = pqGetCopyData3(conn, alloc_copy_buffer, &context, async);
*buffer = context.buffer;
return result;
}

/*
* PQhandleCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH, and invoke a callback.
*
* Pass a "handler" callback which takes a buffer and its size. If the handler
* returns a negative value, PQhandleCopyData will return that as an error
* return code.
*
* Calls handler only after receiving a full row. The buffer does NOT have a
* terminating zero, so do not go beyond the given size.
*
* The context pointer can be anything; this function will pass it to handler.
*
* If successful, calls handler and returns row length (always > 0) as result.
* If no row is available yet (only possible if async is true), does not call
* handler, and returns 0 as result.
* If the copy has ended (consult PQgetResult), does not call handler, and
* returns -1.
* On failure, does not call handler, and returns -2 (consult PQerrorMessage).
*/
int
PQhandleCopyData(PGconn *conn,
int (*handler) (void *, const char *, size_t),
void *context,
int async)
{
if (!conn)
return -2;
if (conn->asyncStatus != PGASYNC_COPY_OUT &&
Expand All @@ -2706,7 +2761,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
libpq_append_conn_error(conn, "no COPY in progress");
return -2;
}
return pqGetCopyData3(conn, buffer, async);
return pqGetCopyData3(conn, handler, context, async);
}

/*
Expand Down
56 changes: 27 additions & 29 deletions src/interfaces/libpq/fe-protocol3.c
Original file line number Diff line number Diff line change
Expand Up @@ -1720,27 +1720,35 @@ getCopyDataMessage(PGconn *conn)
}

/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
* PQhandleCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH, and pass it to a caller-supplied buffer.
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
* Returns 0 if no row available yet (only possible if async is true),
* -1 if end of copy (consult PQgetResult), or -2 if error (consult
* PQerrorMessage).
* Pass a "handler" callback which takes a buffer and its size. If the handler
* returns a negative value, PQhandleCopyData will return that as an error
* return code.
*
* Calls handler only after receiving a full row. The buffer does NOT have a
* terminating zero, so do not go beyond the given size.
*
* The context pointer can be anything; this function will pass it to handler.
*
* If successful, calls handler and returns row length (always > 0) as result.
* If no row is available yet (only possible if async is true), does not call
* handler, and returns 0 as result.
* If the copy has ended (consult PQgetResult), does not call handler, and
* returns -1.
* On failure, does not call handler, and returns -2 (consult PQerrorMessage).
*/
int
pqGetCopyData3(PGconn *conn, char **buffer, int async)
pqGetCopyData3(PGconn *conn,
int (*handler) (void *, const char *, size_t),
void *context,
int async)
{
int msgLength;

for (;;)
{
/*
* Collect the next input message. To make life simpler for async
* callers, we keep returning 0 until the next message is fully
* available, even if it is not Copy Data.
*/
msgLength = getCopyDataMessage(conn);
if (msgLength < 0)
return msgLength; /* end-of-copy or error */
Expand All @@ -1756,29 +1764,19 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
continue;
}

/*
* Drop zero-length messages (shouldn't happen anyway). Otherwise
* pass the data back to the caller.
*/
msgLength -= 4;
if (msgLength > 0)
{
*buffer = (char *) malloc(msgLength + 1);
if (*buffer == NULL)
{
libpq_append_conn_error(conn, "out of memory");
return -2;
}
memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength);
(*buffer)[msgLength] = '\0'; /* Add terminating null */

/* Mark message consumed */
/* We have a row. Call the handler. */
int result = handler(context,
&conn->inBuffer[conn->inCursor],
msgLength);
conn->inStart = conn->inCursor + msgLength;

if (result < 0)
return result;
return msgLength;
}

/* Empty, so drop it and loop around for another */
conn->inStart = conn->inCursor;
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/interfaces/libpq/libpq-fe.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,12 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes);
extern int PQputCopyEnd(PGconn *conn, const char *errormsg);
extern int PQgetCopyData(PGconn *conn, char **buffer, int async);

/* TODO: "House style" would be int, rather than size_t. */
extern int PQhandleCopyData(PGconn *conn,
int handler(void *, const char *, size_t),
void *context,
int async);

/* Deprecated routines for copy in/out */
extern int PQgetline(PGconn *conn, char *buffer, int length);
extern int PQputline(PGconn *conn, const char *string);
Expand Down
5 changes: 4 additions & 1 deletion src/interfaces/libpq/libpq-int.h
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,10 @@ extern int pqGetErrorNotice3(PGconn *conn, bool isError);
extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res,
PGVerbosity verbosity, PGContextVisibility show_context);
extern int pqGetNegotiateProtocolVersion3(PGconn *conn);
extern int pqGetCopyData3(PGconn *conn, char **buffer, int async);
extern int pqGetCopyData3(PGconn *conn,
int (*handler) (void *, const char *, size_t),
void *context,
int async);
extern int pqGetline3(PGconn *conn, char *s, int maxlen);
extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize);
extern int pqEndcopy3(PGconn *conn);
Expand Down