Skip to content

Commit 4fc1fb3

Browse files
authored
Merge pull request #663 from utopia-php/dat-610
Add createOrUpdateDocuments method and filter hooks for document upserts
2 parents e194da3 + 139ee27 commit 4fc1fb3

3 files changed

Lines changed: 163 additions & 0 deletions

File tree

src/Database/Mirror.php

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,75 @@ function ($doc) use ($onNext, &$modified) {
781781
return $modified;
782782
}
783783

784+
public function createOrUpdateDocuments(string $collection, array $documents, int $batchSize = Database::INSERT_BATCH_SIZE, callable|null $onNext = null): int
785+
{
786+
$modified = 0;
787+
$this->source->createOrUpdateDocuments(
788+
$collection,
789+
$documents,
790+
$batchSize,
791+
function ($doc) use ($onNext, &$modified) {
792+
$onNext && $onNext($doc);
793+
$modified++;
794+
}
795+
);
796+
797+
if (
798+
\in_array($collection, self::SOURCE_ONLY_COLLECTIONS)
799+
|| $this->destination === null
800+
) {
801+
return $modified;
802+
}
803+
804+
$upgrade = $this->silent(fn () => $this->getUpgradeStatus($collection));
805+
if ($upgrade === null || $upgrade->getAttribute('status', '') !== 'upgraded') {
806+
return $modified;
807+
}
808+
809+
try {
810+
$clones = [];
811+
812+
foreach ($documents as $document) {
813+
$clone = clone $document;
814+
815+
foreach ($this->writeFilters as $filter) {
816+
$clone = $filter->beforeCreateOrUpdateDocument(
817+
source: $this->source,
818+
destination: $this->destination,
819+
collectionId: $collection,
820+
document: $clone,
821+
);
822+
}
823+
824+
$clones[] = $clone;
825+
}
826+
827+
$modified = $this->destination->withPreserveDates(
828+
fn () =>
829+
$this->destination->createOrUpdateDocuments(
830+
$collection,
831+
$clones,
832+
$batchSize,
833+
null,
834+
)
835+
);
836+
837+
foreach ($clones as $clone) {
838+
foreach ($this->writeFilters as $filter) {
839+
$filter->afterCreateOrUpdateDocument(
840+
source: $this->source,
841+
destination: $this->destination,
842+
collectionId: $collection,
843+
document: $clone,
844+
);
845+
}
846+
}
847+
} catch (\Throwable $err) {
848+
$this->logError('createDocuments', $err);
849+
}
850+
return $modified;
851+
}
852+
784853
public function deleteDocument(string $collection, string $id): bool
785854
{
786855
$result = $this->source->deleteDocument($collection, $id);

src/Database/Mirroring/Filter.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,40 @@ public function afterDeleteDocuments(
359359
array $queries
360360
): void {
361361
}
362+
363+
/**
364+
* Called before document is upserted in the destination database
365+
*
366+
* @param Database $source
367+
* @param Database $destination
368+
* @param string $collectionId
369+
* @param Document $document
370+
* @return Document
371+
*/
372+
public function beforeCreateOrUpdateDocument(
373+
Database $source,
374+
Database $destination,
375+
string $collectionId,
376+
Document $document,
377+
): Document {
378+
return $document;
379+
}
380+
381+
/**
382+
* Called after document is upserted in the destination database
383+
*
384+
* @param Database $source
385+
* @param Database $destination
386+
* @param string $collectionId
387+
* @param Document $document
388+
* @return Document
389+
*/
390+
public function afterCreateOrUpdateDocument(
391+
Database $source,
392+
Database $destination,
393+
string $collectionId,
394+
Document $document,
395+
): Document {
396+
return $document;
397+
}
362398
}

tests/e2e/Adapter/Scopes/DocumentTests.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5788,4 +5788,62 @@ public function testUpsertDateOperations(): void
57885788
$database->setPreserveDates(false);
57895789
$database->deleteCollection($collection);
57905790
}
5791+
5792+
public function testUpdateDocumentsCount(): void
5793+
{
5794+
/** @var Database $database */
5795+
$database = static::getDatabase();
5796+
5797+
if (!$database->getAdapter()->getSupportForUpserts()) {
5798+
return;
5799+
}
5800+
5801+
$collectionName = "update_count";
5802+
$database->createCollection($collectionName);
5803+
5804+
$database->createAttribute($collectionName, 'key', Database::VAR_STRING, 60, false);
5805+
$database->createAttribute($collectionName, 'value', Database::VAR_STRING, 60, false);
5806+
5807+
$permissions = [Permission::read(Role::any()), Permission::write(Role::any()),Permission::update(Role::any())];
5808+
5809+
$docs = [
5810+
new Document([
5811+
'$id' => 'bulk_upsert1',
5812+
'$permissions' => $permissions,
5813+
'key' => 'bulk_upsert1_initial',
5814+
]),
5815+
new Document([
5816+
'$id' => 'bulk_upsert2',
5817+
'$permissions' => $permissions,
5818+
'key' => 'bulk_upsert2_initial',
5819+
]),
5820+
new Document([
5821+
'$id' => 'bulk_upsert3',
5822+
'$permissions' => $permissions,
5823+
'key' => 'bulk_upsert3_initial',
5824+
]),
5825+
new Document([
5826+
'$id' => 'bulk_upsert4',
5827+
'$permissions' => $permissions,
5828+
'key' => 'bulk_upsert4_initial'
5829+
])
5830+
];
5831+
$upsertUpdateResults = [];
5832+
$count = $database->createOrUpdateDocuments($collectionName, $docs, onNext: function ($doc) use (&$upsertUpdateResults) {
5833+
$upsertUpdateResults[] = $doc;
5834+
});
5835+
$this->assertCount(4, $upsertUpdateResults);
5836+
$this->assertEquals(4, $count);
5837+
5838+
$updates = new Document(['value' => 'test']);
5839+
$newDocs = [];
5840+
$count = $database->updateDocuments($collectionName, $updates, onNext:function ($doc) use (&$newDocs) {
5841+
$newDocs[] = $doc;
5842+
});
5843+
5844+
$this->assertCount(4, $newDocs);
5845+
$this->assertEquals(4, $count);
5846+
5847+
$database->deleteCollection($collectionName);
5848+
}
57915849
}

0 commit comments

Comments
 (0)