Skip to content

feat(telegram): 添加媒体组(相册)支持 / add media group (album) support#4893

Merged
Soulter merged 3 commits intoAstrBotDevs:masterfrom
DDZS987:feat/telegram-media-group-support
Feb 8, 2026
Merged

feat(telegram): 添加媒体组(相册)支持 / add media group (album) support#4893
Soulter merged 3 commits intoAstrBotDevs:masterfrom
DDZS987:feat/telegram-media-group-support

Conversation

@DDZS987
Copy link
Contributor

@DDZS987 DDZS987 commented Feb 5, 2026

功能说明

支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。

主要改动

1. 初始化媒体组缓存 (init)

  • 添加 media_group_cache 字典存储待处理的媒体组消息
  • 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
  • 最大等待时间 10 秒(防止永久等待)

2. 消息处理流程 (message_handler)

  • 检测 media_group_id 判断是否为媒体组消息
  • 媒体组消息走特殊处理流程,避免分散处理

3. 媒体组消息缓存 (handle_media_group_message)

  • 缓存收到的媒体组消息
  • 使用 APScheduler 实现防抖(debounce)机制
  • 每收到新消息时重置超时计时器
  • 超时后触发统一处理

4. 媒体组合并处理 (process_media_group)

  • 从缓存中取出所有媒体项
  • 使用第一条消息作为基础(保留文本、回复等信息)
  • 依次添加所有图片、视频、文档到消息链
  • 将合并后的消息发送到处理流程

技术方案论证

Telegram Bot API 在处理媒体组时的设计限制:

  1. 将媒体组的每个消息作为独立的 update 发送
  2. 每个 update 带有相同的 media_group_id
  3. 不提供组的总数、结束标志或一次性完整组的机制

因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。 这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。

官方和社区证据:

  1. Telegram Bot API服务器实现(tdlib)明确指出媒体组缺少结束标志或总数信息

    当前实现仅通过media_group_id将多个Message update分组,没有提供组完成信号或消息总数。开发者必须自行收集并等待一段时间,否则无法可靠处理完整组。这证明无其他方案,只能依赖超时等待。

    链接:Give us some more information on grouped media tdlib/telegram-bot-api#643

  2. Telegram Bot API服务器issue讨论媒体组处理的不便性

    如果尝试等待固定数量消息而无超时机制,bot可能永久挂起(因为消息可能因网络延迟未全部到达)。推荐必须引入超时(timeout),这直接支持硬编码等待作为必要方案。

    链接:Inconvenient way to handle Media Groups tdlib/telegram-bot-api#339

  3. Telegraf(流行Node.js Telegram Bot框架)专用媒体组中间件实现

    该中间件明确使用timeout选项控制等待下一个媒体组消息的时间(默认或自定义硬编码值)。超时后才处理收集到的消息,这证明社区主流方案是硬编码等待,无动态替代。

    链接:https://github.com/DieTime/telegraf-media-group

  4. StackOverflow开发者讨论:无法一次性获取媒体组所有文件,必须手动收集

    当用户发送媒体组时,bot只收到部分消息(带media_group_id),需自行查询/等待其他消息。无数开发者确认需使用timer或delay机制收集,无API原生支持完整组。

    链接:https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id

  5. python-telegram-bot库社区讨论:媒体组消息单独到达,需手动处理

    即使在使用成熟库时,媒体组仍表现为多个独立消息(仅一个带有media_group_id),开发者必须实现收集逻辑,通常结合超时等待。

    链接:Get files in media group by media_group_id python-telegram-bot/python-telegram-bot#3143

  6. Telegram Bot API官方文档对media_group_id的描述

    文档仅定义media_group_id为“媒体消息组的唯一标识符”(可选字段),不提供任何获取完整组或检测结束的接口,隐含开发者需自行实现收集与等待。

    链接:https://core.telegram.org/bots/api#message

总结:以上证据一致表明,由于Telegram Bot API设计局限(无论polling还是webhook),处理媒体组别无他法,只能通过硬编码固定等待时间(常见3-10秒)来收集可能延迟的消息。这是当前最优且唯一的可靠方案,广泛被官方实现、库作者和开发者社区采用。

实现细节

  • 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
  • 最大等待时间 10 秒(防止永久等待)
  • 采用防抖(debounce)机制:每收到新消息重置计时器
  • 利用 APScheduler 实现延迟处理和任务调度

测试验证

  • ✅ 发送 5 张图片相册,成功合并为一条消息
  • ✅ 保留原始文本说明和回复信息
  • ✅ 支持图片、视频、文档混合的媒体组
  • ✅ 日志显示 正在处理媒体组 <media_group_id>,共 5 项

Motivation / 动机

当前 Telegram 适配器在处理用户发送的多图相册(媒体组)时,会将每张图片作为独立的消息处理,导致:

  • 机器人会对同一个相册回复多次(用户发送 5 张图,收到 5 次回复)
  • 无法理解这些图片是一个整体(如"今天的旅行记录")
  • 浪费 API 调用和 Token 消耗

此 PR 实现媒体组合并处理,解决了这个体验问题。

Modifications / 改动点

核心文件: astrbot/core/platform/sources/telegram/tg_adapter.py (+124 行)

实现功能:

  1. 媒体组检测:在 message_handler 中检测 media_group_id
  2. 消息缓存:使用 media_group_cache 字典临时存储媒体组消息
  3. 防抖机制:基于 APScheduler 实现延迟处理,每收到新消息重置计时器
  4. 合并处理:2.5 秒超时后合并所有图片/视频/文档为一条消息
  5. 新增方法
    • handle_media_group_message() - 缓存媒体组消息
    • process_media_group() - 合并并处理媒体组
      3
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

图片2026年2月5日21点19分33秒610 图片2026年2月5日21点14分27秒555 图片2026年2月5日21点14分46秒166 ---

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了"验证步骤"和"运行截图"。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

在 Telegram 适配器中新增对 Telegram 媒体组(相册)消息的统一处理。

新功能:

  • 通过在处理前将多个 update 聚合为单个逻辑消息,支持 Telegram 媒体组(相册)消息。

增强:

  • 引入具有可配置超时时间的媒体组缓存,通过现有的异步调度器对成组的 Telegram 消息进行防抖和定时处理。
Original summary in English

Summary by Sourcery

Add consolidated handling of Telegram media group (album) messages in the Telegram adapter.

New Features:

  • Support Telegram media group (album) messages by aggregating multiple updates into a single logical message before processing.

Enhancements:

  • Introduce a media group cache with configurable timeouts to debounce and schedule processing of grouped Telegram messages via the existing async scheduler.

## 功能说明
支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。

## 主要改动

### 1. 初始化媒体组缓存 (__init__)
- 添加 `media_group_cache` 字典存储待处理的媒体组消息
- 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
- 最大等待时间 10 秒(防止永久等待)

### 2. 消息处理流程 (message_handler)
- 检测 `media_group_id` 判断是否为媒体组消息
- 媒体组消息走特殊处理流程,避免分散处理

### 3. 媒体组消息缓存 (handle_media_group_message)
- 缓存收到的媒体组消息
- 使用 APScheduler 实现防抖(debounce)机制
- 每收到新消息时重置超时计时器
- 超时后触发统一处理

### 4. 媒体组合并处理 (process_media_group)
- 从缓存中取出所有媒体项
- 使用第一条消息作为基础(保留文本、回复等信息)
- 依次添加所有图片、视频、文档到消息链
- 将合并后的消息发送到处理流程

## 技术方案论证

Telegram Bot API 在处理媒体组时的设计限制:
1. 将媒体组的每个消息作为独立的 update 发送
2. 每个 update 带有相同的 `media_group_id`
3. **不提供**组的总数、结束标志或一次性完整组的机制

因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。
这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。

### 官方和社区证据:
- **Telegram Bot API 服务器实现(tdlib)**:明确指出缺少结束标志或总数信息
  tdlib/telegram-bot-api#643

- **Telegram Bot API 服务器 issue**:讨论媒体组处理的不便性,推荐使用超时机制
  tdlib/telegram-bot-api#339

- **Telegraf(Node.js 框架)**:专用媒体组中间件使用 timeout 控制等待时间
  https://github.com/DieTime/telegraf-media-group

- **StackOverflow 讨论**:无法一次性获取媒体组所有文件,必须手动收集
  https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id

- **python-telegram-bot 社区**:确认媒体组消息单独到达,需手动处理
  python-telegram-bot/python-telegram-bot#3143

- **Telegram Bot API 官方文档**:仅定义 `media_group_id` 为可选字段,不提供获取完整组的接口
  https://core.telegram.org/bots/api#message

## 实现细节
- 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
- 最大等待时间 10 秒(防止永久等待)
- 采用防抖(debounce)机制:每收到新消息重置计时器
- 利用 APScheduler 实现延迟处理和任务调度

## 测试验证
- ✅ 发送 5 张图片相册,成功合并为一条消息
- ✅ 保留原始文本说明和回复信息
- ✅ 支持图片、视频、文档混合的媒体组
- ✅ 日志显示 Processing media group <media_group_id> with 5 items

## 代码变更
- 文件:astrbot/core/platform/sources/telegram/tg_adapter.py
- 新增代码:124 行
- 新增方法:handle_media_group_message(), process_media_group()

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@auto-assign auto-assign bot requested review from Fridemn and Soulter February 5, 2026 13:20
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Feb 5, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并且给出了一些整体性的反馈:

  • media_group_max_wait 配置被定义了但从未使用;可以考虑把它接入当前的防抖逻辑(例如作为媒体组最多可被延迟的上限),或者直接移除以避免困惑。
  • handle_media_group_message 中,目前是通过遍历 self.scheduler.get_jobs() 来查找并取消已存在的任务,其实可以直接使用 self.scheduler.get_job(job_id) 来简化逻辑,并避免在每条媒体组消息上扫描所有任务。
给 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- The `media_group_max_wait` configuration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion.
- In `handle_media_group_message`, instead of iterating over `self.scheduler.get_jobs()` to find and cancel the existing job, you can use `self.scheduler.get_job(job_id)` directly to simplify the logic and avoid scanning all jobs on every media-group message.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:97-98` </location>
<code_context>
+        self.media_group_timeout = self.config.get(
+            "telegram_media_group_timeout", 2.5
+        )  # seconds
+        self.media_group_max_wait = self.config.get(
+            "telegram_media_group_max_wait", 10.0
+        )  # max seconds
+
</code_context>

<issue_to_address>
**issue (bug_risk):** Configured `media_group_max_wait` is never used, so media groups can be indefinitely delayed if messages keep arriving within timeout.

`self.media_group_max_wait` is set but never used in the scheduling logic. As a result, if messages in a media group keep arriving within `media_group_timeout` of each other, processing can be postponed indefinitely. Consider tracking the first message time per `media_group_id` and capping the scheduled run to something like `min(first_message_time + media_group_max_wait, now + media_group_timeout)` so the group is always processed within the maximum wait.
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:444-447` </location>
<code_context>
+
+        # Cancel any existing scheduled job for this media group
+        job_id = f"media_group_{media_group_id}"
+        existing_jobs = self.scheduler.get_jobs()
+        for job in existing_jobs:
+            if job.id == job_id:
+                job.remove()
+                logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
+
</code_context>

<issue_to_address>
**suggestion (performance):** Manual job removal plus `replace_existing=True` is redundant and scales poorly with many jobs.

Since `add_job(..., id=job_id, replace_existing=True)` will automatically replace any existing job with the same ID, the explicit `get_jobs()` scan and `job.remove()` loop is unnecessary and adds O(N) overhead per media message. You can remove the loop and rely solely on `replace_existing=True` when adding the job.

Suggested implementation:

```python
        # 使用稳定的 job_id,在调度任务时通过 replace_existing=True 自动替换旧任务
        job_id = f"media_group_{media_group_id}"

```

You should also update the corresponding `self.scheduler.add_job(...)` call for this media group to pass `id=job_id` and `replace_existing=True`, for example:

```python
self.scheduler.add_job(
    self._flush_media_group,
    "date",
    run_date=run_at,
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

so that any previously scheduled job with the same `job_id` is automatically replaced without scanning `get_jobs()`.
</issue_to_address>

### Comment 3
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:442` </location>
<code_context>
+            f"当前共 {len(self.media_group_cache[media_group_id])} 条"
+        )
+
+        # Cancel any existing scheduled job for this media group
+        job_id = f"media_group_{media_group_id}"
+        existing_jobs = self.scheduler.get_jobs()
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the media-group scheduling and processing logic by relying on `replace_existing`, reusing `convert_message`, and either using or removing the unused `media_group_max_wait` configuration.

You can trim a fair bit of complexity without changing behavior:

### 1. Remove manual job lookup/removal

The explicit scan/removal makes the flow harder to follow and is redundant with `replace_existing=True`.

Current:

```python
job_id = f"media_group_{media_group_id}"
existing_jobs = self.scheduler.get_jobs()
for job in existing_jobs:
    if job.id == job_id:
        job.remove()
        logger.debug(f"取消媒体组 {media_group_id} 的旧任务")

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

Simpler, same behavior:

```python
job_id = f"media_group_{media_group_id}"

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,  # this already cancels/replaces the old job
)
logger.debug(f"已安排媒体组 {media_group_id}{self.media_group_timeout} 秒后处理")
```

If you want an explicit check, use `get_job` instead of iterating:

```python
job_id = f"media_group_{media_group_id}"
if self.scheduler.get_job(job_id):
    logger.debug(f"替换媒体组 {media_group_id} 的旧任务")

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```


### 2. Reuse `convert_message` for all media-group items

`process_media_group` is partly reimplementing `convert_message` for the extra items, which splits the conversion logic and is easy to forget when you add new media types.

Current (simplified):

```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)

# ...
for update, _context in updates_and_contexts[1:]:
    if not update.message:
        continue

    if update.message.photo:
        ...
    elif update.message.video:
        ...
    elif update.message.document:
        ...
```

You can centralize conversion via `convert_message` and just merge the `message` lists, keeping all special cases (captions, new media types, etc.) in one place:

```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
if not abm:
    logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败")
    return

for update, ctx in updates_and_contexts[1:]:
    extra = await self.convert_message(update, ctx, get_reply=False)
    if not extra:
        continue
    # merge only the components, keep base session/meta from the first message
    abm.message.extend(extra.message)

await self.handle_msg(abm)
```

If you’re worried about overhead, you can factor out a small helper that `convert_message` and `process_media_group` both call, but even the direct reuse above already removes the duplicated media-type branches.


### 3. `media_group_max_wait` configuration

`self.media_group_max_wait` is currently unused, which makes the behavior harder to infer from config.

If you don’t intend to enforce a hard upper bound yet, consider dropping the config until you do:

```python
# remove this if not used anywhere
# self.media_group_max_wait = self.config.get(
#     "telegram_media_group_max_wait", 10.0
# )
```

Or, if you want the hard cap now, you can incorporate it with minimal extra logic, e.g. store a `created_at` timestamp with the cache entry and skip rescheduling once it exceeds `max_wait`. For example:

```python
if media_group_id not in self.media_group_cache:
    self.media_group_cache[media_group_id] = {
        "created_at": datetime.now(),
        "items": [],
    }

entry = self.media_group_cache[media_group_id]
entry["items"].append((update, context))

elapsed = (datetime.now() - entry["created_at"]).total_seconds()
delay = 0 if elapsed >= self.media_group_max_wait else self.media_group_timeout

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=delay),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

This way the config meaning matches the actual behavior, and the control flow is still relatively simple.
</issue_to_address>

Sourcery 对开源项目免费 —— 如果你觉得我们的评审有帮助,欢迎分享 ✨
请帮我变得更有用!可以在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • The media_group_max_wait configuration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion.
  • In handle_media_group_message, instead of iterating over self.scheduler.get_jobs() to find and cancel the existing job, you can use self.scheduler.get_job(job_id) directly to simplify the logic and avoid scanning all jobs on every media-group message.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `media_group_max_wait` configuration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion.
- In `handle_media_group_message`, instead of iterating over `self.scheduler.get_jobs()` to find and cancel the existing job, you can use `self.scheduler.get_job(job_id)` directly to simplify the logic and avoid scanning all jobs on every media-group message.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:97-98` </location>
<code_context>
+        self.media_group_timeout = self.config.get(
+            "telegram_media_group_timeout", 2.5
+        )  # seconds
+        self.media_group_max_wait = self.config.get(
+            "telegram_media_group_max_wait", 10.0
+        )  # max seconds
+
</code_context>

<issue_to_address>
**issue (bug_risk):** Configured `media_group_max_wait` is never used, so media groups can be indefinitely delayed if messages keep arriving within timeout.

`self.media_group_max_wait` is set but never used in the scheduling logic. As a result, if messages in a media group keep arriving within `media_group_timeout` of each other, processing can be postponed indefinitely. Consider tracking the first message time per `media_group_id` and capping the scheduled run to something like `min(first_message_time + media_group_max_wait, now + media_group_timeout)` so the group is always processed within the maximum wait.
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:444-447` </location>
<code_context>
+
+        # Cancel any existing scheduled job for this media group
+        job_id = f"media_group_{media_group_id}"
+        existing_jobs = self.scheduler.get_jobs()
+        for job in existing_jobs:
+            if job.id == job_id:
+                job.remove()
+                logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
+
</code_context>

<issue_to_address>
**suggestion (performance):** Manual job removal plus `replace_existing=True` is redundant and scales poorly with many jobs.

Since `add_job(..., id=job_id, replace_existing=True)` will automatically replace any existing job with the same ID, the explicit `get_jobs()` scan and `job.remove()` loop is unnecessary and adds O(N) overhead per media message. You can remove the loop and rely solely on `replace_existing=True` when adding the job.

Suggested implementation:

```python
        # 使用稳定的 job_id,在调度任务时通过 replace_existing=True 自动替换旧任务
        job_id = f"media_group_{media_group_id}"

```

You should also update the corresponding `self.scheduler.add_job(...)` call for this media group to pass `id=job_id` and `replace_existing=True`, for example:

```python
self.scheduler.add_job(
    self._flush_media_group,
    "date",
    run_date=run_at,
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

so that any previously scheduled job with the same `job_id` is automatically replaced without scanning `get_jobs()`.
</issue_to_address>

### Comment 3
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:442` </location>
<code_context>
+            f"当前共 {len(self.media_group_cache[media_group_id])} 条"
+        )
+
+        # Cancel any existing scheduled job for this media group
+        job_id = f"media_group_{media_group_id}"
+        existing_jobs = self.scheduler.get_jobs()
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the media-group scheduling and processing logic by relying on `replace_existing`, reusing `convert_message`, and either using or removing the unused `media_group_max_wait` configuration.

You can trim a fair bit of complexity without changing behavior:

### 1. Remove manual job lookup/removal

The explicit scan/removal makes the flow harder to follow and is redundant with `replace_existing=True`.

Current:

```python
job_id = f"media_group_{media_group_id}"
existing_jobs = self.scheduler.get_jobs()
for job in existing_jobs:
    if job.id == job_id:
        job.remove()
        logger.debug(f"取消媒体组 {media_group_id} 的旧任务")

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

Simpler, same behavior:

```python
job_id = f"media_group_{media_group_id}"

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,  # this already cancels/replaces the old job
)
logger.debug(f"已安排媒体组 {media_group_id}{self.media_group_timeout} 秒后处理")
```

If you want an explicit check, use `get_job` instead of iterating:

```python
job_id = f"media_group_{media_group_id}"
if self.scheduler.get_job(job_id):
    logger.debug(f"替换媒体组 {media_group_id} 的旧任务")

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```


### 2. Reuse `convert_message` for all media-group items

`process_media_group` is partly reimplementing `convert_message` for the extra items, which splits the conversion logic and is easy to forget when you add new media types.

Current (simplified):

```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)

# ...
for update, _context in updates_and_contexts[1:]:
    if not update.message:
        continue

    if update.message.photo:
        ...
    elif update.message.video:
        ...
    elif update.message.document:
        ...
```

You can centralize conversion via `convert_message` and just merge the `message` lists, keeping all special cases (captions, new media types, etc.) in one place:

```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
if not abm:
    logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败")
    return

for update, ctx in updates_and_contexts[1:]:
    extra = await self.convert_message(update, ctx, get_reply=False)
    if not extra:
        continue
    # merge only the components, keep base session/meta from the first message
    abm.message.extend(extra.message)

await self.handle_msg(abm)
```

If you’re worried about overhead, you can factor out a small helper that `convert_message` and `process_media_group` both call, but even the direct reuse above already removes the duplicated media-type branches.


### 3. `media_group_max_wait` configuration

`self.media_group_max_wait` is currently unused, which makes the behavior harder to infer from config.

If you don’t intend to enforce a hard upper bound yet, consider dropping the config until you do:

```python
# remove this if not used anywhere
# self.media_group_max_wait = self.config.get(
#     "telegram_media_group_max_wait", 10.0
# )
```

Or, if you want the hard cap now, you can incorporate it with minimal extra logic, e.g. store a `created_at` timestamp with the cache entry and skip rescheduling once it exceeds `max_wait`. For example:

```python
if media_group_id not in self.media_group_cache:
    self.media_group_cache[media_group_id] = {
        "created_at": datetime.now(),
        "items": [],
    }

entry = self.media_group_cache[media_group_id]
entry["items"].append((update, context))

elapsed = (datetime.now() - entry["created_at"]).total_seconds()
delay = 0 if elapsed >= self.media_group_max_wait else self.media_group_timeout

self.scheduler.add_job(
    self.process_media_group,
    "date",
    run_date=datetime.now() + timedelta(seconds=delay),
    args=[media_group_id],
    id=job_id,
    replace_existing=True,
)
```

This way the config meaning matches the actual behavior, and the control flow is still relatively simple.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Feb 5, 2026
根据代码审查反馈改进:

1. 实现 media_group_max_wait 防止无限延迟
   - 跟踪媒体组创建时间,超过最大等待时间立即处理
   - 最坏情况下 10 秒内必定处理,防止消息持续到达导致无限延迟

2. 移除手动 job 查找优化性能
   - 删除 O(N) 的 get_jobs() 循环扫描
   - 依赖 replace_existing=True 自动替换任务

3. 重用 convert_message 减少代码重复
   - 统一所有媒体类型转换逻辑
   - 未来添加新媒体类型只需修改一处

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@DDZS987
Copy link
Contributor Author

DDZS987 commented Feb 5, 2026

以下为按照AI审核员的要求修改优化后依然正常运行的证据截图:

图片2026年2月5日21点39分14秒20 图片2026年2月5日21点39分39秒724

@DDZS987
Copy link
Contributor Author

DDZS987 commented Feb 5, 2026

我在前面的PR描述有一些不太清晰:对于时间配置,我已修改为了硬编码,我不认为这个配置项需要接出到配置文件中

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Feb 8, 2026
@Soulter Soulter merged commit dbe8e33 into AstrBotDevs:master Feb 8, 2026
5 checks passed
@dosubot
Copy link

dosubot bot commented Feb 8, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

united-pooh pushed a commit to united-pooh/AstrBot that referenced this pull request Feb 19, 2026
…tDevs#4893)

* feat(telegram): 添加媒体组(相册)支持 / add media group (album) support

## 功能说明
支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。

## 主要改动

### 1. 初始化媒体组缓存 (__init__)
- 添加 `media_group_cache` 字典存储待处理的媒体组消息
- 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
- 最大等待时间 10 秒(防止永久等待)

### 2. 消息处理流程 (message_handler)
- 检测 `media_group_id` 判断是否为媒体组消息
- 媒体组消息走特殊处理流程,避免分散处理

### 3. 媒体组消息缓存 (handle_media_group_message)
- 缓存收到的媒体组消息
- 使用 APScheduler 实现防抖(debounce)机制
- 每收到新消息时重置超时计时器
- 超时后触发统一处理

### 4. 媒体组合并处理 (process_media_group)
- 从缓存中取出所有媒体项
- 使用第一条消息作为基础(保留文本、回复等信息)
- 依次添加所有图片、视频、文档到消息链
- 将合并后的消息发送到处理流程

## 技术方案论证

Telegram Bot API 在处理媒体组时的设计限制:
1. 将媒体组的每个消息作为独立的 update 发送
2. 每个 update 带有相同的 `media_group_id`
3. **不提供**组的总数、结束标志或一次性完整组的机制

因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。
这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。

### 官方和社区证据:
- **Telegram Bot API 服务器实现(tdlib)**:明确指出缺少结束标志或总数信息
  tdlib/telegram-bot-api#643

- **Telegram Bot API 服务器 issue**:讨论媒体组处理的不便性,推荐使用超时机制
  tdlib/telegram-bot-api#339

- **Telegraf(Node.js 框架)**:专用媒体组中间件使用 timeout 控制等待时间
  https://github.com/DieTime/telegraf-media-group

- **StackOverflow 讨论**:无法一次性获取媒体组所有文件,必须手动收集
  https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id

- **python-telegram-bot 社区**:确认媒体组消息单独到达,需手动处理
  python-telegram-bot/python-telegram-bot#3143

- **Telegram Bot API 官方文档**:仅定义 `media_group_id` 为可选字段,不提供获取完整组的接口
  https://core.telegram.org/bots/api#message

## 实现细节
- 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践)
- 最大等待时间 10 秒(防止永久等待)
- 采用防抖(debounce)机制:每收到新消息重置计时器
- 利用 APScheduler 实现延迟处理和任务调度

## 测试验证
- ✅ 发送 5 张图片相册,成功合并为一条消息
- ✅ 保留原始文本说明和回复信息
- ✅ 支持图片、视频、文档混合的媒体组
- ✅ 日志显示 Processing media group <media_group_id> with 5 items

## 代码变更
- 文件:astrbot/core/platform/sources/telegram/tg_adapter.py
- 新增代码:124 行
- 新增方法:handle_media_group_message(), process_media_group()

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor(telegram): 优化媒体组处理性能和可靠性

根据代码审查反馈改进:

1. 实现 media_group_max_wait 防止无限延迟
   - 跟踪媒体组创建时间,超过最大等待时间立即处理
   - 最坏情况下 10 秒内必定处理,防止消息持续到达导致无限延迟

2. 移除手动 job 查找优化性能
   - 删除 O(N) 的 get_jobs() 循环扫描
   - 依赖 replace_existing=True 自动替换任务

3. 重用 convert_message 减少代码重复
   - 统一所有媒体类型转换逻辑
   - 未来添加新媒体类型只需修改一处

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix(telegram): handle missing message in media group processing and improve logging messages

---------

Co-authored-by: Ubuntu <ubuntu@localhost.localdomain>
Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
Co-authored-by: Soulter <905617992@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. lgtm This PR has been approved by a maintainer size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants