Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@
*.exe
*.out
*.app

# Build directories
build/
CMakeFiles/
CMakeCache.txt
cmake_install.cmake
Makefile
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ To enable the checksum (only on XRootD 5.2+):
ofs.ckslib * libXrdMultiuser.so
```

### Write Buffering

To reduce IOPS from small sequential writes, you can enable write buffering:

```
multiuser.writebuffersize <bytes>
```

Where `<bytes>` is the buffer size in bytes. Default is 0 (disabled). When enabled:
- Sequential writes smaller than the buffer size are accumulated in memory
- The buffer is flushed when full, when a non-sequential write occurs, or when the file is closed
- Buffering is automatically disabled for a file if non-sequential writes are detected

Example: Buffer up to 1MB of writes:
```
multiuser.writebuffersize 1048576
```

**Note:** Buffering is only suitable for sequential write workloads. Non-sequential writes will cause the buffer to be flushed and buffering disabled for that file.

Startup
-------

Expand Down
5 changes: 5 additions & 0 deletions configs/60-osg-multiuser.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,9 @@ if defined ?~XC_ENABLE_MULTIUSER
# checksum while it is writing a file. To turn this on, uncomment the
# following line:
# multiuser.checksumonwrite on

# Write buffering can reduce IOPS for workloads with many small sequential
# writes. Specify the buffer size in bytes (default: 0 = disabled).
# Example: Buffer up to 1MB of sequential writes
# multiuser.writebuffersize 1048576
fi
12 changes: 11 additions & 1 deletion src/MultiuserFile.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#include "XrdChecksum.hh"

#include <memory>
#include <mutex>
#include <vector>

class MultiuserFile : public XrdOssDF {
public:
MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss);
MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size);

virtual ~MultiuserFile() {
if (m_state) {delete m_state;}
Expand Down Expand Up @@ -127,6 +129,7 @@ public:
int Close(long long *retsz=0);

private:
int FlushWriteBuffer();
std::unique_ptr<XrdOssDF> m_wrapped;
XrdSysError &m_log;
const XrdSecEntity* m_client;
Expand All @@ -138,6 +141,13 @@ private:
bool m_checksum_on_write;
unsigned m_digests;

// Write buffering
size_t m_write_buffer_size;
std::vector<unsigned char> m_write_buffer;
off_t m_buffer_offset;
bool m_buffering_enabled;
std::mutex m_buffer_mutex;

};

#endif
27 changes: 25 additions & 2 deletions src/MultiuserFileSystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ MultiuserFileSystem::MultiuserFileSystem(XrdOss *oss, XrdSysLogger *lp, const ch
m_env(envP),
m_log(lp, "multiuser_"),
m_checksum_on_write(false),
m_digests(0)
m_digests(0),
m_write_buffer_size(0)
{
if (!oss) {
throw std::runtime_error("The multi-user plugin must be chained with another filesystem.");
Expand Down Expand Up @@ -114,6 +115,28 @@ MultiuserFileSystem::Config(XrdSysLogger *lp, const char *configfn)
return false;
}
}

// Write buffer size
if (!strcmp("multiuser.writebuffersize", val)) {
val = Config.GetWord();
if (!val || !val[0]) {
m_log.Emsg("Config", "multiuser.writebuffersize must specify a value");
Config.Close();
return false;
}
char *endptr = NULL;
errno = 0;
long int buffer_size = strtol(val, &endptr, 0);
if (errno == ERANGE || buffer_size < 0 || endptr == val || *endptr != '\0') {
m_log.Emsg("Config", "multiuser.writebuffersize must specify a valid non-negative integer");
Config.Close();
return false;
}
m_write_buffer_size = static_cast<size_t>(buffer_size);
std::stringstream ss;
ss << "Setting write buffer size to " << m_write_buffer_size << " bytes";
m_log.Emsg("Config", ss.str().c_str());
}
if (!strcmp("xrootd.chksum", val)) {
m_digests = 0;
val = Config.GetWord();
Expand Down Expand Up @@ -175,7 +198,7 @@ XrdOssDF *MultiuserFileSystem::newFile(const char *user)
{
// Call the underlying OSS newFile
std::unique_ptr<XrdOssDF> wrapped(m_oss->newFile(user));
return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this);
return (MultiuserFile *)new MultiuserFile(user, std::move(wrapped), m_log, m_umask_mode, m_checksum_on_write, m_digests, this, m_write_buffer_size);
}

int MultiuserFileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env)
Expand Down
3 changes: 3 additions & 0 deletions src/MultiuserFileSystem.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public:
bool
Config(XrdSysLogger *lp, const char *configfn);

size_t GetWriteBufferSize() const { return m_write_buffer_size; }

XrdOssDF *newDir(const char *user=0);
XrdOssDF *newFile(const char *user=0);
int Chmod(const char * path, mode_t mode, XrdOucEnv *env=0);
Expand Down Expand Up @@ -71,6 +73,7 @@ private:
std::shared_ptr<XrdAccAuthorize> m_authz;
bool m_checksum_on_write;
unsigned m_digests;
size_t m_write_buffer_size;

};

Expand Down
181 changes: 169 additions & 12 deletions src/multiuser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ErrorSentry
};


MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss) :
MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF, XrdSysError &log, mode_t umask_mode, bool checksum_on_write, unsigned digests, MultiuserFileSystem *oss, size_t write_buffer_size) :
XrdOssDF(user),
m_wrapped(std::move(ossDF)),
m_log(log),
Expand All @@ -142,7 +142,10 @@ MultiuserFile::MultiuserFile(const char *user, std::unique_ptr<XrdOssDF> ossDF,
m_nextoff(0),
m_oss(oss),
m_checksum_on_write(checksum_on_write),
m_digests(digests)
m_digests(digests),
m_write_buffer_size(write_buffer_size),
m_buffer_offset(-1),
m_buffering_enabled(write_buffer_size > 0)
{}

int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Expand Down Expand Up @@ -173,28 +176,182 @@ int MultiuserFile::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv

ssize_t MultiuserFile::Write(const void *buffer, off_t offset, size_t size)
{
// Only take the lock if buffering is configured (m_write_buffer_size > 0)
std::unique_lock<std::mutex> lock(m_buffer_mutex, std::defer_lock);
if (m_write_buffer_size > 0) {
lock.lock();
}

if ((offset != m_nextoff) && m_state)
// Check for out-of-order writes if checksumming or buffering
if ((offset != m_nextoff) && (m_state || m_buffering_enabled))
{
std::stringstream ss;
ss << "Out-of-order writes not supported while running checksum. " << m_fname;
m_log.Emsg("Write", ss.str().c_str());
return -ENOTSUP;
// Flush any buffered data first
if (m_buffering_enabled) {
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}
// Disable buffering for the rest of this file
m_buffering_enabled = false;
}

if (m_state) {
std::stringstream ss;
ss << "Non-sequential write detected; disabling checksum calculation for " << m_fname;
m_log.Emsg("Write", ss.str().c_str());
delete m_state;
m_state = nullptr;
}
}

// If buffering is enabled and configured
if (m_buffering_enabled) {
// If this is the first write or buffer is empty, initialize buffer offset
if (m_write_buffer.empty()) {
m_buffer_offset = offset;
}

// Check if this write is sequential to the buffer
off_t expected_offset = m_buffer_offset + static_cast<off_t>(m_write_buffer.size());
if (offset != expected_offset) {
// Not sequential - flush buffer and disable buffering
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}
m_buffering_enabled = false;
// Fall through to direct write
} else {
// Sequential write - check if we should buffer it
size_t total_size = m_write_buffer.size() + size;

if (total_size <= m_write_buffer_size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are edge cases here I'd like approached differently.

If the total is over the buffer size, then copy as much data as you can to fill the buffer and then write. This way, writes are always the buffer size (except maybe the last before close) at the cost of memcpy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to fill the buffer completely before flushing when total exceeds buffer size. After flush, remaining data is either buffered (if it fits) or written directly (if too large). All writes are now buffer-sized except possibly the last. Fixed in b830dd6.

// Buffer this write - reserve capacity to avoid reallocations
if (m_write_buffer.capacity() < total_size) {
m_write_buffer.reserve(total_size);
}
m_write_buffer.insert(m_write_buffer.end(),
static_cast<const unsigned char*>(buffer),
static_cast<const unsigned char*>(buffer) + size);
m_nextoff = offset + size;
return size;
} else {
// Buffer would exceed limit - fill buffer to maximum, flush, then continue
size_t space_in_buffer = m_write_buffer_size - m_write_buffer.size();
if (space_in_buffer > 0) {
// Fill the buffer completely
if (m_write_buffer.capacity() < m_write_buffer_size) {
m_write_buffer.reserve(m_write_buffer_size);
}
m_write_buffer.insert(m_write_buffer.end(),
static_cast<const unsigned char*>(buffer),
static_cast<const unsigned char*>(buffer) + space_in_buffer);
}

// Flush the full buffer
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
return flush_result;
}

// Now handle the remaining data
const unsigned char* remaining_data = static_cast<const unsigned char*>(buffer) + space_in_buffer;
size_t remaining_size = size - space_in_buffer;

// If remaining data fits in buffer, buffer it; otherwise write directly
if (remaining_size <= m_write_buffer_size) {
m_buffer_offset = offset + space_in_buffer;
if (m_write_buffer.capacity() < m_write_buffer_size) {
m_write_buffer.reserve(m_write_buffer_size);
}
m_write_buffer.insert(m_write_buffer.end(),
remaining_data,
remaining_data + remaining_size);
m_nextoff = offset + size;
return size;
} else {
// Remaining data is too large for buffer - fall through to direct write
buffer = remaining_data;
offset = offset + space_in_buffer;
size = remaining_size;
}
}
}
}

// Direct write (no buffering or buffer disabled)
auto result = m_wrapped->Write(buffer, offset, size);
if (result >= 0) {m_nextoff += result;}
if (m_state)
{
m_state->Update(static_cast<const unsigned char*>(buffer), size);
if (result >= 0) {
m_nextoff = offset + result;
if (m_state && result > 0) {
// Only update checksum for the data that was actually written
m_state->Update(static_cast<const unsigned char*>(buffer), result);
}
}
return result;
}



// FlushWriteBuffer: Writes all buffered data to the underlying file system.
// Preconditions: Must be called with m_buffer_mutex held.
// Returns: 0 on success, negative error code on failure.
// Side effects:
// - Writes buffer contents via m_wrapped->Write()
// - Updates checksum state if enabled (m_state)
// - Clears m_write_buffer and resets m_buffer_offset on success
// - Handles partial writes with retry loop
// - On failure, buffer is NOT cleared to allow retry
int MultiuserFile::FlushWriteBuffer()
{
if (m_write_buffer.empty()) {
return 0;
}

size_t total_written = 0;
while (total_written < m_write_buffer.size()) {
auto result = m_wrapped->Write(m_write_buffer.data() + total_written,
m_buffer_offset + total_written,
m_write_buffer.size() - total_written);
if (result < 0) {
// Write failed - don't clear buffer, return error
return result;
}
if (result == 0) {
// No progress - treat as error
return -EIO;
}
total_written += result;
}

if (m_state) {
m_state->Update(m_write_buffer.data(), m_write_buffer.size());
}

m_write_buffer.clear();
m_buffer_offset = -1;

return 0;
}


int MultiuserFile::Close(long long *retsz)
{
// Only take the lock if buffering is configured (m_write_buffer_size > 0)
std::unique_lock<std::mutex> lock(m_buffer_mutex, std::defer_lock);
if (m_write_buffer_size > 0) {
lock.lock();
}

// Flush any remaining buffered data
if (!m_write_buffer.empty()) {
int flush_result = FlushWriteBuffer();
if (flush_result < 0) {
m_log.Emsg("Close", "Failed to flush write buffer");
// Continue with close anyway
}
}

auto close_result = m_wrapped->Close(retsz);
if (m_state)
{
Expand All @@ -210,7 +367,7 @@ int MultiuserFile::Close(long long *retsz)

}
delete m_state;
m_state = NULL;
m_state = nullptr;
}

return close_result;
Expand Down