This repository contains a job dispatcher that creates standardized analysis dispatch models for large-scale data analysis workflows. It's designed to query a metadata database, locate relevant data assets in cloud storage, and prepare them for parallel processing by analysis workers.
The job dispatcher:
- Queries a metadata database to find datasets matching your criteria
- Locates the corresponding data files in S3 cloud storage
- Creates standardized analysis dispatch models containing file locations and metadata
- Distributes these jobs across multiple parallel workers for efficient processing
-
Analysis Dispatch Model: A standardized JSON structure (
AnalysisDispatchModel) that contains all the information needed to process a dataset, including S3 file locations, asset IDs, and analysis parameters. -
Data Asset: A collection of related files (e.g., experimental recordings, processed data) stored in cloud storage with associated metadata.
-
Document Database Query: A MongoDB-style filter that specifies which datasets to include in your analysis (e.g.,
{"subject.subject_id": "774659", "data_description.process_name": "processed"}). -
Parallel Workers: Independent processing units that each handle a subset of the total jobs, enabling large-scale analysis.
-
Distributed Parameters: Analysis parameters to apply for each data asset.
This is a Code Ocean capsule. To use it:
- Access the capsule at the link above
- Configure your query using the app panel or by providing input files
- Configure your analysis parameters by providing the json parameters to apply to each asset
- Run the capsule to generate analysis dispatch models
- Use the output with downstream analysis workflows
You have two ways to specify which data assets to process:
Provide a MongoDB-style query to filter datasets:
Examples:
- Simple query:
{"subject.subject_id": "774659"} - Complex query:
{"data_description.project_name": "Ephys Platform", "subject.subject_id": {"$in": ["643634", "774659"]}} - Query from file: Provide the path to a JSON file containing your query
Using the Query Generator: Use the metadata query portal to build queries interactively (currently in development). You can also use GAMER to generate interactive queries.
Provide a CSV file with specific data asset IDs:
- Set
--use_data_asset_csv=1 - Include a CSV file in the
/datafolder nameddata_asset_input.csv - The CSV must have a column named
asset_idwith the data asset IDs
- File Extension Filtering: Use
--file_extensionto only process specific file types (e.g.,.nwb,.zarr) - File Grouping:
--split_files=1: Create separate jobs for each file (default)--split_files=0: Group all files from the same asset into one job
There are options to specify which docdb field to group by and whether or not to fetch the latest records. See the group_by and filter_latest records in the app panel
The dispatcher creates analysis dispatch models that conform to the AnalysisDispatchModel schema.
For parallelization, the output creates:
- One folder per worker (0, 1, 2, ... up to
num_parallel_workers) - One JSON file per job within each worker folder
- Unique UUID filenames for each job
This job dispatcher is typically used as the first step in a larger analysis pipeline:
- Job Dispatch (this repository) → Creates analysis dispatch models
- Analysis Wrapper → Processes each job using the input models
See the aind-analysis-pipeline-template for a complete workflow example.
# Clone the repository
git clone <repository-url>
cd aind-analysis-job-dispatch/code
# Install dependencies
pip install -e .[dev]
# Run tests
python -m unittest discover tests/run_capsule.py: Main entry point and orchestration logicutils.py: Database queries and S3 operationstests/: Unit tests for key functionality
No files found: Ensure your file extension is correct and files exist in the S3 locations Query returns no results: Verify your database query syntax and field names Permission errors: Check that AWS credentials are properly configured
You can specify analysis parameters by including an analysis_parameters.json file in the /data/analysis_parameters/ folder. This file contains two keys:
-
fixed_parameters: A single dictionary following your analysis input schema. Use this when you want to run the same analysis parameters on all data assets (N assets → N jobs). -
distributed_parameters: A list of dictionaries, each following your analysis input schema. Use this when you want to run multiple different analyses (N assets × M parameter sets → N×M jobs).
Example:
{
"fixed_parameters": {
"analysis_name": "Unit Quality Filtering",
"analysis_tag": "baseline_v1.0",
"isi_violations_cutoff": 0.05,
},
"distributed_parameters": [
{
"method": "isolation_distance"
},
{
"method": "amplitude_cutoff"
}
]
}The analysis wrapper capsule merges these, together with any commond-line or app-panel inputs, and the final merged parameter set should follow the analysis input schema specified in that capsule.
- aind-analysis-wrapper: Processes the analysis dispatch models created by this dispatcher
- aind-analysis-pipeline-template: Complete pipeline template combining dispatcher and wrapper