Skip to content

perf(datasets): cache OSS bucket, add server-side pagination and --filter for tasks#1064

Open
xdlkc wants to merge 4 commits into
alibaba:masterfrom
xdlkc:perf/datasets-pagination-and-filter
Open

perf(datasets): cache OSS bucket, add server-side pagination and --filter for tasks#1064
xdlkc wants to merge 4 commits into
alibaba:masterfrom
xdlkc:perf/datasets-pagination-and-filter

Conversation

@xdlkc

@xdlkc xdlkc commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

fixes #1063

Summary

  • Cache oss2.Bucket instance — avoids creating 25+ HTTP sessions per list_all_datasets call (datasets list: 5.0s → 2.7s)
  • Server-side pagination — push offset/limit down to _extract_tasks_from_split with early termination and continuation_token caching (datasets tasks --limit 10: 19.4s → 1.5s)
  • --filter flag — prefix-based task search pushed down to OSS query prefix (datasets tasks --filter 0xerr0r: 1.4s)

Design

Bucket caching

_build_bucket() now lazily initializes and caches the oss2.Bucket instance. All methods on the same OssDatasetRegistry share one HTTP session.

Pagination cache

A _PaginationCache dataclass stores the last query's (query_prefix, tasks, continuation_token, is_exhausted). When the next query matches (same prefix, sequential page access), it resumes from the cached token instead of re-scanning from page 1. This turns sequential pagination (page 1 → 2 → 3) into O(1) per page.

max_keys is adapted per request: min(1000, items_still_needed) instead of always 1000.

Filter push-down

--filter is appended to the OSS query prefix (split_prefix + filter), so OSS only returns matching keys. Task names are extracted relative to the original split_prefix to preserve full names.

Interface changes

list_dataset_tasks gains keyword-only offset, limit, task_filter parameters across the full chain: BaseDatasetRegistryOssDatasetRegistryDatasetClient → CLI. All default to their zero-value, so existing callers are unaffected.

Benchmark

34,732 tasks, oss-ap-southeast-1, ~0.5s RTT.

Scenario Before After Speedup
datasets list 5.0s 2.7s 1.9×
datasets tasks --limit 10 19.4s 1.5s 13×
datasets tasks (no limit) 19.4s 15.5s 1.3×
datasets tasks --filter prefix N/A 1.4s new

Test plan

  • pytest tests/unit/datasets/ — 99 tests pass
  • Manual verification: datasets list, datasets tasks --limit, datasets tasks --filter against rock-agent-pre bucket

Co-Authored-By: Claude Code noreply@anthropic.com

xdlkc and others added 2 commits June 4, 2026 10:57
…lter for tasks

- Cache oss2.Bucket instance to avoid creating 25+ connections per list_all_datasets call (5.0s → 2.7s)
- Add _PaginationCache with continuation_token to resume sequential page access in O(1)
- Push offset/limit down to _extract_tasks_from_split with early termination (19.4s → 1.5s for --limit 10)
- Adapt max_keys to actual limit needed instead of always 1000
- Add --filter flag to tasks command, pushed down as OSS prefix for server-side filtering
- Update BaseDatasetRegistry and DatasetClient interfaces with offset/limit/task_filter params

Co-Authored-By: Claude Code <noreply@anthropic.com>
AI-Model: claude-opus-4-6
AI-Contributed/Feature: 120/540
AI-Contributed/UT: 17/448
The client.py imports TaskFile but its definition was not included
in the previous commit, causing ImportError in CI.

Co-Authored-By: Claude Code <noreply@anthropic.com>
AI-Model: claude-opus-4-6
AI-Contributed/Feature: 6/6
AI-Contributed/UT: 0/0

@alibabarock alibabarock left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Verdict: Comment(无 blocking 问题,有几点建议)

⚠️ Warnings

  1. _fs_download 目录下载时 get_task_file 被调了两次 — 先调一次探测"单文件还是目录",失败后再 list + 逐个 get。对大目录来说浪费了一次 RTT。建议先 list 再判断,避免无意义的 get。

  2. _PaginationCachetasks 字段会缓存全量已扫描的 task_ids — 34k tasks 场景下内存里存了 34k 字符串。如果反复切不同 dataset/split 查询,前一个的 cache 会一直占着。建议加个 LRU 或至少 cache 过期。

  3. --ouput typo 作为 alias 被保留add_argument("-o", "--output", "--ouput", ...)。理解是兼容拼写错误,但不太常规,建议注明或直接去掉。

💡 Suggestions

  1. _extract_tasks_from_splitmax_keys 自适应 — OSS 返回的 key 可能有重复(task 目录本身 + 子文件),去重后可能不够。建议 min(1000, items_still_needed + 10) 作为 buffer。

  2. _upload_task_file 返回 int | NoneNone 表示 skipped,1 表示 uploaded,和 _upload_task 返回的 int 混在一起语义模糊。建议统一用 enum 或 dataclass 表示状态。

  3. _normalize_task_path 路径安全校验 — 防 .. 做得好。. 和空段被过滤是正确行为,建议在 docstring 里说明。

✅ Looks Good

  • Bucket 缓存 + 分页下推 + filter prefix 下推,性能提升显著(13x)
  • _PaginationCache 设计合理,顺序翻页 O(1)
  • datasets fs 子命令结构清晰,路径安全校验到位
  • 隐藏文件过滤覆盖了 org/dataset/split 三层
  • JSON 输出统一用 _print_jsoncontextlib.redirect_stdout 隔离 upload 噪音
  • 99 个单测全过,覆盖充分

xdlkc added 2 commits June 4, 2026 18:57
The new pagination loop in _extract_tasks_from_split used `while True` and
read is_truncated/next_continuation_token via getattr, which on a MagicMock
always returns a truthy value. Tests using `return_value` (and any real OSS
response with a non-advancing token) spun forever, starving CPU/memory and
crashing the self-hosted CI runner and local machines.

Add a hard page budget (_MAX_PAGINATION_PAGES) and a non-advancing-token
guard to both pagination paths; make the test helper set explicit pagination
fields so mocks terminate correctly.

refs alibaba#1063

AI-Model: claude-opus-4-8
AI-Contributed/Feature: 39/39
AI-Contributed/UT: 31/31
return ivalue


def _is_json_output(args: argparse.Namespace) -> bool:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

函数都封装到类里面

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] datasets tasks: server-side pagination, continuation token caching, and --filter support

3 participants