Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/disallow-formula-randomness.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Disallowed formula-time randomness through `random()`, directing model authors to use input seed or draw variables instead.
46 changes: 8 additions & 38 deletions policyengine_core/commons/formulas.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,48 +293,18 @@ def amount_between(

def random(population):
"""
Generate random values for each entity in the population.
Raise an error for formula-time randomness.

Args:
population: The population object containing simulation data.

Returns:
np.ndarray: Array of random values for each entity.
Random values should be created during data construction and exposed to
formulas as ordinary input variables, so simulations remain reproducible and
calibration outputs stay tied to the records that were calibrated.
"""
# Initialize count of random calls if not already present
if not hasattr(population.simulation, "count_random_calls"):
population.simulation.count_random_calls = 0
population.simulation.count_random_calls += 1

# Get known periods or use default calculation period
known_periods = population.simulation.get_holder(
f"{population.entity.key}_id"
).get_known_periods()
period = (
known_periods[0]
if known_periods
else population.simulation.default_calculation_period
raise RuntimeError(
"Formula-time randomness is not allowed. Create random seeds or draws "
"during microdata construction and read them through input variables "
"inside formulas."
)

# Get entity IDs for the period
entity_ids = population(f"{population.entity.key}_id", period)

# Generate deterministic random values using vectorised hash
seeds = np.abs(entity_ids * 100 + population.simulation.count_random_calls).astype(
np.uint64
)

# PCG-style mixing function for high-quality pseudo-random generation
x = seeds * np.uint64(0x5851F42D4C957F2D)
x = x ^ (x >> np.uint64(33))
x = x * np.uint64(0xC4CEB9FE1A85EC53)
x = x ^ (x >> np.uint64(33))

# Convert to float in [0, 1) using upper 53 bits for full double precision
values = (x >> np.uint64(11)).astype(np.float64) / (2**53)

return values


def is_in(values: ArrayLike, *targets: list) -> ArrayLike:
"""Returns true if the value is in the list of targets.
Expand Down
154 changes: 6 additions & 148 deletions tests/core/commons/test_random_seed.py
Original file line number Diff line number Diff line change
@@ -1,152 +1,10 @@
"""Test the random function with large entity IDs to ensure no overflow."""

import numpy as np
import pytest
from unittest.mock import Mock
from policyengine_core.commons.formulas import random


class TestRandomSeed:
"""Test random seed handling to prevent NumPy overflow errors."""

def test_random_with_large_entity_ids(self):
"""Test that random() handles large entity IDs without overflow."""
# Create a mock population with simulation
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

# Mock the get_holder and get_known_periods
holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Test with very large entity IDs that would cause overflow
# if not handled properly
large_ids = np.array(
[
np.iinfo(np.int64).max - 1000, # Very large positive ID
np.iinfo(np.int64).max // 2, # Large positive ID
1234567890123456789, # Another large ID
]
)

# Mock the population call to return large IDs
population.side_effect = lambda key, period: large_ids

# This should not raise a ValueError about negative seeds
result = random(population)

# Check that we got valid random values
assert isinstance(result, np.ndarray)
assert len(result) == len(large_ids)
assert all(0 <= val <= 1 for val in result)

def test_random_seed_consistency(self):
"""Test that random() produces consistent results for same inputs."""
# Create mock population
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "household"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Use same IDs
ids = np.array([1, 2, 3])
population.side_effect = lambda key, period: ids

# First call
result1 = random(population)

# Reset count to simulate same conditions
population.simulation.count_random_calls = 0

# Second call with same conditions
result2 = random(population)

# Results should be identical
np.testing.assert_array_equal(result1, result2)

def test_random_increments_call_count(self):
"""Test that random() increments the call counter."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

ids = np.array([1, 2, 3])
population.side_effect = lambda key, period: ids

# First call
random(population)
assert population.simulation.count_random_calls == 1

# Second call
random(population)
assert population.simulation.count_random_calls == 2

def test_random_handles_negative_ids(self):
"""Test that random() handles negative IDs properly."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 0
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Include negative IDs
ids = np.array([-100, -1, 0, 1, 100])
population.side_effect = lambda key, period: ids

# Should handle negative IDs without errors
result = random(population)

assert isinstance(result, np.ndarray)
assert len(result) == len(ids)
assert all(0 <= val <= 1 for val in result)

def test_no_negative_seed_error_with_overflow(self):
"""Test that seed calculation overflow doesn't cause negative seed error."""
population = Mock()
population.simulation = Mock()
population.simulation.count_random_calls = 999999999 # Large count
population.entity = Mock()
population.entity.key = "person"

holder = Mock()
holder.get_known_periods.return_value = []
population.simulation.get_holder.return_value = holder
population.simulation.default_calculation_period = Mock()

# Use the exact ID that would cause overflow in old implementation
# This ID when multiplied by 100 and added to count_random_calls
# would overflow int64 and become negative
overflow_id = np.array([np.iinfo(np.int64).max // 100])
population.side_effect = lambda key, period: overflow_id
from policyengine_core.commons.formulas import random
from policyengine_core.model_api import random as model_api_random

# In the old implementation, this would raise:
# ValueError: Seed must be between 0 and 2**32 - 1
# With the fix using abs(), it should work fine
result = random(population)

assert isinstance(result, np.ndarray)
assert len(result) == 1
assert 0 <= result[0] <= 1
@pytest.mark.parametrize("random_function", [random, model_api_random])
def test_random_raises_for_formula_time_randomness(random_function):
with pytest.raises(RuntimeError, match="Formula-time randomness is not allowed"):
random_function(None)
6 changes: 3 additions & 3 deletions tests/core/test_stable_hash_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

Python's built-in ``hash()`` is randomized per process for strings, so any seed
derived from it changes from one ``python`` invocation to the next. This module
ensures ``Simulation`` uses a stable hash so results involving ``random()`` are
reproducible across runs (issue C6 in the 2026-04 bug hunt, related to #412).
ensures ``Simulation`` uses a stable hash when it seeds NumPy for existing
deterministic simulation paths.
"""

from __future__ import annotations
Expand Down Expand Up @@ -54,7 +54,7 @@ def test_stable_hash_to_seed_covers_seed_range():

def test_sort_keys_makes_equivalent_inputs_share_a_seed():
# Two equivalent situations constructed with different dict insertion order
# must produce the same hash / seed so calls to ``random()`` are stable.
# must produce the same hash / seed.
a = {"person": {"you": {"employment_income": 1000, "age": 30}}}
b = {"person": {"you": {"age": 30, "employment_income": 1000}}}
seed_a = _stable_hash_to_seed(json.dumps(a, sort_keys=True))
Expand Down
Loading