Skip to content

Commit 0af30fe

Browse files
committed
feat(status_events): use DB pagination so we're not loading all modules in one request
1 parent 15ce210 commit 0af30fe

3 files changed

Lines changed: 27 additions & 5 deletions

File tree

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
ARG CRYSTAL_VERSION=latest
22

3-
FROM placeos/crystal:$CRYSTAL_VERSION as build
3+
FROM placeos/crystal:$CRYSTAL_VERSION AS build
44
WORKDIR /app
55

66
# Set the commit via a build arg

src/source/publishing/mqtt_broker_manager.cr

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ module PlaceOS::Source
131131
end
132132
end
133133

134-
# Safe to update iff fields in SAFE_ATTRIBUTES changed
134+
# Safe to update if fields in SAFE_ATTRIBUTES changed
135135
#
136136
def self.safe_update?(model : Model::Broker)
137137
# Take the union of the changed fields and the safe fields

src/source/status_events.cr

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module PlaceOS::Source
1212
Log = ::Log.for(self)
1313

1414
STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
15-
MAX_CONTAINER_SIZE = 50_000
15+
MAX_CONTAINER_SIZE = 40_000
1616
BATCH_SIZE = 100
1717
PROCESSING_INTERVAL = 100.milliseconds
1818
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8
@@ -66,11 +66,33 @@ module PlaceOS::Source
6666
redis.close
6767
end
6868

69+
def paginate_modules(&)
70+
batch_size = 64
71+
last_created_at = Time.unix(0)
72+
last_id = ""
73+
74+
loop do
75+
modules = PlaceOS::Model::Module
76+
.where("created_at > ? OR (created_at = ? AND id > ?)", last_created_at, last_created_at, last_id)
77+
.order(created_at: :asc, id: :asc)
78+
.limit(batch_size)
79+
.to_a
80+
81+
# process
82+
break if modules.empty?
83+
yield modules
84+
break if modules.size < batch_size
85+
86+
last_created_at = modules.last.created_at
87+
last_id = modules.last.id
88+
end
89+
end
90+
6991
def update_values
7092
mods_mapped = 0_u64
7193
status_updated = 0_u64
7294
pattern = "initial_sync"
73-
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
95+
paginate_modules do |modules|
7496
modules.each do |mod|
7597
next unless mod
7698
mods_mapped += 1_u64
@@ -109,7 +131,7 @@ module PlaceOS::Source
109131
status_updated = 0_u64
110132
pattern = "broker_resync"
111133

112-
PlaceOS::Model::Module.order(id: :asc).all.in_groups_of(64, reuse: true) do |modules|
134+
paginate_modules do |modules|
113135
modules.each do |mod|
114136
next unless mod
115137
mods_mapped += 1_u64

0 commit comments

Comments
 (0)