diff --git a/include/maidsafe/routing/routing_node.h b/include/maidsafe/routing/routing_node.h index fa690c58..bc9f1518 100644 --- a/include/maidsafe/routing/routing_node.h +++ b/include/maidsafe/routing/routing_node.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -64,36 +65,33 @@ class RoutingNode { RoutingNode(RoutingNode&&) = delete; RoutingNode& operator=(const RoutingNode&) = delete; RoutingNode& operator=(RoutingNode&&) = delete; - ~RoutingNode(); - // // will return with the data + // normal bootstrap mechanism template - GetReturn Get(Data::NameAndTypeId name_and_type_id, CompletionToken token); + BootstrapReturn Bootstrap(CompletionToken token); + // used where we wish to pass a specific node to bootstrap from + template + BootstrapReturn Bootstrap(Endpoint endpoint, CompletionToken&& token); + + // will return with the data + template + GetReturn Get(Data::NameAndTypeId name_and_type_id, CompletionToken&& token); // will return with allowed or not (error_code only) - template - PutReturn Put(Address to, DataType data, CompletionToken token); + template + PutReturn Put(std::shared_ptr data, CompletionToken&& token); // will return with allowed or not (error_code only) template - PostReturn Post(Address to, FunctorType functor, CompletionToken token); - - void AddBootstrapContact(crux::endpoint /*endpoint*/) { - // bootstrap_handler_.AddBootstrapContact(endpoint); - } - - void AddContact(asio::ip::udp::endpoint endpoint) { - crux_asio_service_.service().post( - [=]() { connection_manager_.AddNode(boost::none, EndpointPair(endpoint)); }); - } - - void StartAccepting(unsigned short port) { - crux_asio_service_.service().post([=]() { connection_manager_.StartAccepting(port); }); - } + PostReturn Post(Address to, FunctorType functor, CompletionToken&& token); - void Shutdown() { - crux_asio_service_.service().post([=]() { connection_manager_.Shutdown(); }); + void AddBootstrapContact(Contact bootstrap_contact) { + bootstrap_handler_.AddBootstrapContacts(std::vector(1, bootstrap_contact)); + // FIXME bootstrap handler may be need to be updated to take one entry always } private: + // tries to bootstrap to network + void StartBootstrap(); + void HandleMessage(Connect connect, MessageHeader original_header); // like connect but add targets endpoint void HandleMessage(ConnectResponse connect_response); @@ -116,100 +114,133 @@ class RoutingNode { // each member of a group needs to send this to the network Address (recieveing needs a Quorum) // filling in public key again. void HandleMessage(routing::Post post, MessageHeader original_header); + + template + void SendSwarmOrParallel(const DestinationAddress& destination, const SourceAddress& source, + const MessageType& message, Authority authority); + void SendSwarmOrParallel(const Address& destination, const SerialisedMessage& serialised_message); + template + void SendToBootstrapNode(const DestinationAddress& destination, const SourceAddress& source, + const MessageType& message, Authority authority); + void SendToBootstrapNode(const SerialisedMessage& serialised_message); + void SendToNonRoutingNode(const Address& destination, + const SerialisedMessage& serialised_message); bool TryCache(MessageTypeTag tag, MessageHeader header, Address name); Authority OurAuthority(const Address& element, const MessageHeader& header) const; - virtual void MessageReceived(Address peer_id, SerialisedMessage serialised_message); - // virtual void ConnectionLost(Address peer) override final; + void MessageReceived(Address peer_id, SerialisedMessage serialised_message); + void ConnectionLost(boost::optional, Address peer); void OnCloseGroupChanged(CloseGroupDifference close_group_difference); SourceAddress OurSourceAddress() const; SourceAddress OurSourceAddress(GroupAddress) const; - // void OnBootstrap(asio::error_code, rudp::Contact, - // std::function); + void OnBootstrap(asio::error_code, Contact, std::function); + void PutOurPublicPmid(); - template - void SendDirect(Address, Message, SendHandler); - EndpointPair NextEndpointPair() { // TODO(dirvine) :23/01/2015 - return EndpointPair(); - } + EndpointPair NextEndpointPair(); // this innocuous looking call will bootstrap the node and also be used if we spot close group // nodes appering or vanishing so its pretty important. void ConnectToCloseGroup(); Address OurId() const { return Address(our_fob_.name()); } - private: using unique_identifier = std::pair; - BoostAsioService crux_asio_service_; AsioService asio_service_; passport::Pmid our_fob_; std::atomic message_id_; boost::optional
bootstrap_node_; - // This crashes for me (PeterJ) on linux. - // BootstrapHandler bootstrap_handler_; + boost::optional our_external_endpoint_; + BootstrapHandler bootstrap_handler_; ConnectionManager connection_manager_; LruCache filter_; Sentinel sentinel_; - LruCache cache_; - std::vector
connected_nodes_; + LruCache cache_; + std::shared_ptr destroy_indicator_; }; template RoutingNode::RoutingNode() - : crux_asio_service_(1), - asio_service_(4), - our_fob_(passport::Pmid(passport::Anpmid())), + : asio_service_(1), + our_fob_(passport::CreatePmidAndSigner().first), message_id_(RandomUint32()), bootstrap_node_(boost::none), - // bootstrap_handler_(), - connection_manager_(crux_asio_service_.service(), passport::PublicPmid(our_fob_)), + bootstrap_handler_(), + connection_manager_(asio_service_.service(), our_fob_.name(), + [=](Address address, SerialisedMessage msg) { + MessageReceived(std::move(address), std::move(msg)); + }, + [=](boost::optional diff, Address peer_id) { + ConnectionLost(std::move(diff), std::move(peer_id)); + }), filter_(std::chrono::minutes(20)), sentinel_([](Address) {}, [](GroupAddress) {}), cache_(std::chrono::minutes(60)), - connected_nodes_() { + destroy_indicator_(new boost::none_t) { + LOG(kInfo) << OurId() << " RoutingNode created"; // store this to allow other nodes to get our ID on startup. IF they have full routing tables they // need Quorum number of these signed anyway. - cache_.Add(our_fob_.name(), Serialise(passport::PublicPmid(our_fob_))); - // try an connect to any local nodes (5483) Expect to be told Node_Id - auto temp_id(MakeIdentity()); - - connection_manager_.SetOnConnectionAdded( - [=](Address addr) { static_cast(this)->HandleConnectionAdded(addr); }); - - // PeterJ: Start listening on ports 5483 and 5433 (why two though?) - // rudp_.Add(rudp::Contact(temp_id, EndpointPair{rudp::Endpoint{GetLocalIp(), 5483}, - // rudp::Endpoint{GetLocalIp(), 5433}}, - // our_fob_.public_key()), - // [this, temp_id](asio::error_code error) { - // if (!error) { - // bootstrap_node_ = temp_id; - // ConnectToCloseGroup(); - // return; - // } - // }); - - // PeterJ: Read endpoints from database and connect to them. - // for (auto& node : bootstrap_handler_.ReadBootstrapContacts()) { - // rudp_.Add(node, [node, this](asio::error_code error) { - // if (!error) { - // bootstrap_node_ = node.id; - // ConnectToCloseGroup(); - // return; - // } - // }); - // if (bootstrap_node_) - // break; - // } + passport::PublicPmid our_public_pmid(our_fob_); + cache_.Add(our_public_pmid.NameAndType(), Serialise(our_public_pmid)); + StartBootstrap(); +} + +template +void RoutingNode::StartBootstrap() { + // try connect to any local nodes (5483) Expect to be told Node_Id + Endpoint live_port_ep(GetLocalIp(), kLivePort); + + // skip trying to bootstrap off self + if (connection_manager_.AcceptingPort() == kLivePort) { + return; + } + + connection_manager_.Connect(live_port_ep, + [=](asio::error_code error, Address peer_addr, Endpoint our_public_endpoint) { + if (error) { + LOG(kWarning) << "Cannot connect to bootstrap endpoint < " << peer_addr << " >" + << error.message(); + // TODO(Team): try connect to bootstrap contacts and other options + // (hardcoded endpoints).on failure keep retrying all options forever + return; + } + LOG(kInfo) << OurId() << " Bootstrapped with " << peer_addr << " his ep:" << live_port_ep; + // FIXME(Team): Thread safety. + bootstrap_node_ = peer_addr; + our_external_endpoint_ = our_public_endpoint; + // bootstrap_endpoint_ = our_endpoint; this will not required if + // connection manager has this connection +// PutOurPublicPmid(); // DISABLED FOR TESTING UNCOMMENT + ConnectToCloseGroup(); + }); + + // auto bootstrap_contacts = bootstrap_handler_.ReadBootstrapContacts(); } template -RoutingNode::~RoutingNode() { - crux_asio_service_.Stop(); +void RoutingNode::PutOurPublicPmid() { + assert(bootstrap_node_); + std::shared_ptr our_public_pmid{new passport::PublicPmid(our_fob_)}; + auto name = our_public_pmid->Name(); + auto type_id = our_public_pmid->TypeId(); + asio::post(asio_service_.service(), [=] { + // FIXME(Prakash) request should be signed and may be sent to ClientManager + PutData put_data_message(type_id, Serialise(our_public_pmid)); + SendToBootstrapNode(std::make_pair(Destination(name), boost::none), OurSourceAddress(), + put_data_message, Authority::client); + }); +} + +template +EndpointPair RoutingNode::NextEndpointPair() { + auto port = connection_manager_.AcceptingPort(); + + return EndpointPair(Endpoint(GetLocalIp(), port), + our_external_endpoint_ ? Endpoint(our_external_endpoint_->address(), port) + : Endpoint()); } template template GetReturn RoutingNode::Get(Data::NameAndTypeId name_and_type_id, - CompletionToken token) { + CompletionToken&& token) { GetHandler handler(std::forward(token)); asio::async_result result(handler); asio::post(asio_service_.service(), [=] { @@ -218,31 +249,40 @@ GetReturn RoutingNode::Get(Data::NameAndTypeId name_and_ GetData request(name_and_type_id, OurSourceAddress()); auto message(Serialise(our_header, MessageToTag::value(), request)); for (const auto& target : connection_manager_.GetTarget(name_and_type_id.name)) { - connection_manager_.FindPeer(target)->Send(message, [](asio::error_code) {}); + connection_manager_.Send(target.id, message, [](asio::error_code) {}); } }); return result.get(); } + // As this is a routing_node this should be renamed to PutPublicPmid one time // and possibly it should be a single type it deals with rather than Put as this call is // special // amongst all node types and is the only unauthorised Put anywhere // nodes have no reason to Put anywhere else template -template -PutReturn RoutingNode::Put(Address to, DataType data, - CompletionToken token) { +template +PutReturn RoutingNode::Put(std::shared_ptr data, + CompletionToken&& token) { PutHandler handler(std::forward(token)); asio::async_result result(handler); - asio::post(asio_service_.service(), [=] { - MessageHeader our_header(std::make_pair(Destination(to), boost::none), OurSourceAddress(), - ++message_id_, Authority::client); - PutData request(DataType::Tag::kValue, data.serialise()); + + asio::post(asio_service_.service(), [=]() mutable { + MessageHeader our_header( + std::make_pair(Destination(OurId()), boost::none), // send to ClientMgr + OurSourceAddress(), ++message_id_, Authority::client); + PutData request(data->TypeId(), Serialise(data)); // FIXME(dirvine) For client in real put this needs signed :08/02/2015 - // fixme data should serialise properly and not require the above call to serialse() auto message(Serialise(our_header, MessageToTag::value(), request)); - for (const auto& target : connection_manager_.GetTarget(to)) { - connection_manager_.FindPeer(target)->Send(message, [](asio::error_code) {}); + auto targets(connection_manager_.GetTarget(OurId())); + for (const auto& target : targets) { + connection_manager_.Send(target.id, message, [](asio::error_code) {}); + } + if (targets.empty() && bootstrap_node_) { + connection_manager_.Send(*bootstrap_node_, message, + [handler](std::error_code ec) mutable { handler(ec); }); + } else { + handler(make_error_code(RoutingErrors::not_connected)); } }); return result.get(); @@ -251,7 +291,7 @@ PutReturn RoutingNode::Put(Address to, DataType data, template template PostReturn RoutingNode::Post(Address to, FunctorType functor, - CompletionToken token) { + CompletionToken&& token) { PostHandler handler(std::forward(token)); asio::async_result result(handler); asio::post(asio_service_.service(), [=] { @@ -262,46 +302,31 @@ PostReturn RoutingNode::Post(Address to, FunctorType fun auto message(Serialise(our_header, MessageToTag::value(), request)); for (const auto& target : connection_manager_.GetTarget(to)) { - // FIXME(PeterJ) Call the above handler when all send handlers finish. - connection_manager_.FindPeer(target)->Send(message, [](asio::error_code) {}); + connection_manager_.Send(target.id, message, [](asio::error_code) {}); } + handler(); }); return result.get(); } template void RoutingNode::ConnectToCloseGroup() { - FindGroup message(NodeAddress(OurId()), OurId()); - MessageHeader header(DestinationAddress(std::make_pair(Destination(OurId()), boost::none)), - SourceAddress{OurSourceAddress()}, ++message_id_, Authority::node); - if (bootstrap_node_) { - auto peer = connection_manager_.FindPeer(*bootstrap_node_); - peer->Send(Serialise(header, MessageToTag::value(), message), - [](asio::error_code error) { - if (error) { - LOG(kWarning) << "rudp cannot send via bootstrap node" << error.message(); - } - }); - return; - } - for (const auto& target : connection_manager_.GetTarget(OurId())) { - auto peer = connection_manager_.FindPeer(target); - peer->Send(Serialise(header, MessageToTag::value(), message), - [](asio::error_code error) { - if (error) { - LOG(kWarning) << "rudp cannot send" << error.message(); - } - }); + FindGroup find_group_message(NodeAddress(OurId()), OurId()); + if (bootstrap_node_) { // TODO(Team) cleanup + SendToBootstrapNode(std::make_pair(Destination(OurId()), boost::none), OurSourceAddress(), + find_group_message, Authority::node); + } else { + SendSwarmOrParallel(std::make_pair(Destination(OurId()), boost::none), OurSourceAddress(), + find_group_message, Authority::node); } } + template -void RoutingNode::MessageReceived(Address /* peer_id */, - SerialisedMessage serialised_message) { +void RoutingNode::MessageReceived(Address peer_id, SerialisedMessage serialised_message) { InputVectorStream binary_input_stream{serialised_message}; MessageHeader header; MessageTypeTag tag; - Identity name; try { Parse(binary_input_stream, header, tag); } catch (const std::exception&) { @@ -314,16 +339,23 @@ void RoutingNode::MessageReceived(Address /* peer_id */, // add to filter as soon as posible filter_.Add({header.FilterValue()}); + LOG(kVerbose) << OurId() + << " Msg from peer " << peer_id + << " tag:" << static_cast::type>(tag) + << " " << header; + // We add these to cache if (tag == MessageTypeTag::GetDataResponse) { auto data = Parse(binary_input_stream); - if (data.data()) - cache_.Add(data.name_and_type_id().name, *data.data()); + if (data.data()) { + std::shared_ptr parsed(Parse>(*data.data())); + cache_.Add(parsed->NameAndType(), *data.data()); + } } // if we can satisfy request from cache we do if (tag == MessageTypeTag::GetData) { auto data = Parse(binary_input_stream); - auto test = cache_.Get(data.name_and_type_id().name); + auto test = cache_.Get(data.name_and_type_id()); // FIXME(dirvine) move to upper lauer :09/02/2015 // if (test) { // GetDataResponse response(data.name(), test); @@ -341,27 +373,71 @@ void RoutingNode::MessageReceived(Address /* peer_id */, } // send to next node(s) even our close group (swarm mode) - for (const auto& target : connection_manager_.GetTarget(header.Destination().first)) { - PeerNode* peer = connection_manager_.FindPeer(target); - peer->Send(serialised_message, [](asio::error_code error) { - if (error) { - LOG(kWarning) << "cannot send" << error.message(); - } - }); + SendSwarmOrParallel(header.Destination().first, serialised_message); + + // TODO(Prakash) cleanup aim to abstract relay logic here and may be use term routed message for + // response messages + + bool relay_request = (header.Source().reply_to_address && + (header.Source().node_address.data == OurId()) && + (header.Destination().first.data != OurId())); + if (relay_request) { // relay request + LOG(kVerbose) << "relay request already fwded"; +// return; // already fwded // Can not return here. THis could be a group message and I might be group member } - // FIXME(dirvine) We need new rudp for this :26/01/2015 - if (header.RelayedMessage() && - std::any_of( - std::begin(connected_nodes_), std::end(connected_nodes_), - [&header](const Address& node) { return node == header.ReplyToAddress()->data; })) { + + bool relay_response = (header.Destination().second && + (header.Destination().first.data == OurId())); + + if (relay_response) { // relay response + //LOG(kVerbose) << OurId() << " relay response try to send to nrt " << (*header.ReplyToAddress()).data; + std::set
connected_non_routing_nodes{connection_manager_.GetNonRoutingNodes()}; + if (std::any_of(std::begin(connected_non_routing_nodes), std::end(connected_non_routing_nodes), + [&header](const Address& node) { return node == header.ReplyToAddress()->data; + })) { // send message to connected node - return; + connection_manager_.SendToNonRoutingNode(*header.ReplyToAddress(), serialised_message, + [](asio::error_code /*error*/) {}); + } + return; // no point progressing further as I was destination and I don't have the replyto node connected } - if (!connection_manager_.AddressInCloseGroupRange(header.Destination().first)) +// if (header.RelayedMessage() && (hjeader.FromNode().data != OurId())) { // skip outgoing msgs +// std::set
connected_non_routing_nodes{connection_manager_.GetNonRoutingNodes()}; +// if (std::any_of( +// std::begin(connected_non_routing_nodes), std::end(connected_non_routing_nodes), +// [&header](const Address& node) { return node == (*header.ReplyToAddress()).data; })) { +// // send message to connected node +// connection_manager_.SendToNonRoutingNode(*header.ReplyToAddress(), serialised_message, +// [](asio::error_code /*error*/) {}); +// return; +// } +// } + + if (!connection_manager_.AddressInCloseGroupRange(header.Destination().first)) { + LOG(kVerbose) << "not for us"; return; // not for us + } + + // Drop message before Sentinel check if it is a direct message type (Connect, ConnectResponse) + // and this node is in the group but the message destination is another group member node. + + if ((tag == MessageTypeTag::Connect) || (tag == MessageTypeTag::ConnectResponse)) { + if (header.Destination().first.data != OurId()) { // not for me + if ((!header.Destination().second.is_initialized())) + return; + if ((header.Destination().second) && (*header.Destination().second).data != OurId()) { + LOG(kVerbose) << "not for me"; + return; + } + } + } // FIXME(dirvine) Sentinel check here!! :19/01/2015 + // auto result = sentinel_.Add(header, tag, serialised_message); + // if (!result) + // return; + switch (tag) { case MessageTypeTag::Connect: HandleMessage(Parse(binary_input_stream), std::move(header)); @@ -416,112 +492,141 @@ Authority RoutingNode::OurAuthority(const Address& element, BOOST_THROW_EXCEPTION(MakeError(CommonErrors::invalid_argument)); } -// TODO(PeterJ): -// template -// void RoutingNode::ConnectionLost(Address peer) { -// connection_manager_.LostNetworkConnection(peer); -// } +template +void RoutingNode::ConnectionLost(boost::optional diff, Address) { + // auto change = connection_manager_.LostNetworkConnection(peer); + if (diff) + static_cast(this)->HandleChurn(*diff); +} // reply with our details; template void RoutingNode::HandleMessage(Connect connect, MessageHeader original_header) { - if (!connection_manager_.IsManaged(connect.requester_id())) + LOG(kInfo) << OurId() << " HandleMessage " << connect; + if (!connection_manager_.SuggestNodeToAdd(connect.requester_id())) return; - auto targets(connection_manager_.GetTarget(connect.requester_id())); + ConnectResponse respond(connect.requester_endpoints(), NextEndpointPair(), connect.requester_id(), OurId(), passport::PublicPmid(our_fob_)); + assert(connect.receiver_id() == OurId()); MessageHeader header(DestinationAddress(original_header.ReturnDestinationAddress()), SourceAddress(OurSourceAddress()), original_header.MessageId(), Authority::node, asymm::Sign(asymm::PlainText(Serialise(respond)), our_fob_.private_key())); - // FIXME(dirvine) Do we need to pass a shared_from_this type object or this may segfault on - // shutdown - // :24/01/2015 - for (auto& target : targets) { - connection_manager_.FindPeer(target) - ->Send(Serialise(header, MessageToTag::value(), respond), - [connect, this](asio::error_code error_code) { - if (error_code) - return; - }); - } + auto message = Serialise(header, MessageToTag::value(), respond); - connection_manager_.AddNode(NodeInfo(connect.requester_id(), connect.requester_fob(), true), - connect.requester_endpoints()); + if (bootstrap_node_) { // TODO cleanup + SendToBootstrapNode(message); + return; + } - // if (added) - // static_cast(this)->HandleChurn(*added); + SendSwarmOrParallel(connect.requester_id(), message); + if (original_header.ReplyToAddress()) + SendToNonRoutingNode((*original_header.ReplyToAddress()).data, message); + + std::weak_ptr destroy_guard = destroy_indicator_; + + connection_manager_.AddNodeAccept( + NodeInfo(connect.requester_id(), connect.requester_fob(), true), + connect.requester_endpoints(), + [=](asio::error_code error, boost::optional added) { + LOG(kError) << " AddNodeAccept "; + if (!destroy_guard.lock()) + return; + if (!error && added) + static_cast(this)->HandleChurn(*added); + }); } template void RoutingNode::HandleMessage(ConnectResponse connect_response) { - if (!connection_manager_.IsManaged(connect_response.requester_id())) + LOG(kInfo) << OurId() << " HandleMessage " << connect_response; + if (!connection_manager_.SuggestNodeToAdd(connect_response.receiver_id())) return; + std::weak_ptr destroy_guard = destroy_indicator_; + + // Workaround because ConnectResponse isn't copyconstructibe. + auto response_ptr = std::make_shared(std::move(connect_response)); + LOG(kError) << OurId() << " calling AddNode " << response_ptr->receiver_id() << " " << response_ptr->receiver_endpoints(); connection_manager_.AddNode( - NodeInfo(connect_response.requester_id(), connect_response.receiver_fob(), true), - connect_response.receiver_endpoints()); - - // auto target = connect_response.requester_id(); - // TODO(PeterJ): - // rudp_.Add( - // rudp::Contact(connect_response.receiver_id(), connect_response.receiver_endpoints(), - // connect_response.receiver_fob().public_key()), - // [target, added, this](asio::error_code error) { - // if (error) { - // this->connection_manager_.DropNode(target); - // return; - // } - // if (added) - // static_cast(this)->HandleChurn(*added); - // if (connection_manager_.Size() >= QuorumSize) { - // rudp_.Remove(*bootstrap_node_, asio::use_future).get(); - // bootstrap_node_ = boost::none; - // } - // }); + NodeInfo(response_ptr->receiver_id(), response_ptr->receiver_fob(), false), + response_ptr->receiver_endpoints(), + [=](asio::error_code error, boost::optional added) { + if (!destroy_guard.lock()) + return; + + auto target = response_ptr->requester_id(); + if (!error && added) + static_cast(this)->HandleChurn(*added); + if (connection_manager_.Size() >= QuorumSize) { + // rudp_.Remove(*bootstrap_node_, asio::use_future).get(); // FIXME (Prakash) + bootstrap_node_ = boost::none; + } + }); } + template void RoutingNode::HandleMessage(FindGroup find_group, MessageHeader original_header) { - auto group = connection_manager_.OurCloseGroup(); + LOG(kInfo) << OurId() << " HandleMessage " << find_group; + auto close_group = connection_manager_.OurCloseGroup(); // add ourselves - group.push_back(passport::PublicPmid(our_fob_)); + std::vector group; + group.reserve(close_group.size() + 1); + for (auto& node_info : close_group) { + group.push_back(std::move(node_info.dht_fob)); + } + group.emplace_back(passport::PublicPmid(our_fob_)); FindGroupResponse response(find_group.target_id(), std::move(group)); MessageHeader header(DestinationAddress(original_header.ReturnDestinationAddress()), SourceAddress(OurSourceAddress(GroupAddress(find_group.target_id()))), original_header.MessageId(), Authority::nae_manager, asymm::Sign(asymm::PlainText(Serialise(response)), our_fob_.private_key())); auto message(Serialise(header, MessageToTag::value(), response)); - for (const auto& node : connection_manager_.GetTarget(original_header.FromNode())) { - connection_manager_.FindPeer(node)->Send(message, [](asio::error_code) {}); + if (bootstrap_node_) { + SendToBootstrapNode(message); + } else { + SendSwarmOrParallel(original_header.FromNode(), message); } + // if node in my group && in non routing list send it to non_routnig list as well + if (original_header.ReplyToAddress()) + SendToNonRoutingNode((*original_header.ReplyToAddress()).data, message); } template -void RoutingNode::HandleMessage(FindGroupResponse find_group_reponse, +void RoutingNode::HandleMessage(FindGroupResponse find_group_response, MessageHeader /* original_header */) { // this is called to get our group on bootstrap, we will try and connect to each of these nodes // Only other reason is to allow the sentinel to check signatures and those calls will just fall // through here. - for (const auto node_pmid : find_group_reponse.group()) { + LOG(kInfo) << OurId() << " HandleMessage " << find_group_response; + for (const auto node_pmid : find_group_response.group()) { Address node_id(node_pmid.Name()); - if (!connection_manager_.IsManaged(node_id)) + //DELETE ME DEBUG CODE + if ((find_group_response.group().size() == 2) &&(node_id == *bootstrap_node_)) { + LOG(kWarning) << "skipping bootstrap guy to connect"; + continue; + } + if (!connection_manager_.SuggestNodeToAdd(node_id)) continue; - Connect message(NextEndpointPair(), OurId(), node_id, passport::PublicPmid(our_fob_)); - MessageHeader header(DestinationAddress(std::make_pair(Destination(node_id), boost::none)), - SourceAddress{OurSourceAddress()}, ++message_id_, Authority::nae_manager); - for (const auto& target : connection_manager_.GetTarget(node_id)) - connection_manager_.FindPeer(target)->Send( - Serialise(header, MessageToTag::value(), message), [](asio::error_code) {}); + Connect connect_message(NextEndpointPair(), OurId(), node_id, passport::PublicPmid(our_fob_)); + if (bootstrap_node_) { // TODO(Team) cleanup + SendToBootstrapNode(std::make_pair(Destination(node_id), boost::none), OurSourceAddress(), + connect_message, Authority::nae_manager); + } else { + SendSwarmOrParallel(std::make_pair(Destination(node_id), boost::none), OurSourceAddress(), + connect_message, Authority::nae_manager); + } } } template void RoutingNode::HandleMessage(GetData get_data, MessageHeader header) { auto result = static_cast(this)->HandleGet( - header.Source(), OurAuthority(get_data.name_and_type_id().name, header), - get_data.name_and_type_id()); + header.Source(), header.FromAuthority(), + OurAuthority(get_data.name_and_type_id().name, header), get_data.name_and_type_id()); if (!result) { // send back error return; @@ -534,7 +639,16 @@ void RoutingNode::HandleMessage(GetData get_data, MessageHeader header) { } template -void RoutingNode::HandleMessage(PutData /*put_data*/, MessageHeader /* original_header */) {} +void RoutingNode::HandleMessage(PutData put_data, MessageHeader original_header) { + std::shared_ptr parsed(Parse>(put_data.data())); + cache_.Add(parsed->NameAndType(), put_data.data()); + auto result = static_cast(this) + ->HandlePut(original_header.Source(), original_header.FromAuthority(), + OurAuthority(parsed->NameAndType().name, original_header), parsed); + if (result) { + // TODO(Fraser#5#): 2015-03-20 - Return error somehow. + } +} template void RoutingNode::HandleMessage(PutDataResponse /*put_data_response*/, @@ -557,24 +671,74 @@ SourceAddress RoutingNode::OurSourceAddress(GroupAddress group) const { return SourceAddress(NodeAddress(OurId()), group, boost::none); } -// template -// void RoutingNode::SendDirect(Address target, Message message, SendHandler handler) { -// MessageHeader header(DestinationAddress(std::make_pair(Destination(target), boost::none)), -// SourceAddress{OurSourceAddress()}, ++message_id_); -// -// rudp_.Send(target, Serialise(header, MessageToTag::value(), message), handler); -// } -// -// void RoutingNode::OnBootstrap(asio::error_code error, rudp::Contact contact, -// std::function handler) { -// if (error) { -// return handler(error, contact); -// } -// -// SendDirect(contact.id, FindGroup(OurId(), contact.id), -// [=](asio::error_code error) { handler(error, contact); }); -// } +template +template +void RoutingNode::SendSwarmOrParallel(const DestinationAddress& destination, + const SourceAddress& source, + const MessageType& message, Authority authority) { + MessageHeader header(destination, source, ++message_id_, authority); + for (const auto& target : connection_manager_.GetTarget(destination.first.data)) { + auto wrapped_message = Serialise(header, MessageToTag::value(), message); + connection_manager_.Send(target.id, std::move(wrapped_message), [](asio::error_code error) { + if (error) { + LOG(kWarning) << "Connection manager cannot send" << error.message(); + } + }); + } +} + +template +void RoutingNode::SendSwarmOrParallel(const Address& destination, + const SerialisedMessage& serialised_message) { + for (const auto& target : connection_manager_.GetTarget(destination)) { + connection_manager_.Send(target.id, serialised_message, [](asio::error_code error) { + if (error) { + LOG(kWarning) << "Connection manager cannot send" << error.message(); + } + }); + } +} + +template +template +void RoutingNode::SendToBootstrapNode(const DestinationAddress& destination, + const SourceAddress& source, + const MessageType& message, Authority authority) { + // assert(source.SourceAddress()== NodeAddress(*bootstrap_node_)); //FIXME(prakash) + MessageHeader header(destination, source, ++message_id_, authority); + auto wrapped_message = Serialise(header, MessageToTag::value(), message); + connection_manager_.Send( + *bootstrap_node_, std::move(wrapped_message), [](asio::error_code error) { + if (error) { + LOG(kWarning) << "Connection manager cannot send to bootstrap node, " << error.message(); + } + }); +} + +template +void RoutingNode::SendToBootstrapNode(const SerialisedMessage& serialised_message) { + auto destination = *bootstrap_node_; + connection_manager_.Send( + destination, serialised_message, [=](asio::error_code error) { + if (error) { + LOG(kWarning) << "Connection manager cannot send to bootstrap node " << destination + << " , error : " << error.message(); + } + }); +} + +template +void RoutingNode::SendToNonRoutingNode(const Address& destination, + const SerialisedMessage& serialised_message) { + connection_manager_.SendToNonRoutingNode(destination, serialised_message, + [=](asio::error_code error) { + if (error) { + LOG(kWarning) << "Connection manager cannot send to destination " << destination + << " , error : " << error.message(); + } + }); +} } // namespace routing diff --git a/include/maidsafe/routing/source_address.h b/include/maidsafe/routing/source_address.h index e04acbed..8c97b70b 100644 --- a/include/maidsafe/routing/source_address.h +++ b/include/maidsafe/routing/source_address.h @@ -86,6 +86,23 @@ inline bool operator>=(const SourceAddress& lhs, const SourceAddress& rhs) { return !operator<(lhs, rhs); } +inline std::ostream& operator<<(std::ostream& os, const SourceAddress& addr) { + os << "(SourceAddress node:" << reinterpret_cast(addr.node_address) << ", "; + if (addr.group_address) { + os << "group:" << reinterpret_cast(*addr.group_address); + } + else { + os << "group:none"; + } + if (addr.reply_to_address) { + os << ", reply_to:" << reinterpret_cast(*addr.reply_to_address); + } + else { + os << ", reply_to:none"; + } + return os << ")"; +} + } // namespace routing } // namespace maidsafe diff --git a/include/maidsafe/routing/types.h b/include/maidsafe/routing/types.h index 4ba0ad4d..9680c467 100644 --- a/include/maidsafe/routing/types.h +++ b/include/maidsafe/routing/types.h @@ -48,7 +48,7 @@ namespace routing { static const size_t GroupSize = 23; static const size_t QuorumSize = 19; -enum class FromType : int32_t { +enum class FromType : std::int32_t { client_manager, nae_manager, node_manager, @@ -58,7 +58,7 @@ enum class FromType : int32_t { node }; -enum class Authority : int32_t { +enum class Authority : std::int32_t { client_manager, nae_manager, node_manager, @@ -68,7 +68,7 @@ enum class Authority : int32_t { }; using Address = Identity; -using MessageId = uint32_t; +using MessageId = std::uint32_t; using Destination = TaggedValue; using ReplyToAddress = TaggedValue; using DestinationAddress = std::pair>; @@ -89,15 +89,23 @@ using HandleGetReturn = maidsafe_error>; using HandlePutPostReturn = boost::expected, maidsafe_error>; using HandlePostReturn = - boost::expected, std::vector>, - maidsafe_error>; + boost::expected, std::vector>, maidsafe_error>; using Endpoint = asio::ip::udp::endpoint; -using Port = uint16_t; +using Port = std::uint16_t; using SerialisedMessage = std::vector; using CloseGroupDifference = std::pair, std::vector
>; using PublicKeyId = std::pair; +template +using AsyncResultHandler = typename asio::handler_type::type, + void(asio::error_code, Args...)>::type; + +template +using AsyncResultReturn = + typename asio::async_result>::type; + +// FIXME: All the below handlers and results can be implemented using the above two. template using BootstrapHandlerHandler = typename asio::handler_type::type; diff --git a/src/maidsafe/routing/apply_tuple.h b/src/maidsafe/routing/apply_tuple.h new file mode 100644 index 00000000..bb79906a --- /dev/null +++ b/src/maidsafe/routing/apply_tuple.h @@ -0,0 +1,59 @@ +/* Copyright 2015 MaidSafe.net limited + + This MaidSafe Software is licensed to you under (1) the MaidSafe.net Commercial License, + version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which + licence you accepted on initial access to the Software (the "Licences"). + + By contributing code to the MaidSafe Software, or to this project generally, you agree to be + bound by the terms of the MaidSafe Contributor Agreement, version 1.0, found in the root + directory of this project at LICENSE, COPYING and CONTRIBUTOR respectively and also + available at: http://www.maidsafe.net/licenses + + Unless required by applicable law or agreed to in writing, the MaidSafe Software distributed + under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. + + See the Licences for the specific language governing permissions and limitations relating to + use of the MaidSafe Software. */ + +#ifndef MAIDSAFE_ROUTING_APPLY_TUPLE_H_ +#define MAIDSAFE_ROUTING_APPLY_TUPLE_H_ + +#include + +namespace maidsafe { + +namespace routing { + +namespace helper { + +template +struct Index {}; + +template +struct GeneratedSequence : GeneratedSequence {}; + +template +struct GeneratedSequence<0, Is...> : Index {}; + +template +inline MAIDSAFE_CONSTEXPR auto ApplyTuple(F&& f, const std::tuple& tup, + helper::Index) + -> decltype(f(std::get(tup)...)) { + return f(std::get(tup)...); +} + +} // namespace helper + +template +inline MAIDSAFE_CONSTEXPR auto ApplyTuple(F&& f, const std::tuple& tup) + -> decltype(helper::ApplyTuple(std::forward(f), tup, + helper::GeneratedSequence{})) { + return helper::ApplyTuple(std::forward(f), tup, helper::GeneratedSequence{}); +} + +} // namespace routing + +} // namespace maidsafe + +#endif // MAIDSAFE_ROUTING_APPLY_TUPLE_H_ diff --git a/src/maidsafe/routing/async_exchange.h b/src/maidsafe/routing/async_exchange.h index 1fe159a3..36868558 100644 --- a/src/maidsafe/routing/async_exchange.h +++ b/src/maidsafe/routing/async_exchange.h @@ -46,7 +46,7 @@ void AsyncExchange(crux::socket& socket, SerialisedMessage our_data, Handler han state->tx_buffer = std::move(our_data); socket.async_send(boost::asio::buffer(state->tx_buffer), - [state, handler](boost::system::error_code error, std::size_t) { + [state, handler](boost::system::error_code error, std::size_t) mutable { if (state->first_error) { if (*state->first_error) { return handler(*state->first_error, SerialisedMessage()); @@ -61,7 +61,7 @@ void AsyncExchange(crux::socket& socket, SerialisedMessage our_data, Handler han }); socket.async_receive(boost::asio::buffer(state->rx_buffer), - [state, handler](boost::system::error_code error, std::size_t size) { + [state, handler](boost::system::error_code error, std::size_t size) mutable { if (state->first_error) { if (*state->first_error) { return handler(*state->first_error, SerialisedMessage()); diff --git a/src/maidsafe/routing/async_queue.h b/src/maidsafe/routing/async_queue.h index dc50bbe6..5b7b8b5b 100644 --- a/src/maidsafe/routing/async_queue.h +++ b/src/maidsafe/routing/async_queue.h @@ -26,41 +26,12 @@ #include "asio/async_result.hpp" #include "maidsafe/common/config.h" +#include "maidsafe/routing/apply_tuple.h" namespace maidsafe { namespace routing { -namespace detail { - -namespace helper { - -template -struct Index {}; - -template -struct GeneratedSequence : GeneratedSequence {}; - -template -struct GeneratedSequence<0, Is...> : Index {}; - -} // namespace helper - -template -inline MAIDSAFE_CONSTEXPR auto ApplyTuple(F&& f, const std::tuple& tup, - helper::Index) - -> decltype(f(std::get(tup)...)) { - return f(std::get(tup)...); -} - -template -inline MAIDSAFE_CONSTEXPR auto ApplyTuple(F&& f, const std::tuple& tup) - -> decltype(ApplyTuple(f, tup, helper::GeneratedSequence{})) { - return ApplyTuple(std::forward(f), tup, helper::GeneratedSequence{}); -} - -} // namespace detail - template class AsyncQueue { private: @@ -111,7 +82,7 @@ class AsyncQueue { values.pop(); } - detail::ApplyTuple(std::move(handler), tuple); + detail::ApplyTuple(handler, std::move(tuple)); return result.get(); } diff --git a/src/maidsafe/routing/connection_manager.cc b/src/maidsafe/routing/connection_manager.cc index d94e0831..9234beb6 100644 --- a/src/maidsafe/routing/connection_manager.cc +++ b/src/maidsafe/routing/connection_manager.cc @@ -31,8 +31,8 @@ #include "maidsafe/routing/peer_node.h" #include "maidsafe/routing/routing_table.h" +#include "maidsafe/routing/timer.h" #include "maidsafe/routing/types.h" -#include "maidsafe/routing/async_exchange.h" namespace maidsafe { @@ -44,32 +44,36 @@ using std::move; using boost::none_t; using boost::optional; -ConnectionManager::ConnectionManager(boost::asio::io_service& ios, PublicPmid our_fob) +ConnectionManager::ConnectionManager(asio::io_service& ios, Address our_id, OnReceive on_receive, + OnConnectionLost on_connection_lost) : io_service_(ios), - our_fob_(std::move(our_fob)), - our_id_(our_fob_.Name()), - peers_(Comparison(our_id_)), + mutex_(), + our_accept_port_(5483), + routing_table_(our_id), + connected_non_routing_nodes_(), + on_receive_(std::move(on_receive)), + on_connection_lost_(std::move(on_connection_lost)), current_close_group_(), - destroy_indicator_(new boost::none_t()) {} + connections_(new Connections(io_service_, our_id)) { + StartReceiving(); + StartAccepting(); +} + +bool ConnectionManager::SuggestNodeToAdd(const Address& node_to_add) const { + return routing_table_.CheckNode(node_to_add); +} -bool ConnectionManager::IsManaged(const Address& node_id) const { - return peers_.find(node_id) != peers_.end(); - // return routing_table_.CheckNode(node_to_add); +std::vector ConnectionManager::GetTarget(const Address& target_node) const { + auto nodes(routing_table_.TargetNodes(target_node)); + nodes.erase(std::remove_if(std::begin(nodes), std::end(nodes), [](NodeInfo& node) { + return !node.connected; + }), std::end(nodes)); + return nodes; } -std::set ConnectionManager::GetTarget( - const Address& target_node) const { - // TODO(PeterJ): The previous code was quite more complicated, so recheck correctness of this one. - return std::set(Comparison(target_node)); - // for (const auto& peer : peers_) { - // result.insert(peer.first); - // } - // return result; - // auto nodes(routing_table_.TargetNodes(target_node)); - //// nodes.erase(std::remove_if(std::begin(nodes), std::end(nodes), - //// [](NodeInfo& node) { return !node.connected(); }), - //// std::end(nodes)); - // return nodes; +std::set
ConnectionManager::GetNonRoutingNodes() const { + std::lock_guard lock(mutex_); + return connected_non_routing_nodes_; } // boost::optional ConnectionManager::LostNetworkConnection( @@ -78,169 +82,187 @@ std::set ConnectionManager::GetTarget( // return GroupChanged(); // } -optional ConnectionManager::DropNode(const Address& their_id) { - // routing_table_.DropNode(their_id); - peers_.erase(their_id); - return GroupChanged(); -} +// optional ConnectionManager::DropNode(const Address& their_id) { +// routing_table_.DropNode(their_id); +// // FIXME(Prakash) remove connection ? +// return GroupChanged(); +// } +void ConnectionManager::DropNode(const Address& their_id) { connections_->Drop(their_id); } -// acceptor_(io_service_, crux::endpoint(boost::asio::ip::udp::v4(), 5483)), -void ConnectionManager::StartAccepting(unsigned short port) { - auto acceptor_i = acceptors_.find(port); +void ConnectionManager::StartAccepting() { + std::weak_ptr weak_connections = connections_; - if (acceptor_i == acceptors_.end()) { - crux::endpoint endpoint(boost::asio::ip::udp::v4(), port); - auto acceptor = std::unique_ptr(new crux::acceptor(io_service_, endpoint)); - auto pair = acceptors_.insert(std::make_pair(port, std::move(acceptor))); - acceptor_i = pair.first; - } - - auto socket = make_shared(io_service_); + connections_->Accept(our_accept_port_, &our_accept_port_, [=](asio::error_code error, + Connections::AcceptResult result) { + if (!weak_connections.lock()) { + return; + } - auto& acceptor = acceptor_i->second; + auto expected_i = expected_accepts_.find(result.his_address); - weak_ptr destroy_guard = destroy_indicator_; + if (expected_i != expected_accepts_.end()) { + LOG(kInfo) << OurId() << " StartAccepting handler 1 " << error.message() << "\n"; + auto expected = std::move(expected_i->second); + expected_accepts_.erase(expected_i); - acceptor->async_accept(*socket, [=](boost::system::error_code error) { - if (!destroy_guard.lock()) - return; + HandleAddNode(error, std::move(expected.node_info), std::move(expected.handler)); - if (error) { - if (error == boost::asio::error::operation_aborted) { + // The handler may have destroyed 'this'. + if (!weak_connections.lock()) { return; } + } else { + LOG(kInfo) << OurId() << " StartAccepting handler 2 " << error.message() << " his_id:" << result.his_address << "\n"; + for (const auto& a : expected_accepts_) { + LOG(kInfo) << "--- " << a.first << "\n"; + } + if (!error) { + connected_non_routing_nodes_.insert(result.his_address); + } } - StartAccepting(port); - - AsyncExchange(*socket, Serialise(our_fob_), - [=](boost::system::error_code error, SerialisedMessage data) { - if (!destroy_guard.lock()) - return; - - if (error) - return; - - PublicPmid their_public_pmid(Parse(std::move(data))); - Address their_id(their_public_pmid.Name()); - InsertPeer(PeerNode(NodeInfo(std::move(their_id), std::move(their_public_pmid), true), - std::move(socket))); - }); + if (error != asio::error::operation_aborted) { + StartAccepting(); + } }); -} -void ConnectionManager::AddNode(optional assumed_node_info, EndpointPair eps) { - static const crux::endpoint unspecified_ep(boost::asio::ip::udp::v4(), 0); - - // TODO(PeterJ): Try the internal endpoint as well - auto endpoint = convert::ToBoost(eps.external); + LOG(kInfo) << "StartAccepting() port " << our_accept_port_; +} - auto pair_i = being_connected_.find(endpoint); +void ConnectionManager::HandleAddNode(asio::error_code error, NodeInfo node_info, + OnAddNode user_handler) { + LOG(kInfo) << OurId() << " HandleAddNode " << error.message(); - if (pair_i == being_connected_.end()) { - bool inserted = false; - auto socket = make_shared(io_service_, unspecified_ep); - std::tie(pair_i, inserted) = being_connected_.insert(std::make_pair(endpoint, socket)); + if (error == asio::error::already_connected) { + connected_non_routing_nodes_.erase(node_info.id); + error = asio::error_code(); } - auto socket = pair_i->second; - weak_ptr weak_socket = socket; - - socket->async_connect(convert::ToBoost(eps.external), [=](boost::system::error_code error) { - auto socket = weak_socket.lock(); + if (error && error != asio::error::timed_out) { + return; + } - if (!socket) - return; + node_info.connected = error != asio::error::timed_out; - if (error) { - being_connected_.erase(endpoint); - return; - } + user_handler(error, AddToRoutingTable(std::move(node_info))); +} - AsyncExchange(*socket, Serialise(our_fob_), - [=](boost::system::error_code error, SerialisedMessage data) { - auto socket = weak_socket.lock(); +void ConnectionManager::AddNodeAccept(NodeInfo node_info, EndpointPair, OnAddNode on_node_added) { + auto id = node_info.id; - if (!socket) - return; + LOG(kInfo) << OurId() << " AddNodeAccept " << node_info.id << "\n"; + auto timer = std::make_shared(io_service_); - being_connected_.erase(endpoint); + auto canceling_handler = + [on_node_added, timer](asio::error_code error, boost::optional diff) { + timer->cancel(); + on_node_added(error, std::move(diff)); + }; - if (error) - return; - - PublicPmid their_public_pmid(Parse(std::move(data))); - Address their_id(their_public_pmid.Name()); - NodeInfo their_node_info(std::move(their_id), std::move(their_public_pmid), true); + auto insert_result = + expected_accepts_.insert(std::make_pair(id, ExpectedAccept{node_info, canceling_handler})); - if (assumed_node_info && *assumed_node_info != their_node_info) - return; + if (!insert_result.second) { + return io_service_.post( + [on_node_added]() { on_node_added(asio::error::already_started, boost::none); }); + } - InsertPeer(PeerNode(std::move(their_node_info), std::move(socket))); - }); + // TODO(Team): Is the timeout value correct? Should it be in defined + // somewhere else? + timer->async_wait(std::chrono::seconds(2), [=]() { + expected_accepts_.erase(id); + HandleAddNode(asio::error::timed_out, node_info, on_node_added); }); } -void ConnectionManager::InsertPeer(PeerNode&& node_arg) { - const auto& id = node_arg.id(); - const auto pair = peers_.insert(std::make_pair(id, std::move(node_arg))); +void ConnectionManager::AddNode(NodeInfo node_to_add, EndpointPair their_endpoint_pair, + OnAddNode on_node_added) { + std::weak_ptr weak_connections = connections_; + + // TODO(PeterJ): Use both endpoints + asio::ip::udp::endpoint endpoint = their_endpoint_pair.external.address().is_unspecified() + ? their_endpoint_pair.local + : their_endpoint_pair.external; + + LOG(kInfo) << "ConnectionManager::Connect " << node_to_add.id << " " << their_endpoint_pair << "\n"; + connections_->Connect(endpoint, + [=](asio::error_code error, Connections::ConnectResult result) { + if (!weak_connections.lock()) { + return on_node_added(asio::error::operation_aborted, boost::none); + } + + LOG(kInfo) << OurId() << " his_address:" << result.his_address << " node_to_add:" << node_to_add.id; + if ((!error || error == asio::error::already_connected) + && (result.his_address != node_to_add.id)) { + error = asio::error::fault; + } + + HandleAddNode(error, node_to_add, on_node_added); + }); +} + +boost::optional ConnectionManager::AddToRoutingTable(NodeInfo node_to_add) { + auto added = routing_table_.AddNode(node_to_add); - if (!pair.second /* = inserted */) { - return; + if (!added.first) { + connections_->Drop(node_to_add.id); + } else if (added.second) { + connections_->Drop(node_to_add.id); } - auto& node = pair.first->second; + // FIXME: It is incorrect to assume the GroupChanged will reflect changes made + // by the previous Drop command because that command will execute its business + // in a separate thread (Same in the AddNodeAccept function and others). + return GroupChanged(); +} - StartReceiving(node); +bool ConnectionManager::CloseGroupMember(const Address& their_id) { + auto close_group(routing_table_.OurCloseGroup()); + return std::any_of(std::begin(close_group), std::end(close_group), + [&their_id](const NodeInfo& node) { return node.id == their_id; }); +} - if (on_connection_added_) { - on_connection_added_(node.id()); +boost::optional ConnectionManager::GroupChanged() { + auto new_nodeinfo_group(routing_table_.OurCloseGroup()); + std::vector
new_group; + for (const auto& nodes : new_nodeinfo_group) + new_group.push_back(nodes.id); + std::lock_guard lock(mutex_); + if (new_group != current_close_group_) { + auto changed = std::make_pair(new_group, current_close_group_); + current_close_group_ = new_group; + return changed; } + return boost::none; } -void ConnectionManager::StartReceiving(PeerNode& node) { - auto node_guard = node.DestroyGuard(); +void ConnectionManager::StartReceiving() { + std::weak_ptr weak_connections = connections_; - node.Receive([=, &node](asio::error_code error, const SerialisedMessage& bytes) { - if (!node_guard.lock()) - return; - if (error) - return; - if (!on_receive_) + connections_->Receive([=](asio::error_code error, Connections::ReceiveResult result) { + if (!weak_connections.lock()) return; - // Complex handler invocation to be safe in cases where the - // handler destroys this object or in case where the handler - // invocation resets the handler to something else. - auto h = move(on_receive_); - h(node.id(), bytes); - if (!node_guard.lock()) - return; - if (!on_receive_) { - on_receive_ = move(h); + if (error) { + return HandleConnectionLost(result.his_address); + } + auto h = std::move(on_receive_); + if (h) { + h(std::move(result.his_address), std::move(result.message)); + if (!weak_connections.lock()) + return; + on_receive_ = std::move(h); } - StartReceiving(node); + StartReceiving(); }); } -// bool ConnectionManager::CloseGroupMember(const Address& their_id) { -// auto close_group(routing_table_.OurCloseGroup()); -// return std::any_of(std::begin(close_group), std::end(close_group), -// [&their_id](const NodeInfo& node) { return node.id == their_id; }); -// } - -optional ConnectionManager::GroupChanged() { - auto new_group(OurCloseGroup()); - std::vector
new_group_ids; - for (const auto& group_member_public_pmid : new_group) - new_group_ids.push_back(group_member_public_pmid.Name()); - - if (new_group_ids != current_close_group_) { - auto changed = std::make_pair(new_group_ids, current_close_group_); - current_close_group_ = new_group_ids; - return changed; +void ConnectionManager::HandleConnectionLost(Address lost_connection) { + routing_table_.DropNode(lost_connection); + connected_non_routing_nodes_.erase(lost_connection); + auto h = std::move(on_connection_lost_); + if (h) { + h(GroupChanged(), lost_connection); } - - return boost::none; } } // namespace routing diff --git a/src/maidsafe/routing/connection_manager.h b/src/maidsafe/routing/connection_manager.h index e20c046d..6731cea5 100644 --- a/src/maidsafe/routing/connection_manager.h +++ b/src/maidsafe/routing/connection_manager.h @@ -52,125 +52,148 @@ destiations. In that case request a close_group message for this node. #include "maidsafe/routing/routing_table.h" #include "maidsafe/routing/types.h" #include "maidsafe/routing/peer_node.h" +#include "maidsafe/routing/connections.h" namespace maidsafe { namespace routing { +class Timer; + class ConnectionManager { using PublicPmid = passport::PublicPmid; - class Comparison { - public: - explicit Comparison(Address our_id) : our_id_(std::move(our_id)) {} - - bool operator()(const Address& lhs, const Address& rhs) const { - return CloserToTarget(lhs, rhs, our_id_); - } + public: + using OnReceive = std::function; + using OnAddNode = std::function)>; + using OnConnectionLost = std::function, Address)>; - private: - const Address our_id_; + private: + struct ExpectedAccept { + NodeInfo node_info; + OnAddNode handler; }; public: - ConnectionManager(boost::asio::io_service& ios, PublicPmid our_fob); + ConnectionManager(asio::io_service& ios, Address our_id, OnReceive on_receive, + OnConnectionLost on_connection_lost); ConnectionManager(const ConnectionManager&) = delete; ConnectionManager(ConnectionManager&&) = delete; - ~ConnectionManager() = default; + ConnectionManager& operator=(const ConnectionManager&) = delete; ConnectionManager& operator=(ConnectionManager&&) = delete; - bool IsManaged(const Address& node_to_add) const; - std::set GetTarget(const Address& target_node) const; - // boost::optional LostNetworkConnection(const Address& node); + bool SuggestNodeToAdd(const Address& node_to_add) const; + std::vector GetTarget(const Address& target_node) const; + std::set
GetNonRoutingNodes() const; + + boost::optional LostNetworkConnection(const Address& node); + // routing wishes to drop a specific node (may be a node we cannot connect to) - boost::optional DropNode(const Address& their_id); - void AddNode(boost::optional node_to_add, EndpointPair); - - std::vector OurCloseGroup() const { - std::vector result; - result.reserve(GroupSize); - size_t i = 0; - for (const auto& pair : peers_) { - if (++i > GroupSize) - break; - result.push_back(pair.second.node_info().dht_fob); - } - return result; - } + // boost::optional DropNode(const Address& their_id); + void DropNode(const Address& their_id); - // size_t CloseGroupBucketDistance() const { - // return routing_table_.BucketIndex(routing_table_.OurCloseGroup().back().id); - // } + void AddNode(NodeInfo node_to_add, EndpointPair their_endpoint_pair, OnAddNode); + void AddNodeAccept(NodeInfo node_to_add, EndpointPair their_endpoint_pair, OnAddNode); + + std::vector OurCloseGroup() const { return routing_table_.OurCloseGroup(); } + + size_t CloseGroupBucketDistance() const { + return routing_table_.BucketIndex(routing_table_.OurCloseGroup().back().id); + } bool AddressInCloseGroupRange(const Address& address) const { - if (peers_.size() < GroupSize) + if (routing_table_.Size() < GroupSize) { return true; - return (static_cast(std::distance(peers_.begin(), peers_.upper_bound(address))) < - GroupSize); + } + return CloserToTarget(address, routing_table_.OurCloseGroup().back().id, + routing_table_.OurId()); } - const Address& OurId() const { return our_id_; } + const Address& OurId() const { return routing_table_.OurId(); } boost::optional GetPublicKey(const Address& node) const { - auto found_i = peers_.find(node); - if (found_i == peers_.end()) { return boost::none; } - return found_i->second.node_info().dht_fob.public_key(); + return routing_table_.GetPublicKey(node); } - // bool CloseGroupMember(const Address& their_id); + bool CloseGroupMember(const Address& their_id); - uint32_t Size() { return static_cast(peers_.size()); } + std::size_t Size() { return routing_table_.Size(); } - PeerNode* FindPeer(Address addr) { - auto i = peers_.find(addr); - if (i == peers_.end()) - return nullptr; - return &i->second; - } + template + void Send(const Address&, const SerialisedMessage&, Handler); - void StartAccepting(unsigned short port); + template + void SendToNonRoutingNode(const Address&, const SerialisedMessage&, Handler); - template - void SetOnConnectionAdded(Handler handler) { - on_connection_added_ = std::move(handler); - } + Port AcceptingPort() const { return our_accept_port_; } - template - void SetOnReceive(Handler handler) { - on_receive_ = std::move(handler); - } + template + void Connect(asio::ip::udp::endpoint, Handler); - void Shutdown() { - acceptors_.clear(); - being_connected_.clear(); - peers_.clear(); - } + void Shutdown() { connections_.reset(); } private: + boost::optional AddToRoutingTable(NodeInfo node_to_add); + void StartReceiving(); + void StartAccepting(); + + void HandleAddNode(asio::error_code, NodeInfo, OnAddNode); + void HandleConnectionLost(Address); + boost::optional GroupChanged(); - void InsertPeer(PeerNode&&); - std::weak_ptr DestroyGuard() { return destroy_indicator_; } - void StartReceiving(PeerNode&); private: - boost::asio::io_service& io_service_; + asio::io_service& io_service_; + mutable std::mutex mutex_; + Port our_accept_port_; + RoutingTable routing_table_; + std::set
connected_non_routing_nodes_; // clients & bootstrapping nodes + OnReceive on_receive_; + OnConnectionLost on_connection_lost_; + std::vector
current_close_group_; + std::map expected_accepts_; + std::shared_ptr connections_; +}; - std::function on_connection_added_; - std::function on_receive_; +template +void ConnectionManager::Send(const Address& addr, const SerialisedMessage& message, + Handler handler) { + std::weak_ptr guard = connections_; + //LOG(kVerbose) << OurId() << " Send to node " << addr << ", msg : " << hex::Substr(message); + connections_->Send(addr, message, [=](asio::error_code error) mutable { + handler(error); - PublicPmid our_fob_; - Address our_id_; + if (!guard.lock()) + return; - std::map> acceptors_; // NOLINT - std::map> being_connected_; - std::map peers_; + if (error) { + HandleConnectionLost(std::move(addr)); + } + }); +} + +template +void ConnectionManager::SendToNonRoutingNode(const Address& addr, + const SerialisedMessage& message, + Handler handler) { + auto found = connected_non_routing_nodes_.find(addr); + if (found != connected_non_routing_nodes_.end()) { + Send(addr, message, handler); + } else { + handler(make_error_code(RoutingErrors::not_connected)); + } +} - std::vector
current_close_group_; - std::shared_ptr destroy_indicator_; -}; +template +void ConnectionManager::Connect(asio::ip::udp::endpoint remote_endpoint, Handler handler) { + connections_->Connect(remote_endpoint, + [=](asio::error_code error, Connections::ConnectResult result) { + handler(error, result.his_address, result.our_endpoint); + }); +} } // namespace routing diff --git a/src/maidsafe/routing/connections.h b/src/maidsafe/routing/connections.h index 60c146c7..b69773dd 100644 --- a/src/maidsafe/routing/connections.h +++ b/src/maidsafe/routing/connections.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include "asio/io_service.hpp" @@ -32,9 +33,10 @@ #include "maidsafe/crux/acceptor.hpp" #include "maidsafe/crux/socket.hpp" -#include "maidsafe/routing/async_queue.h" +#include "maidsafe/routing/apply_tuple.h" #include "maidsafe/routing/async_exchange.h" #include "maidsafe/routing/types.h" +#include "maidsafe/routing/utils.h" namespace maidsafe { @@ -42,8 +44,24 @@ namespace routing { class Connections { public: - Connections(boost::asio::io_service&, const Address& our_node_id); + struct AcceptResult { + Endpoint his_endpoint; + Address his_address; + Endpoint our_endpoint; // As seen by the other end + }; + + struct ConnectResult { + Address his_address; + Endpoint our_endpoint; // As seen by the other end + }; + + struct ReceiveResult { + Address his_address; + SerialisedMessage message; + }; + public: + Connections(asio::io_service&, const Address& our_node_id); Connections() = delete; Connections(const Connections&) = delete; Connections(Connections&&) = delete; @@ -53,17 +71,19 @@ class Connections { ~Connections(); - template - void Send(const Address&, const SerialisedMessage&, Handler); + template + AsyncResultReturn Send(const Address&, const SerialisedMessage&, Token&&); - template - void Receive(Handler); + template + AsyncResultReturn Receive(Token&&); - template - void Connect(asio::ip::udp::endpoint, Handler); + template + AsyncResultReturn Connect(asio::ip::udp::endpoint, Token&&); - template - void Accept(unsigned short port, const Handler); + // The secont argument is an ugly C-style return of the actual port that has been + // chosen. TODO: Try to return it using a proper C++ way. + template + AsyncResultReturn Accept(Port port, Port* chosen_port, Token&&); void Drop(const Address& their_id); @@ -75,38 +95,58 @@ class Connections { boost::asio::io_service& get_io_service(); + void Wait(); + private: void StartReceiving(const Address&, const crux::endpoint&, const std::shared_ptr&); + void OnReceive(asio::error_code, ReceiveResult); + + template + void post(Handler&& handler, Args&&... args); - boost::asio::io_service& service_; + private: + asio::io_service& service; + std::unique_ptr work_; Address our_id_; std::function on_receive_; std::function on_drop_; - std::map> acceptors_; // NOLINT - std::map> connections_; - std::map id_to_endpoint_map_; + std::map> acceptors_; // NOLINT + std::map> connections_; + std::map> connecting_sockets_; + + struct ReceiveInput { + asio::error_code error; + ReceiveResult result; + }; + + using ReceiveOutput = std::function; - AsyncQueue receive_queue_; + std::queue receive_input_; + std::queue receive_output_; + + BoostAsioService runner_; }; -inline Connections::Connections(boost::asio::io_service& ios, const Address& our_node_id) - : service_(ios), our_id_(our_node_id) {} +inline Connections::Connections(asio::io_service& ios, const Address& our_node_id) + : service(ios), work_(new asio::io_service::work(ios)), our_id_(our_node_id), runner_(1) {} -template -void Connections::Send(const Address& remote_id, const SerialisedMessage& bytes, Handler handler) { - service_.post([=]() { - auto remote_endpoint_i = id_to_endpoint_map_.find(remote_id); +template +AsyncResultReturn Connections::Send(const Address& remote_id, const SerialisedMessage& bytes, + Token&& token) { + using Handler = AsyncResultHandler; + Handler handler(std::forward(token)); + asio::async_result result(handler); - if (remote_endpoint_i == id_to_endpoint_map_.end()) { - return handler(asio::error::bad_descriptor); - } + get_io_service().post([=]() mutable { + auto socket_i = connections_.find(remote_id); - auto remote_endpoint = remote_endpoint_i->second; - auto socket_i = connections_.find(remote_endpoint); - assert(socket_i != connections_.end()); + if (socket_i == connections_.end()) { + LOG(kWarning) << "bad_descriptor !! " << remote_id; + return post(handler, asio::error::bad_descriptor); + } auto& socket = socket_i->second; auto buffer = std::make_shared(std::move(bytes)); @@ -114,138 +154,217 @@ void Connections::Send(const Address& remote_id, const SerialisedMessage& bytes, std::weak_ptr weak_socket = socket; socket->async_send(boost::asio::buffer(*buffer), - [=](boost::system::error_code error, std::size_t) { + [=](boost::system::error_code error, std::size_t) mutable { static_cast(buffer); - if (!weak_socket.lock()) { - return handler(asio::error::operation_aborted); + return post(handler, asio::error::operation_aborted); } if (error) { - id_to_endpoint_map_.erase(remote_id); - connections_.erase(remote_endpoint); + connections_.erase(remote_id); } - handler(convert::ToStd(error)); + post(handler, convert::ToStd(error)); }); }); + return result.get(); } -template -void Connections::Receive(Handler handler) { - service_.post([=]() { receive_queue_.AsyncPop(std::move(handler)); }); +template +AsyncResultReturn Connections::Receive(Token&& token) { + using Handler = AsyncResultHandler; + Handler handler(std::forward(token)); + asio::async_result result(handler); + + get_io_service().post([=]() mutable { + if (!receive_input_.empty()) { + auto input = std::move(receive_input_.front()); + receive_input_.pop(); + post(handler, input.error, std::move(input.result)); + } else { + receive_output_.push(std::move(handler)); + } + }); + + return result.get(); } -inline Connections::~Connections() { Shutdown(); } +inline void Connections::OnReceive(asio::error_code error, ReceiveResult result) { + if (!receive_output_.empty()) { + auto handler = std::move(receive_output_.front()); + receive_output_.pop(); + post(handler, error, std::move(result)); + } else { + receive_input_.push(ReceiveInput{error, std::move(result)}); + } +} -template -void Connections::Connect(asio::ip::udp::endpoint endpoint, Handler handler) { - service_.post([=]() { - crux::endpoint unspecified_ep(boost::asio::ip::udp::v4(), 0); - auto socket = std::make_shared(service_, unspecified_ep); +inline Connections::~Connections() { + Shutdown(); + runner_.Stop(); +} - auto insert_result = connections_.insert(std::make_pair(convert::ToBoost(endpoint), socket)); +template +AsyncResultReturn Connections::Connect(Endpoint endpoint, + Token&& token) { + using Handler = AsyncResultHandler; + Handler handler(std::forward(token)); + asio::async_result result(handler); - if (!insert_result.second) { - return handler(asio::error::already_started, Address()); - } + get_io_service().post([=]() mutable { + crux::endpoint unspecified_ep; + + LOG(kInfo) << OurId() << " Connections::Connect(" << endpoint << ")\n"; + if (endpoint.address().is_v4()) + unspecified_ep = crux::endpoint(boost::asio::ip::udp::v4(), 0); + else + unspecified_ep = crux::endpoint(boost::asio::ip::udp::v6(), 0); + + auto socket = std::make_shared(get_io_service(), unspecified_ep); + auto local_endpoint = socket->local_endpoint(); + + auto insert_result = connecting_sockets_.insert(std::make_pair(local_endpoint, socket)); + + assert(insert_result.second); std::weak_ptr weak_socket = socket; - socket->async_connect(convert::ToBoost(endpoint), [=](boost::system::error_code error) { + socket->async_connect(convert::ToBoost(endpoint), [=](boost::system::error_code error) mutable { auto socket = weak_socket.lock(); if (!socket) { - return handler(asio::error::operation_aborted, Address()); + return post(handler, asio::error::operation_aborted, ConnectResult()); } + if (error) { - return handler(convert::ToStd(error), Address()); + connecting_sockets_.erase(local_endpoint); + return post(handler, convert::ToStd(error), ConnectResult()); } auto remote_endpoint = socket->remote_endpoint(); - connections_[remote_endpoint] = socket; - - AsyncExchange(*socket, Serialise(our_id_), - [=](boost::system::error_code error, SerialisedMessage data) { + AsyncExchange(*socket, Serialise(our_id_, convert::ToAsio(remote_endpoint)), + [=](boost::system::error_code error, SerialisedMessage data) mutable { auto socket = weak_socket.lock(); if (!socket) { - return handler(asio::error::operation_aborted, Address()); + return post(handler, asio::error::operation_aborted, ConnectResult()); } + connecting_sockets_.erase(local_endpoint); + if (error) { - connections_.erase(remote_endpoint); - return handler(convert::ToStd(error), Address()); + return post(handler, convert::ToStd(error), ConnectResult()); } InputVectorStream stream(data); Address his_id; - Parse(stream, his_id); + asio::ip::udp::endpoint our_endpoint; + Parse(stream, his_id, our_endpoint); + + auto result = connections_.insert(std::make_pair(his_id, socket)); + + if (!result.second) { + return post(handler, asio::error::already_connected, ConnectResult{his_id, our_endpoint}); + } - id_to_endpoint_map_[his_id] = remote_endpoint; StartReceiving(his_id, remote_endpoint, socket); - handler(convert::ToStd(error), his_id); + post(handler, convert::ToStd(error), ConnectResult{his_id, our_endpoint}); }); }); }); + + return result.get(); } -template -void Connections::Accept(unsigned short port, const Handler handler) { - service_.post([=]() { - auto find_result = acceptors_.insert(std::make_pair(port, std::shared_ptr())); +template +AsyncResultReturn Connections::Accept(Port port, + Port* chosen_port, + Token&& token) { + using Handler = AsyncResultHandler; + Handler handler(std::forward(token)); + asio::async_result result(handler); - auto& acceptor = find_result.first->second; + auto loopback = [](Port port) { return crux::endpoint(boost::asio::ip::udp::v4(), port); }; - if (!acceptor) { - crux::endpoint endpoint(boost::asio::ip::udp::v4(), port); - acceptor.reset(new crux::acceptor(service_, endpoint)); + std::shared_ptr acceptor; + + try { + acceptor = std::make_shared(get_io_service(), loopback(port)); + } catch (...) { + acceptor = std::make_shared(get_io_service(), loopback(0)); + } + + if (chosen_port) { + *chosen_port = acceptor->local_endpoint().port(); + } + + get_io_service().post([=]() mutable { + auto find_result = acceptors_.insert(std::make_pair(port, acceptor)); + + if (!find_result.second /* inserted? */) { + return post(handler, asio::error::already_started, Connections::AcceptResult()); } std::weak_ptr weak_acceptor = acceptor; - auto socket = std::make_shared(service_); + auto socket = std::make_shared(get_io_service()); - acceptor->async_accept(*socket, [=](boost::system::error_code error) { + acceptor->async_accept(*socket, [=](boost::system::error_code error) mutable { if (!weak_acceptor.lock()) { - return handler(asio::error::operation_aborted, asio::ip::udp::endpoint(), Address()); + return post(handler, asio::error::operation_aborted, AcceptResult()); } if (error) { - return handler(asio::error::operation_aborted, asio::ip::udp::endpoint(), Address()); + return post(handler, asio::error::operation_aborted, AcceptResult()); } acceptors_.erase(port); + auto remote_endpoint = socket->remote_endpoint(); - connections_[remote_endpoint] = socket; + auto local_endpoint = socket->local_endpoint(); + + connecting_sockets_[local_endpoint] = socket; std::weak_ptr weak_socket = socket; - AsyncExchange(*socket, Serialise(our_id_), [=](boost::system::error_code error, - SerialisedMessage data) { + AsyncExchange(*socket, Serialise(our_id_, convert::ToAsio(remote_endpoint)), + [=](boost::system::error_code error, SerialisedMessage data) mutable { auto socket = weak_socket.lock(); if (!socket) { - return handler(asio::error::operation_aborted, convert::ToAsio(remote_endpoint), - Address()); + return post(handler, asio::error::operation_aborted, + AcceptResult{convert::ToAsio(remote_endpoint), Address(), Endpoint()}); } if (error) { - connections_.erase(remote_endpoint); - return handler(convert::ToStd(error), convert::ToAsio(remote_endpoint), Address()); + connecting_sockets_.erase(local_endpoint); + return post(handler, convert::ToStd(error), + AcceptResult{convert::ToAsio(remote_endpoint), Address(), Endpoint()}); } InputVectorStream stream(data); Address his_id; - Parse(stream, his_id); + Endpoint our_endpoint; + Parse(stream, his_id, our_endpoint); - id_to_endpoint_map_[his_id] = remote_endpoint; - StartReceiving(his_id, remote_endpoint, socket); + auto result = connections_.insert(std::make_pair(his_id, socket)); - handler(convert::ToStd(error), convert::ToAsio(remote_endpoint), his_id); + if (result.second) { + // Inserted + StartReceiving(his_id, remote_endpoint, socket); + post(handler, convert::ToStd(error), + AcceptResult{convert::ToAsio(remote_endpoint), his_id, our_endpoint}); + } + else { + remote_endpoint = result.first->second->remote_endpoint(); + post(handler, asio::error::already_connected, + AcceptResult{convert::ToAsio(remote_endpoint), his_id, our_endpoint}); + } }); }); }); + + return result.get(); } inline void Connections::StartReceiving(const Address& id, const crux::endpoint& remote_endpoint, @@ -256,20 +375,20 @@ inline void Connections::StartReceiving(const Address& id, const crux::endpoint& auto buffer = std::make_shared(max_message_size()); socket->async_receive( - boost::asio::buffer(*buffer), [=](boost::system::error_code error, size_t size) { + boost::asio::buffer(*buffer), [=](boost::system::error_code error, size_t size) mutable { auto socket = weak_socket.lock(); if (!socket) { - return receive_queue_.Push(asio::error::operation_aborted, id, std::move(*buffer)); + return OnReceive(asio::error::operation_aborted, ReceiveResult{id, std::move(*buffer)}); } if (error) { - id_to_endpoint_map_.erase(id); - connections_.erase(remote_endpoint); + connections_.erase(id); + size = 0; } buffer->resize(size); - receive_queue_.Push(convert::ToStd(error), id, std::move(*buffer)); + OnReceive(convert::ToStd(error), ReceiveResult{id, std::move(*buffer)}); if (error) return; @@ -278,13 +397,28 @@ inline void Connections::StartReceiving(const Address& id, const crux::endpoint& }); } -inline boost::asio::io_service& Connections::get_io_service() { return service_; } +inline boost::asio::io_service& Connections::get_io_service() { return runner_.service(); } inline void Connections::Shutdown() { - service_.post([=]() { + get_io_service().post([=]() { + work_.reset(); acceptors_.clear(); connections_.clear(); - id_to_endpoint_map_.clear(); + connecting_sockets_.clear(); + }); +} + +inline void Connections::Wait() { runner_.Stop(); } + +template +void Connections::post(Handler&& handler, Args&&... args) { + std::tuple::type...> tuple(std::forward(args)...); + service.post([handler, tuple]() mutable { ApplyTuple(handler, tuple); }); +} + +inline void Connections::Drop(const Address& their_id) { + get_io_service().post([=]() { + connections_.erase(their_id); }); } diff --git a/src/maidsafe/routing/message_header.h b/src/maidsafe/routing/message_header.h index 5b03f663..1c417bbb 100644 --- a/src/maidsafe/routing/message_header.h +++ b/src/maidsafe/routing/message_header.h @@ -117,9 +117,30 @@ class MessageHeader { NodeAddress FromNode() const { return source_.node_address; } boost::optional FromGroup() const { return source_.group_address; } Authority FromAuthority() { return authority_; } - bool RelayedMessage() const { return static_cast(source_.reply_to_address); } + + bool RelayedMessage() const { + return static_cast(source_.reply_to_address) || + static_cast(destination_.second); + } + + boost::optional
RelayedTo() const { + if (source_.reply_to_address) { + return static_cast(*source_.reply_to_address); + } + else if (destination_.second) { + return static_cast(*destination_.second); + } + return boost::none; + } + boost::optional ReplyToAddress() const { - return source_.reply_to_address; + if (source_.reply_to_address) { + return source_.reply_to_address; + } + if (destination_.second) { + return destination_.second; + } + return boost::none; } Address FromAddress() const { @@ -171,6 +192,25 @@ class MessageHeader { boost::optional signature_; }; +inline +std::ostream& operator<<(std::ostream& os, const MessageHeader& hdr) { + os << "(Header src:" << hdr.Source() + << ", dst:("; + + auto dst = hdr.Destination(); + + os << reinterpret_cast(dst.first) << ", reply_to:"; + + if (dst.second) { + os << reinterpret_cast(*dst.second) << ")"; + } + else { + os << "none)"; + } + + return os << ", id:" << hdr.MessageId() << ", ...)"; +} + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/messages/connect.h b/src/maidsafe/routing/messages/connect.h index bc9079db..fc45a196 100644 --- a/src/maidsafe/routing/messages/connect.h +++ b/src/maidsafe/routing/messages/connect.h @@ -75,6 +75,11 @@ class Connect { passport::PublicPmid requester_fob_; }; +inline std::ostream& operator<<(std::ostream& os, const Connect& msg) { + return os << "(Connect " << msg.requester_endpoints() << ", rq:" + << msg.requester_id() << ", rc:" << msg.receiver_id() << ", (FOB...))"; +} + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/messages/connect_response.h b/src/maidsafe/routing/messages/connect_response.h index e66c574a..fc00112f 100644 --- a/src/maidsafe/routing/messages/connect_response.h +++ b/src/maidsafe/routing/messages/connect_response.h @@ -80,6 +80,13 @@ class ConnectResponse { passport::PublicPmid receiver_fob_; }; +inline std::ostream& operator<<(std::ostream& os, const ConnectResponse& msg) { + return os << "(ConnectResponse rq:" << msg.requester_endpoints() << ", " + << "rc:" << msg.receiver_endpoints() + << ", rq:" << msg.requester_id() + << ", rc:" << msg.receiver_id() << ")"; +} + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/messages/find_group.h b/src/maidsafe/routing/messages/find_group.h index 89ea2979..63526c86 100644 --- a/src/maidsafe/routing/messages/find_group.h +++ b/src/maidsafe/routing/messages/find_group.h @@ -60,6 +60,11 @@ class FindGroup { Address target_id_; }; +inline std::ostream& operator<<(std::ostream& os, const FindGroup& msg) { + return os << "(FindGroup requester:" << static_cast
(msg.requester_id()) + << ", target:" << msg.target_id() << ")"; +} + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/messages/find_group_response.h b/src/maidsafe/routing/messages/find_group_response.h index be35897f..0bb5f66a 100644 --- a/src/maidsafe/routing/messages/find_group_response.h +++ b/src/maidsafe/routing/messages/find_group_response.h @@ -52,23 +52,8 @@ class FindGroupResponse { FindGroupResponse& operator=(const FindGroupResponse&) = delete; template - Archive& load(Archive& archive) { - group_.clear(); - std::size_t group_size(0); - archive(target_id_, group_size); - for (std::size_t i = 0; i < group_size; ++i) { - group_.emplace_back(); - archive(group_.back()); - } - return archive; - } - - template - Archive& save(Archive& archive) const { - archive(target_id_, group_.size()); - for (const auto& public_pmid : group_) - archive(public_pmid); - return archive; + void serialize(Archive& archive) { + archive(target_id_, group_); } Address target_id() const { return target_id_; } @@ -79,6 +64,22 @@ class FindGroupResponse { std::vector group_; }; +inline std::ostream& operator<<(std::ostream& os, const FindGroupResponse& msg) { + os << "(FindGroupResponse target:" << msg.target_id() + << ", group:{"; + + auto group = msg.group(); + + for (auto i = group.begin(); i != group.end();) { + os << i->Name(); + if (++i != group.end()) { + os << ","; + } + } + + return os << "})"; +} + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/messages/messages.h b/src/maidsafe/routing/messages/messages.h index 2839457b..0c403591 100644 --- a/src/maidsafe/routing/messages/messages.h +++ b/src/maidsafe/routing/messages/messages.h @@ -33,4 +33,5 @@ #include "maidsafe/routing/messages/put_data.h" #include "maidsafe/routing/messages/put_data_response.h" + #endif // MAIDSAFE_ROUTING_MESSAGES_MESSAGES_H_ diff --git a/src/maidsafe/routing/messages/messages_fwd.h b/src/maidsafe/routing/messages/messages_fwd.h index 1458d202..97525a12 100644 --- a/src/maidsafe/routing/messages/messages_fwd.h +++ b/src/maidsafe/routing/messages/messages_fwd.h @@ -132,6 +132,7 @@ struct MessageToTag { static MessageTypeTag value() { return MessageTypeTag::PostResponse; } }; + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/routing_node.cc b/src/maidsafe/routing/routing_node.cc index c54e64d9..db8ea3f6 100644 --- a/src/maidsafe/routing/routing_node.cc +++ b/src/maidsafe/routing/routing_node.cc @@ -18,325 +18,10 @@ #include "maidsafe/routing/routing_node.h" -#include -#include "asio/use_future.hpp" -#include "asio/ip/udp.hpp" -#include "boost/exception/diagnostic_information.hpp" -#include "maidsafe/common/serialisation/binary_archive.h" -#include "maidsafe/common/serialisation/serialisation.h" - -#include "maidsafe/routing/messages/messages.h" -#include "maidsafe/routing/connection_manager.h" -#include "maidsafe/routing/message_header.h" -#include "maidsafe/routing/sentinel.h" -#include "maidsafe/routing/utils.h" - namespace maidsafe { namespace routing { -// -// RoutingNode::RoutingNode(asio::io_service& io_service, boost::filesystem::path db_location, -// const passport::Pmid& pmid, std::shared_ptr listener_ptr) -// : io_service_(io_service), -// our_fob_(pmid), -// bootstrap_node_(boost::none), -// rudp_(), -// bootstrap_handler_(std::move(db_location)), -// connection_manager_(io_service, rudp_, Address(pmid.name()->string())), -// listener_ptr_(listener_ptr), -// filter_(std::chrono::minutes(20)), -// sentinel_(io_service_), -// cache_(std::chrono::minutes(60)) { -// // store this to allow other nodes to get our ID on startup. IF they have full routing tables -// they -// // need Quorum number of these signed anyway. -// cache_.Add(Address(pmid.name()->string()), Serialise(passport::PublicPmid(our_fob_))); -// // try an connect to any local nodes (5483) Expect to be told Node_Id -// auto temp_id(MakeIdentity()); -// rudp_.Add(rudp::Contact(temp_id, rudp::EndpointPair{rudp::Endpoint{GetLocalIp(), 5483}, -// rudp::Endpoint{GetLocalIp(), 5433}}, -// our_fob_.public_key()), -// [this, temp_id](asio::error_code error) { -// if (!error) { -// bootstrap_node_ = temp_id; -// ConnectToCloseGroup(); -// return; -// } -// }); -// for (auto& node : bootstrap_handler_.ReadBootstrapContacts()) { -// rudp_.Add(node, [node, this](asio::error_code error) { -// if (!error) { -// bootstrap_node_ = node.id; -// ConnectToCloseGroup(); -// return; -// } -// }); -// if (bootstrap_node_) -// break; -// } -// } -// -// RoutingNode::~RoutingNode() {} -// -// void RoutingNode::ConnectToCloseGroup() { -// FindGroup message(NodeAddress(OurId()), OurId()); -// MessageHeader header(DestinationAddress(std::make_pair(Destination(OurId()), boost::none)), -// SourceAddress{OurSourceAddress()}, ++message_id_); -// if (bootstrap_node_) { -// rudp_.Send(*bootstrap_node_, Serialise(header, MessageToTag::value(), message), -// [](asio::error_code error) { -// if (error) { -// LOG(kWarning) << "rudp cannot send via bootstrap node" << error.message(); -// } -// }); -// -// return; -// } -// for (const auto& target : connection_manager_.GetTarget(OurId())) -// rudp_.Send(target.id, Serialise(header, MessageToTag::value(), message), -// [](asio::error_code error) { -// if (error) { -// LOG(kWarning) << "rudp cannot send" << error.message(); -// } -// }); -// } -// void RoutingNode::MessageReceived(NodeId /* peer_id */, rudp::ReceivedMessage serialised_message) -// { -// InputVectorStream binary_input_stream{serialised_message}; -// MessageHeader header; -// MessageTypeTag tag; -// try { -// Parse(binary_input_stream, header, tag); -// } catch (const std::exception&) { -// LOG(kError) << "header failure." << boost::current_exception_diagnostic_information(true); -// return; -// } -// -// if (filter_.Check(header.FilterValue())) -// return; // already seen -// // add to filter as soon as posible -// filter_.Add({header.FilterValue()}); -// -// // We add these to cache -// if (tag == MessageTypeTag::GetDataResponse) { -// auto data = Parse(binary_input_stream); -// cache_.Add(data.name(), data.data()); -// } -// // if we can satisfy request from cache we do -// if (tag == MessageTypeTag::GetData) { -// auto data = Parse(binary_input_stream); -// auto test = cache_.Get(data.name()); -// if (test) { -// GetDataResponse response(data.name(), test.value()); -// auto message(Serialise( -// MessageHeader(header.Destination(), OurSourceAddress(), header.MessageId()), -// MessageTypeTag::GetDataResponse, response)); -// for (const auto& target : connection_manager_.GetTarget(header.FromNode())) -// rudp_.Send(target.id, message, [](asio::error_code error) { -// if (error) { -// LOG(kWarning) << "rudp cannot send" << error.message(); -// } -// }); -// return; -// } -// } -// -// // send to next node(s) even our close group (swarm mode) -// for (const auto& target : connection_manager_.GetTarget(header.Destination().first)) -// rudp_.Send(target.id, serialised_message, [](asio::error_code error) { -// if (error) { -// LOG(kWarning) << "rudp cannot send" << error.message(); -// } -// }); -// // FIXME(dirvine) We need new rudp for this :26/01/2015 -// if (header.RelayedMessage() && -// std::any_of(std::begin(connected_nodes_), std::end(connected_nodes_), -// [&header](const Address& node) { return node == *header.RelayedMessage(); })) { -// // send message to connected node -// return; -// } -// -// if (!connection_manager_.AddressInCloseGroupRange(header.Destination().first)) -// return; // not for us -// -// // FIXME(dirvine) Sentinel check here!! :19/01/2015 -// switch (tag) { -// case MessageTypeTag::Connect: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::ConnectResponse: -// HandleMessage(Parse(binary_input_stream)); -// break; -// case MessageTypeTag::FindGroup: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::FindGroupResponse: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::GetData: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::GetDataResponse: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::PutData: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// case MessageTypeTag::Post: -// HandleMessage(Parse(binary_input_stream), std::move(header)); -// break; -// default: -// LOG(kWarning) << "Received message of unknown type."; -// break; -// } -// } -// -// -// void RoutingNode::ConnectionLost(NodeId peer) { connection_manager_.LostNetworkConnection(peer); -// } -// -// // reply with our details; -// void RoutingNode::HandleMessage(Connect connect, MessageHeader original_header) { -// if (!connection_manager_.SuggestNodeToAdd(connect.requester_id())) -// return; -// auto targets(connection_manager_.GetTarget(connect.requester_id())); -// ConnectResponse respond(connect.requester_endpoints(), NextEndpointPair(), -// connect.requester_id(), OurId(), passport::PublicPmid(our_fob_)); -// assert(connect.receiver_id() == OurId()); -// -// MessageHeader header(DestinationAddress(original_header.ReturnDestinationAddress()), -// SourceAddress(OurSourceAddress()), original_header.MessageId(), -// asymm::Sign(Serialise(respond), our_fob_.private_key())); -// // FIXME(dirvine) Do we need to pass a shared_from_this type object or this may segfault on -// // shutdown -// // :24/01/2015 -// for (auto& target : targets) { -// rudp_.Send(target.id, Serialise(header, MessageToTag::value(), respond), -// [connect, this](asio::error_code error_code) { -// if (error_code) -// return; -// }); -// } -// auto added = -// connection_manager_.AddNode(NodeInfo(connect.requester_id(), -// connect.requester_fob()), -// connect.requester_endpoints()); -// -// rudp_.Add(rudp::Contact(connect.requester_id(), connect.requester_endpoints(), -// connect.requester_fob().public_key()), -// [connect, added, this](asio::error_code error) mutable { -// if (error) { -// auto target(connect.requester_id()); -// this->connection_manager_.DropNode(target); -// return; -// } -// }); -// if (added) -// listener_ptr_->HandleCloseGroupDifference(*added); -// } -// -// void RoutingNode::HandleMessage(ConnectResponse connect_response) { -// if (!connection_manager_.SuggestNodeToAdd(connect_response.requester_id())) -// return; -// auto added = connection_manager_.AddNode( -// NodeInfo(connect_response.requester_id(), connect_response.receiver_fob()), -// connect_response.receiver_endpoints()); -// auto target = connect_response.requester_id(); -// rudp_.Add( -// rudp::Contact(connect_response.receiver_id(), -// connect_response.receiver_endpoints(), -// connect_response.receiver_fob().public_key()), -// [target, added, this](asio::error_code error) { -// if (error) { -// this->connection_manager_.DropNode(target); -// return; -// } -// if (added) -// listener_ptr_->HandleCloseGroupDifference(*added); -// if (connection_manager_.Size() >= QuorumSize) { -// rudp_.Remove(*bootstrap_node_, asio::use_future).get(); -// bootstrap_node_ = boost::none; -// } -// }); -// } -// void RoutingNode::HandleMessage(FindGroup find_group, MessageHeader original_header) { -// auto node_infos = std::move(connection_manager_.OurCloseGroup()); -// // add ourselves -// node_infos.emplace_back(NodeInfo(OurId(), passport::PublicPmid(our_fob_))); -// FindGroupResponse response(find_group.target_id(), node_infos); -// MessageHeader header(DestinationAddress(original_header.ReturnDestinationAddress()), -// SourceAddress(OurSourceAddress(GroupAddress(find_group.target_id()))), -// original_header.MessageId(), -// asymm::Sign(Serialise(response), our_fob_.private_key())); -// auto message(Serialise(header, MessageToTag::value(), response)); -// for (const auto& node : connection_manager_.GetTarget(original_header.FromNode())) { -// rudp_.Send(node.id, message, asio::use_future).get(); -// } -// } -// -// void RoutingNode::HandleMessage(FindGroupResponse find_group_reponse, -// MessageHeader /* original_header */) { -// // this is called to get our group on bootstrap, we will try and connect to each of these nodes -// // Only other reason is to allow the sentinel to check signatures and those calls will just -// fall -// // through here. -// for (const auto node : find_group_reponse.node_infos()) { -// if (!connection_manager_.SuggestNodeToAdd(node.id)) -// continue; -// Connect message(NextEndpointPair(), OurId(), node.id, passport::PublicPmid(our_fob_)); -// MessageHeader header(DestinationAddress(std::make_pair(Destination(node.id), boost::none)), -// SourceAddress{OurSourceAddress()}, ++message_id_); -// for (const auto& target : connection_manager_.GetTarget(node.id)) -// rudp_.Send(target.id, Serialise(header, MessageToTag::value(), message), -// [](asio::error_code error) { -// if (error) { -// LOG(kWarning) << "rudp cannot send" << error.message(); -// } -// }); -// } -// } -// -// void RoutingNode::HandleMessage(GetData /*get_data*/, MessageHeader /* original_header */) {} -// -// void RoutingNode::HandleMessage(GetDataResponse /* get_data_response */, -// MessageHeader /* original_header */) {} -// -// void RoutingNode::HandleMessage(PutData /*put_data*/, MessageHeader /* original_header */) {} -// -// void RoutingNode::HandleMessage(PutDataResponse /*put_data_response*/, -// MessageHeader /* original_header */) {} -// -// void RoutingNode::HandleMessage(Post /* post */, MessageHeader /* original_header */) {} -// -// SourceAddress RoutingNode::OurSourceAddress() const { -// if (bootstrap_node_) -// return std::make_tuple(NodeAddress(*bootstrap_node_), boost::none, ReplyToAddress(OurId())); -// else -// return std::make_tuple(NodeAddress(OurId()), boost::none, boost::none); -// } -// -// SourceAddress RoutingNode::OurSourceAddress(GroupAddress group) const { -// return std::make_tuple(NodeAddress(OurId()), group, boost::none); -// } -// -// template -// void RoutingNode::SendDirect(Address target, Message message, SendHandler handler) { -// MessageHeader header(DestinationAddress(std::make_pair(Destination(target), boost::none)), -// SourceAddress{OurSourceAddress()}, ++message_id_); -// -// rudp_.Send(target, Serialise(header, MessageToTag::value(), message), handler); -// } -// -// void RoutingNode::OnBootstrap(asio::error_code error, rudp::Contact contact, -// std::function handler) { -// if (error) { -// return handler(error, contact); -// } -// -// SendDirect(contact.id, FindGroup(OurId(), contact.id), -// [=](asio::error_code error) { handler(error, contact); }); -// } -// + } // namespace routing } // namespace maidsafe diff --git a/src/maidsafe/routing/routing_table.cc b/src/maidsafe/routing/routing_table.cc index b7d70144..e7cc7f57 100644 --- a/src/maidsafe/routing/routing_table.cc +++ b/src/maidsafe/routing/routing_table.cc @@ -202,6 +202,8 @@ bool RoutingTable::NewNodeIsBetterThanExisting( void RoutingTable::PushBackThenSort(NodeInfo their_info) { nodes_.push_back(std::move(their_info)); + LOG(kSuccess) << " RT [ "<< our_id_ << " ] addedd [ " << nodes_.back().id << " ] , size : " + << nodes_.size(); std::sort(std::begin(nodes_), std::end(nodes_), comparison_); } diff --git a/src/maidsafe/routing/routing_table.h b/src/maidsafe/routing/routing_table.h index cdc4b9d2..993df9fd 100644 --- a/src/maidsafe/routing/routing_table.h +++ b/src/maidsafe/routing/routing_table.h @@ -37,6 +37,7 @@ namespace routing { struct NodeInfo; +// N:B Bucket 512 is OurId() and Bucket 0 is the furthest bucket. // The RoutingTable class is used to maintain a list of contacts to which we are connected. It is // threadsafe and all public functions offer the strong exception guarantee. Any public function // having an Address or NodeInfo arg will throw if NDEBUG is defined and the passed ID is invalid. @@ -62,8 +63,10 @@ class RoutingTable { // // 1 - if the contact is ourself, or doesn't have a valid public key, or is already in the table, // it will not be added - // 2 - if the routing table is not full (size < OptimalSize()), the contact will be added - // 3 - if the contact is within our close group, it will be added + // 2 - if the routing table is not full (size < OptimalSize()), the contact will be added with no + // nodes removed + // 3 - if the contact is within our close group, it will be added & possibly a node will be + // removed // 4 - if we can find a candidate for removal (a contact in a bucket with more than 'BucketSize()' // contacts, which is also not within our close group), and if the new contact will fit in a // bucket closer to our own bucket, then we add the new contact. diff --git a/src/maidsafe/routing/tests/routing_connections_test.cc b/src/maidsafe/routing/tests/routing_connections_test.cc index ffb31a52..883a8802 100644 --- a/src/maidsafe/routing/tests/routing_connections_test.cc +++ b/src/maidsafe/routing/tests/routing_connections_test.cc @@ -17,6 +17,8 @@ use of the MaidSafe Software. */ +#include "asio/use_future.hpp" + #include "maidsafe/common/test.h" #include "maidsafe/common/utils.h" @@ -29,8 +31,19 @@ namespace routing { namespace test { +static SerialisedMessage str_to_msg(const std::string& str) { + return SerialisedMessage(str.begin(), str.end()); +} + +static std::string msg_to_str(const SerialisedMessage& msg) { + return std::string(msg.begin(), msg.end()); +} + TEST(ConnectionsTest, FUNC_TwoConnections) { - boost::asio::io_service ios; + bool c1_finished = false; + bool c2_finished = false; + + asio::io_service ios; Address c1_id(MakeIdentity()); Address c2_id(MakeIdentity()); @@ -38,47 +51,78 @@ TEST(ConnectionsTest, FUNC_TwoConnections) { Connections c1(ios, c1_id); Connections c2(ios, c2_id); - unsigned short port = 8080; + Port port = 8080; - bool c1_finished = false; - bool c2_finished = false; + c1.Accept(port, nullptr, [&](asio::error_code error, Connections::AcceptResult result) { + ASSERT_FALSE(error); + ASSERT_EQ(result.his_address, c2.OurId()); + ASSERT_EQ(result.our_endpoint.port(), port); - c1.Accept(port, - [&](asio::error_code error, asio::ip::udp::endpoint, Address his_id) { - ASSERT_FALSE(error); - ASSERT_EQ(his_id, c2.OurId()); - std::string msg = "hello"; - - c1.Send(his_id, - std::vector(msg.begin(), msg.end()), - [&](asio::error_code error) { - ASSERT_FALSE(error); - c1.Shutdown(); - c1_finished = true; - }); - }); + c1.Send(result.his_address, str_to_msg("hello"), [&](asio::error_code error) { + ASSERT_FALSE(error); + c1.Shutdown(); + c1_finished = true; + }); + }); c2.Connect(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port), - [&](asio::error_code error, Address his_id) { - ASSERT_FALSE(error); - ASSERT_EQ(his_id, c1.OurId()); + [&](asio::error_code error, Connections::ConnectResult result) { + ASSERT_FALSE(error); + ASSERT_EQ(result.his_address, c1.OurId()); - c2.Receive([&, his_id](asio::error_code error, Address sender_id, - const std::vector& bytes) { - ASSERT_FALSE(error); - ASSERT_EQ(sender_id, his_id); - ASSERT_EQ(std::string(bytes.begin(), bytes.end()), "hello"); + c2.Receive( + [&, result](asio::error_code error, Connections::ReceiveResult recv_result) { + ASSERT_FALSE(error); + ASSERT_EQ(recv_result.his_address, result.his_address); + ASSERT_EQ(msg_to_str(recv_result.message), "hello"); - c2.Shutdown(); - c2_finished = true; - }); - }); + c2.Shutdown(); + c2_finished = true; + }); + }); ios.run(); ASSERT_TRUE(c1_finished && c2_finished); } +TEST(ConnectionsTest, FUNC_TwoConnectionsWithFutures) { + Address c1_id(MakeIdentity()); + Address c2_id(MakeIdentity()); + + asio::io_service ios; + + Connections c1(ios, c1_id); + Connections c2(ios, c2_id); + + std::thread thread([&]() { ios.run(); }); + + Port port = 8080; + + auto accept_f = c1.Accept(port, nullptr, asio::use_future); + auto connect_f = + c2.Connect(asio::ip::udp::endpoint(asio::ip::address_v4::loopback(), port), asio::use_future); + + auto accept_result = accept_f.get(); + auto connect_result = connect_f.get(); + + ASSERT_EQ(accept_result.his_address, c2.OurId()); + ASSERT_EQ(accept_result.our_endpoint.port(), port); + + ASSERT_EQ(connect_result.his_address, c1.OurId()); + + auto recv_f = c2.Receive(asio::use_future); + auto send_f = c1.Send(accept_result.his_address, str_to_msg("hello"), asio::use_future); + + send_f.get(); + recv_f.get(); + + c1.Shutdown(); + c2.Shutdown(); + + thread.join(); +} + } // namespace test } // namespace routing diff --git a/src/maidsafe/routing/tests/routing_fake_vault_facade_test.cc b/src/maidsafe/routing/tests/routing_fake_vault_facade_test.cc new file mode 100644 index 00000000..258bb940 --- /dev/null +++ b/src/maidsafe/routing/tests/routing_fake_vault_facade_test.cc @@ -0,0 +1,102 @@ +/* Copyright 2014 MaidSafe.net limited + + This MaidSafe Software is licensed to you under (1) the MaidSafe.net Commercial License, + version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which + licence you accepted on initial access to the Software (the "Licences"). + + By contributing code to the MaidSafe Software, or to this project generally, you agree to be + bound by the terms of the MaidSafe Contributor Agreement, version 1.0, found in the root + directory of this project at LICENSE, COPYING and CONTRIBUTOR respectively and also + available at: http://www.maidsafe.net/licenses + + Unless required by applicable law or agreed to in writing, the MaidSafe Software distributed + under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. + + See the Licences for the specific language governing permissions and limitations relating to + use of the MaidSafe Software. */ + +#include "maidsafe/routing/tests/utils/fake_vault_facade.h" + +#include "asio/spawn.hpp" + +#include "maidsafe/common/asio_service.h" +#include "maidsafe/common/test.h" +#include "maidsafe/common/data_types/immutable_data.h" + +namespace maidsafe { + +namespace routing { + +namespace test { + +TEST(RoutingFakeVaultFacadeTest, FUNC_Constructor) { + vault::test::FakeVaultFacade vault1; + vault::test::FakeVaultFacade vault2; + Sleep(std::chrono::seconds(5)); + LOG(kInfo) << "================================================================================="; + vault::test::FakeVaultFacade vault3; + Sleep(std::chrono::seconds(15)); + LOG(kInfo) << "================================================================================="; + +// LOG(kInfo) << "================================================================================="; + +// std::shared_ptr immutable_data( +// new ImmutableData(NonEmptyString(RandomAlphaNumericBytes(1, 50)))); +// std::future put_future(vault2.Put(immutable_data, asio::use_future)); +// EXPECT_NO_THROW(put_future.get()); +// passport::MaidAndSigner maid_and_signer(passport::CreateMaidAndSigner()); +// std::shared_ptr public_maid(new passport::PublicMaid(maid_and_signer.first)); +// put_future = vault2.Put(public_maid, asio::use_future); +// EXPECT_NO_THROW(put_future.get()); + + +// AsioService asio_service(1); +// asio::spawn(asio_service.service(), [&](asio::yield_context yield) { +// std::error_code error; +// vault2.Put(immutable_data, yield[error]); +// EXPECT_FALSE(error) << error.message(); +// vault2.Put(public_maid, yield[error]); +// EXPECT_FALSE(error) << error.message(); +// }); + +// Sleep(std::chrono::seconds(5)); +// asio_service.Stop(); +} + +// TEST(RoutingFakeVaultFacadeTest, FUNC_PutGet) { +// using endpoint = asio::ip::udp::endpoint; +// using address = asio::ip::address_v4; +// vault::test::FakeVaultFacade facade1; +// std::vector> vaults(2); +// Port port(5483); +// for (auto& vault : vaults) +// vault.second = port++; +// for (auto& vault : vaults) +// ASSERT_NO_THROW(vault.first.StartAccepting(vault.second)); + +// ASSERT_GE(vaults.size(), 2); + +// for (size_t i = 0; i != vaults.size() - 1; ++i) +// for (size_t j = i + 1; j != vaults.size(); ++j) +// ASSERT_NO_THROW(vaults[j].first.AddContact(endpoint(address::loopback(), vaults[i].second))); + +// auto vault_index(RandomUint32() % vaults.size()); +// ImmutableData data(NonEmptyString(RandomAlphaNumericString(RandomUint32() % 1000))); + +// vaults[vault_index].first.Put(NodeId(RandomString(NodeId::kSize)), data, +// [](maidsafe_error error) { +// ASSERT_EQ(error.code(), make_error_code(CommonErrors::success)); +// }); + +// vaults[vault_index].first.Get(data.name(), +// [](maidsafe_error error) { +// ASSERT_EQ(error.code(), make_error_code(CommonErrors::success)); +// }); +// } + +} // namespace test + +} // namespace routing + +} // namespace maidsafe diff --git a/src/maidsafe/routing/tests/routing_vault_network_test.cc b/src/maidsafe/routing/tests/routing_vault_network_test.cc index a85a1601..4a148d3e 100644 --- a/src/maidsafe/routing/tests/routing_vault_network_test.cc +++ b/src/maidsafe/routing/tests/routing_vault_network_test.cc @@ -291,20 +291,20 @@ TEST(VaultNetworkTest, FUNC_CreateNetPutGetData) { // other async actions (same with the tests below). - LruCache cache(0, std::chrono::seconds(0)); +// LruCache cache(0, std::chrono::seconds(0)); - RoutingNode n; +// RoutingNode n; - NonEmptyString value(RandomAlphaNumericBytes(65)); - Identity name(crypto::Hash(value)); - MutableData a(name, value); - ImmutableData b(value); +// NonEmptyString value(RandomAlphaNumericBytes(65)); +// Identity name(crypto::Hash(value)); +// MutableData a(name, value); +// ImmutableData b(value); - Address from(MakeIdentity()); - Address to(MakeIdentity()); +// Address from(MakeIdentity()); +// Address to(MakeIdentity()); - n.Get(b.NameAndType(), [](asio::error_code /* error */) {}); - n.Get(a.NameAndType(), [](asio::error_code /* error */) {}); +// n.Get(b.NameAndType(), [](asio::error_code /* error */) {}); +// n.Get(a.NameAndType(), [](asio::error_code /* error */) {}); // n.Put(to, b, [](asio::error_code /* error */) {}); // n.Put(to, a, [](asio::error_code /* error */) {}); diff --git a/src/maidsafe/routing/tests/sentinel_test.cc b/src/maidsafe/routing/tests/sentinel_test.cc index a6ba571b..43fd8c1d 100644 --- a/src/maidsafe/routing/tests/sentinel_test.cc +++ b/src/maidsafe/routing/tests/sentinel_test.cc @@ -131,6 +131,7 @@ class SentinelTest : public testing::Test { return group_message; } + protected: void SortPmidNodes(const Address& target) { std::sort(pmid_nodes_.begin(), pmid_nodes_.end(), diff --git a/src/maidsafe/routing/tests/utils/fake_vault_facade.cc b/src/maidsafe/routing/tests/utils/fake_vault_facade.cc new file mode 100644 index 00000000..ace923c6 --- /dev/null +++ b/src/maidsafe/routing/tests/utils/fake_vault_facade.cc @@ -0,0 +1,96 @@ +/* Copyright 2014 MaidSafe.net limited + + This MaidSafe Software is licensed to you under (1) the MaidSafe.net Commercial License, + version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which + licence you accepted on initial access to the Software (the "Licences"). + + By contributing code to the MaidSafe Software, or to this project generally, you agree to be + bound by the terms of the MaidSafe Contributor Agreement, version 1.0, found in the root + directory of this project at LICENSE, COPYING and CONTRIBUTOR respectively and also + available at: http://www.maidsafe.net/licenses + + Unless required by applicable law or agreed to in writing, the MaidSafe Software distributed + under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. + + See the Licences for the specific language governing permissions and limitations relating to + use of the MaidSafe Software. */ + +#include "maidsafe/routing/tests/utils/fake_vault_facade.h" +#include "maidsafe/common/identity.h" +#include "maidsafe/common/types.h" +#include "maidsafe/common/data_types/immutable_data.h" +#include "maidsafe/common/data_types/mutable_data.h" +#include "maidsafe/passport/types.h" + +namespace maidsafe { + +namespace vault { + +namespace test { + +// template <> +// ImmutableData ParseData(const SerialisedData& serialised_data) { +// auto digest_size(crypto::SHA512::DIGESTSIZE); +// std::string name(serialised_data.begin(), serialised_data.begin() + digest_size); +// std::string content(serialised_data.begin() + digest_size, serialised_data.end()); +// return ImmutableData(ImmutableData::Name(Identity(name)), +// ImmutableData::serialised_type(NonEmptyString(content))); +// } + +routing::HandlePutPostReturn FakeVaultFacade::HandlePut(routing::SourceAddress from, + routing::Authority from_authority, + routing::Authority our_authority, + std::shared_ptr data) { + LOG(kVerbose) << "Received Put request from " << from.node_address.data << " with Data name " + << data->Name() << " and type " << static_cast(data->TypeId()); + (void)from_authority; + (void)our_authority; + // switch (authority) { + // case routing::Authority::client_manager: + // if (from_authority != routing::Authority::client) + // break; + // if (name_and_type_id.type_id == detail::TypeId::value) + // return MaidManager::HandlePut(from, Parse(serialised_data)); + // else if (name_and_type_id.type_id == detail::TypeId::value) + // return MaidManager::HandlePut(from, Parse(serialised_data)); + // else if (name_and_type_id.type_id == detail::TypeId::value) + // return MaidManager::HandlePut(from, Parse(serialised_data)); + // case routing::Authority::nae_manager: + // if (from_authority != routing::Authority::client_manager) + // break; + // if (name_and_type_id.type_id == detail::TypeId::value) + // return DataManager::HandlePut(from, Parse(serialised_data)); + // else if (name_and_type_id.type_id == detail::TypeId::value) + // return DataManager::HandlePut(from, Parse(serialised_data)); + // break; + // default: + // break; + // } + return boost::make_unexpected(MakeError(VaultErrors::failed_to_handle_request)); +} + +routing::HandleGetReturn FakeVaultFacade::HandleGet(routing::SourceAddress /*from*/, + routing::Authority /*from_authority*/, + routing::Authority /*authority*/, + Data::NameAndTypeId /*name_and_type_id*/) { + // switch (authority) { + // case routing::Authority::nae_manager: + // if (name_and_type_id.type_id == detail::TypeId::value) + // return DataManager::template HandleGet(from, data_name); + // else if (name_and_type_id.type_id == detail::TypeId::value) + // return DataManager::template HandleGet(from, data_name); + // break; + // default: + // break; + // } + return boost::make_unexpected(MakeError(VaultErrors::failed_to_handle_request)); +} + +void FakeVaultFacade::HandleChurn(routing::CloseGroupDifference /*diff*/) {} + +} // namespace test + +} // namespace vault + +} // namespace maidsafe diff --git a/src/maidsafe/routing/tests/utils/fake_vault_facade.h b/src/maidsafe/routing/tests/utils/fake_vault_facade.h new file mode 100644 index 00000000..5c5aec99 --- /dev/null +++ b/src/maidsafe/routing/tests/utils/fake_vault_facade.h @@ -0,0 +1,144 @@ +/* Copyright 2014 MaidSafe.net limited + + This MaidSafe Software is licensed to you under (1) the MaidSafe.net Commercial License, + version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which + licence you accepted on initial access to the Software (the "Licences"). + + By contributing code to the MaidSafe Software, or to this project generally, you agree to be + bound by the terms of the MaidSafe Contributor Agreement, version 1.0, found in the root + directory of this project at LICENSE, COPYING and CONTRIBUTOR respectively and also + available at: http://www.maidsafe.net/licenses + + Unless required by applicable law or agreed to in writing, the MaidSafe Software distributed + under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. + + See the Licences for the specific language governing permissions and limitations relating to + use of the MaidSafe Software. */ + +#ifndef MAIDSAFE_ROUTING_TESTS_UTILS_FAKE_VAULT_FACADE_H_ +#define MAIDSAFE_ROUTING_TESTS_UTILS_FAKE_VAULT_FACADE_H_ + +#include +#include +#include + +#include "maidsafe/routing/routing_node.h" + +namespace maidsafe { + +namespace vault { + +namespace test { + +template +class MaidManager { + public: + MaidManager() {} + + template + routing::HandlePutPostReturn HandlePut(const routing::SourceAddress& from, const Data& data); +}; + +template +template +routing::HandlePutPostReturn MaidManager::HandlePut( + const routing::SourceAddress& /*source_address*/, const Data& data) { + std::vector result; + result.push_back( + std::make_pair(routing::Destination(routing::Address(data.Name())), boost::none)); + return routing::HandlePutPostReturn(result); +} + + +template +class DataManager { + public: + DataManager() {} + + template + routing::HandlePutPostReturn HandlePut(const routing::SourceAddress& from, const Data& data); + template + routing::HandleGetReturn HandleGet(const routing::SourceAddress& from, const Identity& name); + + private: + std::map, std::vector> data_; + routing::CloseGroupDifference close_group_; +}; + +template +template +routing::HandlePutPostReturn DataManager::HandlePut(const routing::SourceAddress& /*from*/, + const Data& data) { + if (data_.find(data.name().value.string()) == std::end(data_)) + data_.insert(std::make_pair(data.name().value.string(), data.data().string())); + return boost::make_unexpected(MakeError(CommonErrors::success)); +} + +template +template +routing::HandleGetReturn DataManager::HandleGet(const routing::SourceAddress& /*from*/, + const Identity& name) { + auto it(data_.find(name.string())); + if (it != std::end(data_)) + return routing::HandleGetReturn(std::vector(it->second.begin(), it->second.end())); + return boost::make_unexpected(MakeError(CommonErrors::no_such_element)); +} + +// Helper function to parse data name and contents +// FIXME this need discussion, adding it temporarily to progress +// template +// ParsedType ParseData(const SerialisedData& serialised_data) { +// InputVectorStream binary_input_stream{serialised_data}; +// typename ParsedType::Name name; +// typename ParsedType::serialised_type contents; +// Parse(binary_input_stream, name, contents); +// return ParsedType(name, contents); +// } + +// template <> +// ImmutableData ParseData(const SerialisedData& serialised_data); + +class FakeVaultFacade : public MaidManager, + public DataManager, + public routing::RoutingNode { + public: + FakeVaultFacade() + : MaidManager(), + DataManager(), + routing::RoutingNode() {} + + ~FakeVaultFacade() = default; + + void HandleConnectionAdded(routing::Address /*address*/) {} + + routing::HandleGetReturn HandleGet(routing::SourceAddress from, routing::Authority from_authority, + routing::Authority authority, + Data::NameAndTypeId name_and_type_id); + + + routing::HandlePutPostReturn HandlePut(routing::SourceAddress from, + routing::Authority from_authority, + routing::Authority our_authority, + std::shared_ptr data); + + bool HandlePost(const routing::SerialisedMessage& message); + // not in local cache do upper layers have it (called when we are in target group) + template + boost::expected HandleGet(routing::Address) { + return boost::make_unexpected(MakeError(CommonErrors::no_such_element)); + } + // default put is allowed unless prevented by upper layers + bool HandlePut(routing::Address, routing::SerialisedMessage); + // if the implementation allows any put of data in unauthenticated mode + bool HandleUnauthenticatedPut(routing::Address, routing::SerialisedMessage); + void HandleChurn(routing::CloseGroupDifference diff); +}; + +} // namespace test + +} // namespace vault + +} // namespace maidsafe + +#endif // MAIDSAFE_ROUTING_TESTS_UTILS_FAKE_VAULT_FACADE_H_ diff --git a/src/maidsafe/routing/timer.h b/src/maidsafe/routing/timer.h new file mode 100644 index 00000000..b2e64811 --- /dev/null +++ b/src/maidsafe/routing/timer.h @@ -0,0 +1,75 @@ +/* Copyright 2014 MaidSafe.net limited + + This MaidSafe Software is licensed to you under (1) the MaidSafe.net Commercial License, + version 1.0 or later, or (2) The General Public License (GPL), version 3, depending on which + licence you accepted on initial access to the Software (the "Licences"). + + By contributing code to the MaidSafe Software, or to this project generally, you agree to be + bound by the terms of the MaidSafe Contributor Agreement, version 1.0, found in the root + directory of this project at LICENSE, COPYING and CONTRIBUTOR respectively and also + available at: http://www.maidsafe.net/licenses + + Unless required by applicable law or agreed to in writing, the MaidSafe Software distributed + under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. + + See the Licences for the specific language governing permissions and limitations relating to + use of the MaidSafe Software. */ + +#ifndef MAIDSAFE_ROUTING_TIMER_H_ +#define MAIDSAFE_ROUTING_TIMER_H_ + +#include +#include + +#include "asio/steady_timer.hpp" + +namespace maidsafe { + +namespace routing { + +class Timer { + public: + explicit Timer(asio::io_service& ios); + ~Timer(); + + template + void async_wait(asio::steady_timer::duration duration, Handler&& handler); + + void cancel(); + + private: + enum State { idle, running, canceled }; + asio::steady_timer timer_; + std::shared_ptr state_; +}; + +inline Timer::Timer(asio::io_service& ios) : timer_(ios), state_(std::make_shared(idle)) {} + +inline Timer::~Timer() { cancel(); } + +template +void Timer::async_wait(asio::steady_timer::duration duration, Handler&& handler) { + auto state = state_; + assert(*state == idle && "Current implementation allows only one async_wait invocation"); + *state = running; + timer_.expires_from_now(duration); + // FIXME(Team): forward the handler if we're using c++14 + timer_.async_wait([state, handler](const asio::error_code&) { + if (*state != running) { + return; + } + handler(); + }); +} + +inline void Timer::cancel() { + *state_ = canceled; + timer_.cancel(); +} + +} // namespace routing + +} // namespace maidsafe + +#endif // MAIDSAFE_ROUTING_TIMER_H_