diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9f2e47b7..6f1fa84bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -317,7 +317,7 @@ jobs: with: repository: 'oceanprotocol/ocean.js' path: 'ocean.js' - ref: feature/refactor_signatures + ref: main - name: Build ocean-js working-directory: ${{ github.workspace }}/ocean.js run: | diff --git a/src/components/c2d/compute_engine_base.ts b/src/components/c2d/compute_engine_base.ts index 671eee947..1d6533814 100644 --- a/src/components/c2d/compute_engine_base.ts +++ b/src/components/c2d/compute_engine_base.ts @@ -326,8 +326,14 @@ export abstract class C2DEngine { for (const request of activeResources) { let envResource = this.getResource(env.resources, request.id) if (!envResource) throw new Error(`No such resource ${request.id}`) - if (envResource.total - envResource.inUse < request.amount) + if (envResource.total - envResource.inUse < request.amount) { + console.log({ + totalResources: envResource.total, + inUseResources: envResource.inUse, + requested: request.amount + }) throw new Error(`Not enough available ${request.id}`) + } if (isFree) { if (!env.free) throw new Error(`No free resources`) envResource = this.getResource(env.free?.resources, request.id) diff --git a/src/components/database/DatabaseFactory.ts b/src/components/database/DatabaseFactory.ts index e83549f9d..4d5d35603 100644 --- a/src/components/database/DatabaseFactory.ts +++ b/src/components/database/DatabaseFactory.ts @@ -20,7 +20,7 @@ import { TypesenseIndexerDatabase, TypesenseLogDatabase, TypesenseOrderDatabase -} from './TypenseDatabase.js' +} from './TypesenseDatabase.js' import { elasticSchemas } from './ElasticSchemas.js' import { IDdoStateQuery } from '../../@types/DDO/IDdoStateQuery.js' import { TypesenseDdoStateQuery } from './TypesenseDdoStateQuery.js' diff --git a/src/components/database/ElasticSearchDatabase.ts b/src/components/database/ElasticSearchDatabase.ts index bdb08beab..bfd676531 100644 --- a/src/components/database/ElasticSearchDatabase.ts +++ b/src/components/database/ElasticSearchDatabase.ts @@ -12,8 +12,8 @@ import { ElasticsearchSchema } from './ElasticSchemas.js' import { DATABASE_LOGGER } from '../../utils/logging/common.js' import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js' -import { DDOManager } from '@oceanprotocol/ddo-js' import { validateDDO } from '../../utils/asset.js' +import { DDOManager } from '@oceanprotocol/ddo-js' export class ElasticsearchIndexerDatabase extends AbstractIndexerDatabase { private client: Client @@ -76,7 +76,8 @@ export class ElasticsearchIndexerDatabase extends AbstractIndexerDatabase { try { const result = await this.client.get({ index: this.index, - id: network.toString() + id: network.toString(), + refresh: true }) return result._source } catch (error) { diff --git a/src/components/database/TypenseDatabase.ts b/src/components/database/TypesenseDatabase.ts similarity index 99% rename from src/components/database/TypenseDatabase.ts rename to src/components/database/TypesenseDatabase.ts index 1dc5c3166..4129fdf02 100644 --- a/src/components/database/TypenseDatabase.ts +++ b/src/components/database/TypesenseDatabase.ts @@ -13,8 +13,8 @@ import { AbstractLogDatabase, AbstractOrderDatabase } from './BaseDatabase.js' -import { DDOManager } from '@oceanprotocol/ddo-js' import { validateDDO } from '../../utils/asset.js' +import { DDOManager } from '@oceanprotocol/ddo-js' export class TypesenseOrderDatabase extends AbstractOrderDatabase { private provider: Typesense @@ -372,6 +372,7 @@ export class TypesenseDdoDatabase extends AbstractDdoDatabase { getDDOSchema(ddo: Record): TypesenseSchema { // Find the schema based on the DDO version OR use the short DDO schema when state !== 0 let schemaName: string + const ddoInstance = DDOManager.getDDOClass(ddo) const ddoData = ddoInstance.getDDOData() if ('indexedMetadata' in ddoData && ddoData?.indexedMetadata?.nft.state !== 0) { diff --git a/src/index.ts b/src/index.ts index 67b9093f9..9aa8fcba8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -158,3 +158,10 @@ if (config.hasHttp) { // Call the function to schedule the cron job to delete old logs scheduleCronJobs(oceanNode) } + +process.on('unhandledRejection', (reason) => { + console.log({ reason }) +}) +process.on('uncaughtException', (reason) => { + console.log({ reason }) +}) diff --git a/src/test/integration/compute.test.ts b/src/test/integration/compute.test.ts index 1414d1770..5aef6431e 100644 --- a/src/test/integration/compute.test.ts +++ b/src/test/integration/compute.test.ts @@ -154,16 +154,13 @@ describe('Compute', () => { ) config = await getConfiguration(true) dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance( - config, - dbconn, - null, - null, - null, - null, - null, - true - ) + + const staleJobs = await dbconn.c2d.getRunningJobs() + for (const job of staleJobs) { + await dbconn.c2d.deleteJob(job.jobId) + } + + oceanNode = OceanNode.getInstance(config, dbconn, null, null, null, null, null, true) indexer = new OceanIndexer( dbconn, config.indexingNetworks, @@ -641,62 +638,43 @@ describe('Compute', () => { assert(!response.stream, 'We should not have a stream') }) - it('should start a compute job with maxed resources', async () => { - // first check escrow auth - - let balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) - let funds = await oceanNode.escrow.getUserAvailableFunds( + it('should fail to start a compute job without escrow funds', async () => { + // ensure clean escrow state: no funds, no auths, no locks + const funds = await oceanNode.escrow.getUserAvailableFunds( DEVELOPMENT_CHAIN_ID, await consumerAccount.getAddress(), paymentToken ) - // make sure we have 0 funds if (BigInt(funds.toString()) > BigInt(0)) { await escrowContract .connect(consumerAccount) .withdraw([initializeResponse.payment.token], [funds]) } - let auth = await oceanNode.escrow.getAuthorizations( + const auth = await oceanNode.escrow.getAuthorizations( DEVELOPMENT_CHAIN_ID, paymentToken, await consumerAccount.getAddress(), firstEnv.consumerAddress ) if (auth.length > 0) { - // remove any auths await escrowContract .connect(consumerAccount) .authorize(initializeResponse.payment.token, firstEnv.consumerAddress, 0, 0, 0) } - let locks = await oceanNode.escrow.getLocks( + const locks = await oceanNode.escrow.getLocks( DEVELOPMENT_CHAIN_ID, paymentToken, await consumerAccount.getAddress(), firstEnv.consumerAddress ) - - if (locks.length > 0) { - // cancel all locks - for (const lock of locks) { - try { - await escrowContract - .connect(consumerAccount) - .cancelExpiredLock( - lock.jobId, - lock.token, - lock.payer, - firstEnv.consumerAddress - ) - } catch (e) {} - } - locks = await oceanNode.escrow.getLocks( - DEVELOPMENT_CHAIN_ID, - paymentToken, - await consumerAccount.getAddress(), - firstEnv.consumerAddress - ) + for (const lock of locks) { + try { + await escrowContract + .connect(consumerAccount) + .cancelExpiredLock(lock.jobId, lock.token, lock.payer, firstEnv.consumerAddress) + } catch (e) {} } - const locksBefore = locks.length + const nonce = Date.now().toString() const messageHashBytes = createHashForSignature( await consumerAccount.getAddress(), @@ -738,15 +716,17 @@ describe('Compute', () => { additionalViewers: [await additionalViewerAccount.getAddress()], maxJobDuration: computeJobDuration, resources: re - // additionalDatasets?: ComputeAsset[] - // output?: ComputeOutput } - // it should fail, because we don't have funds & auths in escrow - let response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) assert(response.status.httpStatus === 400, 'Failed to get 400 response') assert(!response.stream, 'We should not have a stream') - // let's put funds in escrow & create an auth - balance = await paymentTokenContract.balanceOf(await consumerAccount.getAddress()) + }) + + it('should start a compute job with maxed resources', async () => { + // deposit funds and create auth in escrow + const balance = await paymentTokenContract.balanceOf( + await consumerAccount.getAddress() + ) await paymentTokenContract .connect(consumerAccount) .approve(initializeResponse.payment.escrowAddress, balance) @@ -762,20 +742,13 @@ describe('Compute', () => { initializeResponse.payment.minLockSeconds, 10 ) - auth = await oceanNode.escrow.getAuthorizations( + + const auth = await oceanNode.escrow.getAuthorizations( DEVELOPMENT_CHAIN_ID, paymentToken, await consumerAccount.getAddress(), firstEnv.consumerAddress ) - const authBefore = auth[0] - funds = await oceanNode.escrow.getUserAvailableFunds( - DEVELOPMENT_CHAIN_ID, - await consumerAccount.getAddress(), - paymentToken - ) - const fundsBefore = funds - assert(BigInt(funds.toString()) > BigInt(0), 'Should have funds in escrow') assert(auth.length > 0, 'Should have authorization') assert( BigInt(auth[0].maxLockedAmount.toString()) > BigInt(0), @@ -785,19 +758,68 @@ describe('Compute', () => { BigInt(auth[0].maxLockCounts.toString()) > BigInt(0), ' Should have maxLockCounts in auth' ) - const nonce2 = Date.now().toString() - const messageHashBytes2 = createHashForSignature( + const authBefore = auth[0] + + const fundsBefore = await oceanNode.escrow.getUserAvailableFunds( + DEVELOPMENT_CHAIN_ID, + await consumerAccount.getAddress(), + paymentToken + ) + assert(BigInt(fundsBefore.toString()) > BigInt(0), 'Should have funds in escrow') + + const locksBefore = ( + await oceanNode.escrow.getLocks( + DEVELOPMENT_CHAIN_ID, + paymentToken, + await consumerAccount.getAddress(), + firstEnv.consumerAddress + ) + ).length + + const nonce = Date.now().toString() + const messageHashBytes = createHashForSignature( await consumerAccount.getAddress(), - nonce2, + nonce, PROTOCOL_COMMANDS.COMPUTE_START ) - const signature2 = await safeSign(consumerAccount, messageHashBytes2) - response = await new PaidComputeStartHandler(oceanNode).handle({ - ...startComputeTask, - nonce: nonce2, - signature: signature2 - }) - console.log(response) + const signature = await safeSign(consumerAccount, messageHashBytes) + const re = [] + for (const res of firstEnv.resources) { + re.push({ id: res.id, amount: res.total }) + } + const startComputeTask: PaidComputeStartCommand = { + command: PROTOCOL_COMMANDS.COMPUTE_START, + consumerAddress: await consumerAccount.getAddress(), + signature, + nonce, + environment: firstEnv.id, + datasets: [ + { + documentId: publishedComputeDataset.ddo.id, + serviceId: publishedComputeDataset.ddo.services[0].id, + transferTxId: datasetOrderTxId + } + ], + algorithm: { + documentId: publishedAlgoDataset.ddo.id, + serviceId: publishedAlgoDataset.ddo.services[0].id, + transferTxId: algoOrderTxId, + meta: publishedAlgoDataset.ddo.metadata.algorithm + }, + output: {}, + payment: { + chainId: DEVELOPMENT_CHAIN_ID, + token: paymentToken + }, + metadata: { + key: 'value' + }, + additionalViewers: [await additionalViewerAccount.getAddress()], + maxJobDuration: computeJobDuration, + resources: re + } + const response = await new PaidComputeStartHandler(oceanNode).handle(startComputeTask) + console.log({ response }) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -807,29 +829,35 @@ describe('Compute', () => { // eslint-disable-next-line prefer-destructuring jobId = jobs[0].jobId console.log('**** Started compute job with id: ', jobId) - // check escrow - funds = await oceanNode.escrow.getUserAvailableFunds( + + // check escrow state changed after job start + const fundsAfter = await oceanNode.escrow.getUserAvailableFunds( DEVELOPMENT_CHAIN_ID, await consumerAccount.getAddress(), paymentToken ) - assert(fundsBefore > funds, 'We should have less funds') - locks = await oceanNode.escrow.getLocks( + assert(fundsBefore > fundsAfter, 'We should have less funds') + + const locksAfter = await oceanNode.escrow.getLocks( DEVELOPMENT_CHAIN_ID, paymentToken, await consumerAccount.getAddress(), firstEnv.consumerAddress ) - assert(locks.length > locksBefore, 'We should have locks') - auth = await oceanNode.escrow.getAuthorizations( + assert(locksAfter.length > locksBefore, 'We should have locks') + + const authAfter = await oceanNode.escrow.getAuthorizations( DEVELOPMENT_CHAIN_ID, paymentToken, await consumerAccount.getAddress(), firstEnv.consumerAddress ) - assert(auth[0].currentLocks > authBefore.currentLocks, 'We should have running jobs') assert( - auth[0].currentLockedAmount > authBefore.currentLockedAmount, + authAfter[0].currentLocks > authBefore.currentLocks, + 'We should have running jobs' + ) + assert( + authAfter[0].currentLockedAmount > authBefore.currentLockedAmount, 'We should have higher currentLockedAmount' ) }) @@ -2124,7 +2152,7 @@ describe('Compute Access Restrictions', () => { ) config = await getConfiguration(true) dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance( + oceanNode = OceanNode.getInstance( config, dbconn, null, @@ -2315,7 +2343,7 @@ describe('Compute Access Restrictions', () => { ) config = await getConfiguration(true) dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance( + oceanNode = OceanNode.getInstance( config, dbconn, null, @@ -2445,7 +2473,7 @@ describe('Compute Access Restrictions', () => { ) config = await getConfiguration(true) dbconn = await Database.init(config.dbConfig) - oceanNode = await OceanNode.getInstance( + oceanNode = OceanNode.getInstance( config, dbconn, null, @@ -2575,7 +2603,7 @@ describe('Compute Access Restrictions', () => { const now = Math.floor(Date.now() / 1000) const expiry = 3500 - const providerAddress = await (await oceanNode.getKeyManager()).getEthAddress() + const providerAddress = oceanNode.getKeyManager().getEthAddress() // Clean up existing locks and authorizations first const locks = await oceanNode.escrow.getLocks( diff --git a/src/test/integration/indexer.test.ts b/src/test/integration/indexer.test.ts index 1c83d4b42..9a551b85e 100644 --- a/src/test/integration/indexer.test.ts +++ b/src/test/integration/indexer.test.ts @@ -100,7 +100,16 @@ describe('Indexer stores a new metadata events and orders.', () => { const config = await getConfiguration(true) database = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, database) + oceanNode = OceanNode.getInstance( + config, + database, + null, + null, + null, + null, + null, + true + ) indexer = new OceanIndexer( database, mockSupportedNetworks, @@ -270,6 +279,7 @@ describe('Indexer stores a new metadata events and orders.', () => { command: PROTOCOL_COMMANDS.QUERY } const response = await queryDdoStateHandler.handle(queryDdoState) + console.log({ response }) assert(response, 'Failed to get response') assert(response.status.httpStatus === 200, 'Failed to get 200 response') assert(response.stream, 'Failed to get stream') @@ -341,11 +351,12 @@ describe('Indexer stores a new metadata events and orders.', () => { }) it('should get the updated state', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) const result = await nftContract.getMetaData() const { ddo, wasTimeout } = await waitToIndex( assetDID, EVENTS.METADATA_UPDATED, - DEFAULT_TEST_TIMEOUT, + DEFAULT_TEST_TIMEOUT * 3, true ) const retrievedDDO: any = ddo @@ -455,11 +466,11 @@ describe('Indexer stores a new metadata events and orders.', () => { }) it('should get number of orders', async function () { - this.timeout(DEFAULT_TEST_TIMEOUT * 2) + this.timeout(DEFAULT_TEST_TIMEOUT * 4) const { ddo, wasTimeout } = await waitToIndex( assetDID, EVENTS.ORDER_STARTED, - DEFAULT_TEST_TIMEOUT * 2, + DEFAULT_TEST_TIMEOUT * 4, true ) if (ddo) { @@ -596,13 +607,14 @@ describe('Indexer stores a new metadata events and orders.', () => { }) it('Deprecated asset should have a short version of ddo', async function () { + this.timeout(DEFAULT_TEST_TIMEOUT * 3) const result = await nftContract.getMetaData() expect(parseInt(result[2].toString())).to.equal(2) const { ddo, wasTimeout } = await waitToIndex( assetDID, EVENTS.METADATA_STATE, - DEFAULT_TEST_TIMEOUT, + DEFAULT_TEST_TIMEOUT * 3, true ) const resolvedDDO: any = ddo diff --git a/src/test/unit/download.test.ts b/src/test/unit/download.test.ts index 4fc8779a3..f583c37a4 100644 --- a/src/test/unit/download.test.ts +++ b/src/test/unit/download.test.ts @@ -21,6 +21,7 @@ import { DEVELOPMENT_CHAIN_ID, KNOWN_CONFIDENTIAL_EVMS } from '../../utils/addre import { DDO } from '@oceanprotocol/ddo-js' import { Wallet } from 'ethers' import { createHashForSignature, safeSign } from '../utils/signature.js' +import { KeyManager } from '../../components/KeyManager/index.js' let envOverrides: OverrideEnvConfig[] let config: OceanNodeConfig @@ -37,8 +38,18 @@ describe('Should validate files structure for download', () => { envOverrides = await setupEnvironment(TEST_ENV_CONFIG_FILE, envOverrides) config = await getConfiguration(true) + const keyManager = new KeyManager(config) db = await Database.init(config.dbConfig) - oceanNode = OceanNode.getInstance(config, db) + oceanNode = OceanNode.getInstance( + config, + db, + null, + null, + null, + keyManager, + null, + true + ) consumerAccount = new Wallet(process.env.PRIVATE_KEY) })