Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### pg\_incremental v1.5.0

* Add upgrade script `pg_incremental--1.4--1.5.sql` so existing installs receive the `pg_cron` guard in `_drop_extension_trigger` (the trigger function lives outside the extension, so fixing `pg_incremental--1.0.sql` alone does not update it). The migration drops and recreates the function and event trigger and removes them from extension membership again.
* Add `max_batches_per_run` to file list pipelines (`incremental.create_file_list_pipeline` and `incremental.file_list_pipelines`). Default `-1` means a single `execute_pipeline` run processes all unprocessed paths in that invocation; a positive value limits how many batch iterations run per call (one file per iteration when not batched, one array batch when batched).

### pg\_incremental v1.4.1 (December 12, 2025)

* Use `lake_file.list` as the default file list function, fallback to crunchy
Expand Down
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ grant usage on schema cron to application;

You can only create pg\_incremental in the database that has pg\_cron.

## Running tests in Docker

You can run the full SQL regression suite (`installcheck`: `sequence`, `time_interval`, and `file_list`) without installing PostgreSQL or pg\_cron on your machine. The repo includes a Docker Compose setup with one image per major PostgreSQL version: services `postgres-17` and `postgres-18`, each with pg\_cron preloaded and the right `cron.database_name` for the test database.

**Requirements:** [Docker](https://docs.docker.com/get-docker/) with Compose v2 (`docker compose`). The first run needs network access to pull base images and build dependencies.

From the repository root:

```bash
./docker/run-tests.sh
```

By default this runs `installcheck` against **PostgreSQL 17 and 18** in sequence (each build uses its own data volume). Set `PG_VERSIONS` to restrict or reorder majors, for example `PG_VERSIONS=18 ./docker/run-tests.sh` or `PG_VERSIONS="17 18" ./docker/run-tests.sh`.

The test image builds [pg\_cron](https://github.com/citusdata/pg_cron) from source (see `PG_CRON_REF` in `docker/Dockerfile`); the pinned tag is new enough to compile against PostgreSQL 18 as well as 17. The official PostgreSQL **18** Docker image uses a different data volume layout than 17; `docker/docker-compose.yml` mounts accordingly.

For each selected version, the script builds the image if needed, starts one container, copies the source tree to `/tmp` inside the container (the repo bind-mount is read-only, so your working tree is not modified), builds and installs the extension, runs `make installcheck`, then tears down that container and its data volume. Success ends with per-version `installcheck: OK` and a final line listing all versions that passed.

To exercise the same flow manually, see comments in `docker/docker-compose.yml` (the default mount is `:ro`, so prefer `run-tests.sh` over running `make` directly on `/work` in the container).

## Creating incremental processing pipelines

There are 3 types of pipelines in pg\_incremental
Expand Down Expand Up @@ -226,7 +246,9 @@ Arguments of the `incremental.create_time_range_pipeline` function:

### Creating a file list pipeline

You can define a file list pipeline with the `incremental.create_file_list_pipeline` function by specifying a generic pipeline name, a file pattern, and a command. The command will be executed in a context where `$1` is set to the path of a file (text). The pipeline periodically looks for new files returned by a list function and then executes the command for each new file.
Upgrading from extension version **1.4** to **1.5** runs `pg_incremental--1.4--1.5.sql`: it refreshes `_drop_extension_trigger` (including the `pg_cron` guard for `DROP EXTENSION`) and adds **`max_batches_per_run`** to `incremental.file_list_pipelines` and `incremental.create_file_list_pipeline`. Use `ALTER EXTENSION pg_incremental UPDATE TO '1.5';`.

You can define a file list pipeline with the `incremental.create_file_list_pipeline` function by specifying a generic pipeline name, a file pattern, and a command. When the pipeline is not batched, the command runs with `$1` set to the path of a file (`text`). When batched, `$1` is a `text[]` of paths. Each call to `incremental.execute_pipeline` (or each pg\_cron run) lists unprocessed paths from your list function and runs the command up to **`max_batches_per_run`** times in that invocation: `-1` (default) means no limit—process every file (every batch when batched) in that run; a positive integer caps how many batch iterations run—each iteration is one file when not batched, or one array batch when batched. Remaining paths wait for the next run.

Example:
```sql
Expand All @@ -253,12 +275,13 @@ Arguments of the `incremental.create_file_list_pipeline` function:
| --------------------- | ----------- | --------------------------------------------------- | ---------------------------------- |
| `pipeline_name` | text | User-defined name of the pipeline | Required |
| `file_pattern` | text | File pattern to pass to the list function | Required |
| `command` | text | Pipeline command with $1 and $2 parameters | Required |
| `command` | text | Pipeline command; `$1` is file path (`text`) or path array (`text[]`) when batched | Required |
| `list_function` | text | Name of the function used to list files | `crunchy_lake.list_files` |
| `batched` | bool | Whether to pass in a batch of files as an array | `false` |
| `max_batch_size` | int | If batched, maximum length of the array | 100 |
| `schedule` | text | pg\_cron schedule for periodic execution (or NULL) | `*/15 * * * *` (every 15 minutes) |
| `execute_immediately` | bool | Execute command immediately for existing data | `true` |
| `max_batches_per_run` | int | Max batch iterations per `execute_pipeline` call: `-1` = no limit (process full backlog in that run); a positive integer caps how many files (non-batched) or array batches (batched) run in that call | `-1` |

Instead of using the argument, you can also change the default list function via the `incremental.default_file_list_function` setting:

Expand Down Expand Up @@ -306,11 +329,11 @@ select * from incremental.time_interval_pipelines;
See the processed files in a file list pipeline:
```sql
select * from incremental.file_list_pipelines ;
┌───────────────┬─────────────────────────────────────┬─────────┬─────────────────────────┐
│ pipeline_name │ file_pattern │ batched │ list_function │
├───────────────┼─────────────────────────────────────┼─────────┼─────────────────────────┤
│ event-import │ s3://marco-crunchy-data/inbox/*.csv │ f │ crunchy_lake.list_files │
└───────────────┴─────────────────────────────────────┴─────────┴─────────────────────────┘
┌───────────────┬─────────────────────────────────────┬─────────┬─────────────────────────┬────────────────┬──────────────────────────
│ pipeline_name │ file_pattern │ batched │ list_function │ max_batch_size │ max_batches_per_run │
├───────────────┼─────────────────────────────────────┼─────────┼─────────────────────────┼────────────────┼─────────────────────
│ event-import │ s3://marco-crunchy-data/inbox/*.csv │ f │ crunchy_lake.list_files │ │ -1 │
└───────────────┴─────────────────────────────────────┴─────────┴─────────────────────────┴────────────────┴─────────────────────

select * from incremental.processed_files ;
┌───────────────┬────────────────────────────────────────────┐
Expand Down Expand Up @@ -387,4 +410,3 @@ Arguments of the `incremental.drop_pipeline` function:
| Argument name | Type | Description | Default |
| --------------------- | ----------- | ------------------------------------------------- | --------------------------- |
| `pipeline_name` | text | User-defined name of the pipeline | Required |

33 changes: 33 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# PostgreSQL + pg_cron for pg_incremental regression tests.
# Build from repository root: docker build -f docker/Dockerfile .
#
# Args:
# PG_MAJOR — server major (e.g. 17, 18); must match base image
# PG_IMAGE_SUFFIX — tag after major (e.g. bookworm, trixie) for postgres:${PG_MAJOR}-${PG_IMAGE_SUFFIX}

ARG PG_MAJOR=17
ARG PG_IMAGE_SUFFIX=bookworm
FROM postgres:${PG_MAJOR}-${PG_IMAGE_SUFFIX}

ARG PG_MAJOR
# v1.6.6+ required for PostgreSQL 18 (PortalRun signature); keep in sync across all PG majors in compose.
ARG PG_CRON_REF=v1.6.7

USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
build-essential \
git \
ca-certificates \
postgresql-server-dev-${PG_MAJOR} \
&& rm -rf /var/lib/apt/lists/*

RUN git clone --depth 1 --branch "${PG_CRON_REF}" https://github.com/citusdata/pg_cron.git /tmp/pg_cron \
&& make -C /tmp/pg_cron PG_CONFIG=/usr/lib/postgresql/${PG_MAJOR}/bin/pg_config \
&& make -C /tmp/pg_cron PG_CONFIG=/usr/lib/postgresql/${PG_MAJOR}/bin/pg_config install \
&& rm -rf /tmp/pg_cron

COPY docker/init-pg-cron.sh /docker-entrypoint-initdb.d/10-pg-cron.sh
RUN chmod +x /docker-entrypoint-initdb.d/10-pg-cron.sh

USER postgres
47 changes: 47 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Run from repository root: ./docker/run-tests.sh
# Manual build against the mount is not supported with :ro; use run-tests.sh.
#
# Services: postgres-17, postgres-18 (see PG_VERSIONS in docker/run-tests.sh).
#
# Example: PG_VERSIONS="17 18" ./docker/run-tests.sh

x-postgres-common: &postgres-common
environment:
POSTGRES_HOST_AUTH_METHOD: trust
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 2s
timeout: 5s
retries: 30
start_period: 15s

services:
postgres-17:
<<: *postgres-common
build:
context: ..
dockerfile: docker/Dockerfile
args:
PG_MAJOR: "17"
PG_IMAGE_SUFFIX: bookworm
volumes:
# PG ≤17 images use PGDATA under this path.
- pg_incremental_test_data_17:/var/lib/postgresql/data
- ..:/work:ro

postgres-18:
<<: *postgres-common
build:
context: ..
dockerfile: docker/Dockerfile
args:
PG_MAJOR: "18"
PG_IMAGE_SUFFIX: bookworm
volumes:
# PG 18 images expect a single mount at /var/lib/postgresql (versioned subdir inside).
- pg_incremental_test_data_18:/var/lib/postgresql
- ..:/work:ro

volumes:
pg_incremental_test_data_17:
pg_incremental_test_data_18:
11 changes: 11 additions & 0 deletions docker/init-pg-cron.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash
set -euo pipefail
# Runs during init after initdb; the long-running server reads this before first start.
# contrib_regression is created later by pg_regress; cron.database_name must match it
# so CREATE EXTENSION pg_cron can run there (--load-extension in Makefile).
cat >>"${PGDATA}/postgresql.conf" <<'EOF'

# pg_cron (docker / regression)
shared_preload_libraries = 'pg_cron'
cron.database_name = 'contrib_regression'
EOF
64 changes: 64 additions & 0 deletions docker/run-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env bash
# Build the test image(s), start PostgreSQL with pg_cron, then build and installcheck from a copy of
# the repo under /tmp so the bind-mounted working tree is never modified (compose mounts ..:/work:ro).
#
# Invoke from repo root: ./docker/run-tests.sh
#
# Environment:
# PG_VERSIONS — space-separated majors to test (default: "17 18").
#
# PostgreSQL in the container needs shared_preload_libraries and cron.database_name for
# contrib_regression (see docker/init-pg-cron.sh).
set -euo pipefail

ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT"

PG_VERSIONS="${PG_VERSIONS:-17 18}"
COMPOSE=(docker compose -f docker/docker-compose.yml)

run_one() {
local major="$1"
local svc="postgres-${major}"

"${COMPOSE[@]}" build "$svc"
"${COMPOSE[@]}" down -v --remove-orphans 2>/dev/null || true
"${COMPOSE[@]}" up -d "$svc"

if ! "${COMPOSE[@]}" exec -T "$svc" bash -lc \
'for i in $(seq 1 90); do pg_isready -U postgres && exit 0; sleep 1; done; exit 1'; then
echo "PostgreSQL $major did not become ready; logs:" >&2
"${COMPOSE[@]}" logs "$svc" >&2 || true
"${COMPOSE[@]}" down -v --remove-orphans 2>/dev/null || true
return 1
fi

"${COMPOSE[@]}" exec -u root -T "$svc" bash -lc '
set -euo pipefail
BUILD=/tmp/pg_incremental_build
rm -rf "$BUILD"
cp -a /work "$BUILD"
cd "$BUILD"
make clean && make && make install
chown -R postgres:postgres "$BUILD"
su postgres -s /bin/bash -c "cd $BUILD && make -o install installcheck"
rm -rf "$BUILD"
'

"${COMPOSE[@]}" down -v --remove-orphans
echo "installcheck: OK (PostgreSQL $major)"
}

failed=0
for major in $PG_VERSIONS; do
echo "========== PostgreSQL $major =========="
if ! run_one "$major"; then
failed=1
break
fi
done

if [[ "$failed" -ne 0 ]]; then
exit 1
fi
echo "All requested versions passed installcheck: $PG_VERSIONS"
118 changes: 118 additions & 0 deletions expected/file_list.out
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,124 @@ select count(*) from incremental.processed_files where pipeline_name = 'skip-tes
3
(1 row)

-- max_batches_per_run: explicit -1 processes all pending files in one execute_pipeline
insert into file_registry values
('/unlim/x.csv'),
('/unlim/y.csv');
create table unlim_log (path text);
select incremental.create_file_list_pipeline(
'unlimited-per-run',
'/unlim/%.csv',
$$ insert into file_list.unlim_log values ($1) $$,
list_function := 'file_list.list_local_files',
batched := false,
schedule := NULL,
execute_immediately := false,
max_batches_per_run := -1);
create_file_list_pipeline
---------------------------

(1 row)

call incremental.execute_pipeline('unlimited-per-run');
select count(*) from unlim_log;
count
-------
2
(1 row)

-- max_batches_per_run: positive cap — one file per execute_pipeline when not batched
insert into file_registry values
('/cap/a.csv'),
('/cap/b.csv'),
('/cap/c.csv');
create table cap_log (path text);
select incremental.create_file_list_pipeline(
'cap-one-per-run',
'/cap/%.csv',
$$ insert into file_list.cap_log values ($1) $$,
list_function := 'file_list.list_local_files',
batched := false,
schedule := NULL,
execute_immediately := false,
max_batches_per_run := 1);
create_file_list_pipeline
---------------------------

(1 row)

call incremental.execute_pipeline('cap-one-per-run');
select count(*) from cap_log;
count
-------
1
(1 row)

call incremental.execute_pipeline('cap-one-per-run');
select count(*) from cap_log;
count
-------
2
(1 row)

call incremental.execute_pipeline('cap-one-per-run');
select count(*) from cap_log;
count
-------
3
(1 row)

call incremental.execute_pipeline('cap-one-per-run');
select count(*) from cap_log;
count
-------
3
(1 row)

-- max_batches_per_run: batched — one batch iteration per execute_pipeline (max_batch_size=2, five files)
insert into file_registry values
('/mcap/1.csv'),
('/mcap/2.csv'),
('/mcap/3.csv'),
('/mcap/4.csv'),
('/mcap/5.csv');
create table mcap_log (path text);
select incremental.create_file_list_pipeline(
'batched-cap-one-batch-per-run',
'/mcap/%.csv',
$$ insert into file_list.mcap_log select unnest($1) $$,
list_function := 'file_list.list_local_files',
batched := true,
max_batch_size := 2,
schedule := NULL,
execute_immediately := false,
max_batches_per_run := 1);
create_file_list_pipeline
---------------------------

(1 row)

call incremental.execute_pipeline('batched-cap-one-batch-per-run');
select count(*) from mcap_log;
count
-------
2
(1 row)

call incremental.execute_pipeline('batched-cap-one-batch-per-run');
select count(*) from mcap_log;
count
-------
4
(1 row)

call incremental.execute_pipeline('batched-cap-one-batch-per-run');
select count(*) from mcap_log;
count
-------
5
(1 row)

-- reset_pipeline: clears processed_files so all files are reprocessed
select incremental.reset_pipeline('ingest-files', execute_immediately := false);
reset_pipeline
Expand Down
3 changes: 2 additions & 1 deletion include/crunchy/incremental/file_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

extern char *DefaultFileListFunction;

void InitializeFileListPipelineState(char *pipelineName, char *prefix, bool batched, char *listFunction, int maxBatchSize);
void InitializeFileListPipelineState(char *pipelineName, char *prefix, bool batched, char *listFunction, int maxBatchSize,
int maxBatchesPerRun);
void RemoveProcessedFileList(char *pipelineName);
void ExecuteFileListPipeline(char *pipelineName, char *command);
bool ListFunctionExists(char *listFunction);
Expand Down
Loading