-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathtest_optimization_api.py
More file actions
136 lines (116 loc) · 5.04 KB
/
test_optimization_api.py
File metadata and controls
136 lines (116 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019-Present Datadog, Inc.
from __future__ import annotations
import collections
from typing import Any, Dict, Union
from datadog_api_client.api_client import ApiClient, Endpoint as _Endpoint
from datadog_api_client.configuration import Configuration
from datadog_api_client.model_utils import (
set_attribute_from_path,
get_attribute_from_path,
UnsetType,
unset,
)
from datadog_api_client.v2.model.update_flaky_tests_response import UpdateFlakyTestsResponse
from datadog_api_client.v2.model.update_flaky_tests_request import UpdateFlakyTestsRequest
from datadog_api_client.v2.model.flaky_tests_search_response import FlakyTestsSearchResponse
from datadog_api_client.v2.model.flaky_tests_search_request import FlakyTestsSearchRequest
from datadog_api_client.v2.model.flaky_test import FlakyTest
class TestOptimizationApi:
"""
Search and manage flaky tests through Test Optimization. See the `Test Optimization page <https://docs.datadoghq.com/tests/>`_ for more information.
"""
def __init__(self, api_client=None):
if api_client is None:
api_client = ApiClient(Configuration())
self.api_client = api_client
self._search_flaky_tests_endpoint = _Endpoint(
settings={
"response_type": (FlakyTestsSearchResponse,),
"auth": ["apiKeyAuth", "appKeyAuth", "AuthZ"],
"endpoint_path": "/api/v2/test/flaky-test-management/tests",
"operation_id": "search_flaky_tests",
"http_method": "POST",
"version": "v2",
},
params_map={
"body": {
"openapi_types": (FlakyTestsSearchRequest,),
"location": "body",
},
},
headers_map={"accept": ["application/json"], "content_type": ["application/json"]},
api_client=api_client,
)
self._update_flaky_tests_endpoint = _Endpoint(
settings={
"response_type": (UpdateFlakyTestsResponse,),
"auth": ["apiKeyAuth", "appKeyAuth", "AuthZ"],
"endpoint_path": "/api/v2/test/flaky-test-management/tests",
"operation_id": "update_flaky_tests",
"http_method": "PATCH",
"version": "v2",
},
params_map={
"body": {
"required": True,
"openapi_types": (UpdateFlakyTestsRequest,),
"location": "body",
},
},
headers_map={"accept": ["application/json"], "content_type": ["application/json"]},
api_client=api_client,
)
def search_flaky_tests(
self,
*,
body: Union[FlakyTestsSearchRequest, UnsetType] = unset,
) -> FlakyTestsSearchResponse:
"""Search flaky tests.
List endpoint returning flaky tests from Flaky Test Management. Results are paginated.
:type body: FlakyTestsSearchRequest, optional
:rtype: FlakyTestsSearchResponse
"""
kwargs: Dict[str, Any] = {}
if body is not unset:
kwargs["body"] = body
return self._search_flaky_tests_endpoint.call_with_http_info(**kwargs)
def search_flaky_tests_with_pagination(
self,
*,
body: Union[FlakyTestsSearchRequest, UnsetType] = unset,
) -> collections.abc.Iterable[FlakyTest]:
"""Search flaky tests.
Provide a paginated version of :meth:`search_flaky_tests`, returning all items.
:type body: FlakyTestsSearchRequest, optional
:return: A generator of paginated results.
:rtype: collections.abc.Iterable[FlakyTest]
"""
kwargs: Dict[str, Any] = {}
if body is not unset:
kwargs["body"] = body
local_page_size = get_attribute_from_path(kwargs, "body.data.attributes.page.limit", 10)
endpoint = self._search_flaky_tests_endpoint
set_attribute_from_path(kwargs, "body.data.attributes.page.limit", local_page_size, endpoint.params_map)
pagination = {
"limit_value": local_page_size,
"results_path": "data",
"cursor_param": "body.data.attributes.page.cursor",
"cursor_path": "meta.pagination.next_page",
"endpoint": endpoint,
"kwargs": kwargs,
}
return endpoint.call_with_http_info_paginated(pagination)
def update_flaky_tests(
self,
body: UpdateFlakyTestsRequest,
) -> UpdateFlakyTestsResponse:
"""Update flaky test states.
Update the state of multiple flaky tests in Flaky Test Management.
:type body: UpdateFlakyTestsRequest
:rtype: UpdateFlakyTestsResponse
"""
kwargs: Dict[str, Any] = {}
kwargs["body"] = body
return self._update_flaky_tests_endpoint.call_with_http_info(**kwargs)