From 93bb715eca531e207ee3949125f60ec0d35cb3fc Mon Sep 17 00:00:00 2001 From: "Jeroen T. Vermeulen" Date: Fri, 3 Feb 2023 15:15:52 +0100 Subject: [PATCH 1/3] Sketch out an alternative to `PQgeCopyData`. Saves CPU time (and power usage) by skipping the allocation and deallocation of buffers for each row. The downside is that the data is only valid for one iteration, but that's probably all that most users of COPY OUT need anyway. The callback-based `PQhandleCopyData` reduces CPU usage by more than half in my toy benchmark. Wall-clock time stayed about the same as it was. The disadvantage is that this breaks with existing libpq API style. But when performance matters, that might be worth it. --- bench.c | 134 ++++++++++++++++++++++++++ src/interfaces/ecpg/ecpglib/execute.c | 19 ++-- src/interfaces/libpq/exports.txt | 1 + src/interfaces/libpq/fe-exec.c | 36 +++++++ src/interfaces/libpq/fe-protocol3.c | 57 +++++++++++ src/interfaces/libpq/libpq-fe.h | 3 + src/interfaces/libpq/libpq-int.h | 1 + 7 files changed, 244 insertions(+), 7 deletions(-) create mode 100644 bench.c diff --git a/bench.c b/bench.c new file mode 100644 index 0000000000000..c3206d29272c1 --- /dev/null +++ b/bench.c @@ -0,0 +1,134 @@ +/* + * Minimal benchmark for PQgetCopyData alternative. + * + * Define CALL to 0 (to use the classic PQgetCopyData) or 1 (to use the + * proposed new function), then run the binary through "time" to get time and + * CPU usage stats. + * + * DO NOT UPSTREAM THIS FILE. It's just a demonstration for the prototype + * patch. + */ +#include +#include + +#include + +/* Define CALL to... + * 0: Use classic PQgetCopyData() + * 1: Use experimental PQhandleCopyData() + */ + +/* Benchmark results (best result per category, out of 4 runs): + * + * PQgetCopyData: + * real - 0m32.972s + * user - 0m11.364s + * sys - 0m1.255s + * + * PQhandleCopyData: + * real - 0m32.839s + * user - 0m3.407s + * sys - 0m0.872s + */ + +#if CALL == 1 +/* + * Print line, add newline. + */ +static int +print_row_and_newline(void *, char *buf, size_t len) +{ + /* Zero-terminate the buffer. */ + buf[len - 1] = '\0'; + printf("%s\n", buf); + return 0; +} +#endif + + +int +main() +{ +#if !defined(CALL) +#error "Set CALL: 0 = PQgetCopyDta, 1 = PQhandleCopyData." +#elif CALL == 0 + fprintf(stderr, "Testing classic PQgetCopyData().\n"); +#elif CALL == 1 + fprintf(stderr, "Testing experimental PQhandleCopyData.\n"); +#else +#error "Unknown CALL value." +#endif + + PGconn *cx = PQconnectdb(""); + + if (!cx) + { + fprintf(stderr, "Could not connect.\n"); + exit(1); + } + PGresult *tx = PQexec(cx, "BEGIN"); + + if (!tx) + { + fprintf(stderr, "No result from BEGIN!\n"); + exit(1); + } + int s = PQresultStatus(tx); + + if (s != PGRES_COMMAND_OK) + { + fprintf(stderr, "Failed to start transaction: status %d.\n", s); + exit(1); + } + + PGresult *r = PQexec( + cx, + "COPY (" + "SELECT generate_series, 'row #' || generate_series " + "FROM generate_series(1, 100000000)" + ") TO STDOUT" + ); + + if (!r) + { + fprintf(stderr, "No result!\n"); + exit(1); + } + int status = PQresultStatus(r); + + if (status != PGRES_COPY_OUT) + { + fprintf(stderr, "Failed to start COPY: status %d.\n", status); + exit(1); + } + + int bytes; +#if CALL == 0 + char *buffer = NULL; + + for ( + bytes = PQgetCopyData(cx, &buffer, 0); + bytes > 0; + bytes = PQgetCopyData(cx, &buffer, 0) + ) + { + if (buffer) + { + printf("%s", buffer); + PQfreemem(buffer); + } + } +#elif CALL == 1 + while ((bytes = PQhandleCopyData(cx, print_row_and_newline, NULL, 0)) > 0); +#else +#error "Unknown CALL value." +#endif + + if (bytes != -1) + { + fprintf(stderr, "Got unexpected result: %d.\n", bytes); + exit(1); + } + + /* (Don't bother cleaning up.) */ +} diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index 641851983d387..1e4eba58a24bd 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -32,6 +32,17 @@ #include "sqlda-compat.h" #include "sqlda-native.h" +/* + * Print non-zero-terminated line received from COPY. + */ +static int +print_row(void *, char *buf, size_t len) +{ + buf[len - 1] = '\0'; + printf("%s\n", buf); + return 0; +} + /* * This function returns a newly malloced string that has ' and \ * escaped. @@ -1876,16 +1887,10 @@ ecpg_process_output(struct statement *stmt, bool clear_result) break; case PGRES_COPY_OUT: { - char *buffer; int res; ecpg_log("ecpg_process_output on line %d: COPY OUT data transfer in progress\n", stmt->lineno); - while ((res = PQgetCopyData(stmt->connection->connection, - &buffer, 0)) > 0) - { - printf("%s", buffer); - PQfreemem(buffer); - } + while ((res = PQhandleCopyData(stmt->connection->connection, print_row, NULL, 0)) > 0); if (res == -1) { /* COPY done */ diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370916..add1ff15910c8 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,4 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQhandleCopyData 187 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index ec62550e38581..d0f0501e80c5d 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2709,6 +2709,42 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) return pqGetCopyData3(conn, buffer, async); } +/* + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and invoke a callback. + * + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. However, you may + * modify the buffer's contents, and the line ends in a newline. If you need + * a terminating zero, you are free to overwrite the newline. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). + */ +int +PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +{ + if (!conn) + return -2; + if (conn->asyncStatus != PGASYNC_COPY_OUT && + conn->asyncStatus != PGASYNC_COPY_BOTH) + { + libpq_append_conn_error(conn, "no COPY in progress"); + return -2; + } + return pqHandleCopyData3(conn, handler, context, async); +} + /* * PQgetline - gets a newline-terminated string from the backend. * diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8ab6a8841654e..1f4ecec9bdd82 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1783,6 +1783,63 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) } } +/* + * PQhandleCopyData - read a row of data from the backend during COPY OUT + * or COPY BOTH, and pass it to a caller-supplied buffer. + * + * Pass a "handler" callback which takes a buffer and its size. (Its return + * value is currently stil meaningless, but could become a flag like "this + * ride is making me sick and I'd like to get off.) + * + * Calls handler only after receiving a full row. The buffer does NOT have a + * terminating zero, so do not go beyond the given size. However, you may + * modify the buffer's contents, and the line ends in a newline. If you need + * a terminating zero, you are free to overwrite the newline. + * + * The context pointer can be anything; this function will pass it to handler. + * + * If successful, calls handler and returns row length (always > 0) as result. + * If no row is available yet (only possible if async is true), does not call + * handler, and returns 0 as result. + * If the copy has ended (consult PQgetResult), does not call handler, and + * returns -1. + * On failure, does not call handler, and returns -2 (consult PQerrorMessage). + */ +int +pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +{ + int msgLength; + + for (;;) + { + msgLength = getCopyDataMessage(conn); + if (msgLength < 0) + return msgLength; /* end-of-copy or error */ + if (msgLength == 0) + { + /* Don't block if async read requested */ + if (async) + return 0; + /* Need to load more data */ + if (pqWait(true, false, conn) || + pqReadData(conn) < 0) + return -2; + continue; + } + + msgLength -= 4; + if (msgLength > 0) + { + /* We have a row. Call the handler. */ + handler(context, &conn->inBuffer[conn->inCursor], msgLength); + conn->inStart = conn->inCursor + msgLength; + return msgLength; + } + + conn->inStart = conn->inCursor; + } +} + /* * PQgetline - gets a newline-terminated string from the backend. * diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f3d92204964ae..c07544b2554b6 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -482,6 +482,9 @@ extern int PQputCopyData(PGconn *conn, const char *buffer, int nbytes); extern int PQputCopyEnd(PGconn *conn, const char *errormsg); extern int PQgetCopyData(PGconn *conn, char **buffer, int async); +/* TODO: "House style" would be int, rather than size_t. */ +extern int PQhandleCopyData(PGconn *conn, int handler(void *, char *, size_t), void *context, int async); + /* Deprecated routines for copy in/out */ extern int PQgetline(PGconn *conn, char *buffer, int length); extern int PQputline(PGconn *conn, const char *string); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d94b648ea5b95..8936ea0388778 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -687,6 +687,7 @@ extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res, PGVerbosity verbosity, PGContextVisibility show_context); extern int pqGetNegotiateProtocolVersion3(PGconn *conn); extern int pqGetCopyData3(PGconn *conn, char **buffer, int async); +extern int pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async); extern int pqGetline3(PGconn *conn, char *s, int maxlen); extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize); extern int pqEndcopy3(PGconn *conn); From d480af9f31d1b57b190d748b784491f74e763834 Mon Sep 17 00:00:00 2001 From: Jeroen Vermeulen Date: Fri, 3 Mar 2023 20:25:06 +0100 Subject: [PATCH 2/3] Changes for Tom. Pass the buffer to the callback as `const`. Instead of copying `pqGetCopyData3` as `pqHandleCopyData3`, re-implement `PQgetCopyData` on top of the callback-based API, now renamed to `pqGetCopyData3`. Use `fwrite` instead of `printf` so we don't need zero-terminated strings. --- bench.c | 6 +- src/interfaces/ecpg/ecpglib/execute.c | 5 +- src/interfaces/libpq/fe-exec.c | 49 ++++++++++++----- src/interfaces/libpq/fe-protocol3.c | 79 ++++----------------------- src/interfaces/libpq/libpq-fe.h | 5 +- src/interfaces/libpq/libpq-int.h | 6 +- 6 files changed, 56 insertions(+), 94 deletions(-) diff --git a/bench.c b/bench.c index c3206d29272c1..35f8c7a36fa9f 100644 --- a/bench.c +++ b/bench.c @@ -36,11 +36,9 @@ * Print line, add newline. */ static int -print_row_and_newline(void *, char *buf, size_t len) +print_row_and_newline(void *, const char *buf, size_t len) { - /* Zero-terminate the buffer. */ - buf[len - 1] = '\0'; - printf("%s\n", buf); + fwrite(buf, 1, len, stdout); return 0; } #endif diff --git a/src/interfaces/ecpg/ecpglib/execute.c b/src/interfaces/ecpg/ecpglib/execute.c index 1e4eba58a24bd..aaf31ea216211 100644 --- a/src/interfaces/ecpg/ecpglib/execute.c +++ b/src/interfaces/ecpg/ecpglib/execute.c @@ -36,10 +36,9 @@ * Print non-zero-terminated line received from COPY. */ static int -print_row(void *, char *buf, size_t len) +print_row(void *, const char *buf, size_t len) { - buf[len - 1] = '\0'; - printf("%s\n", buf); + fwrite(buf, 1, len, stdout); return 0; } diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index d0f0501e80c5d..1c9f0ee1753ff 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2684,6 +2684,27 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) return 1; } +struct GetCopyData_context +{ + PGconn *conn; + char *buffer; +}; + +static int alloc_copy_buffer(void *context, const char *inbuf, size_t len) +{ + struct GetCopyData_context *params = (struct GetCopyData_context *) context; + PGconn *conn = params->conn; + params->buffer = (char *) malloc(len + 1); + if (params->buffer == NULL) + { + libpq_append_conn_error(conn, "out of memory"); + return -2; + } + memcpy(params->buffer, &conn->inBuffer[conn->inCursor], len); + params->buffer[len] = '\0'; /* Add terminating null */ + return 0; +} + /* * PQgetCopyData - read a row of data from the backend during COPY OUT * or COPY BOTH @@ -2697,16 +2718,13 @@ PQputCopyEnd(PGconn *conn, const char *errormsg) int PQgetCopyData(PGconn *conn, char **buffer, int async) { - *buffer = NULL; /* for all failure cases */ - if (!conn) - return -2; - if (conn->asyncStatus != PGASYNC_COPY_OUT && - conn->asyncStatus != PGASYNC_COPY_BOTH) - { - libpq_append_conn_error(conn, "no COPY in progress"); - return -2; - } - return pqGetCopyData3(conn, buffer, async); + struct GetCopyData_context context; + int result; + context.conn = conn; + context.buffer = NULL; /* for all failure cases */ + result = pqGetCopyData3(conn, alloc_copy_buffer, &context, async); + *buffer = context.buffer; + return result; } /* @@ -2718,9 +2736,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) * ride is making me sick and I'd like to get off.) * * Calls handler only after receiving a full row. The buffer does NOT have a - * terminating zero, so do not go beyond the given size. However, you may - * modify the buffer's contents, and the line ends in a newline. If you need - * a terminating zero, you are free to overwrite the newline. + * terminating zero, so do not go beyond the given size. * * The context pointer can be anything; this function will pass it to handler. * @@ -2732,7 +2748,10 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) * On failure, does not call handler, and returns -2 (consult PQerrorMessage). */ int -PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +PQhandleCopyData(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async) { if (!conn) return -2; @@ -2742,7 +2761,7 @@ PQhandleCopyData(PGconn *conn, int (*handler) (void *, char *, size_t), void *co libpq_append_conn_error(conn, "no COPY in progress"); return -2; } - return pqHandleCopyData3(conn, handler, context, async); + return pqGetCopyData3(conn, handler, context, async); } /* diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 1f4ecec9bdd82..8c89462704249 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1719,70 +1719,6 @@ getCopyDataMessage(PGconn *conn) } } -/* - * PQgetCopyData - read a row of data from the backend during COPY OUT - * or COPY BOTH - * - * If successful, sets *buffer to point to a malloc'd row of data, and - * returns row length (always > 0) as result. - * Returns 0 if no row available yet (only possible if async is true), - * -1 if end of copy (consult PQgetResult), or -2 if error (consult - * PQerrorMessage). - */ -int -pqGetCopyData3(PGconn *conn, char **buffer, int async) -{ - int msgLength; - - for (;;) - { - /* - * Collect the next input message. To make life simpler for async - * callers, we keep returning 0 until the next message is fully - * available, even if it is not Copy Data. - */ - msgLength = getCopyDataMessage(conn); - if (msgLength < 0) - return msgLength; /* end-of-copy or error */ - if (msgLength == 0) - { - /* Don't block if async read requested */ - if (async) - return 0; - /* Need to load more data */ - if (pqWait(true, false, conn) || - pqReadData(conn) < 0) - return -2; - continue; - } - - /* - * Drop zero-length messages (shouldn't happen anyway). Otherwise - * pass the data back to the caller. - */ - msgLength -= 4; - if (msgLength > 0) - { - *buffer = (char *) malloc(msgLength + 1); - if (*buffer == NULL) - { - libpq_append_conn_error(conn, "out of memory"); - return -2; - } - memcpy(*buffer, &conn->inBuffer[conn->inCursor], msgLength); - (*buffer)[msgLength] = '\0'; /* Add terminating null */ - - /* Mark message consumed */ - conn->inStart = conn->inCursor + msgLength; - - return msgLength; - } - - /* Empty, so drop it and loop around for another */ - conn->inStart = conn->inCursor; - } -} - /* * PQhandleCopyData - read a row of data from the backend during COPY OUT * or COPY BOTH, and pass it to a caller-supplied buffer. @@ -1792,9 +1728,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) * ride is making me sick and I'd like to get off.) * * Calls handler only after receiving a full row. The buffer does NOT have a - * terminating zero, so do not go beyond the given size. However, you may - * modify the buffer's contents, and the line ends in a newline. If you need - * a terminating zero, you are free to overwrite the newline. + * terminating zero, so do not go beyond the given size. * * The context pointer can be anything; this function will pass it to handler. * @@ -1806,7 +1740,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async) * On failure, does not call handler, and returns -2 (consult PQerrorMessage). */ int -pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async) +pqGetCopyData3(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async) { int msgLength; @@ -1831,8 +1768,12 @@ pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *c if (msgLength > 0) { /* We have a row. Call the handler. */ - handler(context, &conn->inBuffer[conn->inCursor], msgLength); + int result = handler(context, + &conn->inBuffer[conn->inCursor], + msgLength); conn->inStart = conn->inCursor + msgLength; + if (result < 0) + return result; return msgLength; } diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index c07544b2554b6..4963aa1e5140a 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -483,7 +483,10 @@ extern int PQputCopyEnd(PGconn *conn, const char *errormsg); extern int PQgetCopyData(PGconn *conn, char **buffer, int async); /* TODO: "House style" would be int, rather than size_t. */ -extern int PQhandleCopyData(PGconn *conn, int handler(void *, char *, size_t), void *context, int async); +extern int PQhandleCopyData(PGconn *conn, + int handler(void *, const char *, size_t), + void *context, + int async); /* Deprecated routines for copy in/out */ extern int PQgetline(PGconn *conn, char *buffer, int length); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 8936ea0388778..b5f8b609bc01b 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -686,8 +686,10 @@ extern int pqGetErrorNotice3(PGconn *conn, bool isError); extern void pqBuildErrorMessage3(PQExpBuffer msg, const PGresult *res, PGVerbosity verbosity, PGContextVisibility show_context); extern int pqGetNegotiateProtocolVersion3(PGconn *conn); -extern int pqGetCopyData3(PGconn *conn, char **buffer, int async); -extern int pqHandleCopyData3(PGconn *conn, int (*handler) (void *, char *, size_t), void *context, int async); +extern int pqGetCopyData3(PGconn *conn, + int (*handler) (void *, const char *, size_t), + void *context, + int async); extern int pqGetline3(PGconn *conn, char *s, int maxlen); extern int pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize); extern int pqEndcopy3(PGconn *conn); From 13aceb3579e3e23c637a4238c9da82818cb90c38 Mon Sep 17 00:00:00 2001 From: Jeroen Vermeulen Date: Fri, 3 Mar 2023 21:02:14 +0100 Subject: [PATCH 3/3] Document error returns. --- src/interfaces/libpq/fe-exec.c | 6 +++--- src/interfaces/libpq/fe-protocol3.c | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 1c9f0ee1753ff..e3ba98cf42438 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2731,9 +2731,9 @@ PQgetCopyData(PGconn *conn, char **buffer, int async) * PQhandleCopyData - read a row of data from the backend during COPY OUT * or COPY BOTH, and invoke a callback. * - * Pass a "handler" callback which takes a buffer and its size. (Its return - * value is currently stil meaningless, but could become a flag like "this - * ride is making me sick and I'd like to get off.) + * Pass a "handler" callback which takes a buffer and its size. If the handler + * returns a negative value, PQhandleCopyData will return that as an error + * return code. * * Calls handler only after receiving a full row. The buffer does NOT have a * terminating zero, so do not go beyond the given size. diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8c89462704249..c7439ecd77ae0 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1723,9 +1723,9 @@ getCopyDataMessage(PGconn *conn) * PQhandleCopyData - read a row of data from the backend during COPY OUT * or COPY BOTH, and pass it to a caller-supplied buffer. * - * Pass a "handler" callback which takes a buffer and its size. (Its return - * value is currently stil meaningless, but could become a flag like "this - * ride is making me sick and I'd like to get off.) + * Pass a "handler" callback which takes a buffer and its size. If the handler + * returns a negative value, PQhandleCopyData will return that as an error + * return code. * * Calls handler only after receiving a full row. The buffer does NOT have a * terminating zero, so do not go beyond the given size.