Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ jobs:
with:
repository: 'oceanprotocol/ocean.js'
path: 'ocean.js'
ref: feature/refactor_signatures
ref: main
- name: Build ocean-js
working-directory: ${{ github.workspace }}/ocean.js
run: |
Expand Down
60 changes: 28 additions & 32 deletions src/components/P2P/handleProtocolCommands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
checkGlobalConnectionsRateLimit,
checkRequestsRateLimit
} from '../../utils/validators.js'
import { lpStream } from '@libp2p/utils'
import type { Connection, Stream } from '@libp2p/interface'

export class ReadableString extends Readable {
Expand Down Expand Up @@ -40,21 +41,30 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true)
P2P_LOGGER.logMessage('Using ' + remoteAddr, true)

const sendErrorAndClose = async (httpStatus: number, error: string) => {
stream.resume()
const lp = lpStream(stream)
const readWriteSignal = () => AbortSignal.timeout(30_000)

const sendErrorAndClose = async (
httpStatus: number,
error: string,
errorDebug?: Record<string, unknown>
) => {
try {
// Check if stream is already closed
if (stream.status === 'closed' || stream.status === 'closing') {
P2P_LOGGER.warn('Stream already closed, cannot send error response')
return
}

// Resume stream in case it's paused - we need to write
stream.resume()
const status = { httpStatus, error }
stream.send(uint8ArrayFromString(JSON.stringify(status)))
const status = errorDebug
? { httpStatus, error, errorDebug }
: { httpStatus, error }
await lp.write(uint8ArrayFromString(JSON.stringify(status)), {
signal: readWriteSignal()
})
await stream.close()
} catch (e) {
P2P_LOGGER.error(`Error sending error response: ${e.message}`)
const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error'
P2P_LOGGER.error(`Error sending error response: ${msg}`)
try {
stream.abort(e as Error)
} catch {}
Expand Down Expand Up @@ -90,25 +100,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
return
}

// Resume the stream. We can now write.
stream.resume()

// v3 streams are AsyncIterable
let task: Command
try {
for await (const chunk of stream) {
try {
const str = uint8ArrayToString(chunk.subarray())
task = JSON.parse(str) as Command
} catch (e) {
await sendErrorAndClose(400, 'Invalid command')
return
}
}
const cmdBytes = await lp.read({ signal: readWriteSignal() })
const str = uint8ArrayToString(cmdBytes.subarray())
task = JSON.parse(str) as Command
} catch (err) {
P2P_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Unable to process P2P command: ${err.message}`
`Unable to process P2P command: ${err?.message ?? err}`
)
await sendErrorAndClose(400, 'Invalid command')
return
Expand All @@ -133,20 +133,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect
task.caller = remotePeer.toString()
const response: P2PCommandResponse = await handler.handle(task)

// Send status first
stream.send(uint8ArrayFromString(JSON.stringify(response.status)))
// Send status first (length-prefixed)
await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), {
signal: readWriteSignal()
})

// Stream data chunks without buffering, with backpressure support
// Stream data chunks as length-prefixed messages
if (response.stream) {
for await (const chunk of response.stream as Readable) {
const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)

// Handle backpressure - if send returns false, wait for drain
if (!stream.send(bytes)) {
await stream.onDrain({
signal: AbortSignal.timeout(30000) // 30 second timeout for drain
})
}
await lp.write(bytes, { signal: readWriteSignal() })
}
}

Expand Down
36 changes: 23 additions & 13 deletions src/components/P2P/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { handleProtocolCommands } from './handlers.js'

import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { lpStream } from '@libp2p/utils'
import type { Stream } from '@libp2p/interface'

import { bootstrap } from '@libp2p/bootstrap'
Expand Down Expand Up @@ -437,6 +438,9 @@ export class OceanP2P extends EventEmitter {
dialTimeout: config.p2pConfig.connectionsDialTimeout,
maxConnections: config.p2pConfig.maxConnections,
maxPeerAddrsToDial: config.p2pConfig.maxPeerAddrsToDial
},
connectionMonitor: {
abortConnectionOnPingFailure: false
}
}
if (config.p2pConfig.bootstrapNodes && config.p2pConfig.bootstrapNodes.length > 0) {
Expand Down Expand Up @@ -778,22 +782,28 @@ export class OceanP2P extends EventEmitter {
}

try {
// Send message and close write side
stream.send(uint8ArrayFromString(message))
await stream.close()
const lp = lpStream(stream)
const writeSignal = AbortSignal.timeout(10_000)
const readSignal = AbortSignal.timeout(10_000)

// Read and parse status from first chunk
const iterator = stream[Symbol.asyncIterator]()
const { done, value } = await iterator.next()
await lp.write(uint8ArrayFromString(message), { signal: writeSignal })

if (done || !value) {
return { status: { httpStatus: 500, error: 'No response from peer' } }
}

const status = JSON.parse(uint8ArrayToString(value.subarray()))
const statusBytes = await lp.read({ signal: readSignal })
const status = JSON.parse(uint8ArrayToString(statusBytes.subarray()))

// Return status and remaining stream
return { status, stream: { [Symbol.asyncIterator]: () => iterator } }
return {
status,
stream: {
[Symbol.asyncIterator]: async function* () {
try {
while (true) {
const chunk = await lp.read()
yield chunk.subarray ? chunk.subarray() : chunk
}
} catch {}
}
}
}
} catch (err) {
P2P_LOGGER.error(`P2P communication error: ${err.message}`)
try {
Expand Down
Loading