|
24 | 24 | from json.decoder import JSONDecodeError |
25 | 25 | from io import BytesIO |
26 | 26 |
|
| 27 | +from ibm_cloud_sdk_core import BaseService |
27 | 28 | from ibm_cloud_sdk_core.authenticators import Authenticator |
28 | 29 | from requests import Response, Session |
29 | 30 | from requests.cookies import RequestsCookieJar |
30 | 31 |
|
31 | 32 | from .common import get_sdk_headers |
32 | | -from .cloudant_v1 import CloudantV1 |
33 | 33 | from .couchdb_session_authenticator import CouchDbSessionAuthenticator |
34 | 34 |
|
35 | 35 | # pylint: disable=missing-docstring |
@@ -72,52 +72,99 @@ def __hash__(self): |
72 | 72 | # Since Py3.6 dict is ordered so use a key only dict for our set |
73 | 73 | rules_by_operation.setdefault(operation_id, dict()).setdefault(rule) |
74 | 74 |
|
75 | | -_old_init = CloudantV1.__init__ |
76 | | - |
77 | | -def new_init(self, authenticator: Authenticator = None): |
78 | | - _old_init(self, authenticator) |
79 | | - # Overwrite default read timeout to 2.5 minutes |
80 | | - if not ('timeout' in self.http_config): |
81 | | - new_http_config = self.http_config.copy() |
82 | | - new_http_config['timeout'] = (CONNECT_TIMEOUT, READ_TIMEOUT) |
83 | | - self.set_http_config(new_http_config) |
84 | | - # Custom actions for CouchDbSessionAuthenticator |
85 | | - if isinstance(authenticator, CouchDbSessionAuthenticator): |
86 | | - # Replacing BaseService's http.cookiejar.CookieJar as RequestsCookieJar supports update(CookieJar) |
87 | | - self.jar = RequestsCookieJar(self.jar) |
88 | | - self.authenticator.set_jar(self.jar) # Authenticators don't have access to cookie jars by default |
89 | | - add_hooks(self) |
90 | | - |
91 | | -_old_set_service_url = CloudantV1.set_service_url |
92 | | - |
93 | | -def new_set_service_url(self, service_url: str): |
94 | | - _old_set_service_url(self, service_url) |
95 | | - try: |
| 75 | +class CloudantBaseService(BaseService): |
| 76 | + """ |
| 77 | + The base class for service classes. |
| 78 | + """ |
| 79 | + def __init__( |
| 80 | + self, |
| 81 | + service_url: str = None, |
| 82 | + authenticator: Authenticator = None, |
| 83 | + ) -> None: |
| 84 | + """ |
| 85 | + Construct a new client for the Cloudant service. |
| 86 | +
|
| 87 | + :param Authenticator authenticator: The authenticator specifies the authentication mechanism. |
| 88 | + Get up to date information from https://github.com/IBM/python-sdk-core/blob/main/README.md |
| 89 | + about initializing the authenticator of your choice. |
| 90 | + """ |
| 91 | + BaseService.__init__(self, service_url=service_url, authenticator=authenticator) |
| 92 | + # Overwrite default read timeout to 2.5 minutes |
| 93 | + if not ('timeout' in self.http_config): |
| 94 | + new_http_config = self.http_config.copy() |
| 95 | + new_http_config['timeout'] = (CONNECT_TIMEOUT, READ_TIMEOUT) |
| 96 | + self.set_http_config(new_http_config) |
| 97 | + # Custom actions for CouchDbSessionAuthenticator |
| 98 | + if isinstance(authenticator, CouchDbSessionAuthenticator): |
| 99 | + # Replacing BaseService's http.cookiejar.CookieJar as RequestsCookieJar supports update(CookieJar) |
| 100 | + self.jar = RequestsCookieJar(self.jar) |
| 101 | + self.authenticator.set_jar(self.jar) # Authenticators don't have access to cookie jars by default |
| 102 | + add_hooks(self) |
| 103 | + |
| 104 | + def set_service_url(self, service_url: str): |
| 105 | + super().set_service_url(service_url) |
| 106 | + try: |
| 107 | + if isinstance(self.authenticator, CouchDbSessionAuthenticator): |
| 108 | + self.authenticator.token_manager.set_service_url(service_url) |
| 109 | + except AttributeError: |
| 110 | + pass # in case no authenticator is configured yet, pass |
| 111 | + |
| 112 | + def set_default_headers(self, headers: Dict[str, str]): |
| 113 | + super().set_default_headers(headers) |
96 | 114 | if isinstance(self.authenticator, CouchDbSessionAuthenticator): |
97 | | - self.authenticator.token_manager.set_service_url(service_url) |
98 | | - except AttributeError: |
99 | | - pass # in case no authenticator is configured yet, pass |
100 | | - |
101 | | -_old_set_default_headers = CloudantV1.set_default_headers |
102 | | - |
103 | | -def new_set_default_headers(self, headers: Dict[str, str]): |
104 | | - _old_set_default_headers(self, headers) |
105 | | - if isinstance(self.authenticator, CouchDbSessionAuthenticator): |
106 | | - combined_headers = {} |
107 | | - combined_headers.update(headers) |
108 | | - combined_headers.update(get_sdk_headers( |
109 | | - service_name=self.DEFAULT_SERVICE_NAME, |
110 | | - service_version='V1', |
111 | | - operation_id='authenticator_post_session') |
112 | | - ) |
113 | | - self.authenticator.token_manager.set_default_headers(combined_headers) |
114 | | - |
115 | | -_old_set_disable_ssl_verification = CloudantV1.set_disable_ssl_verification |
116 | | - |
117 | | -def new_set_disable_ssl_verification(self, status: bool = False) -> None: |
118 | | - _old_set_disable_ssl_verification(self, status) |
119 | | - if isinstance(self.authenticator, CouchDbSessionAuthenticator): |
120 | | - self.authenticator.token_manager.set_disable_ssl_verification(status) |
| 115 | + combined_headers = {} |
| 116 | + combined_headers.update(headers) |
| 117 | + combined_headers.update(get_sdk_headers( |
| 118 | + service_name=self.DEFAULT_SERVICE_NAME, |
| 119 | + service_version='V1', |
| 120 | + operation_id='authenticator_post_session') |
| 121 | + ) |
| 122 | + self.authenticator.token_manager.set_default_headers(combined_headers) |
| 123 | + |
| 124 | + def set_disable_ssl_verification(self, status: bool = False) -> None: |
| 125 | + super().set_disable_ssl_verification(status) |
| 126 | + if isinstance(self.authenticator, CouchDbSessionAuthenticator): |
| 127 | + self.authenticator.token_manager.set_disable_ssl_verification(status) |
| 128 | + |
| 129 | + def set_http_client(self, http_client: Session) -> None: |
| 130 | + super().set_http_client(http_client) |
| 131 | + add_hooks(self) |
| 132 | + |
| 133 | + def prepare_request(self, |
| 134 | + method: str, |
| 135 | + url: str, |
| 136 | + *args, |
| 137 | + headers: Optional[dict] = None, |
| 138 | + params: Optional[dict] = None, |
| 139 | + data: Optional[Union[str, dict]] = None, |
| 140 | + files: Optional[Union[Dict[str, Tuple[str]], |
| 141 | + List[Tuple[str, |
| 142 | + Tuple[str, |
| 143 | + ...]]]]] = None, |
| 144 | + **kwargs) -> dict: |
| 145 | + # Extract the operation ID from the request headers. |
| 146 | + operation_id = None |
| 147 | + header = headers.get('X-IBMCloud-SDK-Analytics') |
| 148 | + if header is not None: |
| 149 | + for element in header.split(';'): |
| 150 | + if element.startswith('operation_id'): |
| 151 | + operation_id = element.split('=')[1] |
| 152 | + break |
| 153 | + if operation_id is not None: |
| 154 | + # Check each validation rule that applies to the operation. |
| 155 | + # Until the request URL is passed to old_prepare_request it does not include the |
| 156 | + # service URL and is relative to it |
| 157 | + request_url_path_segments = urlsplit(url).path.strip('/').split('/') |
| 158 | + if len(request_url_path_segments) == 1 and request_url_path_segments[0] == '': |
| 159 | + request_url_path_segments = [] |
| 160 | + # Note the get returns a value-less dict, we are iterating only the keys |
| 161 | + for rule in rules_by_operation.get(operation_id, {}): |
| 162 | + if len(request_url_path_segments) > rule.path_segment_index: |
| 163 | + segment_to_validate = request_url_path_segments[rule.path_segment_index] |
| 164 | + if segment_to_validate.startswith('_'): |
| 165 | + raise ValueError('{0} {1} starts with the invalid _ character.'.format(rule.error_parameter_name, |
| 166 | + unquote(segment_to_validate))) |
| 167 | + return super().prepare_request(method, url, *args, headers=headers, params=params, data=data, files=files, **kwargs) |
121 | 168 |
|
122 | 169 | def _error_response_hook(response:Response, *args, **kwargs) -> Optional[Response]: |
123 | 170 | # pylint: disable=W0613 |
@@ -186,52 +233,7 @@ def _error_response_hook(response:Response, *args, **kwargs) -> Optional[Respons |
186 | 233 | # so the exception can surface elsewhere. |
187 | 234 | pass |
188 | 235 | return response |
189 | | - |
190 | | -_old_prepare_request = CloudantV1.prepare_request |
191 | | - |
192 | | -def new_prepare_request(self, |
193 | | - method: str, |
194 | | - url: str, |
195 | | - *args, |
196 | | - headers: Optional[dict] = None, |
197 | | - params: Optional[dict] = None, |
198 | | - data: Optional[Union[str, dict]] = None, |
199 | | - files: Optional[Union[Dict[str, Tuple[str]], |
200 | | - List[Tuple[str, |
201 | | - Tuple[str, |
202 | | - ...]]]]] = None, |
203 | | - **kwargs) -> dict: |
204 | | - # Extract the operation ID from the request headers. |
205 | | - operation_id = None |
206 | | - header = headers.get('X-IBMCloud-SDK-Analytics') |
207 | | - if header is not None: |
208 | | - for element in header.split(';'): |
209 | | - if element.startswith('operation_id'): |
210 | | - operation_id = element.split('=')[1] |
211 | | - break |
212 | | - if operation_id is not None: |
213 | | - # Check each validation rule that applies to the operation. |
214 | | - # Until the request URL is passed to old_prepare_request it does not include the |
215 | | - # service URL and is relative to it |
216 | | - request_url_path_segments = urlsplit(url).path.strip('/').split('/') |
217 | | - if len(request_url_path_segments) == 1 and request_url_path_segments[0] == '': |
218 | | - request_url_path_segments = [] |
219 | | - # Note the get returns a value-less dict, we are iterating only the keys |
220 | | - for rule in rules_by_operation.get(operation_id, {}): |
221 | | - if len(request_url_path_segments) > rule.path_segment_index: |
222 | | - segment_to_validate = request_url_path_segments[rule.path_segment_index] |
223 | | - if segment_to_validate.startswith('_'): |
224 | | - raise ValueError('{0} {1} starts with the invalid _ character.'.format(rule.error_parameter_name, |
225 | | - unquote(segment_to_validate))) |
226 | | - return _old_prepare_request(self, method, url, *args, headers=headers, params=params, data=data, files=files, **kwargs) |
227 | | - |
228 | 236 | def add_hooks(self): |
229 | 237 | response_hooks = self.get_http_client().hooks['response'] |
230 | 238 | if _error_response_hook not in response_hooks: |
231 | 239 | response_hooks.append(_error_response_hook) |
232 | | - |
233 | | -_old_set_http_client = CloudantV1.set_http_client |
234 | | - |
235 | | -def new_set_http_client(self, http_client: Session) -> None: |
236 | | - _old_set_http_client(self, http_client) |
237 | | - add_hooks(self) |
|
0 commit comments