On top of the multilevel version of Distributed.jl and MPIClusterManagers.jl, Multicluster.jl has been developed as a package for deploying multicluster computing systems using Julia. It introduces new functions and methods1 tailored for multicluster computing.
By comparing the code below with its counterpart implemented using the multilevel version of Distributed.jl, it becomes evident—both visually and by line count—that the configuration of the multicluster environment is significantly simpler with Multicluster.jl. In fact, the resulting code is comparable in complexity to a single-cluster implementation.
using Multicluster
SUM = Ref(0)
# the reduce0 function at the driver process
function reduce0(x)
SUM[] *= x
end
# create four clusters
cid1 = addcluster(<IP1>, 16; access_node_args = <A1>, compute_node_args = <C1>)
cid2 = addcluster(<IP2>, 32; access_node_args = <A2>, compute_node_args = <C2>)
cid3 = addcluster(<IP3>, 512; access_node_args = <A3>, compute_node_args = <C3>)
cid4 = addcluster(<IP4>, 64; access_node_args = <A4>, compute_node_args = <C4>)
# create the reduce1 function at each entry process
@everywhere workers() reduce1(x) = @spawnat role = :worker 1 reduce0(x)
# run the computation logic at each computing process
for cid in clusters()
@cluster_everywhere cid begin
MPI.Init()
size = MPI.Comm_size(MPI.COMM_WORLD)
rank = MPI.Comm_rank(MPI.COMM_WORLD)
@info "my info: rank=$rank, size=$size, cluster=$clusterid"
X = rand(1:10)
r = MPI.Reduce(X, (x, y) -> x + y, 0, MPI.COMM_WORLD)
rank == 0 && @spawnat role = :worker 1 reduce1(r)
MPI.Finalize()
end
end
@info "The sum across all clusters is $(SUM[])"The addcluster function receives:
- The address of the access node of a cluster
- An integer
Nrepresenting the number of computing processes - Keyword arguments to configure authentication and execution environments for both access and compute nodes
Internally, addcluster:
- Creates an entry process on the access node
- Launches a team of
Ncomputing processes across the compute nodes - Connects those processes via an MPI communicator using
MPIWorkerManager - Returns a cluster handle
A cluster handle is a structure with two fields:
cid: the process ID of the entry processxid: a context identifier (Union{Nothing, Integer}), initially set tonothing
A context is a set of computing-process PIDs that can communicate through a shared MPI communicator.
julia> ch0 = addcluster(
"$user_login@$cluster_address",
16;
access_node_args = [
:sshflags => `-i $(homedir())/publickey`,
:tunnel => true
],
compute_node_args = [
:master_tcp_interface => master_tcp_interface,
:exeflags => `--threads=32`,
:threadlevel => :multiple,
:mpiflags => `--hostfile /home/$user_login/hostfile`
]
)
Cluster(2, nothing)This example creates 16 computing processes distributed across the cluster’s compute nodes. Each process runs with 32 threads, resulting in a total of 512 threads. The returned handle ch0 corresponds to the entry process PID and an initial empty context.
The addworkers function creates a new context within an existing cluster. It:
- Receives a cluster handle and an integer
N - Launches
Nnew computing processes - Creates a new MPI communicator for those processes
- Returns a new cluster handle with the same
cidand a newxid
julia> addworkers(ch0, 4)
Cluster(2, 2)
julia> addworkers(ch0, 8)
Cluster(2, 3)Inspecting the contexts:
julia> contexts(ch0)
3-element Vector{Union{Nothing, Vector{Integer}}}:
Integer[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
Integer[18, 19, 20, 21]
Integer[22, 23, 24, 25, 26, 27, 28, 29]Only processes within the same context share an MPI communicator and can interact via MPI.
The rmcluster function removes cluster resources:
- If
xidis specified, only the corresponding context is removed - If
xidisnothing, the entire cluster is removed
julia> rmcluster(ch0, 2)
julia> rmcluster(ch0)The first call removes the second context only, while the second call removes all remaining contexts and the cluster itself.
Multicluster.jl provides several introspection functions:
clusters()→ list of all cluster handlesnclusters()→ number of clustersnodes(cid)→ set of node handles for a cluster
A node handle is a record with:
cid: PID of the entry processpid: PID of a computing process
Node handles are used as arguments in several Multicluster.jl operations.
Several Distributed.jl functions have been extended:
procs,nprocsapplied to cluster handles return entry-process informationworkers,nworkersapplied to cluster handles account for contexts
If a cluster handle includes an xid, only the processes in that context are considered.
All cluster-management extensions are implemented in cluster.jl, extending functions originally defined in Distributed.jl. Additional communication helpers are defined in remotecall.jl.
The following functions now accept node handles and cluster handles:
remotecallremotecall_fetchremotecall_waitremote_do
Behavior:
- Node handle → executes on a single computing process
- Cluster handle → executes in parallel across computing processes and returns a list of results
A new helper function, cluster_fetch, is provided to retrieve results from these calls. It may also accept a reducer function to aggregate results from multiple futures.
New macros extend the familiar Distributed.jl workflow:
@cluster_spawnat@node_spawnatfetchfrom_clusterfetchfrom_node@cluster_everywhere@cluster_distributed
These macros apply spawnat, everywhere, and distributed semantics directly to a cluster’s computing processes.
Multicluster.jl simplifies the deployment, management, and programming of multicluster computing systems in Julia. By abstracting away complex orchestration details and extending familiar Distributed.jl constructs, it enables scalable, readable, and maintainable multicluster applications.
Footnotes
-
In Julia, methods are specific implementations of a function specialized for particular combinations of argument types, enabling multiple dispatch. ↩