|
17 | 17 |
|
18 | 18 | import java.io.IOException; |
19 | 19 | import java.io.StringReader; |
20 | | -import java.sql.CallableStatement; |
21 | 20 | import java.sql.Clob; |
22 | 21 | import java.sql.Connection; |
23 | 22 | import java.sql.ResultSet; |
24 | | -import java.sql.SQLException; |
25 | 23 | import java.util.List; |
26 | 24 | import java.util.logging.Logger; |
27 | 25 |
|
@@ -151,35 +149,37 @@ public void produceReportWithCoverage(final String realtimeReporterId, final Str |
151 | 149 | } |
152 | 150 |
|
153 | 151 | public void consumeReport(final String reporterId, final RealtimeReporterEventConsumer consumer) { |
| 152 | + consumeReport(reporterId, consumer, 60); |
| 153 | + } |
| 154 | + |
| 155 | + public void consumeReport(final String reporterId, final RealtimeReporterEventConsumer consumer, final int timeoutSeconds) { |
154 | 156 | StringBuilder sb = new StringBuilder(); |
155 | 157 | sb.append("DECLARE\n"); |
156 | 158 | sb.append(" l_reporter ut_realtime_reporter := ut_realtime_reporter();\n"); |
157 | 159 | sb.append("BEGIN\n"); |
158 | 160 | sb.append(" l_reporter.set_reporter_id(?);\n"); |
159 | | - sb.append(" ? := l_reporter.get_lines_cursor();\n"); |
| 161 | + sb.append(" ? := l_reporter.get_lines_cursor(a_initial_timeout => ?);\n"); |
160 | 162 | sb.append("END;"); |
161 | 163 | final String plsql = sb.toString(); |
162 | 164 | jdbcTemplate.setFetchSize(1); |
163 | 165 | try { |
164 | | - jdbcTemplate.execute(plsql, new CallableStatementCallback<Void>() { |
165 | | - @Override |
166 | | - public Void doInCallableStatement(final CallableStatement cs) throws SQLException { |
167 | | - cs.setString(1, reporterId); |
168 | | - cs.registerOutParameter(2, OracleTypes.CURSOR); |
169 | | - cs.execute(); |
170 | | - final ResultSet rs = (ResultSet) cs.getObject(2); |
171 | | - while (rs.next()) { |
172 | | - final String itemType = rs.getString("item_type"); |
173 | | - final Clob textClob = rs.getClob("text"); |
174 | | - final String textString = textClob.getSubString(1, ((int) textClob.length())); |
175 | | - final RealtimeReporterEvent event = convert(itemType, textString); |
176 | | - if (event != null) { |
177 | | - consumer.process(event); |
178 | | - } |
| 166 | + jdbcTemplate.execute(plsql, (CallableStatementCallback<Void>) cs -> { |
| 167 | + cs.setString(1, reporterId); |
| 168 | + cs.setInt(3, timeoutSeconds); |
| 169 | + cs.registerOutParameter(2, OracleTypes.CURSOR); |
| 170 | + cs.execute(); |
| 171 | + final ResultSet rs = (ResultSet) cs.getObject(2); |
| 172 | + while (rs.next()) { |
| 173 | + final String itemType = rs.getString("item_type"); |
| 174 | + final Clob textClob = rs.getClob("text"); |
| 175 | + final String textString = textClob.getSubString(1, ((int) textClob.length())); |
| 176 | + final RealtimeReporterEvent event = convert(itemType, textString); |
| 177 | + if (event != null) { |
| 178 | + consumer.process(event); |
179 | 179 | } |
180 | | - rs.close(); |
181 | | - return null; |
182 | 180 | } |
| 181 | + rs.close(); |
| 182 | + return null; |
183 | 183 | }); |
184 | 184 | } finally { |
185 | 185 | jdbcTemplate.setFetchSize(UtplsqlDao.FETCH_ROWS); |
|
0 commit comments