From 86fc175eb943dc6370dc6fd3f96c043acffed0af Mon Sep 17 00:00:00 2001 From: "Mykael A." Date: Sat, 18 Jan 2025 12:18:49 -0300 Subject: [PATCH] add gcp support --- src/cluster_providers/gcp/gcp_backend.jl | 646 ++++++++++++----------- src/cluster_providers/gcp/gcp_deploy.jl | 126 ++++- src/cluster_providers/gcp/gcp_persist.jl | 145 +++++ 3 files changed, 610 insertions(+), 307 deletions(-) create mode 100644 src/cluster_providers/gcp/gcp_persist.jl diff --git a/src/cluster_providers/gcp/gcp_backend.jl b/src/cluster_providers/gcp/gcp_backend.jl index a8e4a93..3322064 100644 --- a/src/cluster_providers/gcp/gcp_backend.jl +++ b/src/cluster_providers/gcp/gcp_backend.jl @@ -3,118 +3,98 @@ using AWS: @service using Serialization using Base64 using Sockets -@service Ec2 -@service Efs + +using JSON + +import GoogleCloud as GCPAPI + +gcp_session = Ref{Any}(nothing) + +function gcp_check_session() + if isnothing(gcp_session[]) + # Creates a GCP session and stores it. + gcp_session[] = GCPAPI.GoogleSession(ENV["GOOGLE_APPLICATION_CREDENTIALS"], ["cloud-platform"]) + GCPAPI.set_session!(GCPAPI.compute, gcp_session[]) + end +end + #= Estrutura para Armazenar as informações e função de criação do cluster =# - mutable struct GCPManagerWorkers <: ManagerWorkers #Cluster name::String - instance_type_manager::String - instance_type_worker::String - count::Int image_id_manager::String image_id_worker::String - subnet_id::Union{String, Nothing} - placement_group::Union{String, Nothing} - auto_pg::Bool - security_group_id::Union{String, Nothing} - auto_sg::Bool + count::Int + instance_type_manager::String + instance_type_worker::String + user_manager::String + user_worker::String + zone::String + project::String cluster_nodes::Union{Dict{Symbol, String}, Nothing} - shared_fs::Bool features::Dict{Symbol, Any} end mutable struct GCPPeerWorkers <: PeerWorkers # Cluster name::String - instance_type::String - count::Int image_id::String - subnet_id::Union{String, Nothing} - placement_group::Union{String, Nothing} - auto_pg::Bool - security_group_id::Union{String,Nothing} - auto_sg::Bool + count::Int + instance_type::String + user::String + zone::String + project::String cluster_nodes::Union{Dict{Symbol, String}, Nothing} - shared_fs::Bool features::Dict{Symbol, Any} end mutable struct GCPPeerWorkersMPI <: PeerWorkersMPI # Cluster name::String - instance_type::String - count::Int image_id::String - subnet_id::Union{String, Nothing} - placement_group::Union{String, Nothing} - auto_pg::Bool - security_group_id::Union{String,Nothing} - auto_sg::Bool + count::Int + instance_type::String + zone::String cluster_nodes::Union{Dict{Symbol, String}, Nothing} - shared_fs::Bool features::Dict{Symbol, Any} end # PUBLIC +""" +Creates a compute instances cluster and returns it. +""" function gcp_create_cluster(cluster::Cluster) - + gcp_check_session() cluster.cluster_nodes = gcp_create_instances(cluster) cluster end - -function gcp_get_ips_instance(instance_id::String) - public_ip = Ec2.describe_instances(Dict("InstanceId" => instance_id))["reservationSet"]["item"]["instancesSet"]["item"]["ipAddress"] - private_ip = Ec2.describe_instances(Dict("InstanceId" => instance_id))["reservationSet"]["item"]["instancesSet"]["item"]["privateIpAddress"] - Dict(:public_ip => public_ip, :private_ip => private_ip) +function gcp_get_ips_instance(cluster::Cluster, name) + public_ip = gcp_get_instance_dict(cluster, name)["networkInterfaces"][1]["accessConfigs"][1]["natIP"] + private_ip = gcp_get_instance_dict(cluster, name)["networkInterfaces"][1]["networkIP"] + + return Dict(:public_ip => public_ip, :private_ip => private_ip) end # PUBLIC function gcp_terminate_cluster(cluster::Cluster) - gcp_delete_instances(cluster.cluster_nodes) - for instance in cluster.cluster_nodes - status = gcp_get_instance_status(instance[2]) - while status != "terminated" - println("Waiting for instances to terminate...") - sleep(2) - status = gcp_get_instance_status(instance[2]) - end - end - - cluster.shared_fs && gcp_delete_efs(cluster.file_system_id) - cluster.auto_sg && gcp_delete_security_group(cluster.security_group_id) - cluster.auto_pg && gcp_delete_placement_group(cluster.placement_group) - - return + gcp_delete_instances(cluster) + gcp_await_status(cluster, cluster.cluster_nodes, "notfound") end - #= Grupo de Alocação =# # PUBLIC function gcp_create_placement_group(name) - params = Dict( - "GroupName" => name, - "Strategy" => "cluster", - "TagSpecification" => - Dict( - "ResourceType" => "placement-group", - "Tag" => [Dict("Key" => "cluster", "Value" => name), - Dict("Key" => "Name", "Value" => name)] - ) - ) - Ec2.create_placement_group(params)["placementGroup"]["groupName"] + @warn "CALLED NOT IMPLEMENTED METHOD!" end function gcp_delete_placement_group(name) - params = Dict("GroupName" => name) - Ec2.delete_placement_group(name) + @warn "CALLED NOT IMPLEMENTED METHOD!" end #= @@ -122,37 +102,11 @@ Grupo de Segurança =# # PUBLIC function gcp_create_security_group(name, description) - # Criamos o grupo - params = Dict( - "TagSpecification" => - Dict( - "ResourceType" => "security-group", - "Tag" => [Dict("Key" => "cluster", "Value" => name), - Dict("Key" => "Name", "Value" => name)] - ) - ) - id = Ec2.create_security_group(name, description, params)["groupId"] - - # Liberamos o SSH. - params = Dict( - "GroupId" => id, - "CidrIp" => "0.0.0.0/0", - "IpProtocol" => "tcp", - "FromPort" => 22, - "ToPort" => 22) - Ec2.authorize_security_group_ingress(params) - - # Liberamos o tráfego interno do grupo. - sg_name = Ec2.describe_security_groups(Dict("GroupId" => id))["securityGroupInfo"]["item"]["groupName"] - params = Dict( - "GroupId" => id, - "SourceSecurityGroupName" => sg_name) - Ec2.authorize_security_group_ingress(params) - id + @warn "CALLED NOT IMPLEMENTED METHOD!" end function gcp_delete_security_group(id) - Ec2.delete_security_group(Dict("GroupId" => id)) + @warn "CALLED NOT IMPLEMENTED METHOD!" end #= @@ -160,162 +114,239 @@ Criação de Instâncias =# # Funções auxiliares. -function gcp_set_up_ssh_connection(cluster_name) +function gcp_set_up_ssh_connection(cluster_name, comment) + + internal_key_name = cluster_name + + ssh_path = joinpath(homedir(), ".ssh") + + !isdir(ssh_path) && mkdir(ssh_path) + + keypath = joinpath(ssh_path, "$internal_key_name.key") + pubpath = joinpath(ssh_path, "$internal_key_name.key.pub") + # Criar chave interna pública e privada do SSH. # chars = ['a':'z'; 'A':'Z'; '0':'9'] # random_suffix = join(chars[Random.rand(1:length(chars), 5)]) - internal_key_name = cluster_name - run(`ssh-keygen -f /tmp/$internal_key_name -N ""`) - private_key = base64encode(read("/tmp/$internal_key_name", String)) - public_key = base64encode(read("/tmp/$internal_key_name.pub", String)) + run(`ssh-keygen -t rsa -b 2048 -f $keypath -C $comment -N ""`) + run(`chmod 400 $keypath`) + private_key = base64encode(read(keypath, String)) + public_key = base64encode(read(pubpath, String)) + + private_key, public_key +end +function gcp_get_user_data(cluster_name, user, private_key, public_key) + # Define o script que irá instalar a chave pública e privada no headnode e workers. user_data = "#!/bin/bash -echo $private_key | base64 -d > /home/ubuntu/.ssh/$cluster_name -echo $public_key | base64 -d > /home/ubuntu/.ssh/$cluster_name.pub +echo $private_key | base64 -d > /home/$user/.ssh/$cluster_name +echo $public_key | base64 -d > /home/$user/.ssh/$cluster_name.pub echo 'Host * - IdentityFile /home/ubuntu/.ssh/$cluster_name - StrictHostKeyChecking no' > /home/ubuntu/.ssh/config -cat /home/ubuntu/.ssh/$cluster_name.pub >> /home/ubuntu/.ssh/authorized_keys -chown -R ubuntu:ubuntu /home/ubuntu/.ssh -chmod 600 /home/ubuntu/.ssh/* -sed -i 's/#ClientAliveInterval 0/ClientAliveInterval 1000/g' /etc/ssh/sshd_config -sed -i 's/#ClientAliveCountMax 3/ClientAliveCountMax 100/g' /etc/ssh/sshd_config -systemctl restart ssh + IdentityFile /home/$user/.ssh/$cluster_name + StrictHostKeyChecking no' > /home/$user/.ssh/config +cat /home/$user/.ssh/$cluster_name.pub >> /home/$user/.ssh/authorized_keys +chown -R $user:$user /home/$user/.ssh +chmod 600 /home/$user/.ssh/* +sudo sed -i 's/#ClientAliveInterval 0/ClientAliveInterval 1000/g' /etc/ssh/sshd_config +sudo sed -i 's/#ClientAliveCountMax 3/ClientAliveCountMax 100/g' /etc/ssh/sshd_config +sudo systemctl restart ssh " - [internal_key_name, user_data] -end - -function gcp_create_params(cluster::ManagerWorkers, user_data_base64) - params_manager = Dict( - "InstanceType" => cluster.instance_type_manager, - "ImageId" => cluster.image_id_manager, - "TagSpecification" => - Dict( - "ResourceType" => "instance", - "Tag" => [Dict("Key" => "cluster", "Value" => cluster.name), - Dict("Key" => "Name", "Value" => "manager") ] - ), - "UserData" => user_data_base64, - ) - params_workers = Dict( - "InstanceType" => cluster.instance_type_worker, - "ImageId" => cluster.image_id_worker, - "TagSpecification" => - Dict( - "ResourceType" => "instance", - "Tag" => [Dict("Key" => "cluster", "Value" => cluster.name), - Dict("Key" => "Name", "Value" => "worker") ] - ), - "UserData" => user_data_base64, - ) - - if !isnothing(cluster.subnet_id) - params_manager["SubnetId"] = cluster.subnet_id - params_workers["SubnetId"] = cluster.subnet_id - end - - if !isnothing(cluster.placement_group) - params_manager["Placement"] = Dict("GroupName" => cluster.placement_group) - params_workers["Placement"] = Dict("GroupName" => cluster.placement_group) - end + return user_data +end - if !isnothing(cluster.security_group_id) - params_manager["SecurityGroupId"] = [cluster.security_group_id] - params_workers["SecurityGroupId"] = [cluster.security_group_id] +function gcp_create_params(cluster::ManagerWorkers, cluster_nodes, internal_key_name, user_data, private_key, public_key) + + user_data_manager, user_data_worker = user_data + + ssh_path = joinpath(homedir(), ".ssh") + pubpath = joinpath(ssh_path, "$internal_key_name.key.pub") + + params_manager = Vector{Dict}() + + user_manager = cluster.user_manager + user_worker = cluster.user_worker + + push!(params_manager, Dict( + "disks" => [Dict( + "autoDelete" => true, + "boot" => true, + "initializeParams" => Dict( + "diskSizeGb" => 50, + "sourceImage" => "projects/$(cluster.image_id_manager)" + ), + "mode" => "READ_WRITE", + "type" => "PERSISTENT" + )], + "zone" => cluster.zone, + "machineType" => "zones/$(cluster.zone)/machineTypes/$(cluster.instance_type_manager)", + "name" => cluster_nodes[:manager], + "networkInterfaces" => [Dict( + "accessConfigs" => [Dict( + "name" => "external-nat", + "type" => "ONE_TO_ONE_NAT" + )], + "network" => "https://www.googleapis.com/compute/v1/projects/$(cluster.project)/global/networks/default" + )], + "metadata" => + "items" => [Dict( + "key" => "startup-script", + "value" => user_data_manager + ), + Dict( + "key" => "ssh-keys", + "value" => "$user_manager:$pubpath" + )] + )) + + params_workers = Vector{Dict}() + + for i = 1:cluster.count + push!(params_workers, Dict( + "disks" => [Dict( + "autoDelete" => true, + "boot" => true, + "initializeParams" => Dict( + "diskSizeGb" => 50, + "sourceImage" => "projects/$(cluster.image_id_worker)" + ), + "mode" => "READ_WRITE", + "type" => "PERSISTENT" + )], + "zone" => cluster.zone, + "machineType" => "zones/$(cluster.zone)/machineTypes/$(cluster.instance_type_worker)", + "name" => cluster_nodes[Symbol("worker$i")], + "networkInterfaces" => [Dict( + "accessConfigs" => [Dict( + "name" => "external-nat", + "type" => "ONE_TO_ONE_NAT" + )], + "network" => "https://www.googleapis.com/compute/v1/projects/$(cluster.project)/global/networks/default" + )], + "metadata" => + "items" => [Dict( + "key" => "startup-script", + "value" => user_data_worker + ), + Dict( + "key" => "ssh-keys", + "value" => "$user_worker:$pubpath" + )] + )) end - params_manager, params_workers + return params_manager, params_workers end -function gcp_create_params(cluster::PeerWorkers, user_data_base64) - params = Dict( - "InstanceType" => cluster.instance_type, - "ImageId" => cluster.image_id, - "TagSpecification" => - Dict( - "ResourceType" => "instance", - "Tag" => [Dict("Key" => "cluster", "Value" => cluster.name), - Dict("Key" => "Name", "Value" => "peer") ] - ), - "UserData" => user_data_base64, - ) +function gcp_create_params(cluster::PeerWorkers, cluster_nodes, internal_key_name, user_data, private_key, public_key) - if !isnothing(cluster.subnet_id) - params["SubnetId"] = cluster.subnet_id - end + ssh_path = joinpath(homedir(), ".ssh") + pubpath = joinpath(ssh_path, "$internal_key_name.key.pub") - if !isnothing(cluster.placement_group) - params["Placement"] = Dict("GroupName" => cluster.placement_group) - end + user = cluster.user - if !isnothing(cluster.security_group_id) - params["SecurityGroupId"] = [cluster.security_group_id] + params = Vector{Dict}() + + for instance in values(cluster_nodes) + push!(params, Dict( + "disks" => [Dict( + "autoDelete" => true, + "boot" => true, + "initializeParams" => Dict( + "diskSizeGb" => 50, + "sourceImage" => "projects/$(cluster.image_id)" + ), + "mode" => "READ_WRITE", + "type" => "PERSISTENT" + )], + "zone" => cluster.zone, + "machineType" => "zones/$(cluster.zone)/machineTypes/$(cluster.instance_type)", + "name" => instance, + "networkInterfaces" => [Dict( + "accessConfigs" => [Dict( + "name" => "external-nat", + "type" => "ONE_TO_ONE_NAT" + )], + "network" => "https://www.googleapis.com/compute/v1/projects/$(cluster.project)/global/networks/default" + )], + "metadata" => + "items" => [Dict( + "key" => "startup-script", + "value" => user_data + ), + Dict( + "key" => "ssh-keys", + "value" => "$user:$pubpath" + )] + )) end - params + return params end function gcp_remove_temp_files(internal_key_name) - run(`rm /tmp/$internal_key_name`) - run(`rm /tmp/$internal_key_name.pub`) + ssh_path = joinpath(homedir(), ".ssh") + keypath = joinpath(ssh_path, "$internal_key_name.key") + pubpath = joinpath(ssh_path, "$internal_key_name.key.pub") + rm(keypath) + rm(pubpath) end +function gcp_set_hostfile(cluster::Cluster, cluster_nodes, internal_key_name, user) + gcp_set_hostfile(cluster, cluster_nodes, internal_key_name, user, user) +end -function gcp_set_hostfile(cluster_nodes, internal_key_name) +function gcp_set_hostfile(cluster::Cluster, cluster_nodes, internal_key_name, user_manager, user_worker) + # Testando se a conexão SSH está ativa. - for instance in keys(cluster_nodes) - public_ip = Ec2.describe_instances(Dict("InstanceId" => cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["ipAddress"] + for (name, instance) in cluster_nodes + public_ip = gcp_get_ips_instance(cluster, instance)[:public_ip] connection_ok = false + print("Waiting for $name to become accessible .") while !connection_ok try connect(public_ip, 22) connection_ok = true catch e - println("Waiting for $instance to be accessible...") + print(".") end end + println("ok") end # Criando o arquivo hostfile. hostfile_content = "127.0.0.1 localhost\n" hostfilefile_content = "" - for instance in keys(cluster_nodes) - private_ip = Ec2.describe_instances(Dict("InstanceId" => cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["privateIpAddress"] + for (name, instance) in cluster_nodes + private_ip = gcp_get_ips_instance(cluster, instance)[:private_ip] hostfile_content *= "$private_ip $instance\n" - if instance != :manager + if name != :manager hostfilefile_content *= "$instance\n" end end - #=h = Threads.@spawn begin - public_ip = Ec2.describe_instances(Dict("InstanceId" => cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["ipAddress"] - for instance in keys(cluster_nodes) - for instance_other in keys(cluster_nodes) - @info "--- $instance -> $instance_other" - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "ssh $instance_other uptime"`) - end - end - end=# + ssh_path = joinpath(homedir(), ".ssh") + keypath = joinpath(ssh_path, "$internal_key_name.key") + + home = ENV["HOME"] # Atualiza o hostname e o hostfile. - for instance in keys(cluster_nodes) - public_ip = Ec2.describe_instances(Dict("InstanceId" => cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["ipAddress"] + for (name, instance) in cluster_nodes + user = name == :manager ? user_manager : user_worker + public_ip = gcp_get_ips_instance(cluster, instance)[:public_ip] + run(`ssh-keygen -f $home/.ssh/known_hosts -R $public_ip`) # private_ip = Ec2.describe_instances(Dict("InstanceId" => cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["privateIpAddress"] - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "sudo hostnamectl set-hostname $instance"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "echo '$hostfilefile_content' > /home/ubuntu/hostfile"`) -# try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "awk '{ print \$2 \" \" \$1 }' hostfile >> hosts.tmp"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "echo '$hostfile_content' >> hosts.tmp"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "sudo chown ubuntu:ubuntu /etc/hosts"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "cat hosts.tmp > /etc/hosts"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "sudo chown root:root /etc/hosts"`) - try_run(`ssh -i /tmp/$internal_key_name -o StrictHostKeyChecking=no ubuntu@$public_ip "rm hosts.tmp"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "sudo hostnamectl set-hostname $instance"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "echo '$hostfilefile_content' > /home/$user/hostfile"`) +# try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "awk '{ print \$2 \" \" \$1 }' hostfile >> hosts.tmp"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "echo '$hostfile_content' >> hosts.tmp"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "sudo chown $user:$user /etc/hosts"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "cat hosts.tmp > /etc/hosts"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "sudo chown root:root /etc/hosts"`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip "rm hosts.tmp"`) end - - #wait(h) - end @@ -325,98 +356,92 @@ Cria as instâncias. function gcp_create_instances(cluster::ManagerWorkers) + + new_cluster = cluster + cluster_nodes = Dict() + cluster_nodes[:manager] = lowercase(new_cluster.name) * string(1) + for i in 1:new_cluster.count + cluster_nodes[Symbol("worker$i")] = lowercase(new_cluster.name) * string(i + 1) + end + # Configurando a conexão SSH. - internal_key_name, user_data = gcp_set_up_ssh_connection(cluster.name) + + private_key, public_key = gcp_set_up_ssh_connection(cluster.name, cluster.user_manager) + + user_data_manager = gcp_get_user_data(cluster.name, cluster.user_manager, private_key, public_key) + user_data_worker = gcp_get_user_data(cluster.name, cluster.user_worker, private_key, public_key) + + internal_key_name = cluster.name - user_data_base64 = base64encode(user_data) + try gcp_allow_ssh(cluster.project) catch end # Criando as instâncias - params_manager, params_workers = gcp_create_params(cluster, user_data_base64) + params_manager, params_workers = gcp_create_params(cluster, cluster_nodes, internal_key_name, (user_data_manager, user_data_worker), private_key, public_key) + # Criar o headnode - instance_headnode = Ec2.run_instances(1, 1, params_manager) - cluster_nodes[:manager] = instance_headnode["instancesSet"]["item"]["instanceId"] + gcp_compute_instance_insert(new_cluster, params_manager) # Criar os worker nodes. - params_workers["InstanceType"] = cluster.instance_type_worker - params_workers["TagSpecification"]["Tag"][2]["Value"] = "worker" - count = cluster.count - instances_workers = Ec2.run_instances(count, count, params_workers) - workers = count - for i in 1:count - instance = "" - if count > 1 - instance = instances_workers["instancesSet"]["item"][i] - elseif count == 1 - instance = instances_workers["instancesSet"]["item"] - end - instance_id = instance["instanceId"] - cluster_nodes[Symbol("worker$i")] = instance_id - end + gcp_compute_instance_insert(new_cluster, params_workers) - gcp_await_status(cluster_nodes, "running") - gcp_await_check(cluster_nodes, "ok") + gcp_await_status(new_cluster, cluster_nodes, "RUNNING") - gcp_set_hostfile(cluster_nodes, internal_key_name) + gcp_set_hostfile(new_cluster, cluster_nodes, internal_key_name, cluster.user_manager, cluster.user_worker) #gcp_remove_temp_files(internal_key_name) - cluster_nodes + return cluster_nodes end function gcp_create_instances(cluster::PeerWorkers) + + new_cluster = cluster + cluster_nodes = Dict() + for i = 1:new_cluster.count + cluster_nodes[Symbol("peer$i")] = lowercase(new_cluster.name) * string(i) + end # Configurando a conexão SSH. - internal_key_name, user_data = gcp_set_up_ssh_connection(cluster.name) + private_key, public_key = gcp_set_up_ssh_connection(cluster.name, cluster.user) + user_data = gcp_get_user_data(cluster.name, cluster.user, private_key, public_key) + + internal_key_name = cluster.name - user_data_base64 = base64encode(user_data) + try gcp_allow_ssh(cluster.project) catch end # Criando as instâncias - params = gcp_create_params(cluster, user_data_base64) + params = gcp_create_params(new_cluster, cluster_nodes, internal_key_name, user_data, private_key, public_key) # Criar os Peers. - count = cluster.count - instances_peers = Ec2.run_instances(count, count, params) - for i in 1:count - instance = "" - if count > 1 - instance = instances_peers["instancesSet"]["item"][i] - elseif count == 1 - instance = instances_peers["instancesSet"]["item"] - end - instance_id = instance["instanceId"] - cluster_nodes[Symbol("peer$i")] = instance_id - end + gcp_compute_instance_insert(new_cluster, params) - gcp_await_status(cluster_nodes, "running") - gcp_await_check(cluster_nodes, "ok") + gcp_await_status(new_cluster, cluster_nodes, "RUNNING") - gcp_set_hostfile(cluster_nodes, internal_key_name) + gcp_set_hostfile(new_cluster, cluster_nodes, internal_key_name, cluster.user) - # gcp_remove_temp_files(internal_key_name) + #gcp_remove_temp_files(internal_key_name) - cluster_nodes + return cluster_nodes end -function gcp_await_status(cluster_nodes, status) - for nodeid in keys(cluster_nodes) - print("Waiting for $nodeid to be $status ...") - while gcp_get_instance_status(cluster_nodes[nodeid]) != status - print(".") - sleep(2) - end - println("successfull") +function gcp_compute_instance_insert(cluster::Cluster, params) + vector_size = size(params, 1) + for i = 1:vector_size + GCPAPI.compute(:Instance, :insert, cluster.project, cluster.zone; data=params[i]) end end -function gcp_await_check(cluster_nodes, status) +function gcp_await_status(cluster::Cluster, cluster_nodes, status) for nodeid in keys(cluster_nodes) print("Waiting for $nodeid to be $status ...") - while gcp_get_instance_check(cluster_nodes[nodeid]) != status + current_status = gcp_get_instance_status(cluster, cluster_nodes[nodeid]) + while current_status != status print(".") sleep(2) + current_status = gcp_get_instance_status(cluster, cluster_nodes[nodeid]) end println("successfull") end @@ -425,8 +450,9 @@ end # PUBLIC function gcp_cluster_status(cluster::Cluster, status_list) cluster_nodes = cluster.cluster_nodes - for nodeid in keys(cluster_nodes) - !(gcp_get_instance_status(cluster_nodes[nodeid]) in status_list) && return false + for nodeid in values(cluster_nodes) + current_status = gcp_get_instance_status(cluster, nodeid) + !(current_status in status_list) && return false end return true end @@ -439,43 +465,20 @@ function gcp_cluster_ready(cluster::Cluster; status="ok") return true end -function gcp_delete_instances(cluster_nodes) - for id in values(cluster_nodes) - Ec2.terminate_instances(id) - end -end - -function gcp_get_instance_status(id) - try - description = Ec2.describe_instances(Dict("InstanceId" => id)) - if haskey(description["reservationSet"], "item") - description["reservationSet"]["item"]["instancesSet"]["item"]["instanceState"]["name"] - else - "notfound" - end - catch _ - "notfound" +function gcp_delete_instances(cluster::Cluster) + for id in values(cluster.cluster_nodes) + GCPAPI.compute(:Instance, :delete, cluster.project, cluster.zone, id) end end -function gcp_get_instance_check(id) +function gcp_get_instance_status(cluster::Cluster, id) try - description = Ec2.describe_instance_status(Dict("InstanceId" => id)) - if haskey(description["instanceStatusSet"], "item") - description["instanceStatusSet"]["item"]["instanceStatus"]["status"] - else - "notfound" - end + return gcp_get_instance_dict(cluster, id)["status"] catch _ - "notfound" + return "notfound" end end -function gcp_get_instance_subnet(id) - description = Ec2.describe_instances(Dict("InstanceId" => id)) - description["reservationSet"]["item"]["instancesSet"]["item"]["subnetId"] -end - # PUBLIC gcp_can_interrupt(cluster::Cluster) = gcp_cluster_isrunning(cluster) @@ -483,40 +486,68 @@ gcp_can_interrupt(cluster::Cluster) = gcp_cluster_isrunning(cluster) # PUBLIC function gcp_interrupt_cluster(cluster::Cluster) gcp_stop_instances(cluster) - gcp_await_status(cluster.cluster_nodes, "stopped") + gcp_await_status(cluster, cluster.cluster_nodes, "TERMINATED") end # PUBLIC -gcp_can_resume(cluster::Cluster) = gcp_cluster_status(cluster, ["stopped"]) +gcp_can_resume(cluster::Cluster) = gcp_cluster_status(cluster, ["TERMINATED"]) # Start interrupted cluster instances or reboot running cluster instances. # All instances must be in "interrupted" or "running" state. # If some instance is not in "interrupted" or "running" state, raise an exception. # PUBLIC -function gcp_resume_cluster(cluster::Cluster) +function gcp_resume_cluster(cluster::ManagerWorkers) + home = ENV["HOME"] + ssh_path = joinpath(homedir(), ".ssh") + keypath = joinpath(ssh_path, "$(cluster.name).key") + user_manager = cluster.user_manager + user_worker = cluster.user_worker + gcp_start_instances(cluster) - gcp_await_status(cluster.cluster_nodes, "running") - gcp_await_check(cluster.cluster_nodes, "ok") - for instance in keys(cluster.cluster_nodes) - public_ip = Ec2.describe_instances(Dict("InstanceId" => cluster.cluster_nodes[instance]))["reservationSet"]["item"]["instancesSet"]["item"]["ipAddress"] - try_run(`ssh -i /tmp/$(cluster.name) -o StrictHostKeyChecking=no ubuntu@$public_ip uptime`) + gcp_await_status(cluster, cluster.cluster_nodes, "RUNNING") + + public_ip = gcp_get_ips_instance(cluster, cluster.cluster_nodes[:manager])[:public_ip] + + run(`ssh-keygen -f $home/.ssh/known_hosts -R $public_ip`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user_manager@$public_ip uptime`) + + for i in 1:cluster.count + instance = cluster.cluster_nodes[Symbol("worker$i")] + public_ip = gcp_get_ips_instance(cluster, instance)[:public_ip] + run(`ssh-keygen -f $home/.ssh/known_hosts -R $public_ip`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user_worker@$public_ip uptime`) end end +function gcp_resume_cluster(cluster::PeerWorkers) + home = ENV["HOME"] + ssh_path = joinpath(homedir(), ".ssh") + keypath = joinpath(ssh_path, "$(cluster.name).key") + user = cluster.user + + gcp_start_instances(cluster) + gcp_await_status(cluster, cluster.cluster_nodes, "RUNNING") + + for instance in values(cluster.cluster_nodes) + public_ip = gcp_get_ips_instance(cluster, instance)[:public_ip] + run(`ssh-keygen -f $home/.ssh/known_hosts -R $public_ip`) + try_run(`ssh -i $keypath -o StrictHostKeyChecking=no $user@$public_ip uptime`) + end +end # Check if the cluster instances are running or interrupted. -gcp_cluster_isrunning(cluster::Cluster) = gcp_cluster_status(cluster, ["running"]) && gcp_cluster_ready(cluster) -gcp_cluster_isstopped(cluster::Cluster) = gcp_cluster_status(cluster, ["stopped"]) +gcp_cluster_isrunning(cluster::Cluster) = gcp_cluster_status(cluster, ["RUNNING"]) #&& gcp_cluster_ready(cluster) +gcp_cluster_isstopped(cluster::Cluster) = gcp_cluster_status(cluster, ["TERMINATED"]) function gcp_stop_instances(cluster::Cluster) for id in values(cluster.cluster_nodes) - Ec2.stop_instances(id) + GCPAPI.compute(:Instance, :stop, cluster.project, cluster.zone, id) end end function gcp_start_instances(cluster::Cluster) for id in values(cluster.cluster_nodes) - Ec2.start_instances(id) + GCPAPI.compute(:Instance, :start, cluster.project, cluster.zone, id) end end @@ -524,7 +555,30 @@ end function gcp_get_ips(cluster::Cluster) ips = Dict() for (node, id) in cluster.cluster_nodes - ips[node] = gcp_get_ips_instance(id) + ips[node] = gcp_get_ips_instance(cluster, id) end ips +end + +function gcp_get_instance_dict(cluster::Cluster, name) + gcp_check_session() + return JSON.parse(String(GCPAPI.compute(:Instance, :get, cluster.project, cluster.zone, name))) +end + + +function gcp_allow_ssh(project) + firewall_rule = Dict( + "allowed" => [ + Dict("IPProtocol" => "tcp", + "ports" => ["22"])], + "direction" => "INGRESS", + "kind" => "compute#firewall", + "name" => "allow-ssh", + "network" => "projects/$project/global/networks/default", + "priority" => 1000, + "selfLink" => "projects/$project/global/firewalls/allow-ssh", + "sourceRanges" => ["0.0.0.0/0"] + ) + + GCPAPI.compute(:Firewall, :insert, project; data=firewall_rule) end \ No newline at end of file diff --git a/src/cluster_providers/gcp/gcp_deploy.jl b/src/cluster_providers/gcp/gcp_deploy.jl index 44e83b1..ea42b21 100644 --- a/src/cluster_providers/gcp/gcp_deploy.jl +++ b/src/cluster_providers/gcp/gcp_deploy.jl @@ -1,40 +1,144 @@ +gcp_cluster_info = Dict() + +#=function get_ips(gcptype::Type{GoogleCloud}, cluster_handle) + ips = Vector{Dict}() + cluster = gcp_cluster_info[cluster_handle] + try + for i in cluster.count + name = lowercase(String(cluster_handle)) * string(i) + push!(ips, gcp_get_ips_instance(cluster, name)) + end + catch err + terminate_cluster(gcptype, cluster_handle) + + throw(err) + end + + return ips +end=# + +get_ips(_::Type{GoogleCloud}, cluster_handle) = gcp_cluster_info[cluster_handle] |> gcp_get_ips + + # 1. creates a worker process in the manager node # 2. from the manager node, create worker processes in the compute nodes with MPIClusterManager -function deploy_cluster(type::Type{GoogleCloud}, mode::Type{LinkMode}, features) +function deploy_cluster(gcptype::Type{GoogleCloud}, + _::Type{<:ManagerWorkers}, + _::Type{<:CreateMode}, + cluster_handle, + cluster_features, + instance_type) + node_count = get(cluster_features, :node_count, 1) + + imageid_manager, imageid_worker = extract_mwfeature(cluster_features, GoogleCloud, :imageid) + user_manager, user_worker = extract_mwfeature(cluster_features, GoogleCloud, :user) + + #image_id_workers = get(cluster_features, :image_id, defaults_dict[GoogleCloud][:image_id]) + #image_id_manager = get(cluster_features, :image_id_manager, defaults_dict[GoogleCloud][:image_id_manager]) + zone = get(cluster_features, :zone, defaults_dict[GoogleCloud][:zone]) + project = defaults_dict[GoogleCloud][:project] + instance_type_manager = instance_type[1] + instance_type_worker = instance_type[2] + + cluster = GCPManagerWorkers(string(cluster_handle), + imageid_manager, + imageid_worker, + node_count, + instance_type_manager, + instance_type_worker, + user_manager, + user_worker, + zone, + project, + nothing, + cluster_features) + gcp_create_cluster(cluster) + + gcp_cluster_info[cluster_handle] = cluster + + gcp_cluster_save(cluster) + + return cluster end # 1. run the script to clusterize the nodes # 2. call deploy_cluster to link ... -function deploy_cluster(type::Type{GoogleCloud}, mode::Type{ClusterizeMode}, features) +function deploy_cluster(gcptype::Type{GoogleCloud}, + _::Type{<:PeerWorkers}, + _::Type{<:CreateMode}, + cluster_handle, + cluster_features, + instance_type) -end + node_count = get(cluster_features, :node_count, 1) + imageid = get(cluster_features, :imageid, defaults_dict[GoogleCloud][:imageid]) + user = get(cluster_features, :user, defaults_dict[GoogleCloud][:user]) + zone = get(cluster_features, :zone, defaults_dict[GoogleCloud][:zone]) + project = defaults_dict[GoogleCloud][:project] -# 1. create a set of GCP instances using the GCP API -# 2. run deploy_cluster to clusterize them and link to them -function deploy_cluster(type::Type{GoogleCloud}, mode::Type{CreateMode}, features) + cluster = GCPPeerWorkers(string(cluster_handle), + imageid, + node_count, + instance_type, + user, + zone, + project, + nothing, + cluster_features) + + gcp_create_cluster(cluster) + + gcp_cluster_info[cluster_handle] = cluster + + gcp_cluster_save(cluster) + return cluster end -function launch_processes(_::Type{GoogleCloud}, cluster_type, cluster_handle, ips, user_id) +function launch_processes(_::Type{GoogleCloud}, cluster_type, cluster_handle, ips) + cluster = gcp_cluster_info[cluster_handle] + + return launch_processes_ssh(cluster.features, cluster_type, ips) end #==== INTERRUPT CLUSTER ====# +can_interrupt(_::Type{GoogleCloud}, cluster_handle) = gcp_cluster_info[cluster_handle] |> gcp_can_interrupt + function interrupt_cluster(_::Type{GoogleCloud}, cluster_handle) - + cluster = gcp_cluster_info[cluster_handle] + gcp_interrupt_cluster(cluster) end #==== CONTINUE CLUSTER ====# -function resume_cluster(type::Type{GoogleCloud}, cluster_handle) - +can_resume(_::Type{GoogleCloud}, cluster_handle) = gcp_cluster_info[cluster_handle] |> gcp_can_resume + +function resume_cluster(_::Type{GoogleCloud}, cluster_handle) + cluster = gcp_cluster_info[cluster_handle] + gcp_resume_cluster(cluster) + return gcp_get_ips(cluster) end + #==== TERMINATE CLUSTER ====# function terminate_cluster(type::Type{GoogleCloud}, cluster_handle) - + cluster = gcp_cluster_info[cluster_handle] + gcp_terminate_cluster(cluster) + gcp_delete_cluster(cluster_handle) + delete!(gcp_cluster_info, cluster_handle) + return +end + +function cluster_isrunning(_::Type{GoogleCloud}, cluster_handle) + try + return gcp_cluster_info[cluster_handle] |> gcp_cluster_isrunning + catch e + @warn "Erro ao verificar o status do cluster: ", e + return false + end end \ No newline at end of file diff --git a/src/cluster_providers/gcp/gcp_persist.jl b/src/cluster_providers/gcp/gcp_persist.jl new file mode 100644 index 0000000..9eb2c2d --- /dev/null +++ b/src/cluster_providers/gcp/gcp_persist.jl @@ -0,0 +1,145 @@ + + +function gcp_cluster_save(cluster::ManagerWorkers) + + contents = Dict() + + contents["type"] = ManagerWorkers + contents["timestamp"] = string(now()) + contents["provider"] = GoogleCloud + + contents["name"] = cluster.name + contents["user_manager"] = cluster.user_manager + contents["user_worker"] = cluster.user_worker + contents["instance_type_manager"] = cluster.instance_type_manager + contents["instance_type_worker"] = cluster.instance_type_worker + contents["count"] = cluster.count + contents["image_id_manager"] = cluster.image_id_manager + contents["image_id_worker"] = cluster.image_id_worker + contents["cluster_nodes"] = cluster.cluster_nodes + contents["cluster_features"] = cluster.features + contents["zone"] = cluster.zone + contents["project"] = cluster.project + + configpath = get(ENV,"CLOUD_CLUSTERS_CONFIG", pwd()) + + open(joinpath(configpath, string(cluster.name, ".cluster")), "w") do io + TOML.print(io, contents) do x + x isa DataType && return string(x) + error("unhandled type $(typeof(x))") + end + end + +end + +function gcp_cluster_save(cluster::PeerWorkers) + + contents = Dict() + + contents["type"] = PeerWorkers + contents["timestamp"] = string(now()) + contents["provider"] = GoogleCloud + + contents["name"] = cluster.name + contents["user"] = cluster.user + contents["instance_type"] = cluster.instance_type + contents["count"] = cluster.count + contents["image_id"] = cluster.image_id + contents["zone"] = cluster.zone + contents["project"] = cluster.project + contents["cluster_nodes"] = cluster.cluster_nodes + contents["cluster_features"] = cluster.features + + configpath = get(ENV,"CLOUD_CLUSTERS_CONFIG", pwd()) + + open(joinpath(configpath, string(cluster.name, ".cluster")), "w") do io + TOML.print(io, contents) do x + x isa DataType && return string(x) + error("unhandled type $(typeof(x))") + end + end + +end + + +function cluster_load(_::Type{GoogleCloud}, _::Type{<:ManagerWorkers}, cluster_handle, contents) + + instance_type_manager = contents["instance_type_manager"] + instance_type_worker = contents["instance_type_worker"] + count = contents["count"] + image_id_manager = contents["image_id_manager"] + image_id_worker = contents["image_id_worker"] + user_manager = contents["user_manager"] + user_worker = contents["user_worker"] + zone = contents["zone"] + project = contents["project"] + + _cluster_nodes = contents["cluster_nodes"] + cluster_nodes = Dict() + for (node_name, instance_id) in _cluster_nodes + cluster_nodes[Symbol(node_name)] = instance_id + end + + cluster_features = contents["cluster_features"] |> gcp_adjusttypefeatures + + cluster = GCPManagerWorkers(string(cluster_handle), image_id_manager, image_id_worker, count, + instance_type_manager, instance_type_worker, user_manager, user_worker, + zone, project, cluster_nodes, cluster_features) + + if gcp_cluster_status(cluster, ["RUNNING", "TERMINATED"]) + gcp_cluster_info[cluster_handle] = cluster + return cluster.features + else + gcp_delete_cluster(cluster_handle) + return nothing + end +end + + +function gcp_adjusttypefeatures(_cluster_features) + cluster_features = Dict() + for (id, vl0) in _cluster_features + idsym = Symbol(id) + vl1 = idsym in [:cluster_type, :node_machinetype, :provider, :node_provider] ? fetchtype(vl0) : vl0 + vl2 = idsym in [:worker_features, :manager_features] ? gcp_adjusttypefeatures(vl1) : vl1 + cluster_features[idsym] = vl2 + end + return cluster_features +end + +function cluster_load(_::Type{GoogleCloud}, _::Type{<:PeerWorkers}, cluster_handle, contents) + + image_id = contents["image_id"] + count = contents["count"] + instance_type = contents["instance_type"] + user = contents["user"] + zone = contents["zone"] + project = contents["project"] + + _cluster_nodes = contents["cluster_nodes"] + cluster_nodes = Dict() + for (node_name, instance_id) in _cluster_nodes + cluster_nodes[Symbol(node_name)] = instance_id + end + + cluster_features = contents["cluster_features"] |> gcp_adjusttypefeatures + + cluster = GCPPeerWorkers(string(cluster_handle), image_id, count, instance_type, user, zone, project, + cluster_nodes, cluster_features) + + if gcp_cluster_status(cluster, ["RUNNING", "TERMINATED"]) + gcp_cluster_info[cluster_handle] = cluster + return cluster.features + else + gcp_delete_cluster(cluster_handle) + return nothing + end +end + +function gcp_delete_cluster(cluster_handle) + configpath = get(ENV,"CLOUD_CLUSTERS_CONFIG", pwd()) + rm(joinpath(configpath, "$cluster_handle.cluster")) + gcp_remove_temp_files(cluster_handle) +end + +