Push taskId filter down to storage layer in AsyncProfilerTaskLog query#13787
Conversation
|
@mrproliu Could you check this? Why did it only filter in memory? |
There was a problem hiding this comment.
Pull request overview
This PR updates the async-profiler task log query flow so taskId filtering happens in the storage layer (JDBC / Elasticsearch / BanyanDB) rather than fetching all logs and filtering in memory within AsyncProfilerQueryService.
Changes:
- Add
taskIdparameter toIAsyncProfilerTaskLogQueryDAO.getTaskLogList(...). - Apply
taskIdfiltering in JDBC (WHERE task_id = ?), Elasticsearch (termquery), and BanyanDB (eq(task_id, ...)). - Simplify
AsyncProfilerQueryServiceby removing in-memory filtering and passingtaskIddirectly to the DAO.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCAsyncProfilerTaskLogQueryDAO.java | Adds task_id predicate to SQL generation for task log queries. |
| oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AsyncProfilerTaskLogQueryEsDAO.java | Adds task_id term filter to the ES query. |
| oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAsyncProfilerTaskLogQueryDAO.java | Adds task_id equality condition to the BanyanDB stream query. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java | Updates DAO API to require taskId when listing task logs. |
| oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/asyncprofiler/AsyncProfilerQueryService.java | Passes taskId to DAO and removes in-memory filtering logic. |
| docs/en/changes/changes.md | Documents the behavioral change in the changelog. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public List<AsyncProfilerTaskLog> queryAsyncProfilerTaskLogs(String taskId) throws IOException { | ||
| List<AsyncProfilerTaskLog> taskLogList = getTaskLogQueryDAO().getTaskLogList(); | ||
| return findMatchedLogs(taskId, taskLogList); | ||
| } | ||
|
|
||
| private List<AsyncProfilerTaskLog> findMatchedLogs(final String taskID, final List<AsyncProfilerTaskLog> allLogs) { | ||
| return allLogs.stream() | ||
| .filter(l -> Objects.equals(l.getId(), taskID)) | ||
| return getTaskLogQueryDAO().getTaskLogList(taskId).stream() | ||
| .map(this::extendTaskLog) | ||
| .collect(Collectors.toList()); |
There was a problem hiding this comment.
getTaskLogList(taskId) is now called directly and the ES implementation uses Query.term(..., taskId) which throws a NullPointerException when taskId is null. To keep behavior consistent across storage implementations (JDBC would just bind null) and avoid unexpected NPEs, add a fast-fail/guard here (e.g., validate taskId is non-null/non-blank and return empty list or throw an IllegalArgumentException with a clear message) before invoking the DAO.
...kywalking/oap/server/core/storage/profiling/asyncprofiler/IAsyncProfilerTaskLogQueryDAO.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public List<AsyncProfilerTaskLog> getTaskLogList() throws IOException { | ||
| public List<AsyncProfilerTaskLog> getTaskLogList(String taskId) throws IOException { | ||
| final String index = IndexController.LogicIndicesRegister.getPhysicalTableName( | ||
| AsyncProfilerTaskLogRecord.INDEX_NAME); | ||
| final BoolQueryBuilder query = Query.bool(); | ||
| if (IndexController.LogicIndicesRegister.isMergedTable(AsyncProfilerTaskLogRecord.INDEX_NAME)) { | ||
| query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, AsyncProfilerTaskLogRecord.INDEX_NAME)); | ||
| } | ||
| query.must(Query.term(AsyncProfilerTaskLogRecord.TASK_ID, taskId)); | ||
| final SearchBuilder search = |
There was a problem hiding this comment.
Query.term(..., taskId) will throw a NullPointerException with a generic message ("value") if taskId is null. Consider validating taskId at the start of this method (e.g., requireNonNull / checkArgument) and throwing an IllegalArgumentException with a clearer message, so failures are easier to diagnose and consistent with other storage implementations.
|
Please resolve conflicts. |
139d82e to
0e804ca
Compare
What this PR does
IAsyncProfilerTaskLogQueryDAO.getTaskLogList()had no parameters, causing all implementations (JDBC, Elasticsearch, BanyanDB) to fetch every task log from storage. The service layer then filtered bytaskIdin memory.This PR pushes the filter down to the storage layer by adding
taskIdas a parameter togetTaskLogList()and updating all three implementations to apply DB-level filtering.Changes
IAsyncProfilerTaskLogQueryDAO: addtaskIdparameter togetTaskLogList()JDBCAsyncProfilerTaskLogQueryDAO: addWHERE task_id = ?to SQLAsyncProfilerTaskLogQueryEsDAO: addtermquery ontask_idBanyanDBAsyncProfilerTaskLogQueryDAO: addeq(TASK_ID, taskId)conditionAsyncProfilerQueryService: passtaskIddirectly to DAO, removefindMatchedLogs()in-memory filteringRelated
Related to #13593