|
1 | 1 | """ |
2 | | -Implementation of gradient descent algorithm for minimizing cost of a linear hypothesis |
3 | | -function. |
| 2 | +Gradient descent helpers for a simple linear hypothesis function. |
| 3 | +
|
| 4 | +Time complexity: O(iterations * n_samples * n_features) |
| 5 | +Space complexity: O(n_features) |
4 | 6 | """ |
5 | 7 |
|
| 8 | +from __future__ import annotations |
| 9 | + |
| 10 | +from collections.abc import Sequence |
| 11 | + |
6 | 12 | import numpy as np |
7 | 13 |
|
8 | | -# List of input, output pairs |
9 | | -train_data = ( |
10 | | - ((5, 2, 3), 15), |
11 | | - ((6, 5, 9), 25), |
12 | | - ((11, 12, 13), 41), |
13 | | - ((1, 1, 1), 8), |
14 | | - ((11, 12, 13), 41), |
| 14 | +# List of input, output pairs (bias term handled separately) |
| 15 | +train_data: tuple[tuple[tuple[float, ...], float], ...] = ( |
| 16 | + ((5.0, 2.0, 3.0), 15.0), |
| 17 | + ((6.0, 5.0, 9.0), 25.0), |
| 18 | + ((11.0, 12.0, 13.0), 41.0), |
| 19 | + ((1.0, 1.0, 1.0), 8.0), |
| 20 | + ((11.0, 12.0, 13.0), 41.0), |
| 21 | +) |
| 22 | +test_data: tuple[tuple[tuple[float, ...], float], ...] = ( |
| 23 | + ((515.0, 22.0, 13.0), 555.0), |
| 24 | + ((61.0, 35.0, 49.0), 150.0), |
15 | 25 | ) |
16 | | -test_data = (((515, 22, 13), 555), ((61, 35, 49), 150)) |
17 | | -parameter_vector = [2, 4, 1, 5] |
18 | | -m = len(train_data) |
| 26 | +parameter_vector: list[float] = [2.0, 4.0, 1.0, 5.0] |
19 | 27 | LEARNING_RATE = 0.009 |
20 | 28 |
|
21 | 29 |
|
22 | | -def _error(example_no, data_set="train"): |
23 | | - """ |
24 | | - :param data_set: train data or test data |
25 | | - :param example_no: example number whose error has to be checked |
26 | | - :return: error in example pointed by example number. |
27 | | - """ |
28 | | - return calculate_hypothesis_value(example_no, data_set) - output( |
29 | | - example_no, data_set |
30 | | - ) |
| 30 | +def _get_dataset(data_set: str) -> tuple[tuple[tuple[float, ...], float], ...]: |
| 31 | + """Return the requested dataset or raise for unknown keys.""" |
| 32 | + if data_set == "train": |
| 33 | + return train_data |
| 34 | + if data_set == "test": |
| 35 | + return test_data |
| 36 | + msg = "data_set must be 'train' or 'test'" |
| 37 | + raise ValueError(msg) |
31 | 38 |
|
32 | 39 |
|
33 | | -def _hypothesis_value(data_input_tuple): |
| 40 | +def predict_from_parameters( |
| 41 | + parameters: Sequence[float], features: Sequence[float] |
| 42 | +) -> float: |
34 | 43 | """ |
35 | | - Calculates hypothesis function value for a given input |
36 | | - :param data_input_tuple: Input tuple of a particular example |
37 | | - :return: Value of hypothesis function at that point. |
38 | | - Note that there is an 'biased input' whose value is fixed as 1. |
39 | | - It is not explicitly mentioned in input data.. But, ML hypothesis functions use it. |
40 | | - So, we have to take care of it separately. Line 36 takes care of it. |
| 44 | + Evaluate the linear hypothesis, treating the first coefficient as the bias term. |
| 45 | +
|
| 46 | + >>> predict_from_parameters([1.0, 2.0, -1.0], (3.0, 0.5)) |
| 47 | + 6.5 |
41 | 48 | """ |
42 | | - hyp_val = 0 |
43 | | - for i in range(len(parameter_vector) - 1): |
44 | | - hyp_val += data_input_tuple[i] * parameter_vector[i + 1] |
45 | | - hyp_val += parameter_vector[0] |
46 | | - return hyp_val |
| 49 | + if len(parameters) != len(features) + 1: |
| 50 | + raise ValueError("parameters must include a bias term and match feature count") |
| 51 | + return float(parameters[0] + np.dot(parameters[1:], features)) |
47 | 52 |
|
48 | 53 |
|
49 | | -def output(example_no, data_set): |
| 54 | +def output(example_no: int, data_set: str = "train") -> float: |
50 | 55 | """ |
51 | | - :param data_set: test data or train data |
52 | | - :param example_no: example whose output is to be fetched |
53 | | - :return: output for that example |
| 56 | + Retrieve the label for an example from the requested dataset. |
| 57 | +
|
| 58 | + >>> output(0, data_set=\"train\") |
| 59 | + 15.0 |
54 | 60 | """ |
55 | | - if data_set == "train": |
56 | | - return train_data[example_no][1] |
57 | | - elif data_set == "test": |
58 | | - return test_data[example_no][1] |
59 | | - return None |
| 61 | + dataset = _get_dataset(data_set) |
| 62 | + return dataset[example_no][1] |
60 | 63 |
|
61 | 64 |
|
62 | | -def calculate_hypothesis_value(example_no, data_set): |
| 65 | +def calculate_hypothesis_value( |
| 66 | + example_no: int, |
| 67 | + data_set: str = "train", |
| 68 | + parameters: Sequence[float] | None = None, |
| 69 | +) -> float: |
63 | 70 | """ |
64 | | - Calculates hypothesis value for a given example |
65 | | - :param data_set: test data or train_data |
66 | | - :param example_no: example whose hypothesis value is to be calculated |
67 | | - :return: hypothesis value for that example |
| 71 | + Calculate the hypothesis value for a specific example. |
| 72 | +
|
| 73 | + >>> calculate_hypothesis_value(0, parameters=[2.0, 1.0, 0.0, 0.0]) |
| 74 | + 7.0 |
68 | 75 | """ |
69 | | - if data_set == "train": |
70 | | - return _hypothesis_value(train_data[example_no][0]) |
71 | | - elif data_set == "test": |
72 | | - return _hypothesis_value(test_data[example_no][0]) |
73 | | - return None |
| 76 | + dataset = _get_dataset(data_set) |
| 77 | + params = parameter_vector if parameters is None else parameters |
| 78 | + return predict_from_parameters(params, dataset[example_no][0]) |
74 | 79 |
|
75 | 80 |
|
76 | | -def summation_of_cost_derivative(index, end=m): |
| 81 | +def _error( |
| 82 | + example_no: int, data_set: str = "train", parameters: Sequence[float] | None = None |
| 83 | +) -> float: |
| 84 | + """Compute the prediction error for one example.""" |
| 85 | + return calculate_hypothesis_value(example_no, data_set, parameters) - output( |
| 86 | + example_no, data_set |
| 87 | + ) |
| 88 | + |
| 89 | + |
| 90 | +def summation_of_cost_derivative( |
| 91 | + index: int, |
| 92 | + end: int | None = None, |
| 93 | + parameters: Sequence[float] | None = None, |
| 94 | + data_set: str = "train", |
| 95 | + dataset: Sequence[tuple[Sequence[float], float]] | None = None, |
| 96 | +) -> float: |
77 | 97 | """ |
78 | | - Calculates the sum of cost function derivative |
79 | | - :param index: index wrt derivative is being calculated |
80 | | - :param end: value where summation ends, default is m, number of examples |
81 | | - :return: Returns the summation of cost derivative |
82 | | - Note: If index is -1, this means we are calculating summation wrt to biased |
83 | | - parameter. |
| 98 | + Calculate the summed derivative of the cost function for a parameter index. |
| 99 | +
|
| 100 | + ``index=-1`` represents the bias term. |
84 | 101 | """ |
85 | | - summation_value = 0 |
86 | | - for i in range(end): |
| 102 | + working_dataset = _get_dataset(data_set) if dataset is None else dataset |
| 103 | + params = parameter_vector if parameters is None else parameters |
| 104 | + limit = len(working_dataset) if end is None else end |
| 105 | + |
| 106 | + summation_value = 0.0 |
| 107 | + for i in range(limit): |
| 108 | + features, label = working_dataset[i] |
| 109 | + error = predict_from_parameters(params, features) - label |
87 | 110 | if index == -1: |
88 | | - summation_value += _error(i) |
| 111 | + summation_value += error |
89 | 112 | else: |
90 | | - summation_value += _error(i) * train_data[i][0][index] |
| 113 | + summation_value += error * features[index] |
91 | 114 | return summation_value |
92 | 115 |
|
93 | 116 |
|
94 | | -def get_cost_derivative(index): |
| 117 | +def get_cost_derivative( |
| 118 | + index: int, |
| 119 | + data_set: str = "train", |
| 120 | + parameters: Sequence[float] | None = None, |
| 121 | + dataset: Sequence[tuple[Sequence[float], float]] | None = None, |
| 122 | +) -> float: |
95 | 123 | """ |
96 | | - :param index: index of the parameter vector wrt to derivative is to be calculated |
97 | | - :return: derivative wrt to that index |
98 | | - Note: If index is -1, this means we are calculating summation wrt to biased |
99 | | - parameter. |
| 124 | + Return the average cost derivative for one parameter. |
| 125 | +
|
| 126 | + ``index=-1`` represents the bias term. |
100 | 127 | """ |
101 | | - cost_derivative_value = summation_of_cost_derivative(index, m) / m |
102 | | - return cost_derivative_value |
| 128 | + working_dataset = _get_dataset(data_set) if dataset is None else dataset |
| 129 | + return summation_of_cost_derivative( |
| 130 | + index, len(working_dataset), parameters, data_set, working_dataset |
| 131 | + ) / len(working_dataset) |
103 | 132 |
|
104 | 133 |
|
105 | | -def run_gradient_descent(): |
| 134 | +def batch_gradient_descent_step( |
| 135 | + parameters: Sequence[float], |
| 136 | + learning_rate: float, |
| 137 | + data: Sequence[tuple[Sequence[float], float]] | None = None, |
| 138 | +) -> list[float]: |
| 139 | + """ |
| 140 | + Perform one batch gradient descent step. |
| 141 | +
|
| 142 | + >>> dataset = (((1.0, 0.0, 0.0), 1.0), ((0.0, 1.0, 0.0), 1.0)) |
| 143 | + >>> batch_gradient_descent_step([0.0, 0.0, 0.0, 0.0], 0.1, dataset) |
| 144 | + [0.1, 0.05, 0.05, 0.0] |
| 145 | + """ |
| 146 | + dataset = train_data if data is None else data |
| 147 | + updated_parameters: list[float] = [] |
| 148 | + for i, parameter in enumerate(parameters): |
| 149 | + cost_derivative = get_cost_derivative( |
| 150 | + i - 1, data_set="train", parameters=parameters, dataset=dataset |
| 151 | + ) |
| 152 | + updated_parameters.append(parameter - learning_rate * cost_derivative) |
| 153 | + return updated_parameters |
| 154 | + |
| 155 | + |
| 156 | +def run_gradient_descent( |
| 157 | + learning_rate: float = LEARNING_RATE, |
| 158 | + max_iterations: int = 10_000, |
| 159 | + atol: float = 2e-6, |
| 160 | + rtol: float = 0.0, |
| 161 | +) -> tuple[list[float], int]: |
| 162 | + """ |
| 163 | + Repeatedly apply gradient descent until the parameter vector stabilizes. |
| 164 | +
|
| 165 | + >>> params, iterations = run_gradient_descent(max_iterations=5) |
| 166 | + >>> len(params) |
| 167 | + 4 |
| 168 | + >>> iterations >= 1 |
| 169 | + True |
| 170 | + """ |
106 | 171 | global parameter_vector |
107 | | - # Tune these values to set a tolerance value for predicted output |
108 | | - absolute_error_limit = 0.000002 |
109 | | - relative_error_limit = 0 |
110 | | - j = 0 |
111 | | - while True: |
112 | | - j += 1 |
113 | | - temp_parameter_vector = [0, 0, 0, 0] |
114 | | - for i in range(len(parameter_vector)): |
115 | | - cost_derivative = get_cost_derivative(i - 1) |
116 | | - temp_parameter_vector[i] = ( |
117 | | - parameter_vector[i] - LEARNING_RATE * cost_derivative |
118 | | - ) |
119 | | - if np.allclose( |
120 | | - parameter_vector, |
121 | | - temp_parameter_vector, |
122 | | - atol=absolute_error_limit, |
123 | | - rtol=relative_error_limit, |
124 | | - ): |
| 172 | + iterations = 0 |
| 173 | + current_parameters = parameter_vector[:] |
| 174 | + for iteration in range(1, max_iterations + 1): |
| 175 | + iterations = iteration |
| 176 | + next_parameters = batch_gradient_descent_step(current_parameters, learning_rate) |
| 177 | + if np.allclose(current_parameters, next_parameters, atol=atol, rtol=rtol): |
| 178 | + current_parameters = next_parameters |
125 | 179 | break |
126 | | - parameter_vector = temp_parameter_vector |
127 | | - print(("Number of iterations:", j)) |
| 180 | + current_parameters = next_parameters |
| 181 | + |
| 182 | + parameter_vector = current_parameters |
| 183 | + return current_parameters, iterations |
128 | 184 |
|
129 | 185 |
|
130 | | -def test_gradient_descent(): |
| 186 | +def test_gradient_descent() -> None: |
| 187 | + """Run a quick prediction check against the test dataset.""" |
| 188 | + params, iterations = run_gradient_descent() |
| 189 | + print(f"Converged in {iterations} iterations -> {params}") |
131 | 190 | for i in range(len(test_data)): |
132 | 191 | print(("Actual output value:", output(i, "test"))) |
133 | 192 | print(("Hypothesis output:", calculate_hypothesis_value(i, "test"))) |
134 | 193 |
|
135 | 194 |
|
136 | 195 | if __name__ == "__main__": |
137 | | - run_gradient_descent() |
138 | 196 | print("\nTesting gradient descent for a linear hypothesis function.\n") |
139 | 197 | test_gradient_descent() |
0 commit comments