Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 169 additions & 51 deletions tests/msc4140/delayed_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package tests

import (
"fmt"
"io"
"math"
"net/http"
"net/url"
"testing"
Expand Down Expand Up @@ -50,7 +52,7 @@ func TestDelayedEvents(t *testing.T) {
user2.MustJoinRoom(t, roomID, nil)

t.Run("delayed events are empty on startup", func(t *testing.T) {
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I needed some less than/greater than comparisons in the refactored test logic, I decided to make matchDelayedEvents like the MustSyncUntil where we can pass in a bevy of check opts.

})

t.Run("delayed event lookups are authenticated", func(t *testing.T) {
Expand Down Expand Up @@ -100,14 +102,14 @@ func TestDelayedEvents(t *testing.T) {
}

countExpected = 0
matchDelayedEvents(t, user, numEvents)
matchDelayedEvents(t, user, delayedEventsNumberEqual(numEvents))

t.Run("cannot get delayed events of another user", func(t *testing.T) {
matchDelayedEvents(t, user2, 0)
matchDelayedEvents(t, user2, delayedEventsNumberEqual(0))
})

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
queryParams := url.Values{}
queryParams.Set("dir", "f")
queryParams.Set("from", token)
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestDelayedEvents(t *testing.T) {
getDelayQueryParam("900"),
)

matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))

res = getDelayedEvents(t, user)
must.MatchResponse(t, res, match.HTTPResponse{
Expand All @@ -172,7 +174,7 @@ func TestDelayedEvents(t *testing.T) {
})

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
JSON: []match.JSON{
Expand Down Expand Up @@ -244,7 +246,7 @@ func TestDelayedEvents(t *testing.T) {
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
StatusCode: 404,
Expand All @@ -256,7 +258,7 @@ func TestDelayedEvents(t *testing.T) {
getPathForUpdateDelayedEvent(delayID, DelayedEventActionCancel),
client.WithJSONBody(t, map[string]interface{}{}),
)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))

time.Sleep(1 * time.Second)
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
Expand Down Expand Up @@ -286,7 +288,7 @@ func TestDelayedEvents(t *testing.T) {
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
StatusCode: 404,
Expand All @@ -298,7 +300,7 @@ func TestDelayedEvents(t *testing.T) {
getPathForUpdateDelayedEvent(delayID, DelayedEventActionSend),
client.WithJSONBody(t, map[string]interface{}{}),
)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
JSON: []match.JSON{
Expand Down Expand Up @@ -328,7 +330,7 @@ func TestDelayedEvents(t *testing.T) {
delayID := client.GetJSONFieldStr(t, client.ParseJSON(t, res), "delay_id")

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
StatusCode: 404,
Expand All @@ -342,14 +344,14 @@ func TestDelayedEvents(t *testing.T) {
)

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))
res = user.Do(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
StatusCode: 404,
})

time.Sleep(1 * time.Second)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
must.MatchResponse(t, res, match.HTTPResponse{
JSON: []match.JSON{
Expand All @@ -376,7 +378,7 @@ func TestDelayedEvents(t *testing.T) {
}),
getDelayQueryParam("900"),
)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))

user.MustDo(
t,
Expand All @@ -386,7 +388,7 @@ func TestDelayedEvents(t *testing.T) {
setterKey: "manual",
}),
)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))

time.Sleep(1 * time.Second)
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
Expand Down Expand Up @@ -415,7 +417,7 @@ func TestDelayedEvents(t *testing.T) {
}),
getDelayQueryParam("900"),
)
matchDelayedEvents(t, user, 1)
matchDelayedEvents(t, user, delayedEventsNumberEqual(1))

setterExpected := "manual"
user2.MustDo(
Expand All @@ -426,7 +428,7 @@ func TestDelayedEvents(t *testing.T) {
setterKey: setterExpected,
}),
)
matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))

time.Sleep(1 * time.Second)
res = user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey))
Expand All @@ -446,41 +448,89 @@ func TestDelayedEvents(t *testing.T) {
stateKey1 := "1"
stateKey2 := "2"

numberOfDelayedEvents := 0

// Send an initial delayed event that will be ready to send as soon as the server
// comes back up.
user.MustDo(
t,
"PUT",
getPathForState(roomID, eventType, stateKey1),
client.WithJSONBody(t, map[string]interface{}{}),
getDelayQueryParam("900"),
)
beforeScheduleStateTimestamp2 := time.Now()
user.MustDo(
t,
"PUT",
getPathForState(roomID, eventType, stateKey2),
client.WithJSONBody(t, map[string]interface{}{}),
getDelayQueryParam("9900"),
)
matchDelayedEvents(t, user, 2)
numberOfDelayedEvents++

// Previously, this was naively using a single delayed event with a 10 second delay.
// But because we're stopping and starting servers here, it could take up to
// `deployment.GetConfig().SpawnHSTimeout` (defaults to 30 seconds) for the server
// to start up again so by the time the server is back up, the delayed event may
// have already been sent invalidating our assertions below (which expect some
// delayed events to still be pending and then see one of them be sent after the
// server is back up).
//
// We could account for this by setting the delayed event delay to be longer than
// `deployment.GetConfig().SpawnHSTimeout` but that would make the test suite take
// longer to run in all cases even for homeservers that are quick to restart because
// we have to wait for that large delay.
//
// We instead account for this by scheduling many delayed events at short intervals
// (we chose 10 seconds because that's what the test naively chose before). Then
// whenever the servers comes back, we can just check until it decrements by 1.
//
// We add 1 to the number of intervals to ensure that we have at least one interval
// to check against no matter how things are configured.
numberOf10SecondIntervals := int(math.Ceil(deployment.GetConfig().SpawnHSTimeout.Seconds()/10)) + 1
for i := 0; i < numberOf10SecondIntervals; i++ {
// +1 as we want to start at 10 seconds and so we don't end up with -100ms delay
// on the first one.
delay := time.Duration(i+1)*10*time.Second - 100*time.Millisecond

user.MustDo(
t,
"PUT",
// Avoid clashing state keys as that would cancel previous delayed events on the
// same key (start at 2).
getPathForState(roomID, eventType, fmt.Sprintf("%d", i+2)),
client.WithJSONBody(t, map[string]interface{}{}),
getDelayQueryParam(fmt.Sprintf("%d", delay.Milliseconds())),
)
numberOfDelayedEvents++
}
// We expect all of the delayed events to be scheduled and not sent yet.
matchDelayedEvents(t, user, delayedEventsNumberEqual(numberOfDelayedEvents))

// Restart the server and wait until it's back up.
deployment.StopServer(t, hsName)
// Wait one second which will cause the first delayed event to be ready to be sent
// when the server is back up.
time.Sleep(1 * time.Second)
deployment.StartServer(t, hsName)

// The rest of the test assumes the second delayed event (10 second delay) still
// hasn't been sent yet.
if time.Now().Sub(beforeScheduleStateTimestamp2) > 10*time.Second {
t.Fatalf(
"Test took too long to run, cannot guarantee delayed event timings. " +
"More than 10 seconds elapsed between scheduling the delayed event and now when we're about to check for it.",
)
}

matchDelayedEvents(t, user, 1)
delayedEventResponse := matchDelayedEvents(t, user,
// We should still see some delayed events left after the restart.
delayedEventsNumberGreaterThan(0),
// We should see at-least one less than we had before the restart (the first
// delayed event should have been sent). Other delayed events may have been sent
// by the time the server actually came back up.
delayedEventsNumberLessThan(numberOfDelayedEvents-1),
)
// Capture whatever number of delayed events are remaining after the server restart.
remainingDelayedEventCount := countDelayedEvents(t, delayedEventResponse)
// Sanity check that the room state was updated correctly with the delayed events
// that were sent.
user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey1))

time.Sleep(9 * time.Second)
matchDelayedEvents(t, user, 0)
// Wait until we see another delayed event being sent (ensure things resumed and are continuing).
time.Sleep(10 * time.Second)
matchDelayedEvents(t, user,
delayedEventsNumberLessThan(remainingDelayedEventCount),
)
// Sanity check that the other delayed events also updated the room state correctly.
//
// FIXME: Ideally, we'd check specifically for the last one that was sent but it
// will be a bit of a juggle and fiddly to get this right so for now we just check
// one.
user.MustDo(t, "GET", getPathForState(roomID, eventType, stateKey2))
})
}
Expand Down Expand Up @@ -512,25 +562,93 @@ func getDelayedEvents(t *testing.T, user *client.CSAPI) *http.Response {
return user.MustDo(t, "GET", getPathForDelayedEvents())
}

// Checks if the number of delayed events match the given number. This will
// countDelayedEvents counts the number of delayed events in the response. Assumes the
// response is well-formed.
func countDelayedEventsInternal(res *http.Response) (int, error) {
body, err := io.ReadAll(res.Body)
if err != nil {
return 0, fmt.Errorf("countDelayedEventsInternal: Failed to read response body: %s", err)
}

parsedBody := gjson.ParseBytes(body)
return len(parsedBody.Get("delayed_events").Array()), nil
}

func countDelayedEvents(t *testing.T, res *http.Response) int {
t.Helper()
count, err := countDelayedEventsInternal(res)
if err != nil {
t.Fatalf("countDelayedEvents: %s", err)
}
return count
}

type delayedEventsCheckOpt func(res *http.Response) error

// delayedEventsNumberEqual returns a check option that checks if the number of delayed events
// is equal to the given number.
func delayedEventsNumberEqual(wantNumber int) delayedEventsCheckOpt {
return func(res *http.Response) error {
_, err := should.MatchResponse(res, match.HTTPResponse{
StatusCode: 200,
JSON: []match.JSON{
match.JSONKeyArrayOfSize("delayed_events", wantNumber),
},
})
Comment on lines +592 to +597
Copy link
Collaborator Author

@MadLittleMods MadLittleMods Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the other delayedEventsNumberGreaterThan/delayedEventsNumberLessThan checks would also use the JSON matching like we do here so we get maximum nice debug output when things go wrong. But there isn't a matcher for that yet. We could add one but I've let it be for now. Things fit nice enough as-is.

if err == nil {
return nil
}
return fmt.Errorf("delayedEventsNumberEqual(%d): %s", wantNumber, err)
}
}

// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events
// is greater than the given number.
func delayedEventsNumberGreaterThan(target int) delayedEventsCheckOpt {
return func(res *http.Response) error {
count, err := countDelayedEventsInternal(res)
if err != nil {
return fmt.Errorf("delayedEventsNumberGreaterThan(%d): %s", target, err)
}
if count > target {
return nil
}
return fmt.Errorf("delayedEventsNumberGreaterThan(%d): got %d", target, count)
}
}

// delayedEventsNumberLessThan returns a check option that checks if the number of delayed events
// is less than the given number.
func delayedEventsNumberLessThan(target int) delayedEventsCheckOpt {
return func(res *http.Response) error {
count, err := countDelayedEventsInternal(res)
if err != nil {
return fmt.Errorf("delayedEventsNumberLessThan(%d): %s", target, err)
}
if count < target {
return nil
}
return fmt.Errorf("delayedEventsNumberLessThan(%d): got %d", target, count)
}
}

// matchDelayedEvents will run the given checks on the delayed events response. This will
// retry to handle replication lag.
func matchDelayedEvents(t *testing.T, user *client.CSAPI, wantNumber int) {
func matchDelayedEvents(t *testing.T, user *client.CSAPI, checks ...delayedEventsCheckOpt) *http.Response {
t.Helper()

// We need to retry this as replication can sometimes lag.
user.MustDo(t, "GET", getPathForDelayedEvents(),
return user.MustDo(t, "GET", getPathForDelayedEvents(),
client.WithRetryUntil(
500*time.Millisecond,
func(res *http.Response) bool {
_, err := should.MatchResponse(res, match.HTTPResponse{
StatusCode: 200,
JSON: []match.JSON{
match.JSONKeyArrayOfSize("delayed_events", wantNumber),
},
})
if err != nil {
t.Log(err)
return false
for _, check := range checks {
err := check(res)

if err != nil {
t.Log(err)
return false
}
}
return true
},
Expand All @@ -553,5 +671,5 @@ func cleanupDelayedEvents(t *testing.T, user *client.CSAPI) {
)
}

matchDelayedEvents(t, user, 0)
matchDelayedEvents(t, user, delayedEventsNumberEqual(0))
}
Loading