From 00dc7bbbe4b25a7cf14eb594ee12e3db9a66de82 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 16:09:05 +0100 Subject: [PATCH 01/32] add fread connection support --- NEWS.md | 5 +- R/fread.R | 68 ++++++++++++++++++++++++++- inst/tests/tests.Rraw | 17 +++++-- src/data.table.h | 3 +- src/fread.c | 3 ++ src/freadR.c | 104 +++++++++++++++++++++++++++++++++++++++++- src/freadR.h | 5 +- src/init.c | 1 + 8 files changed, 197 insertions(+), 9 deletions(-) diff --git a/NEWS.md b/NEWS.md index 4a821991b9..9d979884b2 100644 --- a/NEWS.md +++ b/NEWS.md @@ -296,7 +296,10 @@ See [#2611](https://github.com/Rdatatable/data.table/issues/2611) for details. T # user system elapsed # 0.028 0.000 0.005 ``` - 20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation. + +20. `fread()` now supports the `comment.char` argument to skip trailing comments or comment-only lines, consistent with `read.table()`, [#856](https://github.com/Rdatatable/data.table/issues/856). The default remains `comment.char = ""` (no comment parsing) for backward compatibility and performance, in contrast to `read.table(comment.char = "#")`. Thanks to @arunsrinivasan and many others for the suggestion and @ben-schwen for the implementation. + +21. `fread()` can now read from connections directly by spilling to a temporary file first, [#561](https://github.com/Rdatatable/data.table/issues/561). For the best throughput, point `tmpdir=` (or the global temp directory) to fast storage like an SSD or RAM. Thanks to Chris Neff for the report and @ben-schwen for the implementation. ### BUG FIXES diff --git a/R/fread.R b/R/fread.R index 16a72ed24d..56cf5c0801 100644 --- a/R/fread.R +++ b/R/fread.R @@ -55,7 +55,16 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") input = text } } - else if (is.null(cmd)) { + # Check if input is a connection and read it into memory + input_is_con = FALSE + if (!missing(input) && inherits(input, "connection")) { + input_is_con = TRUE + } else if (!is.null(file) && inherits(file, "connection")) { + input = file + input_is_con = TRUE + file = NULL + } + if (!input_is_con && is.null(cmd)) { if (!is.character(input) || length(input)!=1L) { stopf("input= must be a single character string containing a file name, a system command containing at least one space, a URL starting 'http[s]://', 'ftp[s]://' or 'file://', or, the input data itself containing at least one \\n or \\r") } @@ -81,6 +90,61 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } file = tmpFile } + connection_spill_info = NULL + if (input_is_con) { + if (verbose) { + catf("[00] Spill connection to tempfile\n") + catf(" Connection class: %s\n", paste(class(input), collapse=", ")) + catf(" Reading connection into RAM buffer... ") + flush.console() + } + spill_started.at = proc.time() + con_summary = summary(input) + con_desc = con_summary$description + con_class = class(input)[1L] + con_open = isOpen(input) + + needs_reopen = FALSE + if (con_open) { + binary_modes = c("rb", "r+b", "wb", "w+b", "ab", "a+b") + if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE + } + + close_con = NULL + + if (needs_reopen) { + close(input) + input = switch(con_class, + "file" = file(con_desc, "rb"), + "gzfile" = gzfile(con_desc, "rb"), + "bzfile" = bzfile(con_desc, "rb"), + "url" = url(con_desc, "rb"), + "pipe" = pipe(con_desc, "rb"), + stopf("Unsupported connection type: %s", con_class)) + close_con = input + } else if (!con_open) { + open(input, "rb") + close_con = input + } + tmpFile = tempfile(tmpdir=tmpdir) + on.exit(unlink(tmpFile), add=TRUE) + bytes_copied = .Call(CspillConnectionToFile, input, tmpFile, as.numeric(nrows)) + spill_elapsed = (proc.time() - spill_started.at)[["elapsed"]] + + if (bytes_copied == 0) { + warningf("Connection has size 0. Returning a NULL %s.", if (data.table) 'data.table' else 'data.frame') + return(if (data.table) data.table(NULL) else data.frame(NULL)) + } + + if (verbose) { + catf("done in %s (read %d bytes)\n", timetaken(spill_started.at), bytes_copied) + flush.console() + } + connection_spill_info = c(spill_elapsed, bytes_copied) + input = tmpFile + file = tmpFile + if (!is.null(close_con)) close(close_con) + } if (!is.null(file)) { if (!is.character(file) || length(file)!=1L) stopf("file= must be a single character string containing a filename, or URL starting 'http[s]://', 'ftp[s]://' or 'file://'") @@ -293,7 +357,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") tz="UTC" } ans = .Call(CfreadR,input,identical(input,file),sep,dec,quote,header,nrows,skip,na.strings,strip.white,blank.lines.skip,comment.char, - fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC") + fill,showProgress,nThread,verbose,warnings2errors,logical01,logicalYN,select,drop,colClasses,integer64,encoding,keepLeadingZeros,tz=="UTC",connection_spill_info) if (!length(ans)) return(null.data.table()) # test 1743.308 drops all columns nr = length(ans[[1L]]) require_bit64_if_needed(ans) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index e6ef9be1dd..3182fd77c1 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -2737,11 +2737,13 @@ if (test_bit64) { # getwd() has been set by test.data.table() to the location of this tests.Rraw file. Test files should be in the same directory. if (test_R.utils) { f = testDir("ch11b.dat.bz2") # http://www.stats.ox.ac.uk/pub/datasets/csb/ch11b.dat - test(900.1, fread(f, logical01=FALSE), as.data.table(read.table(f))) + test(900.1, DT<-fread(f, logical01=FALSE), as.data.table(read.table(f))) + test(900.15, fread(file(f), logical01=FALSE), DT) test(900.2, fread(f, logical01=TRUE), as.data.table(read.table(f))[,V5:=as.logical(V5)]) f = testDir("1206FUT.txt.bz2") # a CRLF line ending file (DOS) test(901.1, DT<-fread(f,strip.white=FALSE), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class))))) + test(901.15, fread(file(f), strip.white=FALSE), DT) test(901.2, DT<-fread(f), setDT(read.table(f,sep="\t",header=TRUE,colClasses=as.vector(sapply(DT,class)),strip.white=TRUE))) } @@ -6654,8 +6656,10 @@ if (test_bit64 && test_R.utils) { ZBJBLOAJAQI = c("LHCYS AYE ZLEMYA IFU HEI JG FEYE", "", ""), JKCRUUBAVQ = c("", ".\\YAPCNXJ\\004570_850034_757\\VWBZSS_848482_600874_487_PEKT-6-KQTVIL-7_30\\IRVQT\\HUZWLBSJYHZ\\XFWPXQ-WSPJHC-00-0770000855383.KKZ", "") ) - test(1449.1, fread(testDir("quoted_multiline.csv.bz2"))[c(1L, 43:44), c(1L, 22:24)], DT) - test(1449.2, fread(testDir("quoted_multiline.csv.bz2"), integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)]) + f = testDir("quoted_multiline.csv.bz2") + test(1449.1, fread(f)[c(1L, 43:44), c(1L, 22:24)], DT) + test(1449.15, fread(file(f))[c(1L, 43:44), c(1L, 22:24)], DT) + test(1449.2, fread(f, integer64='character', select = 'GPMLHTLN')[c(1L, 43:44)][[1L]], DT[ , as.character(GPMLHTLN)]) } # Fix for #927 @@ -21858,3 +21862,10 @@ test(2344.04, key(DT[, .(V4 = c("b", "a"), V2, V5 = c("y", "x"), V1)]), c("V1", # fread with quotes and single column #7366 test(2345, fread('"this_that"\n"2025-01-01 00:00:01"'), data.table(this_that = as.POSIXct("2025-01-01 00:00:01", tz="UTC"))) + +# fread supports connections #561 +f = testDir("russellCRLF.csv") +test(2346.1, fread(file(f, "r")), fread(f)) +test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) +test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) +test(2346.3, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) diff --git a/src/data.table.h b/src/data.table.h index 663f0adb46..5d4773b77a 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -361,7 +361,8 @@ SEXP setcharvec(SEXP, SEXP, SEXP); SEXP chmatch_R(SEXP, SEXP, SEXP); SEXP chmatchdup_R(SEXP, SEXP, SEXP); SEXP chin_R(SEXP, SEXP); -SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP freadR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); +SEXP spillConnectionToFile(SEXP, SEXP, SEXP); SEXP fwriteR(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP); SEXP rbindlist(SEXP, SEXP, SEXP, SEXP, SEXP); SEXP setlistelt(SEXP, SEXP, SEXP); diff --git a/src/fread.c b/src/fread.c index 51b08af3fc..4158b2be5e 100644 --- a/src/fread.c +++ b/src/fread.c @@ -2971,7 +2971,10 @@ int freadMain(freadMainArgs _args) if (verbose) { DTPRINT("=============================\n"); // # notranslate + tTot = tTot + (args.connectionSpillActive ? args.connectionSpillSeconds : 0.0); if (tTot < 0.000001) tTot = 0.000001; // to avoid nan% output in some trivially small tests where tot==0.000s + if (args.connectionSpillActive) + DTPRINT(_("%8.3fs (%3.0f%%) Spill connection to tempfile (%.3fGiB)\n"), args.connectionSpillSeconds, 100.0 * args.connectionSpillSeconds / tTot, args.connectionSpillBytes / (1024.0 * 1024.0 * 1024.0)); DTPRINT(_("%8.3fs (%3.0f%%) Memory map %.3fGiB file\n"), tMap - t0, 100.0 * (tMap - t0) / tTot, 1.0 * fileSize / (1024 * 1024 * 1024)); DTPRINT(_("%8.3fs (%3.0f%%) sep="), tLayout - tMap, 100.0 * (tLayout - tMap) / tTot); DTPRINT(sep == '\t' ? "'\\t'" : (sep == '\n' ? "'\\n'" : "'%c'"), sep); // # notranslate diff --git a/src/freadR.c b/src/freadR.c index d586aaed0a..654b7b7888 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -1,6 +1,7 @@ #include "fread.h" #include "freadR.h" #include "data.table.h" +#include /***** TO DO ***** Restore test 1339 (balanced embedded quotes, see ?fread already updated). @@ -28,6 +29,7 @@ Secondary separator for list() columns, such as columns 11 and 12 in BED (no nee static int typeSxp[NUT] = { NILSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, LGLSXP, INTSXP, REALSXP, REALSXP, REALSXP, REALSXP, INTSXP, REALSXP, STRSXP, REALSXP, STRSXP }; static char typeRName[NUT][10] = { "NULL", "logical", "logical", "logical", "logical", "logical", "logical", "integer", "integer64", "double", "double", "double", "IDate", "POSIXct", "character", "numeric", "CLASS" }; static int typeEnum[NUT] = { CT_DROP, CT_EMPTY, CT_BOOL8_N, CT_BOOL8_U, CT_BOOL8_T, CT_BOOL8_L, CT_BOOL8_Y, CT_INT32, CT_INT64, CT_FLOAT64, CT_FLOAT64_HEX, CT_FLOAT64_EXT, CT_ISO8601_DATE, CT_ISO8601_TIME, CT_STRING, CT_FLOAT64, CT_STRING }; + static colType readInt64As = CT_INT64; static SEXP selectSxp; static SEXP dropSxp; @@ -77,7 +79,8 @@ SEXP freadR( SEXP integer64Arg, SEXP encodingArg, SEXP keepLeadingZerosArgs, - SEXP noTZasUTC + SEXP noTZasUTC, + SEXP connectionSpillArg ) { verbose = LOGICAL(verboseArg)[0]; @@ -170,6 +173,19 @@ SEXP freadR( args.warningsAreErrors = warningsAreErrors; args.keepLeadingZeros = LOGICAL(keepLeadingZerosArgs)[0]; args.noTZasUTC = LOGICAL(noTZasUTC)[0]; + args.connectionSpillActive = false; + args.connectionSpillSeconds = 0.0; + args.connectionSpillBytes = 0.0; + if (!isNull(connectionSpillArg)) { + if (!isReal(connectionSpillArg) || LENGTH(connectionSpillArg) != 2) + internal_error(__func__, "connectionSpillArg must be length-2 real vector"); // # nocov + const double *spill = REAL(connectionSpillArg); + args.connectionSpillSeconds = spill[0]; + args.connectionSpillBytes = spill[1]; + if (!R_FINITE(args.connectionSpillSeconds) || args.connectionSpillSeconds < 0) args.connectionSpillSeconds = 0.0; + if (!R_FINITE(args.connectionSpillBytes) || args.connectionSpillBytes < 0) args.connectionSpillBytes = 0.0; + args.connectionSpillActive = true; + } // === extras used for callbacks === if (!isString(integer64Arg) || LENGTH(integer64Arg) != 1) error(_("'integer64' must be a single character string")); @@ -724,6 +740,92 @@ void progress(int p, int eta) } // # nocov end +// Spill connection contents to a tempfile so R-level fread can treat it like a filename +SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { + if (!inherits(connection, "connection")) { + INTERNAL_STOP(_("spillConnectionToFile: argument must be a connection")); + } + + if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); + } + + if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); + } + + Rconnection con = R_GetConnection(connection); + if (con == NULL) { + INTERNAL_STOP(_("spillConnectionToFile: invalid connection")); + } + + if (!con->isopen) { + INTERNAL_STOP(_("spillConnectionToFile: connection is not open")); + } + + const char *filepath = CHAR(STRING_ELT(tempfile_path, 0)); + const double nrows_max = REAL_RO(nrows_limit)[0]; + const bool limit_rows = R_FINITE(nrows_max) && nrows_max >= 0.0; + size_t row_limit = 0; + if (limit_rows) { + row_limit = (size_t)nrows_max; + if (row_limit == 0) row_limit = 100; // read at least 100 rows if nrows==0 + row_limit++; // cater for potential header row + } + + FILE *outfile = fopen(filepath, "wb"); + if (outfile == NULL) { + STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing"), filepath); + } + + // Read and write in chunks // TODO tune chunk size + size_t chunk_size = 256 * 1024; + char *buffer = (char *)malloc(chunk_size); + if (buffer == NULL) { + fclose(outfile); + STOP(_("spillConnectionToFile: failed to allocate buffer")); + } + + size_t total_read = 0; + size_t nrows_seen = 0; + + while (true) { + size_t nread = con->read(buffer, 1, chunk_size, con); + if (nread == 0) { + break; // EOF + } + + size_t bytes_to_write = nread; + if (limit_rows && nrows_seen < row_limit) { + for (size_t i = 0; i < nread; i++) { + if (buffer[i] == '\n') { + nrows_seen++; + if (nrows_seen >= row_limit) { + bytes_to_write = i + 1; + break; + } + } + } + } + + size_t nwritten = fwrite(buffer, 1, bytes_to_write, outfile); + if (nwritten != bytes_to_write) { + free(buffer); + fclose(outfile); + STOP(_("spillConnectionToFile: write error (wrote %zu of %zu bytes)"), nwritten, bytes_to_write); + } + total_read += bytes_to_write; + + if (limit_rows && nrows_seen >= row_limit) { + break; + } + } + + free(buffer); + fclose(outfile); + return ScalarReal((double)total_read); +} + void halt__(bool warn, const char *format, ...) { // Solves: http://stackoverflow.com/questions/18597123/fread-data-table-locks-files diff --git a/src/freadR.h b/src/freadR.h index a13d2a1df8..7e48cfe54f 100644 --- a/src/freadR.h +++ b/src/freadR.h @@ -8,7 +8,10 @@ #include "po.h" #define FREAD_MAIN_ARGS_EXTRA_FIELDS \ - bool oldNoDateTime; + bool oldNoDateTime; \ + bool connectionSpillActive; \ + double connectionSpillSeconds; \ + double connectionSpillBytes; #define FREAD_PUSH_BUFFERS_EXTRA_FIELDS \ int nStringCols; \ diff --git a/src/init.c b/src/init.c index ef81a7a0e0..a6befca527 100644 --- a/src/init.c +++ b/src/init.c @@ -67,6 +67,7 @@ R_CallMethodDef callMethods[] = { {"Cchmatchdup", (DL_FUNC) &chmatchdup_R, -1}, {"Cchin", (DL_FUNC) &chin_R, -1}, {"CfreadR", (DL_FUNC) &freadR, -1}, +{"CspillConnectionToFile", (DL_FUNC) &spillConnectionToFile, -1}, {"CfwriteR", (DL_FUNC) &fwriteR, -1}, {"Creorder", (DL_FUNC) &reorder, -1}, {"Crbindlist", (DL_FUNC) &rbindlist, -1}, From 78bce0ea68723db2d070d5409274ec128319f6fc Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 16:19:52 +0100 Subject: [PATCH 02/32] fix testnum --- R/fread.R | 2 +- inst/tests/tests.Rraw | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/fread.R b/R/fread.R index 56cf5c0801..e36feafa02 100644 --- a/R/fread.R +++ b/R/fread.R @@ -64,7 +64,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") input_is_con = TRUE file = NULL } - if (!input_is_con && is.null(cmd)) { + if (!input_is_con && is.null(cmd) && is.null(text)) { if (!is.character(input) || length(input)!=1L) { stopf("input= must be a single character string containing a file name, a system command containing at least one space, a URL starting 'http[s]://', 'ftp[s]://' or 'file://', or, the input data itself containing at least one \\n or \\r") } diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 3182fd77c1..20ea88c26d 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -21868,4 +21868,4 @@ f = testDir("russellCRLF.csv") test(2346.1, fread(file(f, "r")), fread(f)) test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) -test(2346.3, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) +test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) From 0afd46845012cdb7aa7afc9e9b5a0d8c32a5e59d Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 16:33:56 +0100 Subject: [PATCH 03/32] make linterse happy --- R/fread.R | 6 ++---- src/freadR.c | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/R/fread.R b/R/fread.R index e36feafa02..1d6f0b5c11 100644 --- a/R/fread.R +++ b/R/fread.R @@ -93,15 +93,13 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") connection_spill_info = NULL if (input_is_con) { if (verbose) { - catf("[00] Spill connection to tempfile\n") - catf(" Connection class: %s\n", paste(class(input), collapse=", ")) - catf(" Reading connection into RAM buffer... ") + catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into RAM buffer... ", toString(class(input))) flush.console() } spill_started.at = proc.time() con_summary = summary(input) con_desc = con_summary$description - con_class = class(input)[1L] + con_class = class1(input) con_open = isOpen(input) needs_reopen = FALSE diff --git a/src/freadR.c b/src/freadR.c index 654b7b7888..938fa23fb8 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -780,7 +780,7 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit // Read and write in chunks // TODO tune chunk size size_t chunk_size = 256 * 1024; - char *buffer = (char *)malloc(chunk_size); + char *buffer = malloc(chunk_size); if (buffer == NULL) { fclose(outfile); STOP(_("spillConnectionToFile: failed to allocate buffer")); From fa79c8c6eb25c951284f9a32d76743816bf3b1f9 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 16:40:15 +0100 Subject: [PATCH 04/32] make linters even more happy --- src/freadR.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/freadR.c b/src/freadR.c index 938fa23fb8..11b817a4fd 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -781,7 +781,7 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit // Read and write in chunks // TODO tune chunk size size_t chunk_size = 256 * 1024; char *buffer = malloc(chunk_size); - if (buffer == NULL) { + if (!buffer) { fclose(outfile); STOP(_("spillConnectionToFile: failed to allocate buffer")); } From 9590e22be11c2bc124c02d1de48f51e0ff986af6 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 17:37:17 +0100 Subject: [PATCH 05/32] remove read bytes %d since this can overflow --- R/fread.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/fread.R b/R/fread.R index 1d6f0b5c11..98522e399b 100644 --- a/R/fread.R +++ b/R/fread.R @@ -135,7 +135,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } if (verbose) { - catf("done in %s (read %d bytes)\n", timetaken(spill_started.at), bytes_copied) + catf("done in %s\n", timetaken(spill_started.at)) flush.console() } connection_spill_info = c(spill_elapsed, bytes_copied) From 58b3386381ec7797f71ce089f18294fcd3e7fd87 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 17:54:05 +0100 Subject: [PATCH 06/32] add coverage --- inst/tests/tests.Rraw | 5 ++++- src/freadR.c | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 20ea88c26d..c057c2d6a8 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -21865,7 +21865,10 @@ test(2345, fread('"this_that"\n"2025-01-01 00:00:01"'), data.table(this_that = a # fread supports connections #561 f = testDir("russellCRLF.csv") -test(2346.1, fread(file(f, "r")), fread(f)) +test(2346.1, fread(file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile") test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) +file.create(f <- tempfile()) +test(2346.5, fread(file(f)), data.table(), warning="Connection has size 0.") +unlink(f) diff --git a/src/freadR.c b/src/freadR.c index 11b817a4fd..bfe0a7d279 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -743,24 +743,24 @@ void progress(int p, int eta) // Spill connection contents to a tempfile so R-level fread can treat it like a filename SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { if (!inherits(connection, "connection")) { - INTERNAL_STOP(_("spillConnectionToFile: argument must be a connection")); + INTERNAL_STOP(_("spillConnectionToFile: argument must be a connection")); // # nocov } if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { - INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); + INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov } if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { - INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); + INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); // # nocov } Rconnection con = R_GetConnection(connection); if (con == NULL) { - INTERNAL_STOP(_("spillConnectionToFile: invalid connection")); + INTERNAL_STOP(_("spillConnectionToFile: invalid connection")); // # nocov } if (!con->isopen) { - INTERNAL_STOP(_("spillConnectionToFile: connection is not open")); + INTERNAL_STOP(_("spillConnectionToFile: connection is not open")); // # nocov } const char *filepath = CHAR(STRING_ELT(tempfile_path, 0)); @@ -775,15 +775,15 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit FILE *outfile = fopen(filepath, "wb"); if (outfile == NULL) { - STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing"), filepath); + STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing"), filepath); // # nocov } // Read and write in chunks // TODO tune chunk size size_t chunk_size = 256 * 1024; char *buffer = malloc(chunk_size); if (!buffer) { - fclose(outfile); - STOP(_("spillConnectionToFile: failed to allocate buffer")); + fclose(outfile); // # nocov + STOP(_("spillConnectionToFile: failed to allocate buffer")); // # nocov } size_t total_read = 0; @@ -810,9 +810,11 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit size_t nwritten = fwrite(buffer, 1, bytes_to_write, outfile); if (nwritten != bytes_to_write) { + // # nocov start free(buffer); fclose(outfile); STOP(_("spillConnectionToFile: write error (wrote %zu of %zu bytes)"), nwritten, bytes_to_write); + // # nocov end } total_read += bytes_to_write; From 995d2dc6fe1faf69aa049923e3ebba4cd8be6267 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 18:19:30 +0100 Subject: [PATCH 07/32] be fully experimental API compliant --- src/freadR.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/freadR.c b/src/freadR.c index bfe0a7d279..40059599d8 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -742,10 +742,6 @@ void progress(int p, int eta) // Spill connection contents to a tempfile so R-level fread can treat it like a filename SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { - if (!inherits(connection, "connection")) { - INTERNAL_STOP(_("spillConnectionToFile: argument must be a connection")); // # nocov - } - if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov } @@ -755,14 +751,6 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit } Rconnection con = R_GetConnection(connection); - if (con == NULL) { - INTERNAL_STOP(_("spillConnectionToFile: invalid connection")); // # nocov - } - - if (!con->isopen) { - INTERNAL_STOP(_("spillConnectionToFile: connection is not open")); // # nocov - } - const char *filepath = CHAR(STRING_ELT(tempfile_path, 0)); const double nrows_max = REAL_RO(nrows_limit)[0]; const bool limit_rows = R_FINITE(nrows_max) && nrows_max >= 0.0; @@ -790,7 +778,7 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit size_t nrows_seen = 0; while (true) { - size_t nread = con->read(buffer, 1, chunk_size, con); + size_t nread = R_ReadConnection(con, buffer, chunk_size); if (nread == 0) { break; // EOF } From 3866b6d3a9bbf50583e72765adb16bbf2d80fc05 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 18:21:06 +0100 Subject: [PATCH 08/32] more coverage --- inst/tests/tests.Rraw | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index c057c2d6a8..08f20017c4 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -21865,7 +21865,7 @@ test(2345, fread('"this_that"\n"2025-01-01 00:00:01"'), data.table(this_that = a # fread supports connections #561 f = testDir("russellCRLF.csv") -test(2346.1, fread(file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile") +test(2346.1, fread(file=file(f, "r"), verbose=TRUE), fread(f), output="Spill connection to tempfile") test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) From 8294c6fac2cd35714e677527fbcd16042dabf4f9 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Sat, 8 Nov 2025 22:09:13 +0100 Subject: [PATCH 09/32] update error message for nrow and mmap --- src/fread.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/fread.c b/src/fread.c index 4158b2be5e..e4be0b6e54 100644 --- a/src/fread.c +++ b/src/fread.c @@ -1575,9 +1575,16 @@ int freadMain(freadMainArgs _args) CloseHandle(hFile); // see https://msdn.microsoft.com/en-us/library/windows/desktop/aa366537(v=vs.85).aspx if (mmp == NULL) { #endif - int nbit = 8 * sizeof(char*); // #nocov - STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, // # nocov - nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); // # nocov + // # nocov start + int nbit = 8 * sizeof(char*); + if (nrowLimit < INT64_MAX) { + STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. Since you specified nrows=%"PRId64", try wrapping the file in a connection: fread(file('filename'), nrows=%"PRId64")."), + filesize_to_str(fileSize), nbit, nrowLimit, nrowLimit); + } else { + STOP(_("Opened %s file ok but could not memory map it. This is a %dbit process. %s."), filesize_to_str(fileSize), nbit, + nbit <= 32 ? _("Please upgrade to 64bit") : _("There is probably not enough contiguous virtual memory available")); + } + // # nocov end } sof = (const char*) mmp; if (verbose) DTPRINT(_(" Memory mapped ok\n")); From 1b7cec7204b6f9f59b4177262e60d4fc84e244e2 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger <52290390+ben-schwen@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:13:25 +0100 Subject: [PATCH 10/32] add wording changes Co-authored-by: aitap --- R/fread.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/fread.R b/R/fread.R index 98522e399b..15d40ae57d 100644 --- a/R/fread.R +++ b/R/fread.R @@ -93,7 +93,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") connection_spill_info = NULL if (input_is_con) { if (verbose) { - catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into RAM buffer... ", toString(class(input))) + catf("[00] Spill connection to tempfile\n Connection class: %s\n Reading connection into a temporary file... ", toString(class(input))) flush.console() } spill_started.at = proc.time() @@ -118,7 +118,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") "bzfile" = bzfile(con_desc, "rb"), "url" = url(con_desc, "rb"), "pipe" = pipe(con_desc, "rb"), - stopf("Unsupported connection type: %s", con_class)) + stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class)) close_con = input } else if (!con_open) { open(input, "rb") From 9b3c3873bced02a433068bd8de55a9a0f615fae8 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger <52290390+ben-schwen@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:13:53 +0100 Subject: [PATCH 11/32] add connections guard Co-authored-by: aitap --- src/freadR.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/freadR.c b/src/freadR.c index 40059599d8..adaa68f480 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -742,6 +742,9 @@ void progress(int p, int eta) // Spill connection contents to a tempfile so R-level fread can treat it like a filename SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { +#if R_CONNECTIONS_VERSION != 1 +INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov +#else if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov } @@ -814,6 +817,7 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit free(buffer); fclose(outfile); return ScalarReal((double)total_read); +#endif // was R_CONNECTIONS_VERSION not != 1? } void halt__(bool warn, const char *format, ...) From 3da8943aa2ef32b883bd0acb29e6f7407e915e77 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger <52290390+ben-schwen@users.noreply.github.com> Date: Mon, 10 Nov 2025 13:14:23 +0100 Subject: [PATCH 12/32] add strerrors Co-authored-by: aitap --- src/freadR.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/freadR.c b/src/freadR.c index adaa68f480..77efc1ec78 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -766,7 +766,7 @@ INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", FILE *outfile = fopen(filepath, "wb"); if (outfile == NULL) { - STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing"), filepath); // # nocov + STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing: %s"), filepath, strerror(errno)); // # nocov } // Read and write in chunks // TODO tune chunk size @@ -804,7 +804,7 @@ INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", // # nocov start free(buffer); fclose(outfile); - STOP(_("spillConnectionToFile: write error (wrote %zu of %zu bytes)"), nwritten, bytes_to_write); + STOP(_("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)"), strerror(errno), nwritten, bytes_to_write); // # nocov end } total_read += bytes_to_write; From f6f9ed3c05cc95efb8bba3ebe809ec54585a2960 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 13:21:48 +0100 Subject: [PATCH 13/32] add errno lib --- src/freadR.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/freadR.c b/src/freadR.c index 77efc1ec78..59b8e6bd65 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -2,6 +2,7 @@ #include "freadR.h" #include "data.table.h" #include +#include /***** TO DO ***** Restore test 1339 (balanced embedded quotes, see ?fread already updated). From 5a98e6261584e7256116992116a5cb753082e3b0 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 13:51:26 +0100 Subject: [PATCH 14/32] add reopen_connection generic --- NAMESPACE | 1 + R/fread.R | 38 +++++++++++++++++++++++++++++++------- man/reopen_connection.Rd | 25 +++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) create mode 100644 man/reopen_connection.Rd diff --git a/NAMESPACE b/NAMESPACE index 361b706d38..aec274be7a 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -20,6 +20,7 @@ export(rbindlist) export(fifelse) export(fcase) export(fread) +export(reopen_connection) export(fwrite) export(foverlaps) export(shift) diff --git a/R/fread.R b/R/fread.R index 15d40ae57d..c74adf3e52 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,3 +1,33 @@ +# S3 generic for reopening connections in binary mode +reopen_connection = function(con, ...) { + UseMethod("reopen_connection") +} + +reopen_connection.default = function(con, ...) { + con_class = class(con)[1L] + stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) +} + +reopen_connection.file = function(con, ...) { + file(summary(con)$description, "rb") +} + +reopen_connection.gzfile = function(con, ...) { + gzfile(summary(con)$description, "rb") +} + +reopen_connection.bzfile = function(con, ...) { + bzfile(summary(con)$description, "rb") +} + +reopen_connection.url = function(con, ...) { + url(summary(con)$description, "rb") +} + +reopen_connection.pipe = function(con, ...) { + pipe(summary(con)$description, "rb") +} + fread = function( input="", file=NULL, text=NULL, cmd=NULL, sep="auto", sep2="auto", dec="auto", quote="\"", nrows=Inf, header="auto", na.strings=getOption("datatable.na.strings","NA"), stringsAsFactors=FALSE, verbose=getOption("datatable.verbose",FALSE), @@ -112,13 +142,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") if (needs_reopen) { close(input) - input = switch(con_class, - "file" = file(con_desc, "rb"), - "gzfile" = gzfile(con_desc, "rb"), - "bzfile" = bzfile(con_desc, "rb"), - "url" = url(con_desc, "rb"), - "pipe" = pipe(con_desc, "rb"), - stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class)) + input = reopen_connection(input) close_con = input } else if (!con_open) { open(input, "rb") diff --git a/man/reopen_connection.Rd b/man/reopen_connection.Rd new file mode 100644 index 0000000000..68a0100bba --- /dev/null +++ b/man/reopen_connection.Rd @@ -0,0 +1,25 @@ +\name{reopen_connection} +\alias{reopen_connection} +\title{ Reopen a connection in binary mode } +\description{ + S3 generic to reopen a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. +} +\usage{ +reopen_connection(con, ...) +} +\arguments{ + \item{con}{ A connection object. } + \item{...}{ Additional arguments for methods. } +} +\details{ + Reopens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, and \code{pipe} connections. + + To support custom connection types with \code{fread}, define a method for your connection class that returns a new connection opened in binary mode. +} +\value{ + A connection object opened in binary read mode. +} +\seealso{ + \code{\link{fread}} +} +\keyword{ data } From d76c3a5480585fce0c1edaf8fd6f721cdf8469ba Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 14:01:30 +0100 Subject: [PATCH 15/32] close con on exit --- R/fread.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/R/fread.R b/R/fread.R index c74adf3e52..de73486964 100644 --- a/R/fread.R +++ b/R/fread.R @@ -24,6 +24,10 @@ reopen_connection.url = function(con, ...) { url(summary(con)$description, "rb") } +reopen_connection.unz = function(con, ...) { + unz(summary(con)$description, "rb") +} + reopen_connection.pipe = function(con, ...) { pipe(summary(con)$description, "rb") } @@ -148,6 +152,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") open(input, "rb") close_con = input } + if (!is.null(close_con)) on.exit(close(close_con), add=TRUE) tmpFile = tempfile(tmpdir=tmpdir) on.exit(unlink(tmpFile), add=TRUE) bytes_copied = .Call(CspillConnectionToFile, input, tmpFile, as.numeric(nrows)) @@ -165,7 +170,6 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") connection_spill_info = c(spill_elapsed, bytes_copied) input = tmpFile file = tmpFile - if (!is.null(close_con)) close(close_con) } if (!is.null(file)) { if (!is.character(file) || length(file)!=1L) From d520cd42db2d422d0b721232ca17fe05d31b88ab Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 14:02:27 +0100 Subject: [PATCH 16/32] adjust doc --- man/reopen_connection.Rd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/man/reopen_connection.Rd b/man/reopen_connection.Rd index 68a0100bba..aa9f88009c 100644 --- a/man/reopen_connection.Rd +++ b/man/reopen_connection.Rd @@ -12,7 +12,7 @@ reopen_connection(con, ...) \item{...}{ Additional arguments for methods. } } \details{ - Reopens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, and \code{pipe} connections. + Reopens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections. To support custom connection types with \code{fread}, define a method for your connection class that returns a new connection opened in binary mode. } From c3f7cf6b23740282ec70d86c379b4cbc61dd85dd Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 16:01:08 +0100 Subject: [PATCH 17/32] update conncection info --- R/fread.R | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/R/fread.R b/R/fread.R index de73486964..7c35597ada 100644 --- a/R/fread.R +++ b/R/fread.R @@ -4,7 +4,7 @@ reopen_connection = function(con, ...) { } reopen_connection.default = function(con, ...) { - con_class = class(con)[1L] + con_class = class1(con) stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) } @@ -132,8 +132,6 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") } spill_started.at = proc.time() con_summary = summary(input) - con_desc = con_summary$description - con_class = class1(input) con_open = isOpen(input) needs_reopen = FALSE From 4235a5cd8458b46e00af962500f94c717f3e0b04 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 17:48:42 +0100 Subject: [PATCH 18/32] reopen connection --- R/fread.R | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/R/fread.R b/R/fread.R index 7c35597ada..52efc67e19 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,35 +1,35 @@ # S3 generic for reopening connections in binary mode -reopen_connection = function(con, ...) { +reopen_connection = function(con, description, ...) { UseMethod("reopen_connection") } -reopen_connection.default = function(con, ...) { +reopen_connection.default = function(con, description, ...) { con_class = class1(con) stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) } -reopen_connection.file = function(con, ...) { - file(summary(con)$description, "rb") +reopen_connection.file = function(con, description, ...) { + file(description, "rb") } -reopen_connection.gzfile = function(con, ...) { - gzfile(summary(con)$description, "rb") +reopen_connection.gzfile = function(con, description, ...) { + gzfile(description, "rb") } -reopen_connection.bzfile = function(con, ...) { - bzfile(summary(con)$description, "rb") +reopen_connection.bzfile = function(con, description, ...) { + bzfile(description, "rb") } -reopen_connection.url = function(con, ...) { - url(summary(con)$description, "rb") +reopen_connection.url = function(con, description, ...) { + url(description, "rb") } -reopen_connection.unz = function(con, ...) { - unz(summary(con)$description, "rb") +reopen_connection.unz = function(con, description, ...) { + unz(description, "rb") } -reopen_connection.pipe = function(con, ...) { - pipe(summary(con)$description, "rb") +reopen_connection.pipe = function(con, description, ...) { + pipe(description, "rb") } fread = function( @@ -131,11 +131,11 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") flush.console() } spill_started.at = proc.time() - con_summary = summary(input) con_open = isOpen(input) needs_reopen = FALSE if (con_open) { + con_summary = summary(input) binary_modes = c("rb", "r+b", "wb", "w+b", "ab", "a+b") if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE } @@ -144,7 +144,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") if (needs_reopen) { close(input) - input = reopen_connection(input) + input = reopen_connection(input, con_summary$description) close_con = input } else if (!con_open) { open(input, "rb") From e37b0eeeacc98a1e17141ea3db13505af5173db0 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 18:06:27 +0100 Subject: [PATCH 19/32] change modes --- R/fread.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/fread.R b/R/fread.R index 52efc67e19..932849d14c 100644 --- a/R/fread.R +++ b/R/fread.R @@ -136,7 +136,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") needs_reopen = FALSE if (con_open) { con_summary = summary(input) - binary_modes = c("rb", "r+b", "wb", "w+b", "ab", "a+b") + binary_modes = c("rb", "r+b") if (!con_summary$mode %chin% binary_modes) needs_reopen = TRUE } From 2bcfc6c2364fbeaa96b209083b102b230e5ef4f7 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 18:10:36 +0100 Subject: [PATCH 20/32] update docs --- man/reopen_connection.Rd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/man/reopen_connection.Rd b/man/reopen_connection.Rd index aa9f88009c..42da77c415 100644 --- a/man/reopen_connection.Rd +++ b/man/reopen_connection.Rd @@ -5,10 +5,11 @@ S3 generic to reopen a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. } \usage{ -reopen_connection(con, ...) +reopen_connection(con, description, ...) } \arguments{ \item{con}{ A connection object. } + \item{description}{ character string. A description of the connection. } \item{...}{ Additional arguments for methods. } } \details{ From 2e67cc2760e5a4f308233da456aac2f659890f07 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 18:30:25 +0100 Subject: [PATCH 21/32] add nocov --- R/fread.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/fread.R b/R/fread.R index 932849d14c..1ebb5f7c82 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,3 +1,4 @@ +# nocov start # S3 generic for reopening connections in binary mode reopen_connection = function(con, description, ...) { UseMethod("reopen_connection") @@ -31,6 +32,7 @@ reopen_connection.unz = function(con, description, ...) { reopen_connection.pipe = function(con, description, ...) { pipe(description, "rb") } +# nocov end fread = function( input="", file=NULL, text=NULL, cmd=NULL, sep="auto", sep2="auto", dec="auto", quote="\"", nrows=Inf, header="auto", From 4383ae24f066de3697de8e131da048e80f4f3ffa Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Mon, 10 Nov 2025 22:14:06 +0100 Subject: [PATCH 22/32] use R_ExecWithCleanup to clean up on errors --- src/freadR.c | 107 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 64 insertions(+), 43 deletions(-) diff --git a/src/freadR.c b/src/freadR.c index 59b8e6bd65..6ee7bd3d28 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -741,58 +741,54 @@ void progress(int p, int eta) } // # nocov end -// Spill connection contents to a tempfile so R-level fread can treat it like a filename -SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { -#if R_CONNECTIONS_VERSION != 1 -INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov -#else - if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { - INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov - } - - if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { - INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); // # nocov +typedef struct { + Rconnection con; + const char *filepath; + size_t row_limit; + FILE *outfile; + char *buffer; +} SpillState; + +static void spill_cleanup(void *data) +{ + SpillState *state = (SpillState *)data; + if (!state) return; + free(state->buffer); // free(NULL) is safe no-op + if (state->outfile) { + fclose(state->outfile); } +} - Rconnection con = R_GetConnection(connection); - const char *filepath = CHAR(STRING_ELT(tempfile_path, 0)); - const double nrows_max = REAL_RO(nrows_limit)[0]; - const bool limit_rows = R_FINITE(nrows_max) && nrows_max >= 0.0; - size_t row_limit = 0; - if (limit_rows) { - row_limit = (size_t)nrows_max; - if (row_limit == 0) row_limit = 100; // read at least 100 rows if nrows==0 - row_limit++; // cater for potential header row - } +static SEXP do_spill(void *data) +{ + SpillState *state = (SpillState *)data; + const size_t chunk_size = 256 * 1024; // TODO tune chunk size - FILE *outfile = fopen(filepath, "wb"); - if (outfile == NULL) { - STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing: %s"), filepath, strerror(errno)); // # nocov + state->outfile = fopen(state->filepath, "wb"); + if (state->outfile == NULL) { + STOP(_("spillConnectionToFile: failed to open temp file '%s' for writing: %s"), state->filepath, strerror(errno)); // # nocov } - // Read and write in chunks // TODO tune chunk size - size_t chunk_size = 256 * 1024; - char *buffer = malloc(chunk_size); - if (!buffer) { - fclose(outfile); // # nocov + state->buffer = malloc(chunk_size); + if (!state->buffer) { STOP(_("spillConnectionToFile: failed to allocate buffer")); // # nocov } - + const bool limit_rows = R_FINITE(state->row_limit) && (state->row_limit > 0); size_t total_read = 0; size_t nrows_seen = 0; while (true) { - size_t nread = R_ReadConnection(con, buffer, chunk_size); + size_t nread = R_ReadConnection(state->con, state->buffer, chunk_size); if (nread == 0) { break; // EOF } size_t bytes_to_write = nread; - if (limit_rows && nrows_seen < row_limit) { + if (limit_rows && nrows_seen < state->row_limit) { for (size_t i = 0; i < nread; i++) { - if (buffer[i] == '\n') { + if (state->buffer[i] == '\n') { nrows_seen++; - if (nrows_seen >= row_limit) { + if (nrows_seen >= state->row_limit) { bytes_to_write = i + 1; break; } @@ -800,24 +796,49 @@ INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", } } - size_t nwritten = fwrite(buffer, 1, bytes_to_write, outfile); + size_t nwritten = fwrite(state->buffer, 1, bytes_to_write, state->outfile); if (nwritten != bytes_to_write) { - // # nocov start - free(buffer); - fclose(outfile); - STOP(_("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)"), strerror(errno), nwritten, bytes_to_write); - // # nocov end + STOP(_("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)"), strerror(errno), nwritten, bytes_to_write); // # nocov } total_read += bytes_to_write; - if (limit_rows && nrows_seen >= row_limit) { + if (limit_rows && nrows_seen >= state->row_limit) { break; } } - free(buffer); - fclose(outfile); return ScalarReal((double)total_read); +} + +// Spill connection contents to a tempfile so R-level fread can treat it like a filename +SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { +#if R_CONNECTIONS_VERSION != 1 +INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov +#else + if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov + } + + if (!isReal(nrows_limit) || LENGTH(nrows_limit) != 1) { + INTERNAL_STOP(_("spillConnectionToFile: nrows_limit must be a single numeric value")); // # nocov + } + + SpillState state = { + .con = R_GetConnection(connection), + .filepath = CHAR(STRING_ELT(tempfile_path, 0)), + .row_limit = 0, + .outfile = NULL, + .buffer = NULL + }; + + const double nrows_max = REAL_RO(nrows_limit)[0]; + if (R_FINITE(nrows_max) && nrows_max >= 0.0) { + state.row_limit = (size_t)nrows_max; + if (state.row_limit == 0) state.row_limit = 100; // read at least 100 rows if nrows==0 + state.row_limit++; // cater for potential header row + } + + return R_ExecWithCleanup(do_spill, &state, spill_cleanup, &state); #endif // was R_CONNECTIONS_VERSION not != 1? } From 5e9178025964712c33e55c1354a0a5a03a4afb19 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 11 Nov 2025 22:22:48 +0100 Subject: [PATCH 23/32] add test for consuming before fread --- inst/tests/tests.Rraw | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 08f20017c4..85faeae721 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -21869,6 +21869,11 @@ test(2346.1, fread(file=file(f, "r"), verbose=TRUE), fread(f), output="Spill con test(2346.2, fread(file(f, "r"), nrows=0L), fread(f, nrows=0L)) test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) +# test with open connection consuming part of the connection before fread +con = file(f, "rb") +readLines(con, n=3) +test(2346.5, fread(con), fread(f, skip=3L)) +close(con) file.create(f <- tempfile()) -test(2346.5, fread(file(f)), data.table(), warning="Connection has size 0.") +test(2346.6, fread(file(f)), data.table(), warning="Connection has size 0.") unlink(f) From 5182c0ceb12c7d29f3b454554556cc148fbb3a62 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 11 Nov 2025 22:53:24 +0100 Subject: [PATCH 24/32] use factory pattern --- R/fread.R | 34 +++++++++++++++++----------------- man/connection_opener.Rd | 33 +++++++++++++++++++++++++++++++++ man/reopen_connection.Rd | 26 -------------------------- 3 files changed, 50 insertions(+), 43 deletions(-) create mode 100644 man/connection_opener.Rd delete mode 100644 man/reopen_connection.Rd diff --git a/R/fread.R b/R/fread.R index 1ebb5f7c82..4429d43d6c 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,36 +1,36 @@ # nocov start -# S3 generic for reopening connections in binary mode -reopen_connection = function(con, description, ...) { - UseMethod("reopen_connection") +# S3 generic that returns a function to open connections in binary mode +connection_opener = function(con, ...) { + UseMethod("connection_opener") } -reopen_connection.default = function(con, description, ...) { +connection_opener.default = function(con, ...) { con_class = class1(con) stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) } -reopen_connection.file = function(con, description, ...) { - file(description, "rb") +connection_opener.file = function(con, ...) { + function(description) file(description, "rb", ...) } -reopen_connection.gzfile = function(con, description, ...) { - gzfile(description, "rb") +connection_opener.gzfile = function(con, ...) { + function(description) gzfile(description, "rb", ...) } -reopen_connection.bzfile = function(con, description, ...) { - bzfile(description, "rb") +connection_opener.bzfile = function(con, ...) { + function(description) bzfile(description, "rb", ...) } -reopen_connection.url = function(con, description, ...) { - url(description, "rb") +connection_opener.url = function(con, ...) { + function(description) url(description, "rb", ...) } -reopen_connection.unz = function(con, description, ...) { - unz(description, "rb") +connection_opener.unz = function(con, ...) { + function(description) unz(description, "rb", ...) } -reopen_connection.pipe = function(con, description, ...) { - pipe(description, "rb") +connection_opener.pipe = function(con, ...) { + function(description) pipe(description, "rb", ...) } # nocov end @@ -146,7 +146,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") if (needs_reopen) { close(input) - input = reopen_connection(input, con_summary$description) + input = connection_opener(input)(con_summary$description) close_con = input } else if (!con_open) { open(input, "rb") diff --git a/man/connection_opener.Rd b/man/connection_opener.Rd new file mode 100644 index 0000000000..622e347783 --- /dev/null +++ b/man/connection_opener.Rd @@ -0,0 +1,33 @@ +\name{connection_opener} +\alias{connection_opener} +\title{ Create a function to open connections in binary mode } +\description{ + S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. +} +\usage{ +connection_opener(con, ...) +} +\arguments{ + \item{con}{ A connection object. } + \item{...}{ Additional arguments passed to the connection constructor. } +} +\details{ + Returns a function that accepts a description argument and opens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections. + + To support custom connection types with \code{fread}, define a method for your connection class that returns an opener function. +} +\value{ + A function that accepts a description argument and returns a connection object opened in binary read mode. +} +\examples{ +\dontrun{ +# Define a method for a custom connection class +connection_opener.my_con = function(con, ...) { + function(description) my_con(description, mode = "rb", ...) +} +} +} +\seealso{ + \code{\link{fread}} +} +\keyword{ data } diff --git a/man/reopen_connection.Rd b/man/reopen_connection.Rd deleted file mode 100644 index 42da77c415..0000000000 --- a/man/reopen_connection.Rd +++ /dev/null @@ -1,26 +0,0 @@ -\name{reopen_connection} -\alias{reopen_connection} -\title{ Reopen a connection in binary mode } -\description{ - S3 generic to reopen a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. -} -\usage{ -reopen_connection(con, description, ...) -} -\arguments{ - \item{con}{ A connection object. } - \item{description}{ character string. A description of the connection. } - \item{...}{ Additional arguments for methods. } -} -\details{ - Reopens a connection in binary read mode (\code{"rb"}). Methods are provided for \code{file}, \code{gzfile}, \code{bzfile}, \code{url}, \code{unz} and \code{pipe} connections. - - To support custom connection types with \code{fread}, define a method for your connection class that returns a new connection opened in binary mode. -} -\value{ - A connection object opened in binary read mode. -} -\seealso{ - \code{\link{fread}} -} -\keyword{ data } From 441c557b988e8a06ff7f4155aeff3cb109c0d0dd Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 11 Nov 2025 22:56:49 +0100 Subject: [PATCH 25/32] add aliases for S3 methods --- man/connection_opener.Rd | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/man/connection_opener.Rd b/man/connection_opener.Rd index 622e347783..4f9f91f4d6 100644 --- a/man/connection_opener.Rd +++ b/man/connection_opener.Rd @@ -1,5 +1,12 @@ \name{connection_opener} \alias{connection_opener} +\alias{connection_opener.default} +\alias{connection_opener.file} +\alias{connection_opener.gzfile} +\alias{connection_opener.bzfile} +\alias{connection_opener.url} +\alias{connection_opener.unz} +\alias{connection_opener.pipe} \title{ Create a function to open connections in binary mode } \description{ S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. From a609fdab3ee97fe85042327658a967301a15259f Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 11 Nov 2025 23:40:58 +0100 Subject: [PATCH 26/32] capture print in test --- inst/tests/tests.Rraw | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 85faeae721..b5b7e5d618 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -21871,8 +21871,7 @@ test(2346.3, fread(file(f, "r"), nrows=5), fread(f, nrows=5)) test(2346.4, fread(file(f, "r"), nrows=5, header=FALSE), fread(f, nrows=5, header=FALSE)) # test with open connection consuming part of the connection before fread con = file(f, "rb") -readLines(con, n=3) -test(2346.5, fread(con), fread(f, skip=3L)) +test(2346.5, {readLines(con, n=3); fread(con)}, fread(f, skip=3L)) close(con) file.create(f <- tempfile()) test(2346.6, fread(file(f)), data.table(), warning="Connection has size 0.") From daabbb79a1a632f17be69ffcf12d7c55eb9bbf44 Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 11 Nov 2025 23:41:55 +0100 Subject: [PATCH 27/32] fix namespace --- NAMESPACE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NAMESPACE b/NAMESPACE index aec274be7a..8c960e65e6 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -20,7 +20,7 @@ export(rbindlist) export(fifelse) export(fcase) export(fread) -export(reopen_connection) +export(connection_opener) export(fwrite) export(foverlaps) export(shift) From 1a98f38f1b06264413fce6396cd652c671c077ec Mon Sep 17 00:00:00 2001 From: Benjamin Schwendinger Date: Tue, 18 Nov 2025 18:31:26 +0100 Subject: [PATCH 28/32] rename to binary_reopener --- NAMESPACE | 2 +- R/fread.R | 20 ++++++++++---------- man/connection_opener.Rd | 22 +++++++++++----------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 8c960e65e6..c93a0a641c 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -20,7 +20,7 @@ export(rbindlist) export(fifelse) export(fcase) export(fread) -export(connection_opener) +export(binary_reopener) export(fwrite) export(foverlaps) export(shift) diff --git a/R/fread.R b/R/fread.R index 4429d43d6c..1724778941 100644 --- a/R/fread.R +++ b/R/fread.R @@ -1,35 +1,35 @@ # nocov start # S3 generic that returns a function to open connections in binary mode -connection_opener = function(con, ...) { - UseMethod("connection_opener") +binary_reopener = function(con, ...) { + UseMethod("binary_reopener") } -connection_opener.default = function(con, ...) { +binary_reopener.default = function(con, ...) { con_class = class1(con) stopf("Don't know how to reopen connection type '%s'. Need a connection opened in binary mode to continue.", con_class) } -connection_opener.file = function(con, ...) { +binary_reopener.file = function(con, ...) { function(description) file(description, "rb", ...) } -connection_opener.gzfile = function(con, ...) { +binary_reopener.gzfile = function(con, ...) { function(description) gzfile(description, "rb", ...) } -connection_opener.bzfile = function(con, ...) { +binary_reopener.bzfile = function(con, ...) { function(description) bzfile(description, "rb", ...) } -connection_opener.url = function(con, ...) { +binary_reopener.url = function(con, ...) { function(description) url(description, "rb", ...) } -connection_opener.unz = function(con, ...) { +binary_reopener.unz = function(con, ...) { function(description) unz(description, "rb", ...) } -connection_opener.pipe = function(con, ...) { +binary_reopener.pipe = function(con, ...) { function(description) pipe(description, "rb", ...) } # nocov end @@ -146,7 +146,7 @@ yaml=FALSE, tmpdir=tempdir(), tz="UTC") if (needs_reopen) { close(input) - input = connection_opener(input)(con_summary$description) + input = binary_reopener(input)(con_summary$description) close_con = input } else if (!con_open) { open(input, "rb") diff --git a/man/connection_opener.Rd b/man/connection_opener.Rd index 4f9f91f4d6..c22927c736 100644 --- a/man/connection_opener.Rd +++ b/man/connection_opener.Rd @@ -1,18 +1,18 @@ -\name{connection_opener} -\alias{connection_opener} -\alias{connection_opener.default} -\alias{connection_opener.file} -\alias{connection_opener.gzfile} -\alias{connection_opener.bzfile} -\alias{connection_opener.url} -\alias{connection_opener.unz} -\alias{connection_opener.pipe} +\name{binary_reopener} +\alias{binary_reopener} +\alias{binary_reopener.default} +\alias{binary_reopener.file} +\alias{binary_reopener.gzfile} +\alias{binary_reopener.bzfile} +\alias{binary_reopener.url} +\alias{binary_reopener.unz} +\alias{binary_reopener.pipe} \title{ Create a function to open connections in binary mode } \description{ S3 generic that returns a function to open a connection in binary read mode. Used internally by \code{fread}. Exported so packages with custom connection classes can define methods. } \usage{ -connection_opener(con, ...) +binary_reopener(con, ...) } \arguments{ \item{con}{ A connection object. } @@ -29,7 +29,7 @@ connection_opener(con, ...) \examples{ \dontrun{ # Define a method for a custom connection class -connection_opener.my_con = function(con, ...) { +binary_reopener.my_con = function(con, ...) { function(description) my_con(description, mode = "rb", ...) } } From 5eef830594c4ff48f10377fdd4db0193d9dbc4b0 Mon Sep 17 00:00:00 2001 From: Ivan K Date: Wed, 19 Nov 2025 17:47:46 +0300 Subject: [PATCH 29/32] More #ifdef wrapping for connections API Wrap the helper functions too. Avoid double negatives. --- src/freadR.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/freadR.c b/src/freadR.c index 6ee7bd3d28..ea15199e45 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -741,6 +741,7 @@ void progress(int p, int eta) } // # nocov end +#if R_CONNECTIONS_VERSION == 1 typedef struct { Rconnection con; const char *filepath; @@ -809,12 +810,11 @@ static SEXP do_spill(void *data) return ScalarReal((double)total_read); } +#endif // R_CONNECTIONS_VERSION == 1 // Spill connection contents to a tempfile so R-level fread can treat it like a filename SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit) { -#if R_CONNECTIONS_VERSION != 1 -INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov -#else +#if R_CONNECTIONS_VERSION == 1 if (!isString(tempfile_path) || LENGTH(tempfile_path) != 1) { INTERNAL_STOP(_("spillConnectionToFile: tempfile_path must be a single string")); // # nocov } @@ -839,7 +839,9 @@ INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", } return R_ExecWithCleanup(do_spill, &state, spill_cleanup, &state); -#endif // was R_CONNECTIONS_VERSION not != 1? +#else // R_CONNECTIONS_VERSION != 1 + INTERNAL_STOP(_("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d", R_CONNECTIONS_VERSION)); // # nocov +#endif } void halt__(bool warn, const char *format, ...) From 0c6eff568cd592130fe60b0a61f90d7d0ec62b41 Mon Sep 17 00:00:00 2001 From: Ivan K Date: Wed, 19 Nov 2025 23:48:26 +0300 Subject: [PATCH 30/32] R_FINITE will always be true for a size_t argument --- src/freadR.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/freadR.c b/src/freadR.c index ea15199e45..abee4b29bc 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -774,7 +774,7 @@ static SEXP do_spill(void *data) if (!state->buffer) { STOP(_("spillConnectionToFile: failed to allocate buffer")); // # nocov } - const bool limit_rows = R_FINITE(state->row_limit) && (state->row_limit > 0); + const bool limit_rows = state->row_limit > 0; size_t total_read = 0; size_t nrows_seen = 0; From 236bc5c3bb419ba45ce97e89768e4439e70e1fb5 Mon Sep 17 00:00:00 2001 From: Ivan K Date: Wed, 19 Nov 2025 23:56:14 +0300 Subject: [PATCH 31/32] Fail when nrow_limit exceeds SIZE_MAX Otherwise truncation occurs silently, possibly setting the limit to something like 100. --- src/freadR.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/freadR.c b/src/freadR.c index abee4b29bc..713d1a8b5c 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -833,6 +833,8 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit const double nrows_max = REAL_RO(nrows_limit)[0]; if (R_FINITE(nrows_max) && nrows_max >= 0.0) { + if (nrows_max > SIZE_MAX) + STOP(_("spillConnectionToFile: nrows_limit (%g) must fit into a native-size unsigned integer (<= %zu)"), nrows_max, (size_t)SIZE_MAX); // # nocov state.row_limit = (size_t)nrows_max; if (state.row_limit == 0) state.row_limit = 100; // read at least 100 rows if nrows==0 state.row_limit++; // cater for potential header row From 6f4d90f81ee2551268a2dddc7306cabdbb4f78db Mon Sep 17 00:00:00 2001 From: Ivan K Date: Thu, 20 Nov 2025 00:04:59 +0300 Subject: [PATCH 32/32] Use translateChar() for native encoding string CHAR() could in theory return Latin-1 or UTF-8 text. translateChar() checks the encoding bits and only converts if needed, releasing the memory upon return from the .Call(). --- src/freadR.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/freadR.c b/src/freadR.c index 713d1a8b5c..e7d7cb8b0e 100644 --- a/src/freadR.c +++ b/src/freadR.c @@ -825,7 +825,7 @@ SEXP spillConnectionToFile(SEXP connection, SEXP tempfile_path, SEXP nrows_limit SpillState state = { .con = R_GetConnection(connection), - .filepath = CHAR(STRING_ELT(tempfile_path, 0)), + .filepath = translateChar(STRING_ELT(tempfile_path, 0)), .row_limit = 0, .outfile = NULL, .buffer = NULL