Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func (c *CSAPI) InviteRoom(t ct.TestLike, roomID string, userID string) *http.Re
return c.Do(t, "POST", []string{"_matrix", "client", "v3", "rooms", roomID, "invite"}, WithJSONBody(t, body))
}

func (c *CSAPI) GetRoomCurrentState(t ct.TestLike, roomID string) *http.Response {
return c.Do(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "state"})
}

func (c *CSAPI) MustGetGlobalAccountData(t ct.TestLike, eventType string) *http.Response {
res := c.GetGlobalAccountData(t, eventType)
mustRespond2xx(t, res)
Expand Down
32 changes: 27 additions & 5 deletions federation/server_room.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ type ServerRoom struct {
TimelineMutex sync.RWMutex
ForwardExtremities []string
Depth int64
waiters map[string][]*helpers.Waiter // room ID -> []Waiter
waitersEvent map[string][]*helpers.Waiter // event ID -> []Waiter
waitersMembership map[string][]*helpers.Waiter // userID -> []Waiter
waitersMu *sync.Mutex
}

Expand All @@ -82,7 +83,8 @@ func NewServerRoom(roomVer gomatrixserverlib.RoomVersion, roomId string) *Server
Version: roomVer,
State: make(map[string]gomatrixserverlib.PDU),
ForwardExtremities: make([]string, 0),
waiters: make(map[string][]*helpers.Waiter),
waitersEvent: make(map[string][]*helpers.Waiter),
waitersMembership: make(map[string][]*helpers.Waiter),
waitersMu: &sync.Mutex{},
}
room.ServerRoomImpl = &ServerRoomImplDefault{}
Expand All @@ -107,10 +109,15 @@ func (r *ServerRoom) AddEvent(ev gomatrixserverlib.PDU) {
// inform waiters
r.waitersMu.Lock()
defer r.waitersMu.Unlock()
for _, w := range r.waiters[ev.EventID()] {
for _, w := range r.waitersEvent[ev.EventID()] {
w.Finish()
}
delete(r.waiters, ev.EventID()) // clear the waiters
if ev.Type() == "m.room.member" {
for _, w := range r.waitersMembership[*ev.StateKey()] {
w.Finish()
}
}
delete(r.waitersEvent, ev.EventID()) // clear the waiters
}

// WaiterForEvent creates a Waiter which waits until the given event ID is added to the room.
Expand All @@ -128,7 +135,7 @@ func (r *ServerRoom) WaiterForEvent(eventID string) *helpers.Waiter {
defer r.TimelineMutex.Unlock()
w := helpers.NewWaiter()
r.waitersMu.Lock()
r.waiters[eventID] = append(r.waiters[eventID], w)
r.waitersEvent[eventID] = append(r.waitersEvent[eventID], w)
r.waitersMu.Unlock()
// check if the event is already there and if so immediately end the wait
for _, ev := range r.Timeline {
Expand All @@ -140,6 +147,21 @@ func (r *ServerRoom) WaiterForEvent(eventID string) *helpers.Waiter {
return w
}

func (r *ServerRoom) WaiterForMembershipEvent(userID string) *helpers.Waiter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this a general WaiterForStateEvent thing 🤷

Might make it more appetizing to write engineered tests

// We need to lock the state so we can check for it at the end
r.StateMutex.Lock()
defer r.StateMutex.Unlock()
w := helpers.NewWaiter()
r.waitersMu.Lock()
r.waitersMembership[userID] = append(r.waitersMembership[userID], w)
r.waitersMu.Unlock()
tuple := fmt.Sprintf("m.room.member\x1f%s", userID)
if _, found := r.State[tuple]; found {
w.Finish()
}
return w
}

// AuthEvents returns the state event IDs of the auth events which authenticate this event
func (r *ServerRoom) AuthEvents(sn gomatrixserverlib.StateNeeded) (eventIDs []string) {
// Guard against returning a nil string slice
Expand Down
179 changes: 179 additions & 0 deletions tests/federation_state_resolution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package tests

import (
"encoding/json"
"io"
"testing"
"time"

"github.com/matrix-org/complement"
"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/ct"
"github.com/matrix-org/complement/federation"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/tidwall/gjson"
)

// Tests state resolution (v2) by forcing a fork with conflicted state, per:
// https://github.com/matrix-org/matrix-spec/issues/1209
// After initial room create and power levels change to enable name setting, we
// have the following:
//
// alice sets name1
//
// |
// |---------------------------.
// | |
// v v
//
// alice sets name2 bob kicks alice (this arrives late)
//
// | |
// | .---------------------'
// v v
//
// what is the current state here
func TestFederationStateResolution(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func TestFederationStateResolution(t *testing.T) {
func TestFederationStateConflictResolution(t *testing.T) {

deployment := complement.Deploy(t, 1)
defer deployment.Destroy(t)
srv := federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandleMakeSendJoinRequests(),
federation.HandleTransactionRequests(nil, nil),
)
srv.UnexpectedRequestsAreErrors = false
cancel := srv.Listen()
defer cancel()

hsName := "hs1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be tempted just to inline this.

The placement here makes it bit weird to figure out whether it pertains to the real homeserver from the deployment vs the engineered homeserver from Complement.


alice := deployment.Register(t, hsName, helpers.RegistrationOpts{})
// Charlie exists so we can view the room after we ban Alice
charlie := deployment.Register(t, hsName, helpers.RegistrationOpts{})

bob := srv.UserID("bob")
ver := alice.GetDefaultRoomVersion(t)
serverRoom := srv.MustMakeRoom(t, ver, federation.InitialRoomEvents(ver, bob))

getName := func() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better as a func getRoomName(...) helper below without it being nested here

res := charlie.GetRoomCurrentState(t, serverRoom.RoomID)
body, err := io.ReadAll(res.Body)
if err != nil {
panic(err)
}

var evs []json.RawMessage
if err := json.Unmarshal(body, &evs); err != nil {
panic(err)
}

for _, ev := range evs {
if gjson.GetBytes(ev, "type").String() == "m.room.name" {
return gjson.GetBytes(ev, "content.name").String()
}
}
return ""
}

// Join Charlie - this will be a federated join as the server is not yet in the room
charlie.MustJoinRoom(t, serverRoom.RoomID, []spec.ServerName{srv.ServerName()})
charlie.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(charlie.UserID, serverRoom.RoomID))
serverRoom.WaiterForMembershipEvent(charlie.UserID).Waitf(t, 5*time.Second, "failed to see alice join complement server")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
serverRoom.WaiterForMembershipEvent(charlie.UserID).Waitf(t, 5*time.Second, "failed to see alice join complement server")
serverRoom.WaiterForMembershipEvent(charlie.UserID).Waitf(t, 5*time.Second, "failed to see charlie join complement server")

serverRoom.MustHaveMembershipForUser(t, charlie.UserID, "join")

// Join Alice - this will be a local send since the server is now in the room
alice.MustJoinRoom(t, serverRoom.RoomID, []spec.ServerName{srv.ServerName()})
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, serverRoom.RoomID))
serverRoom.WaiterForMembershipEvent(alice.UserID).Waitf(t, 5*time.Second, "failed to see alice join complement server")
serverRoom.MustHaveMembershipForUser(t, alice.UserID, "join")

// Send updated power levels so alice can set the room name.
pwlvlEvt := srv.MustCreateEvent(t, serverRoom, federation.Event{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pwlvlEvt := srv.MustCreateEvent(t, serverRoom, federation.Event{
powerlevelEvent := srv.MustCreateEvent(t, serverRoom, federation.Event{

Type: "m.room.power_levels",
StateKey: b.Ptr(""),
Sender: bob,
Content: map[string]any{
"m.room.name": 50,
"users": map[string]any{
alice.UserID: 50,
bob: 100, // as before
},
},
})
serverRoom.AddEvent(pwlvlEvt)
srv.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, hsName), []json.RawMessage{pwlvlEvt.JSON()}, nil)
charlie.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, pwlvlEvt.EventID()))

// Bob set an initial name, this will not usually be in the resolved state
nameEvt := srv.MustCreateEvent(t, serverRoom, federation.Event{
Type: "m.room.name",
StateKey: b.Ptr(""),
Sender: bob,
Content: map[string]any{
"name": "Bob initial",
},
})
serverRoom.AddEvent(nameEvt)
srv.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, hsName), []json.RawMessage{nameEvt.JSON()}, nil)
charlie.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, nameEvt.EventID()))
if getName() != "Bob initial" {
ct.Fatalf(t, "first initial name not set")
}

// Alice now sets the room name, which is allowed. Notably it is this event that will be lost
// due to the state reset.
e2 := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name especially since we refer to this name in the error messages

Suggested change
e2 := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
roomNameChangeFromAlice := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{

(applies to all of the e2, s2, type stuff)

Type: "m.room.name",
StateKey: b.Ptr(""),
Content: map[string]any{
"name": "Alice initial",
},
})
// wait until bob's server sees e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// wait until bob's server sees e
// wait until bob's server sees the event

serverRoom.WaiterForEvent(e2).Waitf(t, 5*time.Second, "failed to see e2 (%s) on complement server", e2)
if getName() != "Alice initial" {
ct.Fatalf(t, "second initial name not set")
}

// create S2, which kicks alice but don't send it yet
s2 := srv.MustCreateEvent(t, serverRoom, federation.Event{
Type: "m.room.member",
StateKey: b.Ptr(alice.UserID),
Sender: bob,
Content: map[string]any{
"membership": "ban",
},
})
serverRoom.AddEvent(s2)

// Alice now sets the room name agian, which is allowed currently, but should be removed
e3 := alice.SendEventSynced(t, serverRoom.RoomID, b.Event{
Type: "m.room.name",
StateKey: b.Ptr(""),
Content: map[string]any{
"name": "Bad name",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"name": "Bad name",
"name": "another name",

It's not a bad name so let's not mark it as such. It's "some state that will be rolled back because we will later do xxx"

},
})
// wait until bob's server sees e
serverRoom.WaiterForEvent(e3).Waitf(t, 5*time.Second, "failed to see e3 (%s) on complement server", e3)
if getName() != "Bad name" {
ct.Fatalf(t, "First updated name not set")
}

// Now send s2, which has prev event e2 same as e3, resulting in two extremeties in the DAG that
// must be resolved, which should undo e3.
srv.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, hsName), []json.RawMessage{s2.JSON()}, nil)
// wait until we see S2 to ensure the server has processed this.
charlie.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(serverRoom.RoomID, s2.EventID()))

// Check the name was reverted or dropped. Although this seems counterintuitive the name being
// dropped is an unfortunate consequence of the state res v2 algorithm, as described here:
// https://github.com/matrix-org/matrix-spec-proposals/blob/erikj/state_res_msc/proposals/1442-state-resolution.md#state-resets
if name := getName(); name == "Bad name" {
ct.Fatalf(t, "initial name not restored after state resolution, got: %s", name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also show the expected name

} else if name == "" {
t.Log("original name not restored")
}
Comment on lines +174 to +178
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this check to look more like, if not equal to the name we expect to be rolled back to then error

}
Loading