Skip to content

Commit a4c4fed

Browse files
committed
Better observer state handling.
1 parent b3024a7 commit a4c4fed

7 files changed

Lines changed: 123 additions & 173 deletions

File tree

lib/async/utilization/metric.rb

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,26 @@ module Utilization
1111
# including the buffer, offset, and type. When the observer changes, the cache
1212
# is invalidated and rebuilt on the next access.
1313
class Metric
14+
# Create a new metric for the given field and observer.
15+
#
16+
# @parameter field [Symbol] The field name for this metric.
17+
# @parameter observer [Observer | Nil] The observer to associate with this metric.
18+
# @returns [Metric] A new metric instance.
19+
def self.for(field, observer)
20+
self.new(field).tap do |metric|
21+
metric.observer = observer
22+
end
23+
end
24+
1425
# Initialize a new metric.
1526
#
1627
# @parameter name [Symbol] The field name for this metric.
17-
# @parameter registry [Registry] The registry instance to use.
18-
def initialize(name, registry)
28+
def initialize(name)
1929
@name = name.to_sym
20-
@registry = registry
2130
@value = 0
22-
@cache_valid = false
23-
@cached_field_info = nil
31+
32+
@observer = nil
33+
@cached_field = nil
2434
@cached_buffer = nil
2535
@guard = Mutex.new
2636
end
@@ -34,19 +44,35 @@ def initialize(name, registry)
3444
# @attribute [Mutex] The mutex for thread safety.
3545
attr :guard
3646

37-
# Invalidate the cached field information.
47+
# Set the observer and rebuild cache.
3848
#
39-
# Called when the observer changes to force cache rebuild.
40-
def invalidate
41-
@cache_valid = false
42-
@cached_field_info = nil
43-
@cached_buffer = nil
49+
# This is called when the registry assigns a new observer (or removes it).
50+
# The cache is invalidated and then immediately recomputed so that the
51+
# fast write path doesn't need to re-check the observer on the first write.
52+
#
53+
# @parameter observer [Observer | Nil] The new observer (or nil).
54+
def observer=(observer)
55+
@guard.synchronize do
56+
@observer = observer
57+
58+
if @observer
59+
if field = @observer.schema[@name]
60+
if buffer = @observer.buffer
61+
@cached_field = field
62+
@cached_buffer = buffer
63+
64+
return write_direct(@value)
65+
end
66+
end
67+
end
68+
69+
@cached_field = nil
70+
@cached_buffer = nil
71+
end
4472
end
4573

4674
# Increment the metric value.
4775
#
48-
# Uses the fast path (direct buffer write) when cache is valid and observer is available.
49-
#
5076
# @returns [Integer] The new value of the field.
5177
def increment
5278
@guard.synchronize do
@@ -103,28 +129,6 @@ def set(value)
103129

104130
protected
105131

106-
# Check if the cache is valid and rebuild if necessary.
107-
#
108-
# Always attempts to build the cache if it's invalid. Returns true if cache
109-
# is now valid (observer exists, field is in schema, and buffer is available), false otherwise.
110-
#
111-
# @returns [bool] True if cache is valid, false otherwise.
112-
def ensure_cache_valid!
113-
unless @cache_valid
114-
if observer = @registry.observer
115-
if field = observer.schema[@name]
116-
if buffer = observer.buffer
117-
@cached_field_info = field
118-
@cached_buffer = buffer
119-
end
120-
end
121-
end
122-
123-
# Once we've validated the cache, even if there was no observer or buffer, we mark it as valid, so that we don't try to revalidate it again:
124-
@cache_valid = true
125-
end
126-
end
127-
128132
# Write directly to the cached buffer if available.
129133
#
130134
# This is the fast path that avoids hash lookups. Always ensures cache is valid
@@ -133,16 +137,12 @@ def ensure_cache_valid!
133137
# @parameter value [Numeric] The value to write.
134138
# @returns [Boolean] Whether the write succeeded.
135139
def write_direct(value)
136-
self.ensure_cache_valid!
137-
138140
if @cached_buffer
139-
@cached_buffer.set_value(@cached_field_info.type, @cached_field_info.offset, value)
141+
@cached_buffer.set_value(@cached_field.type, @cached_field.offset, value)
140142
end
141143

142144
return true
143145
rescue => error
144-
# If write fails, log warning but don't invalidate cache
145-
# The error might be transient, and invalidating would force hash lookups
146146
Console.warn(self, "Failed to write metric value!", metric: {name: @name, value: value}, exception: error)
147147

148148
return false

lib/async/utilization/observer.rb

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# Released under the MIT License.
44
# Copyright, 2026, by Samuel Williams.
55

6-
require "console"
76
require_relative "schema"
87

98
module Async
@@ -71,21 +70,6 @@ def initialize(schema, buffer)
7170

7271
# @attribute [IO::Buffer] The mapped buffer for shared memory.
7372
attr :buffer
74-
75-
# Set a field value.
76-
#
77-
# Writes the value to shared memory at the offset defined by the schema.
78-
# Only fields defined in the schema will be written.
79-
#
80-
# @parameter field [Symbol] The field name to set.
81-
# @parameter value [Numeric] The value to set.
82-
def set(field, value)
83-
if field = @schema[field]
84-
@buffer.set_value(field.type, field.offset, value)
85-
end
86-
rescue => error
87-
Console.warn(self, "Failed to set field in shared memory!", field: field, exception: error)
88-
end
8973
end
9074
end
9175
end

lib/async/utilization/registry.rb

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# Released under the MIT License.
44
# Copyright, 2026, by Samuel Williams.
55

6+
require "console"
7+
68
module Async
79
module Utilization
810
# Registry for emitting utilization metrics.
@@ -61,26 +63,23 @@ def values
6163

6264
# Set the observer for the registry.
6365
#
64-
# When an observer is set, it is notified of all current metric values
65-
# so it can sync its state. The observer must implement `set(field, value)`.
66-
# All cached metrics are invalidated when the observer changes.
66+
# When an observer is set, all cached metrics are updated so they write
67+
# directly to the observer's buffer. The observer must expose `schema`
68+
# and `buffer` attributes.
6769
#
68-
# @parameter observer [#set] The observer to set.
70+
# @parameter observer [Observer | Nil] The observer to set.
6971
def observer=(observer)
7072
@guard.synchronize do
71-
# Invalidate all cached metrics
73+
@observer = observer
74+
75+
# Invalidate all cached metrics with new observer (or nil)
7276
@metrics.each_value do |metric|
73-
metric.invalidate
77+
metric.observer = observer
7478
end
7579

76-
@observer = observer
80+
# Console.info(self, "Observer assigned", observer: observer, metric_count: @metrics.size)
7781
end
7882

79-
# Notify observer of all current metric values (outside guard to avoid deadlock)
80-
@metrics.each do |name, metric|
81-
value = metric.guard.synchronize{metric.value}
82-
observer.set(name, value)
83-
end
8483
end
8584

8685
# Set a field value.
@@ -135,7 +134,7 @@ def metric(field)
135134
field = field.to_sym
136135

137136
@guard.synchronize do
138-
@metrics[field] ||= Metric.new(field, self)
137+
@metrics[field] ||= Metric.for(field, @observer)
139138
end
140139
end
141140
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Better observer state handling.
6+
37
## v0.3.1
48

59
- Remove unused `thread-local` dependency.

test/async/utilization/metric.rb

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,8 @@
154154
end
155155

156156
it "writes directly to shared memory when observer is set" do
157-
registry.observer = observer
158157
metric = registry.metric(:total_requests)
158+
registry.observer = observer
159159

160160
metric.set(42)
161161

@@ -216,33 +216,13 @@
216216
expect(metric1).to be == metric2
217217
end
218218

219-
it "falls back to observer.set when write_direct fails" do
220-
registry.observer = observer
221-
metric = registry.metric(:total_requests)
222-
223-
# Force cache to be invalid by invalidating it
224-
metric.invalidate
225-
226-
# Set a value - should fall back to observer.set
227-
metric.set(42)
228-
expect(metric.value).to be == 42
229-
230-
# Verify it was written to shared memory
231-
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
232-
expect(buffer.get_value(:u64, 0)).to be == 42
233-
end
234-
235219
it "handles write errors gracefully" do
236220
registry.observer = observer
237221
metric = registry.metric(:total_requests)
238222

239223
# Set a value first to build the cache
240224
metric.set(10)
241225

242-
# Verify cache is built
243-
expect(metric.instance_variable_get(:@cache_valid)).to be == true
244-
cached_buffer = metric.instance_variable_get(:@cached_buffer)
245-
246226
# Create an invalid buffer that will raise an error
247227
invalid_buffer = Object.new
248228
def invalid_buffer.set_value(type, offset, value)
@@ -251,18 +231,38 @@ def invalid_buffer.set_value(type, offset, value)
251231

252232
metric.instance_variable_set(:@cached_buffer, invalid_buffer)
253233

254-
# Should not raise, but log warning and keep cache valid
234+
# Should not raise, but log warning
255235
metric.set(42)
256236
expect(metric.value).to be == 42
257237

258-
# Cache should remain valid (not invalidated on error)
259-
expect(metric.instance_variable_get(:@cache_valid)).to be == true
260-
261238
# Assert that a warning was logged
262239
expect_console.to have_logged(
263240
severity: be == :warn,
264241
subject: be_a(Async::Utilization::Metric),
265242
message: be == "Failed to write metric value!"
266243
)
267244
end
245+
246+
it "clears cache when observer is removed" do
247+
registry.observer = observer
248+
metric = registry.metric(:total_requests)
249+
metric.set(10)
250+
251+
# Remove observer — cache should be cleared
252+
registry.observer = nil
253+
254+
# Write should not go to the old buffer
255+
metric.set(99)
256+
257+
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
258+
expect(buffer.get_value(:u64, 0)).to be == 10
259+
260+
# In-memory value is still updated
261+
expect(metric.value).to be == 99
262+
263+
# Re-attaching observer should sync the current value
264+
registry.observer = observer
265+
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
266+
expect(buffer.get_value(:u64, 0)).to be == 99
267+
end
268268
end

test/async/utilization/observer.rb

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@
44
# Copyright, 2026, by Samuel Williams.
55

66
require "sus"
7-
require "sus/fixtures/console/captured_logger"
87
require "sus/fixtures/temporary_directory_context"
98
require "async/utilization"
109
require "fileutils"
1110

1211
describe Async::Utilization::Observer do
13-
include Sus::Fixtures::Console::CapturedLogger
1412
include Sus::Fixtures::TemporaryDirectoryContext
1513

1614
let(:shm_path) {File.join(root, "test.shm")}
@@ -41,68 +39,30 @@
4139
expect(observer.schema).to be == schema
4240
end
4341

44-
it "can write values to shared memory" do
45-
observer.set(:total_requests, 42)
46-
observer.set(:active_requests, 5)
47-
48-
# Read back from file to verify
49-
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
50-
expect(buffer.get_value(:u64, 0)).to be == 42
51-
expect(buffer.get_value(:u32, 8)).to be == 5
52-
end
53-
5442
with "non-page-aligned offsets" do
5543
let(:file_size) {IO::Buffer::PAGE_SIZE * 2}
5644
let(:offset) {100} # Not page-aligned
5745

58-
it "handles non-page-aligned offsets" do
59-
observer.set(:total_requests, 100)
60-
observer.set(:active_requests, 20)
46+
it "maps values at the correct offset" do
47+
observer.buffer.set_value(:u64, schema[:total_requests].offset, 100)
48+
observer.buffer.set_value(:u32, schema[:active_requests].offset, 20)
6149

62-
# Read back from file at the correct offset
50+
# Read back from file at the correct byte position
6351
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
64-
expect(buffer.get_value(:u64, offset)).to be == 100
65-
expect(buffer.get_value(:u32, offset + 8)).to be == 20
52+
expect(buffer.get_value(:u64, offset + schema[:total_requests].offset)).to be == 100
53+
expect(buffer.get_value(:u32, offset + schema[:active_requests].offset)).to be == 20
6654
end
6755
end
6856

69-
it "ignores fields not in schema" do
70-
# Should not raise an error
71-
observer.set(:unknown_field, 999)
72-
73-
# Verify nothing was written
74-
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
75-
expect(buffer.get_value(:u64, 0)).to be == 0
76-
end
77-
7857
with "page-aligned offsets" do
7958
let(:file_size) {page_size * 2}
8059
let(:segment_size) {page_size}
8160

82-
it "handles page-aligned offsets without slicing" do
83-
expect(observer).to be_a(Async::Utilization::Observer)
84-
observer.set(:total_requests, 123)
61+
it "maps values at the correct offset" do
62+
observer.buffer.set_value(:u64, schema[:total_requests].offset, 123)
8563

86-
# Verify value was written
8764
buffer = IO::Buffer.map(File.open(shm_path, "r+b"), file_size, 0)
88-
expect(buffer.get_value(:u64, 0)).to be == 123
65+
expect(buffer.get_value(:u64, schema[:total_requests].offset)).to be == 123
8966
end
9067
end
91-
92-
it "handles errors gracefully when setting values" do
93-
# Create an invalid buffer that will cause an error
94-
# We'll mock the buffer to raise an error
95-
buffer = observer.instance_variable_get(:@buffer)
96-
expect(buffer).to receive(:set_value).and_raise(IOError, "Buffer error")
97-
98-
# Should not raise, but log a warning
99-
observer.set(:total_requests, 42)
100-
101-
# Assert that a warning was logged
102-
expect_console.to have_logged(
103-
severity: be == :warn,
104-
subject: be_a(Async::Utilization::Observer),
105-
message: be == "Failed to set field in shared memory!"
106-
)
107-
end
10868
end

0 commit comments

Comments
 (0)