-
Notifications
You must be signed in to change notification settings - Fork 49
Don't create Host instances with random host_id #623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
0173fc3
12fdb74
2784e3f
a92be80
eb1cfb2
c381c19
c306d61
2d17c1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1683,14 +1683,7 @@ def protocol_downgrade(self, host_endpoint, previous_version): | |
| "http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version", self.protocol_version, new_version, host_endpoint) | ||
| self.protocol_version = new_version | ||
|
|
||
| def _add_resolved_hosts(self): | ||
| for endpoint in self.endpoints_resolved: | ||
| host, new = self.add_host(endpoint, signal=False) | ||
| if new: | ||
| host.set_up() | ||
| for listener in self.listeners: | ||
| listener.on_add(host) | ||
|
|
||
| def _populate_hosts(self): | ||
| self.profile_manager.populate( | ||
| weakref.proxy(self), self.metadata.all_hosts()) | ||
| self.load_balancing_policy.populate( | ||
|
|
@@ -1717,17 +1710,10 @@ def connect(self, keyspace=None, wait_for_all_pools=False): | |
| self.contact_points, self.protocol_version) | ||
| self.connection_class.initialize_reactor() | ||
| _register_cluster_shutdown(self) | ||
|
|
||
| self._add_resolved_hosts() | ||
|
|
||
| try: | ||
| self.control_connection.connect() | ||
|
|
||
| # we set all contact points up for connecting, but we won't infer state after this | ||
| for endpoint in self.endpoints_resolved: | ||
| h = self.metadata.get_host(endpoint) | ||
| if h and self.profile_manager.distance(h) == HostDistance.IGNORED: | ||
| h.is_up = None | ||
| self._populate_hosts() | ||
|
|
||
| log.debug("Control connection created") | ||
| except Exception: | ||
|
|
@@ -3534,28 +3520,22 @@ def _set_new_connection(self, conn): | |
| if old: | ||
| log.debug("[control connection] Closing old connection %r, replacing with %r", old, conn) | ||
| old.close() | ||
| def _connect_host_in_lbp(self): | ||
|
|
||
| def _try_connect_to_hosts(self): | ||
| errors = {} | ||
| lbp = ( | ||
| self._cluster.load_balancing_policy | ||
| if self._cluster._config_mode == _ConfigMode.LEGACY else | ||
| self._cluster._default_load_balancing_policy | ||
| ) | ||
|
|
||
| for host in lbp.make_query_plan(): | ||
| lbp = self._cluster.load_balancing_policy \ | ||
| if self._cluster._config_mode == _ConfigMode.LEGACY else self._cluster._default_load_balancing_policy | ||
|
|
||
| for endpoint in chain((host.endpoint for host in lbp.make_query_plan()), self._cluster.endpoints_resolved): | ||
| try: | ||
| return (self._try_connect(host), None) | ||
| except ConnectionException as exc: | ||
| errors[str(host.endpoint)] = exc | ||
| log.warning("[control connection] Error connecting to %s:", host, exc_info=True) | ||
| self._cluster.signal_connection_failure(host, exc, is_host_addition=False) | ||
| return (self._try_connect(endpoint), None) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you remove the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do that in a separate PR |
||
| except Exception as exc: | ||
| errors[str(host.endpoint)] = exc | ||
| log.warning("[control connection] Error connecting to %s:", host, exc_info=True) | ||
| errors[str(endpoint)] = exc | ||
| log.warning("[control connection] Error connecting to %s:", endpoint, exc_info=True) | ||
| if self._is_shutdown: | ||
| raise DriverException("[control connection] Reconnection in progress during shutdown") | ||
|
|
||
| return (None, errors) | ||
|
|
||
| def _reconnect_internal(self): | ||
|
|
@@ -3567,43 +3547,43 @@ def _reconnect_internal(self): | |
| to the exception that was raised when an attempt was made to open | ||
| a connection to that host. | ||
| """ | ||
| (conn, _) = self._connect_host_in_lbp() | ||
| (conn, _) = self._try_connect_to_hosts() | ||
| if conn is not None: | ||
| return conn | ||
|
|
||
| # Try to re-resolve hostnames as a fallback when all hosts are unreachable | ||
| self._cluster._resolve_hostnames() | ||
|
|
||
| self._cluster._add_resolved_hosts() | ||
| self._cluster._populate_hosts() | ||
|
|
||
| (conn, errors) = self._connect_host_in_lbp() | ||
| (conn, errors) = self._try_connect_to_hosts() | ||
| if conn is not None: | ||
| return conn | ||
|
|
||
| raise NoHostAvailable("Unable to connect to any servers", errors) | ||
|
|
||
| def _try_connect(self, host): | ||
| def _try_connect(self, endpoint): | ||
sylwiaszunejko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| Creates a new Connection, registers for pushed events, and refreshes | ||
| node/token and schema metadata. | ||
| """ | ||
| log.debug("[control connection] Opening new connection to %s", host) | ||
| log.debug("[control connection] Opening new connection to %s", endpoint) | ||
|
|
||
| while True: | ||
| try: | ||
| connection = self._cluster.connection_factory(host.endpoint, is_control_connection=True) | ||
| connection = self._cluster.connection_factory(endpoint, is_control_connection=True) | ||
| if self._is_shutdown: | ||
| connection.close() | ||
| raise DriverException("Reconnecting during shutdown") | ||
| break | ||
| except ProtocolVersionUnsupported as e: | ||
| self._cluster.protocol_downgrade(host.endpoint, e.startup_version) | ||
| self._cluster.protocol_downgrade(endpoint, e.startup_version) | ||
| except ProtocolException as e: | ||
| # protocol v5 is out of beta in C* >=4.0-beta5 and is now the default driver | ||
| # protocol version. If the protocol version was not explicitly specified, | ||
| # and that the server raises a beta protocol error, we should downgrade. | ||
| if not self._cluster._protocol_version_explicit and e.is_beta_protocol_error: | ||
| self._cluster.protocol_downgrade(host.endpoint, self._cluster.protocol_version) | ||
| self._cluster.protocol_downgrade(endpoint, self._cluster.protocol_version) | ||
| else: | ||
| raise | ||
|
|
||
|
|
@@ -3821,7 +3801,10 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None, | |
| tokens = local_row.get("tokens") | ||
|
|
||
| host = self._cluster.metadata.get_host(connection.original_endpoint) | ||
| if host: | ||
| if not host: | ||
| log.info("[control connection] Local host %s not found in metadata, adding it", connection.original_endpoint) | ||
| peers_result.append(local_row) | ||
| else: | ||
| datacenter = local_row.get("data_center") | ||
| rack = local_row.get("rack") | ||
| self._update_location_info(host, datacenter, rack) | ||
|
|
@@ -4177,8 +4160,9 @@ def _get_peers_query(self, peers_query_type, connection=None): | |
| query_template = (self._SELECT_SCHEMA_PEERS_TEMPLATE | ||
| if peers_query_type == self.PeersQueryType.PEERS_SCHEMA | ||
| else self._SELECT_PEERS_NO_TOKENS_TEMPLATE) | ||
| host_release_version = self._cluster.metadata.get_host(connection.original_endpoint).release_version | ||
| host_dse_version = self._cluster.metadata.get_host(connection.original_endpoint).dse_version | ||
| original_endpoint_host = self._cluster.metadata.get_host(connection.original_endpoint) | ||
| host_release_version = None if original_endpoint_host is None else original_endpoint_host.release_version | ||
| host_dse_version = None if original_endpoint_host is None else original_endpoint_host.dse_version | ||
| uses_native_address_query = ( | ||
| host_dse_version and Version(host_dse_version) >= self._MINIMUM_NATIVE_ADDRESS_DSE_VERSION) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -139,6 +139,10 @@ def export_schema_as_string(self): | |
| def refresh(self, connection, timeout, target_type=None, change_type=None, fetch_size=None, | ||
| metadata_request_timeout=None, **kwargs): | ||
|
|
||
| # If the host is not in metadata, we can't proceed, hosts should be added after succesfully establishing control connection | ||
| if not self.get_host(connection.original_endpoint): | ||
| return | ||
|
|
||
|
Comment on lines
+143
to
+145
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a comment explaining what is going on here.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is wrong fix, we need to address it in a separate PR, correct fix would be to pull version from the system.local when this information is absent.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a comment, let me know if this can stay for now, or should I change it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Lorak-mmk , this issue is triggered only on first metadata loading, at the time there is no known hosts, so we can't pull version info from them, but we need to pull metadata, so the best case would be to pull versions from the |
||
| server_version = self.get_host(connection.original_endpoint).release_version | ||
| dse_version = self.get_host(connection.original_endpoint).dse_version | ||
| parser = get_schema_parser(connection, server_version, dse_version, timeout, metadata_request_timeout, fetch_size) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,7 +176,7 @@ def __init__(self, endpoint, conviction_policy_factory, datacenter=None, rack=No | |
| self.endpoint = endpoint if isinstance(endpoint, EndPoint) else DefaultEndPoint(endpoint) | ||
| self.conviction_policy = conviction_policy_factory(self) | ||
| if not host_id: | ||
| host_id = uuid.uuid4() | ||
| raise ValueError("host_id may not be None") | ||
| self.host_id = host_id | ||
|
Comment on lines
177
to
180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Commit: "Don't create Host instances with random host_id" The change here is the one that the commit message explains. Perhaps the When writing commits, please assume that a reader won't be as familiar with the relevant code as you are. It is almost always true - even if reviewer is an active maintainer, there is high chance they did not work with this specific area recently. |
||
| self.set_location_info(datacenter, rack) | ||
| self.lock = RLock() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.