diff --git a/.gitignore b/.gitignore index 259148f..65286e9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,10 @@ *.exe *.out *.app + +# Build directories +build/ +CMakeFiles/ +CMakeCache.txt +cmake_install.cmake +Makefile diff --git a/README.md b/README.md index 42f897e..dd2fdcb 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,26 @@ To enable the checksum (only on XRootD 5.2+): ofs.ckslib * libXrdMultiuser.so ``` +### Write Buffering + +To reduce IOPS from small sequential writes, you can enable write buffering: + +``` +multiuser.writebuffersize +``` + +Where `` is the buffer size in bytes. Default is 0 (disabled). When enabled: +- Sequential writes smaller than the buffer size are accumulated in memory +- The buffer is flushed when full, when a non-sequential write occurs, or when the file is closed +- Buffering is automatically disabled for a file if non-sequential writes are detected + +Example: Buffer up to 1MB of writes: +``` +multiuser.writebuffersize 1048576 +``` + +**Note:** Buffering is only suitable for sequential write workloads. Non-sequential writes will cause the buffer to be flushed and buffering disabled for that file. + Startup ------- diff --git a/configs/60-osg-multiuser.cfg b/configs/60-osg-multiuser.cfg index 050c646..98aa0f8 100644 --- a/configs/60-osg-multiuser.cfg +++ b/configs/60-osg-multiuser.cfg @@ -17,4 +17,9 @@ if defined ?~XC_ENABLE_MULTIUSER # checksum while it is writing a file. To turn this on, uncomment the # following line: # multiuser.checksumonwrite on + + # Write buffering can reduce IOPS for workloads with many small sequential + # writes. Specify the buffer size in bytes (default: 0 = disabled). + # Example: Buffer up to 1MB of sequential writes + # multiuser.writebuffersize 1048576 fi diff --git a/src/MultiuserFile.hh b/src/MultiuserFile.hh index 74e1d81..d2f95a5 100644 --- a/src/MultiuserFile.hh +++ b/src/MultiuserFile.hh @@ -15,10 +15,12 @@ #include "XrdChecksum.hh" #include +#include +#include class MultiuserFile : public XrdOssDF { public: - MultiuserFile(const char *user, std::unique_ptr ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss); + MultiuserFile(const char *user, std::unique_ptr ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size); virtual ~MultiuserFile() { if (m_state) {delete m_state;} @@ -127,6 +129,7 @@ public: int Close(long long *retsz=0); private: + int FlushWriteBuffer(); std::unique_ptr m_wrapped; XrdSysError &m_log; const XrdSecEntity* m_client; @@ -138,6 +141,13 @@ private: bool m_checksum_on_write; unsigned m_digests; + // Write buffering + size_t m_write_buffer_size; + std::vector m_write_buffer; + off_t m_buffer_offset; + bool m_buffering_enabled; + std::mutex m_buffer_mutex; + }; #endif \ No newline at end of file diff --git a/src/MultiuserFileSystem.cc b/src/MultiuserFileSystem.cc index 714a99f..c2521c5 100644 --- a/src/MultiuserFileSystem.cc +++ b/src/MultiuserFileSystem.cc @@ -41,7 +41,8 @@ MultiuserFileSystem::MultiuserFileSystem(XrdOss *oss, XrdSysLogger *lp, const ch m_env(envP), m_log(lp, "multiuser_"), m_checksum_on_write(false), - m_digests(0) + m_digests(0), + m_write_buffer_size(0) { if (!oss) { throw std::runtime_error("The multi-user plugin must be chained with another filesystem."); @@ -114,6 +115,28 @@ MultiuserFileSystem::Config(XrdSysLogger *lp, const char *configfn) return false; } } + + // Write buffer size + if (!strcmp("multiuser.writebuffersize", val)) { + val = Config.GetWord(); + if (!val || !val[0]) { + m_log.Emsg("Config", "multiuser.writebuffersize must specify a value"); + Config.Close(); + return false; + } + char *endptr = NULL; + errno = 0; + long int buffer_size = strtol(val, &endptr, 0); + if (errno == ERANGE || buffer_size < 0 || endptr == val || *endptr != '\0') { + m_log.Emsg("Config", "multiuser.writebuffersize must specify a valid non-negative integer"); + Config.Close(); + return false; + } + m_write_buffer_size = static_cast(buffer_size); + std::stringstream ss; + ss << "Setting write buffer size to " << m_write_buffer_size << " bytes"; + m_log.Emsg("Config", ss.str().c_str()); + } if (!strcmp("xrootd.chksum", val)) { m_digests = 0; val = Config.GetWord(); @@ -175,7 +198,7 @@ XrdOssDF *MultiuserFileSystem::newFile(const char *user) { // Call the underlying OSS newFile std::unique_ptr wrapped(m_oss->newFile(user)); - return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this); + return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this, m_write_buffer_size); } int MultiuserFileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env) diff --git a/src/MultiuserFileSystem.hh b/src/MultiuserFileSystem.hh index a7289a8..e320d9d 100644 --- a/src/MultiuserFileSystem.hh +++ b/src/MultiuserFileSystem.hh @@ -24,6 +24,8 @@ public: bool Config(XrdSysLogger *lp, const char *configfn); + size_t GetWriteBufferSize() const { return m_write_buffer_size; } + XrdOssDF *newDir(const char *user=0); XrdOssDF *newFile(const char *user=0); int Chmod(const char * path, mode_t mode, XrdOucEnv *env=0); @@ -71,6 +73,7 @@ private: std::shared_ptr m_authz; bool m_checksum_on_write; unsigned m_digests; + size_t m_write_buffer_size; }; diff --git a/src/multiuser.cpp b/src/multiuser.cpp index 522cff3..ebb1d3e 100644 --- a/src/multiuser.cpp +++ b/src/multiuser.cpp @@ -133,7 +133,7 @@ class ErrorSentry }; -MultiuserFile::MultiuserFile(const char *user, std::unique_ptr ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss) : +MultiuserFile::MultiuserFile(const char *user, std::unique_ptr ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size) : XrdOssDF(user), m_wrapped(std::move(ossDF)), m_log(log), @@ -142,7 +142,10 @@ MultiuserFile::MultiuserFile(const char *user, std::unique_ptr ossDF, m_nextoff(0), m_oss(oss), m_checksum_on_write(checksum_on_write), - m_digests(digests) + m_digests(digests), + m_write_buffer_size(write_buffer_size), + m_buffer_offset(-1), + m_buffering_enabled(write_buffer_size > 0) {} int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) @@ -173,28 +176,182 @@ int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv ssize_t MultiuserFile::Write(const void *buffer, off_t offset, size_t size) { + // Only take the lock if buffering is configured (m_write_buffer_size > 0) + std::unique_lock lock(m_buffer_mutex, std::defer_lock); + if (m_write_buffer_size > 0) { + lock.lock(); + } - if ((offset != m_nextoff) && m_state) + // Check for out-of-order writes if checksumming or buffering + if ((offset != m_nextoff) && (m_state || m_buffering_enabled)) { - std::stringstream ss; - ss << "Out-of-order writes not supported while running checksum. " << m_fname; - m_log.Emsg("Write", ss.str().c_str()); - return -ENOTSUP; + // Flush any buffered data first + if (m_buffering_enabled) { + int flush_result = FlushWriteBuffer(); + if (flush_result < 0) { + return flush_result; + } + // Disable buffering for the rest of this file + m_buffering_enabled = false; + } + + if (m_state) { + std::stringstream ss; + ss << "Non-sequential write detected; disabling checksum calculation for " << m_fname; + m_log.Emsg("Write", ss.str().c_str()); + delete m_state; + m_state = nullptr; + } + } + + // If buffering is enabled and configured + if (m_buffering_enabled) { + // If this is the first write or buffer is empty, initialize buffer offset + if (m_write_buffer.empty()) { + m_buffer_offset = offset; + } + + // Check if this write is sequential to the buffer + off_t expected_offset = m_buffer_offset + static_cast(m_write_buffer.size()); + if (offset != expected_offset) { + // Not sequential - flush buffer and disable buffering + int flush_result = FlushWriteBuffer(); + if (flush_result < 0) { + return flush_result; + } + m_buffering_enabled = false; + // Fall through to direct write + } else { + // Sequential write - check if we should buffer it + size_t total_size = m_write_buffer.size() + size; + + if (total_size <= m_write_buffer_size) { + // Buffer this write - reserve capacity to avoid reallocations + if (m_write_buffer.capacity() < total_size) { + m_write_buffer.reserve(total_size); + } + m_write_buffer.insert(m_write_buffer.end(), + static_cast(buffer), + static_cast(buffer) + size); + m_nextoff = offset + size; + return size; + } else { + // Buffer would exceed limit - fill buffer to maximum, flush, then continue + size_t space_in_buffer = m_write_buffer_size - m_write_buffer.size(); + if (space_in_buffer > 0) { + // Fill the buffer completely + if (m_write_buffer.capacity() < m_write_buffer_size) { + m_write_buffer.reserve(m_write_buffer_size); + } + m_write_buffer.insert(m_write_buffer.end(), + static_cast(buffer), + static_cast(buffer) + space_in_buffer); + } + + // Flush the full buffer + int flush_result = FlushWriteBuffer(); + if (flush_result < 0) { + return flush_result; + } + + // Now handle the remaining data + const unsigned char* remaining_data = static_cast(buffer) + space_in_buffer; + size_t remaining_size = size - space_in_buffer; + + // If remaining data fits in buffer, buffer it; otherwise write directly + if (remaining_size <= m_write_buffer_size) { + m_buffer_offset = offset + space_in_buffer; + if (m_write_buffer.capacity() < m_write_buffer_size) { + m_write_buffer.reserve(m_write_buffer_size); + } + m_write_buffer.insert(m_write_buffer.end(), + remaining_data, + remaining_data + remaining_size); + m_nextoff = offset + size; + return size; + } else { + // Remaining data is too large for buffer - fall through to direct write + buffer = remaining_data; + offset = offset + space_in_buffer; + size = remaining_size; + } + } + } } + // Direct write (no buffering or buffer disabled) auto result = m_wrapped->Write(buffer, offset, size); - if (result >= 0) {m_nextoff += result;} - if (m_state) - { - m_state->Update(static_cast(buffer), size); + if (result >= 0) { + m_nextoff = offset + result; + if (m_state && result > 0) { + // Only update checksum for the data that was actually written + m_state->Update(static_cast(buffer), result); + } } return result; } +// FlushWriteBuffer: Writes all buffered data to the underlying file system. +// Preconditions: Must be called with m_buffer_mutex held. +// Returns: 0 on success, negative error code on failure. +// Side effects: +// - Writes buffer contents via m_wrapped->Write() +// - Updates checksum state if enabled (m_state) +// - Clears m_write_buffer and resets m_buffer_offset on success +// - Handles partial writes with retry loop +// - On failure, buffer is NOT cleared to allow retry +int MultiuserFile::FlushWriteBuffer() +{ + if (m_write_buffer.empty()) { + return 0; + } + + size_t total_written = 0; + while (total_written < m_write_buffer.size()) { + auto result = m_wrapped->Write(m_write_buffer.data() + total_written, + m_buffer_offset + total_written, + m_write_buffer.size() - total_written); + if (result < 0) { + // Write failed - don't clear buffer, return error + return result; + } + if (result == 0) { + // No progress - treat as error + return -EIO; + } + total_written += result; + } + + if (m_state) { + m_state->Update(m_write_buffer.data(), m_write_buffer.size()); + } + + m_write_buffer.clear(); + m_buffer_offset = -1; + + return 0; +} + + int MultiuserFile::Close(long long *retsz) { + // Only take the lock if buffering is configured (m_write_buffer_size > 0) + std::unique_lock lock(m_buffer_mutex, std::defer_lock); + if (m_write_buffer_size > 0) { + lock.lock(); + } + + // Flush any remaining buffered data + if (!m_write_buffer.empty()) { + int flush_result = FlushWriteBuffer(); + if (flush_result < 0) { + m_log.Emsg("Close", "Failed to flush write buffer"); + // Continue with close anyway + } + } + auto close_result = m_wrapped->Close(retsz); if (m_state) { @@ -210,7 +367,7 @@ int MultiuserFile::Close(long long *retsz) } delete m_state; - m_state = NULL; + m_state = nullptr; } return close_result;