Skip to content
Closed
90 changes: 40 additions & 50 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,34 +819,6 @@ func (db *DatabaseCollectionWithUser) getAvailableRev(ctx context.Context, doc *
return nil, "", nil, ErrMissing
}

// Returns the 1x-style body of the asked-for revision or the most recent available ancestor.
func (db *DatabaseCollectionWithUser) getAvailable1xRev(ctx context.Context, doc *Document, revid string) ([]byte, error) {
bodyBytes, ancestorRevID, attachments, err := db.getAvailableRev(ctx, doc, revid)
if err != nil {
return nil, err
}

kvPairs := []base.KVPair{
{Key: BodyId, Val: doc.ID},
{Key: BodyRev, Val: ancestorRevID},
}

if ancestorRev, ok := doc.History[ancestorRevID]; ok && ancestorRev != nil && ancestorRev.Deleted {
kvPairs = append(kvPairs, base.KVPair{Key: BodyDeleted, Val: true})
}

if len(attachments) > 0 {
kvPairs = append(kvPairs, base.KVPair{Key: BodyAttachments, Val: attachments})
}

bodyBytes, err = base.InjectJSONProperties(bodyBytes, kvPairs...)
if err != nil {
return nil, err
}

return bodyBytes, nil
}

// Returns the attachments of the asked-for revision or the most recent available ancestor.
// Returns nil if no attachments or ancestors are found.
func (db *DatabaseCollectionWithUser) getAvailableRevAttachments(ctx context.Context, doc *Document, revid string) (ancestorAttachments AttachmentsMeta, foundAncestor bool) {
Expand Down Expand Up @@ -1202,13 +1174,13 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
if conflictErr != nil {
if db.ForceAPIForbiddenErrors() {
// Make sure the user has permission to modify the document before confirming doc existence
mutableBody, metaMap, newRevID, err := db.prepareSyncFn(doc, newDoc)
mutableBody, metaMap, err := db.prepareDocForSyncFn(ctx, doc, newDoc.Body(ctx), newDoc.RevID, newDoc.Deleted)
if err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Failed to prepare to run sync function: %v", err)
return nil, nil, false, nil, ErrForbidden
}

_, _, _, _, _, err = db.runSyncFn(ctx, doc, mutableBody, metaMap, newRevID)
_, _, _, _, _, err = db.runSyncFn(ctx, doc, mutableBody, metaMap, newDoc.RevID)
if err != nil {
base.DebugfCtx(ctx, base.KeyCRUD, "Could not modify doc %q due to %s and sync func rejection: %v", base.UD(doc.ID), conflictErr, err)
return nil, nil, false, nil, ErrForbidden
Expand Down Expand Up @@ -1651,7 +1623,7 @@ func (db *DatabaseCollectionWithUser) PutExistingRevWithBody(ctx context.Context
// access map for users, roles, handler errors and sync fn exceptions.
// If syncFn is provided, it will be used instead of the one configured on the database.
func (db *DatabaseCollectionWithUser) SyncFnDryRun(ctx context.Context, newDoc, oldDoc *Document, userMeta, syncOptions map[string]any, syncFn string, errorLogFunc, infoLogFunc func(string)) (*channels.ChannelMapperOutput, error) {
mutableBody, metaMap, _, err := db.prepareSyncFn(oldDoc, newDoc)
mutableBody, metaMap, err := db.prepareDocForSyncFn(ctx, oldDoc, newDoc.Body(ctx), newDoc.RevID, newDoc.Deleted)
if err != nil {
base.InfofCtx(ctx, base.KeyDiagnostic, "Failed to prepare to run sync function: %v", err)
return nil, err
Expand Down Expand Up @@ -2178,8 +2150,14 @@ func (db *DatabaseCollectionWithUser) storeOldBodyInRevTreeAndUpdateCurrent(ctx
}
doc.setNonWinningRevisionBody(prevCurrentRev, oldBodyJson, db.AllowExternalRevBodyStorage(), oldDocHasAttachments)
}
// Store the new revision body into the doc:
// Store the new revision body into the doc. For tombstones, newDoc._body is an empty Body{}
// placeholder — copying it causes MarshalWithXattrs to serialize it as "{}", writing a spurious
// 2-byte body. Tombstones carry no body, so clear it instead.
//if !newDoc.Deleted {
doc.setRevisionBody(ctx, newRevID, newDoc, db.AllowExternalRevBodyStorage(), newDocHasAttachments)
//} else {
// doc.RemoveBody()
//}
doc.SetAttachments(newDoc.Attachments())
doc.MetadataOnlyUpdate = newDoc.MetadataOnlyUpdate

Expand All @@ -2197,28 +2175,27 @@ func (db *DatabaseCollectionWithUser) storeOldBodyInRevTreeAndUpdateCurrent(ctx
}
}

func (db *DatabaseCollectionWithUser) prepareSyncFn(doc *Document, newDoc *Document) (mutableBody Body, metaMap map[string]any, newRevID string, err error) {
// Marshal raw user xattrs for use in Sync Fn. If this fails we can bail out so we should do early as possible.
func (db *DatabaseCollectionWithUser) prepareDocForSyncFn(ctx context.Context, doc *Document, body Body, revID string, tombstone bool) (mutableBody Body, metaMap map[string]any, err error) {
metaMap, err = doc.GetMetaMap(db.UserXattrKey())
if err != nil {
return
}

mutableBody, err = newDoc.GetDeepMutableBody()
if err != nil {
mutableBody = body.DeepCopy(ctx)
if mutableBody == nil {
// Fail fast when DeepCopy cannot produce a writable body. Returning an error here avoids
// panicking on the metadata assignments below and preserves the original body semantics.
err = fmt.Errorf("failed to deep copy document body for sync function")
return
}

err = validateNewBody(mutableBody)
if err != nil {
return
}

newRevID = newDoc.RevID

mutableBody[BodyId] = doc.ID
mutableBody[BodyRev] = newRevID
if newDoc.Deleted {
mutableBody[BodyRev] = revID
if tombstone {
mutableBody[BodyDeleted] = true
}

Expand All @@ -2239,13 +2216,21 @@ func (db *DatabaseCollectionWithUser) runSyncFn(ctx context.Context, doc *Docume
func (db *DatabaseCollectionWithUser) recalculateSyncFnForActiveRev(ctx context.Context, doc *Document, metaMap map[string]any, newRevID string) (channelSet base.Set, access, roles channels.AccessMap, syncExpiry *uint32, oldBodyJSON string, err error) {
// In some cases an older revision might become the current one. If so, get its
// channels & access, for purposes of updating the doc:
curBodyBytes, err := db.getAvailable1xRev(ctx, doc, doc.GetRevTreeID())

ancestorBody, ancestorRevID, _, err := db.getAvailableRev(ctx, doc, doc.GetRevTreeID())
if err != nil {
return
}

var curBody Body
err = curBody.Unmarshal(curBodyBytes)
var mutableBody Body
err = mutableBody.Unmarshal(ancestorBody)
if err != nil {
return
}
var isTombstone bool
if ancestorRev, ok := doc.History[ancestorRevID]; ok && ancestorRev != nil && ancestorRev.Deleted {
isTombstone = true
}
curBody, _, err := db.prepareDocForSyncFn(ctx, doc, mutableBody, doc.GetRevTreeID(), isTombstone)
if err != nil {
return
}
Expand Down Expand Up @@ -2496,7 +2481,8 @@ func (col *DatabaseCollectionWithUser) documentUpdateFunc(
return
}

mutableBody, metaMap, newRevID, err := col.prepareSyncFn(doc, newDoc)
newRevID := newDoc.RevID
mutableBody, metaMap, err := col.prepareDocForSyncFn(ctx, doc, newDoc.Body(ctx), newDoc.RevID, newDoc.Deleted)
if err != nil {
return
}
Expand Down Expand Up @@ -2948,15 +2934,19 @@ func (db *DatabaseCollectionWithUser) postWriteUpdateHLV(ctx context.Context, do
// we don't need to store revision body backups without delta sync in 4.0, since all clients know how to use the sendReplacementRevs feature
backupRev := db.deltaSyncEnabled() && db.deltaSyncRevMaxAgeSeconds() != 0
if db.UseXattrs() && backupRev {
var newBodyWithAtts = doc._rawBody

newBodyWithAtts, err := doc.BodyBytes(ctx)
if err != nil {
base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q cv=%q err=%v ", base.UD(doc.ID), base.UD(doc.GetRevTreeID()), base.UD(doc.HLV.GetCurrentVersionString()), err)

}
Comment on lines +2938 to +2942
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary marshalling work when there are no attachments on a doc. Move inside the attachment if block.

This is on the replicator codepath, and thus sensitive to throughput/performance concerns.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should stay outside itself, because the new Body is used later by setOldRevisionJSON.

if len(doc.Attachments()) > 0 {
var err error
newBodyWithAtts, err = base.InjectJSONProperties(doc._rawBody, base.KVPair{
newBodyWithAtts, err = base.InjectJSONProperties(newBodyWithAtts, base.KVPair{
Key: BodyAttachments,
Val: doc.Attachments(),
})
if err != nil {
base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q cv=%q err=%v ", base.UD(doc.ID), doc.GetRevTreeID(), doc.HLV.GetCurrentVersionString(), err)
base.WarnfCtx(ctx, "Unable to marshal new revision body during backupRevisionJSON: doc=%q rev=%q cv=%q err=%v ", base.UD(doc.ID), base.UD(doc.GetRevTreeID()), base.UD(doc.HLV.GetCurrentVersionString()), err)
return doc
}
}
Expand Down
8 changes: 6 additions & 2 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,9 @@ func TestPutExistingCurrentVersion(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Value)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)
docBytes, err := doc.BodyBytes(ctx)
require.NoError(t, err)
assert.Equal(t, []byte(`{"key1":"value2"}`), docBytes)

// assert on the sync data from the above update to the doc
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
Expand Down Expand Up @@ -1926,7 +1928,9 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
// assert on returned CV value
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Value)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)
docBytes, err := doc.BodyBytes(ctx)
require.NoError(t, err)
assert.Equal(t, []byte(`{"key1":"value2"}`), docBytes)

// assert on the sync data from the above update to the doc
// CV should be equal to CV of update on client but the cvCAS should be updated with the new update and
Expand Down
20 changes: 14 additions & 6 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1750,19 +1750,27 @@ func (db *DatabaseCollectionWithUser) getResyncedDocument(ctx context.Context, d
// Run the sync fn over each current/leaf revision, in case there are conflicts:
changed := 0
doc.History.forEachLeaf(func(rev *RevInfo) {
bodyBytes, _, err := db.get1xRevFromDoc(ctx, doc, rev.ID, false)

revBodyBytes, _, _, err := db.getRevision(ctx, doc, rev.ID)
if err != nil {
base.WarnfCtx(ctx, "Error getting rev from doc %s/%s %s", base.UD(docid), rev.ID, err)
base.WarnfCtx(ctx, "Unable to retrieve body for doc %s rev %s: %v", base.UD(doc.ID), base.UD(rev.ID), err)
return
}
var body Body
if err := body.Unmarshal(bodyBytes); err != nil {
base.WarnfCtx(ctx, "Error unmarshalling body %s/%s for sync function %s", base.UD(docid), rev.ID, err)
var mutableBody Body
if err := mutableBody.Unmarshal(revBodyBytes); err != nil {
base.WarnfCtx(ctx, "Unable to unmarshal body for doc %s rev %s: %v", base.UD(doc.ID), base.UD(rev.ID), err)
return
}
metaMap, err := doc.GetMetaMap(db.UserXattrKey())
var isTombstone bool
if ancestorRev, ok := doc.History[rev.ID]; ok && ancestorRev != nil && ancestorRev.Deleted {
isTombstone = true
}
body, metaMap, err := db.prepareDocForSyncFn(ctx, doc, mutableBody, rev.ID, isTombstone)
if err != nil {
base.WarnfCtx(ctx, "Unable to prepare doc %s for rev %s: %v", base.UD(doc.ID), base.UD(rev.ID), err)
return
}

channels, access, roles, syncExpiry, _, err := db.getChannelsAndAccess(ctx, doc, body, metaMap, rev.ID)
if err != nil {
// Probably the validator rejected the doc
Expand Down
1 change: 1 addition & 0 deletions db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (db *DatabaseCollectionWithUser) importDoc(ctx context.Context, docid strin
return nil, base.ErrEmptyDocument
}
body = Body{}
body[BodyDeleted] = true
}

newDoc := &Document{
Expand Down
4 changes: 2 additions & 2 deletions db/utilities_hlv_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ func (db *DatabaseCollectionWithUser) CreateDocNoHLV(t testing.TB, ctx context.C
if conflictErr != nil {
if db.ForceAPIForbiddenErrors() {
// Make sure the user has permission to modify the document before confirming doc existence
mutableBody, metaMap, newRevID, err := db.prepareSyncFn(doc, newDoc)
mutableBody, metaMap, err := db.prepareDocForSyncFn(ctx, doc, newDoc.Body(ctx), newDoc.RevID, newDoc.Deleted)
if err != nil {
base.InfofCtx(ctx, base.KeyCRUD, "Failed to prepare to run sync function: %v", err)
return nil, nil, false, nil, ErrForbidden
}

_, _, _, _, _, err = db.runSyncFn(ctx, doc, mutableBody, metaMap, newRevID)
_, _, _, _, _, err = db.runSyncFn(ctx, doc, mutableBody, metaMap, newDoc.RevID)
if err != nil {
base.DebugfCtx(ctx, base.KeyCRUD, "Could not modify doc %q due to %s and sync func rejection: %v", base.UD(doc.ID), conflictErr, err)
return nil, nil, false, nil, ErrForbidden
Expand Down
4 changes: 3 additions & 1 deletion db/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func validateAPIDocUpdate(body Body) error {

// validateImportBody validates incoming import bodies
func validateImportBody(body Body) error {
if isPurged, ok := body[BodyPurged].(bool); ok && isPurged {
// Treat any occurrence of _purged as a purge/import-cancel signal so malformed
// values can't bypass reserved-property validation and get silently stripped later.
if _, ok := body[BodyPurged]; ok {
return base.ErrImportCancelledPurged
}

Expand Down
Loading