Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fast_llm/data/dataset/streaming.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import json
import logging
import time
import typing

Expand All @@ -14,6 +15,8 @@
from fast_llm.data.document.token_data import TokenDataDocument
from fast_llm.utils import Assert

logger = logging.getLogger(__name__)


@config_class()
class RedisStreamingDocumentData(Config):
Expand Down
2 changes: 1 addition & 1 deletion fast_llm/layers/ssm/gdn.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def __init__(
self._value_head_dim, lr_scale=self._lr_scale, peft=self._peft
)

if _fast_gdn_available:
if _fast_gdn_available and distributed_config.use_cuda:
self.chunk_gated_delta_rule = chunk_gated_delta_rule
else:
logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion fast_llm_external_models/apriel2/modeling_apriel2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2839,7 +2839,7 @@ def forward(

# Reshape back to [batch, num_patches, text_hidden]
image_features = image_features.squeeze(0).view(batch_size, num_patches_per_image, -1)
return image_features, (*all_hidden_states, hidden_states, image_features)
return image_features, (*all_hidden_states, hidden_states, image_features) if output_hidden_states else None


class SimpleMLP(nn.Module):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def test_batch_processing_behavior(self, model_pair):
with torch.no_grad():
# Batch processing
batch_src = get_pixtral_vision_features(source, pixel_values)
batch_tgt, _ = target.get_image_features(pixel_values).view(-1, batch_src.shape[-1])
batch_tgt = target.get_image_features(pixel_values)[0].view(-1, batch_src.shape[-1])

# Sequential processing
singles_src = [get_pixtral_vision_features(source, pixel_values[i : i + 1]) for i in range(3)]
Expand Down
3 changes: 2 additions & 1 deletion tests/models/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _run_model_streaming_configs(
model_testing_config,
None,
updates={
("data", "datasets"): {"training": {"port": port}},
("data", "datasets"): {"training": {"port": port, "timeout": 1.0}},
("training", "export"): {"format": model_testing_config.checkpoint_format.name, "interval": 1},
"callbacks": {
"streaming": {
Expand All @@ -143,6 +143,7 @@ def _run_model_streaming_configs(
"external_world_size": config.consumer_count,
},
"export": {"format": model_testing_config.checkpoint_format.name},
"timeout": 1.0,
}
},
# Disable tensor logging.
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/distributed_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_config(relative: float = 0, absolute: float = 0, **kwargs) -> CompareCon
if torch.cuda.is_available()
else {
(None, "norm"): get_config(ignore_tensors=True),
(None, "word_embeddings_weight"): get_config(8e-2, 1e-4),
(None, "embeddings_weight"): get_config(8e-2, 1e-4),
}
),
(None, "bias"): get_config(2e-2, 1e-3) if torch.cuda.is_available() else get_config(2e-2, 2e-3),
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/model_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ def update_and_add_testing_config(
# note: tp is excluded because there is currently no gradient reductions implemented for tp norm in gdn.py (STP works though).
# we should be using STP with this model, not TP!
skip_tests=("sdp", "ms", TP_NO_STP),
requires_cuda=False,
requires_cuda=True, # GDN available on CPU, but not in the converted model (also runs very slow).
)

_gdn_block = MODEL_CONFIGS["apriel2_gdn"].config_dict["model"]["base_model"]["decoder"]["block"]
Expand Down
32 changes: 30 additions & 2 deletions tests/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ def producer_loop():

@contextlib.contextmanager
def fake_redis_server(config: RedisConfig):
# We search for free port as port from previous test can still be not free even after server shutdown

# ----- Monkey-patch handler to suppress broken pipes -----
orig_handle = fakeredis._tcp_server.TCPFakeRequestHandler.handle

Expand All @@ -83,6 +81,34 @@ def safe_handle(self):

fakeredis._tcp_server.TCPFakeRequestHandler.handle = safe_handle

# ----- Monkey-patch setup to use Resp2Writer instead of Resp3Writer -----
# fakeredis 2.34+ hardcodes Resp3Writer for all connections, causing blocked
# XREADGROUP timeouts to return RESP3 null (b'_\r\n') even on RESP2 connections
# (i.e. clients that never sent HELLO 3). The redis-py RESP2 parser raises
# Protocol Error: b'_' on this byte. Fix: replace with Resp2Writer at setup time.
# The Resp2Writer class was introduced alongside the bug in 2.34, so use its
# presence as the version guard.
orig_setup = fakeredis._tcp_server.TCPFakeRequestHandler.setup
if hasattr(fakeredis._tcp_server, "Resp3Writer"):
# fakeredis 2.34+ hardcodes Resp3Writer for all connections, causing blocked
# XREADGROUP timeouts to return RESP3 null (b'_\r\n') even on RESP2 connections
# (i.e. clients that never sent HELLO 3). The redis-py RESP2 parser raises
# Protocol Error: b'_' on this byte. Fix: replace with Resp2Writer at setup time.
if not hasattr(fakeredis._tcp_server, "Resp2Writer"):
raise RuntimeError(
f"fakeredis {fakeredis.__version__} has Resp3Writer but not Resp2Writer — "
"the workaround for the RESP2/RESP3 null encoding bug no longer applies. "
"See tests/utils/redis.py for details."
)

def resp2_setup(self):
orig_setup(self)
if not isinstance(self.writer, fakeredis._tcp_server.Resp2Writer):
self.writer = fakeredis._tcp_server.Resp2Writer(self.client_address, self.wfile, self)
self.current_client.writer = self.writer

fakeredis._tcp_server.TCPFakeRequestHandler.setup = resp2_setup

server = fakeredis.TcpFakeServer((config.host, config.port), server_type="redis")
print(f"Starting fake redis server at {config.host}:{config.port}")
thread = threading.Thread(target=server.serve_forever, daemon=True)
Expand All @@ -96,3 +122,5 @@ def safe_handle(self):
server.shutdown()
server.server_close()
thread.join()
fakeredis._tcp_server.TCPFakeRequestHandler.setup = orig_setup
fakeredis._tcp_server.TCPFakeRequestHandler.handle = orig_handle
Loading