Skip to content

Commit 4bce8a2

Browse files
committed
fix macos tests
1 parent fc618c8 commit 4bce8a2

File tree

4 files changed

+96
-169
lines changed

4 files changed

+96
-169
lines changed

slick_queue_py.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ def __getitem__(self, index: int) -> memoryview:
311311
off = self._data_offset + (index & self.mask) * self.element_size
312312
return self._buf[off: off + self.element_size]
313313

314-
def read(self, read_index: Union[int, AtomicCursor]) -> Union[Tuple[Optional[bytes], int, int], Tuple[Optional[bytes], int]]:
314+
def read(self, read_index: Union[int, AtomicCursor]) -> Union[Tuple[Optional[bytes], int, int], Tuple[Optional[bytes], int, int]]:
315315
"""
316316
Read data from the queue.
317317
@@ -345,7 +345,7 @@ def read(self, read_index: Union[int, AtomicCursor]) -> Union[Tuple[Optional[byt
345345
346346
# Multi-consumer work-stealing
347347
cursor = AtomicCursor(cursor_shm.buf, 0)
348-
data, size = q.read(cursor) # Atomically claim next item
348+
data, size, index = q.read(cursor) # Atomically claim next item
349349
"""
350350
if isinstance(read_index, AtomicCursor):
351351
return self._read_atomic_cursor(read_index)
@@ -396,7 +396,7 @@ def _read_single_consumer(self, read_index: int) -> Tuple[Optional[bytes], int,
396396
new_read_index = data_index + slot_size
397397
return data, slot_size, new_read_index
398398

399-
def _read_atomic_cursor(self, read_index: AtomicCursor) -> Tuple[Optional[bytes], int]:
399+
def _read_atomic_cursor(self, read_index: AtomicCursor) -> Tuple[Optional[bytes], int, int]:
400400
"""
401401
Multi-consumer read using a shared atomic cursor (work-stealing pattern).
402402
@@ -407,8 +407,8 @@ def _read_atomic_cursor(self, read_index: AtomicCursor) -> Tuple[Optional[bytes]
407407
read_index: Shared AtomicCursor for coordinating multiple consumers
408408
409409
Returns:
410-
Tuple of (data_bytes or None, item_size).
411-
If no data available returns (None, 0).
410+
Tuple of (data_bytes or None, item_size, data_index).
411+
If no data available returns (None, 0, -1).
412412
"""
413413
if self._buf is None:
414414
raise RuntimeError("Queue buffer is not initialized")
@@ -433,7 +433,7 @@ def _read_atomic_cursor(self, read_index: AtomicCursor) -> Tuple[Optional[bytes]
433433

434434
# Check if data is ready (C++ lines 296-299)
435435
if data_index == (2**64 - 1) or data_index < current_index:
436-
return None, 0
436+
return None, 0, -1
437437

438438
# Check for wrap (C++ lines 300-304)
439439
if data_index > current_index and ((data_index & self.mask) != idx):
@@ -449,7 +449,7 @@ def _read_atomic_cursor(self, read_index: AtomicCursor) -> Tuple[Optional[bytes]
449449
# Successfully claimed the item, read and return it
450450
data_off = self._data_offset + (current_index & self.mask) * self.element_size
451451
data = bytes(self._buf[data_off: data_off + slot_size * self.element_size])
452-
return data, slot_size
452+
return data, slot_size, current_index
453453

454454
# CAS failed, another consumer claimed it, retry
455455

tests/test_atomic_cursor.py

Lines changed: 29 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def test_atomic_cursor_local_mode_basic():
5959
# Consume with atomic cursor
6060
consumed = []
6161
for _ in range(10):
62-
data, size = q.read(cursor)
62+
data, size, _ = q.read(cursor)
6363
if data is not None:
6464
value = struct.unpack("<I", data[:4])[0]
6565
consumed.append(value)
@@ -68,7 +68,7 @@ def test_atomic_cursor_local_mode_basic():
6868
assert consumed == list(range(10)), f"Expected {list(range(10))}, got {consumed}"
6969

7070
# Verify no more data
71-
data, size = q.read(cursor)
71+
data, size, _ = q.read(cursor)
7272
assert data is None, "Expected no more data"
7373

7474
q.close()
@@ -104,7 +104,7 @@ def consumer_worker(worker_id, results):
104104
max_consecutive_none = 20 # Give more chances
105105
time.sleep(random.uniform(0.03, 0.05))
106106
while len(consumed) < num_items and consecutive_none < max_consecutive_none:
107-
data, _ = q.read(cursor)
107+
data, size, data_index = q.read(cursor)
108108
if data is None:
109109
consecutive_none += 1
110110
else:
@@ -173,15 +173,15 @@ def consumer_worker(worker_id, results):
173173
consumed = []
174174
no_data_count = 0
175175
while len(consumed) < num_items // num_threads + 50: # Over-subscribe
176-
data, size = q.read(cursor)
176+
data, size, _ = q.read(cursor)
177177
if data is not None:
178178
no_data_count = 0
179179
value = struct.unpack("<I", data[:4])[0]
180180
consumed.append(value)
181181
time.sleep(random.uniform(0.0003, 0.0005))
182182
else:
183183
no_data_count += 1
184-
if no_data_count > 1000:
184+
if no_data_count > 10000:
185185
break
186186
time.sleep(0.00001) # Wait for producer
187187
results[worker_id] = consumed
@@ -234,19 +234,21 @@ def consumer_process_worker(queue_name, cursor_name, worker_id, num_items, resul
234234
cursor = AtomicCursor(cursor_shm.buf, 0)
235235

236236
consumed = []
237-
num_no_data = 0
238237

239-
with open('ready', 'a'):
240-
# Keep consuming until we've collectively consumed all items
241-
while cursor.load() < num_items:
242-
data, size = q.read(cursor)
243-
if data is not None:
244-
num_no_data = 0
245-
value = struct.unpack("<I", data[:4])[0]
246-
consumed.append(value)
247-
time.sleep(random.uniform(0.003, 0.005))
248-
else:
249-
time.sleep(0.000001)
238+
# Signal that consumer is ready (create/touch the file, then close it immediately)
239+
with open('ready', 'a') as f:
240+
pass # Just create/touch the file
241+
242+
# Keep consuming until we've collectively consumed all items
243+
while cursor.load() < num_items:
244+
data, size, data_index = q.read(cursor)
245+
if data is not None:
246+
value = struct.unpack("<I", data[:4])[0]
247+
print(f"Process {worker_id} consumed an item {value} @ index {data_index}")
248+
consumed.append(value)
249+
time.sleep(random.uniform(0.0003, 0.0005))
250+
else:
251+
time.sleep(0.000001)
250252
results.put((worker_id, consumed))
251253

252254
cursor_shm.close()
@@ -267,6 +269,7 @@ def producer_worker(num_items: int, q: SlickQueue):
267269
data = struct.pack("<I", i)
268270
q[idx][:len(data)] = data
269271
q.publish(idx)
272+
print(f"Produced item {idx} {i}")
270273
time.sleep(random.uniform(0.001, 0.003)) # Slow producer
271274

272275
def test_atomic_cursor_shared_memory_mode():
@@ -394,7 +397,7 @@ def test_atomic_cursor_compare_with_int_cursor():
394397

395398
consumed_atomic = []
396399
for _ in range(num_items):
397-
data, size = q2.read(cursor)
400+
data, size, _ = q2.read(cursor)
398401
if data is not None:
399402
value = struct.unpack("<I", data[:4])[0]
400403
consumed_atomic.append(value)
@@ -436,7 +439,7 @@ def test_atomic_cursor_wraparound():
436439
q.publish(idx, sz)
437440

438441
# Consume immediately
439-
data, size = q.read(cursor)
442+
data, size, _ = q.read(cursor)
440443
if data is not None:
441444
offset = 0
442445
while size > 0:
@@ -538,93 +541,6 @@ def test_atomic_cursor_python_cpp_work_stealing_cursor_created_by_py():
538541
print(f"[PASS] C++ work-stealing consumer cursor created by Python test passed: {num_items} items consumed")
539542

540543

541-
def test_atomic_cursor_python_cpp_work_stealing_cursor_created_by_py():
542-
"""Test AtomicCursor with a C++ work-stealing consumer, cursor created by Python."""
543-
print("\n=== Test: AtomicCursor with C++ Work-Stealing Consumer, cursor created by Python ===")
544-
545-
queue_name = 'test_atomic_cursor_cpp_queue'
546-
cursor_name = 'test_cur_cpp_py_created'
547-
num_items = 100
548-
549-
try:
550-
os.remove('ready')
551-
except Exception:
552-
pass
553-
554-
# Create shared queue and cursor
555-
q = SlickQueue(name=queue_name, size=128, element_size=32)
556-
cursor_shm = SharedMemory(name=cursor_name, create=True, size=8)
557-
cursor = AtomicCursor(cursor_shm.buf, 0)
558-
cursor.store(0)
559-
560-
actual_name = q.get_shm_name()
561-
actual_cursor_name = cursor_shm._name
562-
563-
# Start C++ consumer process
564-
cpp_consumer = find_cpp_executable("cpp_work_stealing_consumer")
565-
output_file = Path(__file__).parent / "cpp_work_stealing_consumer_output.txt"
566-
cpp_consumer_proc = subprocess.Popen([
567-
cpp_consumer,
568-
actual_name,
569-
str(num_items),
570-
str(32), # element size
571-
actual_cursor_name,
572-
str(output_file)
573-
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
574-
575-
# Start consumer processes
576-
results = MPQueue()
577-
py_consumer_proc = Process(
578-
target=consumer_process_worker,
579-
args=(queue_name, cursor_name, 0, num_items, results)
580-
)
581-
py_consumer_proc.start()
582-
583-
# Start producer
584-
producer = Thread(target=producer_worker, args=(num_items, q))
585-
producer.start()
586-
587-
producer.join()
588-
py_consumer_proc.join()
589-
590-
stdout, stderr = cpp_consumer_proc.communicate()
591-
592-
print(stdout.decode())
593-
print(stderr.decode())
594-
if cpp_consumer_proc.returncode != 0:
595-
print(stderr, file=sys.stderr)
596-
raise RuntimeError(f"C++ consumer failed with code {cpp_consumer_proc.returncode}")
597-
598-
# Verify consumed data
599-
consumed = []
600-
with open(output_file, 'r') as f:
601-
for line in f:
602-
item, _ = map(int, line.strip().split())
603-
consumed.append(item)
604-
605-
py_result = results.get(timeout=5)
606-
py_consumed = py_result[1]
607-
print("Python consumer consumed:", len(py_consumed))
608-
consumed.extend(item for item in py_consumed)
609-
610-
# Check all items consumed
611-
assert len(consumed) == num_items, f"Expected {num_items} items, got {len(consumed)}"
612-
613-
# Check data integrity
614-
expected = set(range(num_items))
615-
actual = set(consumed)
616-
assert expected == actual, f"Data mismatch between produced and consumed"
617-
618-
# Cleanup
619-
cursor_shm.close()
620-
cursor_shm.unlink()
621-
q.close()
622-
q.unlink()
623-
if output_file.exists():
624-
output_file.unlink()
625-
626-
print(f"[PASS] C++ work-stealing consumer cursor created by Python test passed: {num_items} items consumed")
627-
628544
def test_atomic_cursor_python_cpp_work_stealing_cursor_created_by_cpp():
629545
"""Test AtomicCursor with a C++ work-stealing consumer, cursor created by C++."""
630546
print("\n=== Test: AtomicCursor with C++ Work-Stealing Consumer, cursor created by C++ ===")
@@ -641,15 +557,20 @@ def test_atomic_cursor_python_cpp_work_stealing_cursor_created_by_cpp():
641557
# Create shared queue and cursor
642558
q = SlickQueue(name=queue_name, size=128, element_size=32)
643559

560+
actual_queue_name = q.get_shm_name()
561+
562+
from slick_queue_py.atomic_ops import validate_shm_name
563+
actual_cursor_name = validate_shm_name(cursor_name) # Created by C++ process
564+
644565
# Start C++ consumer process
645566
cpp_consumer = find_cpp_executable("cpp_work_stealing_consumer")
646567
output_file = Path(__file__).parent / "cpp_work_stealing_consumer_output.txt"
647568
cpp_consumer_proc = subprocess.Popen([
648569
cpp_consumer,
649-
queue_name,
570+
actual_queue_name,
650571
str(num_items),
651572
str(32), # element size
652-
cursor_name,
573+
actual_cursor_name,
653574
str(output_file)
654575
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
655576

tests/test_interop.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -70,34 +70,37 @@ def python_producer(queue_name, num_items, worker_id, results_queue, size=64):
7070
def python_consumer(queue_name, num_items, results_queue, starting_index):
7171
"""Python consumer process."""
7272
try:
73-
with open('ready', 'a'):
74-
# On Windows, we need to specify size when opening existing shared memory
75-
# q = SlickQueue(name=queue_name, size=size, element_size=32)
76-
q = SlickQueue(name=queue_name, element_size=32)
77-
consumed = []
78-
# read_index = 0
79-
read_index = starting_index
80-
attempts = 0
81-
max_attempts = 10000
82-
83-
while len(consumed) < num_items and attempts < max_attempts:
84-
attempts += 1
85-
prev_read_index = read_index
86-
data, size, read_index = q.read(read_index)
87-
88-
if data is not None:
89-
worker_id, item_num = struct.unpack("<I I", data[:8])
90-
consumed.append((worker_id, item_num))
91-
# print(f"consume {worker_id} {item_num} (from index {prev_read_index}, consumed: {len(consumed)})", flush=True)
92-
else:
93-
time.sleep(0.001)
94-
95-
# print(f"CONSUMER: About to put {len(consumed)} items into results queue", flush=True)
96-
results_queue.put(('success', consumed))
97-
# print(f"CONSUMER: Successfully put results into queue", flush=True)
98-
q.close()
99-
time.sleep(1)
100-
# print(f"CONSUMER: Exiting normally", flush=True)
73+
# Signal that consumer is ready (create/touch the file, then close it immediately)
74+
with open('ready', 'a') as f:
75+
pass # Just create/touch the file
76+
77+
# On Windows, we need to specify size when opening existing shared memory
78+
# q = SlickQueue(name=queue_name, size=size, element_size=32)
79+
q = SlickQueue(name=queue_name, element_size=32)
80+
consumed = []
81+
# read_index = 0
82+
read_index = starting_index
83+
attempts = 0
84+
max_attempts = 10000
85+
86+
while len(consumed) < num_items and attempts < max_attempts:
87+
attempts += 1
88+
prev_read_index = read_index
89+
data, size, read_index = q.read(read_index)
90+
91+
if data is not None:
92+
worker_id, item_num = struct.unpack("<I I", data[:8])
93+
consumed.append((worker_id, item_num))
94+
# print(f"consume {worker_id} {item_num} (from index {prev_read_index}, consumed: {len(consumed)})", flush=True)
95+
else:
96+
time.sleep(0.001)
97+
98+
# print(f"CONSUMER: About to put {len(consumed)} items into results queue", flush=True)
99+
results_queue.put(('success', consumed))
100+
# print(f"CONSUMER: Successfully put results into queue", flush=True)
101+
q.close()
102+
time.sleep(1)
103+
# print(f"CONSUMER: Exiting normally", flush=True)
101104
except Exception as e:
102105
print(f"CONSUMER: Exception - {str(e)}", flush=True)
103106
results_queue.put(('error', str(e)))

0 commit comments

Comments
 (0)