Skip to content

Conversation

@binary-husky
Copy link
Collaborator

No description provided.

@binary-husky binary-husky merged commit b80bc21 into main Feb 9, 2026
0 of 3 checks passed
@gemini-code-assist
Copy link

Summary of Changes

Hello @binary-husky, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a novel 'swarm training mode' to AgentJet, fundamentally changing how reinforcement learning for large language models can be conducted. By decoupling the agent's execution logic from the core training loop, it allows for distributed computation where a central GPU server handles model training and inference, while numerous lightweight clients can run agent workflows and report rewards. This architecture significantly enhances flexibility, enabling agent development and rollout on diverse hardware, including machines without GPUs, while still leveraging powerful remote training infrastructure.

Highlights

  • Introduction of Swarm Training Mode: A new 'swarm training mode' has been implemented, enabling a client-server architecture for distributed reinforcement learning. This allows local clients (potentially without GPUs) to perform agent rollouts by interacting with a remote GPU server for LLM inference and training.
  • Enhanced Client-Server Communication: New FastAPI endpoints have been added to the server to manage episodes, including claiming, ending, syncing training configurations, starting the engine, and monitoring status. ZeroMQ is utilized for efficient internal communication between the FastAPI server and the training loop.
  • Streaming Dataset Support: The RouterTaskReader and HuggingFaceTaskReader have been updated to leverage Python generators for task generation, facilitating the use of potentially infinite or very large datasets without loading them entirely into memory.
  • Granular Context Tracker Control: The BaseTracker and MultiAgentContextTracker now feature _read_only and _discarded flags, along with distinct should_interrupt_soft_fn and should_interrupt_hard_fn for more precise control over episode termination within the swarm environment.
  • New Swarm-Specific Files and Documentation: Several new files have been added, including ajet/tuner_lib/weight_tuner/experimental/as_swarm_client.py, ajet/tuner_lib/weight_tuner/experimental/as_swarm_server.py, ajet/tuner_lib/weight_tuner/experimental/interchange_utils.py, ajet/task_runner/swarm_runner.py, and associated documentation and example scripts to support and explain the new swarm mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • ajet/init.py
    • Implemented lazy imports for various modules to improve startup performance.
  • ajet/backbone/main_verl.py
    • Updated PPO training entry point to use DictConfig type hints.
    • Removed loguru import from global scope, now imported locally where needed.
    • Adjusted dataset loading to utilize generators for streaming data.
  • ajet/backbone/main_vllm.py
    • Added enable_swarm_mode configuration.
    • Modified rollout mode to switch to 'sample-ts' when swarm mode is active.
    • Included status updates for the interchange server.
  • ajet/backbone/trainer_trinity.py
    • Modified dataset loading to use generate_training_tasks and generate_validation_tasks for streaming datasets.
  • ajet/backbone/trainer_verl.py
    • Introduced discard_original_batch parameter to union_gen_batch_via_task_id for swarm mode.
    • Added status updates for the interchange server during rollout steps.
    • Conditionally skipped validation/testing in swarm mode.
  • ajet/context_tracker/base_tracker.py
    • Refactored BaseTracker initialization.
    • Added _read_only and _discarded flags for internal state management.
    • Introduced a reset method to clear tracker state.
  • ajet/context_tracker/basic_tracker.py
    • Removed terminal_rewards_dict attribute.
    • Corrected a typo in split_prompt_response_index.
    • Added an assertion to verify reward step count consistency.
  • ajet/context_tracker/multiagent_tracking.py
    • Updated __init__ to accept separate should_interrupt_soft_fn and should_interrupt_hard_fn.
    • Modified step_track to pop from timeline cache and increment round_cnt.
    • Added a _read_only check to prevent saving timelines when in read-only mode.
  • ajet/copilot/job.py
    • Updated AgentJetJob constructor to default to 'verl' backbone.
    • Added grpo_n, batch_size, and swarm_mode parameters for flexible configuration.
    • Adjusted default YAML loading based on swarm_mode activation.
  • ajet/default_config/ajet_default.py
    • Added num_repeat to AjetRollout dataclass.
  • ajet/default_config/ajet_default.yaml
    • Added enable_swarm_mode and already_started fields to ajet.interchange_server configuration.
    • Increased max_fastapi_threads for improved concurrency.
  • ajet/default_config/ajet_ts_default.yaml
    • Added a new default configuration file specifically for swarm training mode.
  • ajet/launcher.py
    • Moved utility functions (check_debugpy_version, check_avail_gpu, get_backbone_target, setup_environment_vars, dict_to_namespace) to ajet/utils/launch_utils.py.
    • Added --swarm-server argument for launching the server in swarm mode.
    • Introduced start_swarm_server function.
    • Modified configuration loading logic to support swarm server mode.
  • ajet/schema/task.py
    • Added descriptive comments to Task fields for better clarity.
  • ajet/task_reader/init.py
    • Introduced list_to_generator utility function.
    • Updated task_to_standard_dataset to accept a generator, enabling streaming datasets.
    • Modified RouterTaskReader to use generate_training_tasks and generate_validation_tasks.
  • ajet/task_reader/document_reader/doc_reader.py
    • Changed logger.warning to logger.info for unstructured import failure messages.
  • ajet/task_reader/hf_dataset_reader.py
    • Modified _load_dataset_split to return a generator instead of a list, supporting streaming datasets.
    • Added generate_training_tasks and generate_validation_tasks methods.
  • ajet/task_rollout/native_parallel_worker.py
    • Imported threading, wait, ALL_COMPLETED, FIRST_COMPLETED for advanced thread management.
    • Added rollout_swarm method to handle swarm-specific rollout logic and interchange server communication.
    • Updated rollout_dynamic to check _discarded instead of discarded for consistency.
  • ajet/task_rollout/single_worker.py
    • Imported threading, time, SwarmReceiveAbortException, SwarmRunner.
    • Added enable_swarm_mode attribute to conditionally instantiate SwarmRunner or GeneralRunner.
    • Introduced rollout_env_worker_loop for continuous task processing in swarm mode.
  • ajet/task_runner/base_runner.py
    • Imported is_episode_claimed for episode status checks.
    • Modified runner_hooks to return separate should_interrupt_soft_fn and should_interrupt_hard_fn.
  • ajet/task_runner/general_runner.py
    • Updated imports to use ajet.tuner and ajet.schema.task directly for clarity.
  • ajet/task_runner/swarm_runner.py
    • Added SwarmRunner for handling episode registration, claiming, and result submission via HTTP and ZeroMQ in swarm mode.
  • ajet/tuner.py
    • Updated imports to use ajet.context_tracker.multiagent_tracking directly.
  • ajet/tuner_lib/weight_tuner/as_oai_baseurl_apikey.py
    • Introduced OpenaiBaseUrlAndApiKey Pydantic model for API credentials.
    • Modified __init__ to use config.ajet.interchange_server.interchange_server_port for port configuration.
  • ajet/tuner_lib/weight_tuner/experimental/as_oai_model_client.py
    • Updated __init__ to use get_zmq_socket for address generation.
    • Modified should_terminate into should_soft_terminate and should_hard_terminate for finer control.
    • Added logic to handle termination signals during service startup.
  • ajet/tuner_lib/weight_tuner/experimental/as_oai_model_server.py
    • Imported Manager, Process, Coroutine, Optional, Tuple, find_free_port, get_host_ip, EpisodeStatus.
    • Modified get_app to accept enable_swarm_mode, shared_mem_dict, shared_mem_dict_lock.
    • Added logic to check engine status and update activity timestamp for swarm mode.
    • Updated start_interchange_server to handle blocking mode and enable_swarm_mode.
  • ajet/tuner_lib/weight_tuner/experimental/as_swarm_client.py
    • Added SwarmClient for interacting with the Swarm server, including methods for episode management, configuration sync, and engine control.
  • ajet/tuner_lib/weight_tuner/experimental/as_swarm_server.py
    • Implemented the Swarm server logic, including FastAPI routes for managing episodes, engine status, and handling client requests. Includes shared memory management for episode states and process tree killing for engine shutdown.
  • ajet/tuner_lib/weight_tuner/experimental/interchange_utils.py
    • Defined Pydantic models for various request/response objects used in swarm client-server communication.
    • Provided utility functions like get_interchange_server_url, http_change_engine_status, is_episode_claimed, http_register_episode, and get_zmq_socket.
  • ajet/utils/config_utils.py
    • Added override_param_callback to read_ajet_hierarchical_config and prepare_experiment_config for dynamic configuration adjustments.
  • ajet/utils/core_env_vars.py
    • Added copy import.
    • Adjusted AJET_DAT_INTERCHANGE_PORT and MASTER_NODE_IP environment variable handling.
    • Added logic to limit printed environment variable length for readability.
  • ajet/utils/launch_utils.py
    • Moved several utility functions (get_backbone_target, setup_environment_vars, check_debugpy_version, check_avail_gpu, dict_to_namespace) from ajet/launcher.py to this file.
  • ajet/utils/retry.py
    • Introduced SwarmReceiveAbortException.
    • Modified retry_with_backoff to catch SwarmReceiveAbortException and silently return None.
  • ajet/utils/thread_executors.py
    • Imported threading module.
    • Added BoundedThreadPoolExecutor for managing thread pools with a bounded queue.
  • ajet/workflow.py
    • Updated imports to use ajet.tuner directly.
  • ajet_swarm_threading.py
    • Added an example script demonstrating how to use the SwarmClient for distributed agent rollouts.
  • ajet_tinkerscript.md
    • Added a markdown file with a command example.
  • docs/en/platform_comparison.md
    • Updated 'Tinker' to 'Swarm' in the platform comparison documentation.
  • docs/en/workflow.md
    • Renamed 'TinkerJet' to 'Swarm' in the workflow documentation.
  • docs/index.md
    • Updated 'Tinker' to 'Swarm' in the main documentation index.
  • tests/bench/benchmark_learn2ask/benchmark_learn2ask.py
    • Adjusted a benchmark value for the 'learn2ask' test.
  • tinkerscript.md
    • Added a detailed design blueprint for Swarm, explaining its overview, core architecture, detailed workflow, episode state machine, and implementation example.
  • tinkerscript_1.md
    • Added another detailed design blueprint for Swarm, similar to tinkerscript.md but with slightly different emphasis on components and workflow.
  • tutorial/demo_tinkerjet/README.md
    • Added a README for the TinkerJet demo, explaining Swarm's capabilities and core training code concept.
  • tutorial/demo_tinkerjet/ajet_tinkerscript_default.yaml
    • Added a default YAML configuration file for the TinkerJet demo.
  • tutorial/example_academic_trans/trans.py
    • Added an example agent for academic translation, demonstrating the use of OpenAI API calls and a grader.
  • tutorial/example_academic_trans/trans_reward.py
    • Added reward computation logic for academic translation, including a TranslationQualityGrader.
  • tutorial/example_academic_trans/trans_roll.py
    • Added an example script demonstrating academic translation rollout using SwarmClient.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: "swarm training mode". This mode enables a client-server architecture for distributed rollouts, allowing agent logic to run on client machines (potentially without GPUs) while the model training and inference are offloaded to a remote server. The changes are extensive and well-structured, touching upon data loading, context tracking, configuration, and the core training loop.

Key changes include:

  • Swarm Architecture: Addition of a FastAPI-based swarm server for managing episodes and a Python client library for interacting with it.
  • Efficient Data Loading: The task readers have been refactored to use Python generators and datasets.Dataset.from_generator, which significantly improves memory efficiency for large datasets.
  • Enhanced Context Tracker: The BaseTracker now includes a reset() method, making it reusable across multiple episodes, a key requirement for the swarm worker lifecycle.
  • Lazy Loading: The root __init__.py now uses lazy loading to improve the library's initial import time.

Overall, this is a well-executed feature addition that greatly enhances the framework's flexibility. I've identified one critical bug and a couple of areas for improvement related to performance and robustness.

"""
assert yaml_path.endswith(".yaml"), "Configuration file must be a YAML file"
exp_base = os.path.dirname(yaml_path)
exp_base = os.path.exists(os.path.dirname(yaml_path))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This line appears to contain a bug. os.path.exists() returns a boolean value, but the intention here seems to be to get the directory path of the yaml_path. This will likely lead to a FileNotFoundError in subsequent operations. It should be restored to use os.path.dirname().

Suggested change
exp_base = os.path.exists(os.path.dirname(yaml_path))
exp_base = os.path.dirname(yaml_path)

# DEBUG = True
DEBUG = False
RCVTIMEO = 2 * 1000
RCVTIMEO_OUT = 300 * 1000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A 5-minute timeout (300 * 1000 ms) for a blocking operation within a server thread is quite long and could pose a risk to server stability. If multiple clients time out, this could lead to thread exhaustion. It's recommended to reduce RCVTIMEO_OUT to a more conservative value, such as 30 seconds, to enhance server responsiveness and prevent resource contention.

Suggested change
RCVTIMEO_OUT = 300 * 1000
RCVTIMEO_OUT = 30 * 1000

return True
if (observation_window["stop"] is not None) and observation_window["stop"][task_thread_index]: # check soft condition
# if soft condition met, check if episode is claimed
has_claimed = is_episode_claimed(self.config, workflow_task.episode_uuid)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The is_episode_claimed function executes an HTTP request to check the episode's status. Since should_interrupt_hard_fn may be called frequently within the rollout loop, these network calls could introduce noticeable latency and impact performance. To mitigate this, consider implementing a caching mechanism with a short time-to-live (TTL) for the result of is_episode_claimed. This would significantly reduce the number of HTTP requests and improve overall efficiency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant