feat(telegram): 添加媒体组(相册)支持 / add media group (album) support#4893
Merged
Soulter merged 3 commits intoAstrBotDevs:masterfrom Feb 8, 2026
Merged
feat(telegram): 添加媒体组(相册)支持 / add media group (album) support#4893Soulter merged 3 commits intoAstrBotDevs:masterfrom
Soulter merged 3 commits intoAstrBotDevs:masterfrom
Conversation
## 功能说明 支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。 ## 主要改动 ### 1. 初始化媒体组缓存 (__init__) - 添加 `media_group_cache` 字典存储待处理的媒体组消息 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) ### 2. 消息处理流程 (message_handler) - 检测 `media_group_id` 判断是否为媒体组消息 - 媒体组消息走特殊处理流程,避免分散处理 ### 3. 媒体组消息缓存 (handle_media_group_message) - 缓存收到的媒体组消息 - 使用 APScheduler 实现防抖(debounce)机制 - 每收到新消息时重置超时计时器 - 超时后触发统一处理 ### 4. 媒体组合并处理 (process_media_group) - 从缓存中取出所有媒体项 - 使用第一条消息作为基础(保留文本、回复等信息) - 依次添加所有图片、视频、文档到消息链 - 将合并后的消息发送到处理流程 ## 技术方案论证 Telegram Bot API 在处理媒体组时的设计限制: 1. 将媒体组的每个消息作为独立的 update 发送 2. 每个 update 带有相同的 `media_group_id` 3. **不提供**组的总数、结束标志或一次性完整组的机制 因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。 这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。 ### 官方和社区证据: - **Telegram Bot API 服务器实现(tdlib)**:明确指出缺少结束标志或总数信息 tdlib/telegram-bot-api#643 - **Telegram Bot API 服务器 issue**:讨论媒体组处理的不便性,推荐使用超时机制 tdlib/telegram-bot-api#339 - **Telegraf(Node.js 框架)**:专用媒体组中间件使用 timeout 控制等待时间 https://github.com/DieTime/telegraf-media-group - **StackOverflow 讨论**:无法一次性获取媒体组所有文件,必须手动收集 https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id - **python-telegram-bot 社区**:确认媒体组消息单独到达,需手动处理 python-telegram-bot/python-telegram-bot#3143 - **Telegram Bot API 官方文档**:仅定义 `media_group_id` 为可选字段,不提供获取完整组的接口 https://core.telegram.org/bots/api#message ## 实现细节 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) - 采用防抖(debounce)机制:每收到新消息重置计时器 - 利用 APScheduler 实现延迟处理和任务调度 ## 测试验证 - ✅ 发送 5 张图片相册,成功合并为一条消息 - ✅ 保留原始文本说明和回复信息 - ✅ 支持图片、视频、文档混合的媒体组 - ✅ 日志显示 Processing media group <media_group_id> with 5 items ## 代码变更 - 文件:astrbot/core/platform/sources/telegram/tg_adapter.py - 新增代码:124 行 - 新增方法:handle_media_group_message(), process_media_group() Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Contributor
There was a problem hiding this comment.
Hey - 我发现了 3 个问题,并且给出了一些整体性的反馈:
media_group_max_wait配置被定义了但从未使用;可以考虑把它接入当前的防抖逻辑(例如作为媒体组最多可被延迟的上限),或者直接移除以避免困惑。- 在
handle_media_group_message中,目前是通过遍历self.scheduler.get_jobs()来查找并取消已存在的任务,其实可以直接使用self.scheduler.get_job(job_id)来简化逻辑,并避免在每条媒体组消息上扫描所有任务。
给 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- The `media_group_max_wait` configuration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion.
- In `handle_media_group_message`, instead of iterating over `self.scheduler.get_jobs()` to find and cancel the existing job, you can use `self.scheduler.get_job(job_id)` directly to simplify the logic and avoid scanning all jobs on every media-group message.
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:97-98` </location>
<code_context>
+ self.media_group_timeout = self.config.get(
+ "telegram_media_group_timeout", 2.5
+ ) # seconds
+ self.media_group_max_wait = self.config.get(
+ "telegram_media_group_max_wait", 10.0
+ ) # max seconds
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Configured `media_group_max_wait` is never used, so media groups can be indefinitely delayed if messages keep arriving within timeout.
`self.media_group_max_wait` is set but never used in the scheduling logic. As a result, if messages in a media group keep arriving within `media_group_timeout` of each other, processing can be postponed indefinitely. Consider tracking the first message time per `media_group_id` and capping the scheduled run to something like `min(first_message_time + media_group_max_wait, now + media_group_timeout)` so the group is always processed within the maximum wait.
</issue_to_address>
### Comment 2
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:444-447` </location>
<code_context>
+
+ # Cancel any existing scheduled job for this media group
+ job_id = f"media_group_{media_group_id}"
+ existing_jobs = self.scheduler.get_jobs()
+ for job in existing_jobs:
+ if job.id == job_id:
+ job.remove()
+ logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
+
</code_context>
<issue_to_address>
**suggestion (performance):** Manual job removal plus `replace_existing=True` is redundant and scales poorly with many jobs.
Since `add_job(..., id=job_id, replace_existing=True)` will automatically replace any existing job with the same ID, the explicit `get_jobs()` scan and `job.remove()` loop is unnecessary and adds O(N) overhead per media message. You can remove the loop and rely solely on `replace_existing=True` when adding the job.
Suggested implementation:
```python
# 使用稳定的 job_id,在调度任务时通过 replace_existing=True 自动替换旧任务
job_id = f"media_group_{media_group_id}"
```
You should also update the corresponding `self.scheduler.add_job(...)` call for this media group to pass `id=job_id` and `replace_existing=True`, for example:
```python
self.scheduler.add_job(
self._flush_media_group,
"date",
run_date=run_at,
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
so that any previously scheduled job with the same `job_id` is automatically replaced without scanning `get_jobs()`.
</issue_to_address>
### Comment 3
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:442` </location>
<code_context>
+ f"当前共 {len(self.media_group_cache[media_group_id])} 条"
+ )
+
+ # Cancel any existing scheduled job for this media group
+ job_id = f"media_group_{media_group_id}"
+ existing_jobs = self.scheduler.get_jobs()
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the media-group scheduling and processing logic by relying on `replace_existing`, reusing `convert_message`, and either using or removing the unused `media_group_max_wait` configuration.
You can trim a fair bit of complexity without changing behavior:
### 1. Remove manual job lookup/removal
The explicit scan/removal makes the flow harder to follow and is redundant with `replace_existing=True`.
Current:
```python
job_id = f"media_group_{media_group_id}"
existing_jobs = self.scheduler.get_jobs()
for job in existing_jobs:
if job.id == job_id:
job.remove()
logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
Simpler, same behavior:
```python
job_id = f"media_group_{media_group_id}"
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True, # this already cancels/replaces the old job
)
logger.debug(f"已安排媒体组 {media_group_id} 在 {self.media_group_timeout} 秒后处理")
```
If you want an explicit check, use `get_job` instead of iterating:
```python
job_id = f"media_group_{media_group_id}"
if self.scheduler.get_job(job_id):
logger.debug(f"替换媒体组 {media_group_id} 的旧任务")
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
### 2. Reuse `convert_message` for all media-group items
`process_media_group` is partly reimplementing `convert_message` for the extra items, which splits the conversion logic and is easy to forget when you add new media types.
Current (simplified):
```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
# ...
for update, _context in updates_and_contexts[1:]:
if not update.message:
continue
if update.message.photo:
...
elif update.message.video:
...
elif update.message.document:
...
```
You can centralize conversion via `convert_message` and just merge the `message` lists, keeping all special cases (captions, new media types, etc.) in one place:
```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
if not abm:
logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败")
return
for update, ctx in updates_and_contexts[1:]:
extra = await self.convert_message(update, ctx, get_reply=False)
if not extra:
continue
# merge only the components, keep base session/meta from the first message
abm.message.extend(extra.message)
await self.handle_msg(abm)
```
If you’re worried about overhead, you can factor out a small helper that `convert_message` and `process_media_group` both call, but even the direct reuse above already removes the duplicated media-type branches.
### 3. `media_group_max_wait` configuration
`self.media_group_max_wait` is currently unused, which makes the behavior harder to infer from config.
If you don’t intend to enforce a hard upper bound yet, consider dropping the config until you do:
```python
# remove this if not used anywhere
# self.media_group_max_wait = self.config.get(
# "telegram_media_group_max_wait", 10.0
# )
```
Or, if you want the hard cap now, you can incorporate it with minimal extra logic, e.g. store a `created_at` timestamp with the cache entry and skip rescheduling once it exceeds `max_wait`. For example:
```python
if media_group_id not in self.media_group_cache:
self.media_group_cache[media_group_id] = {
"created_at": datetime.now(),
"items": [],
}
entry = self.media_group_cache[media_group_id]
entry["items"].append((update, context))
elapsed = (datetime.now() - entry["created_at"]).total_seconds()
delay = 0 if elapsed >= self.media_group_max_wait else self.media_group_timeout
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=delay),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
This way the config meaning matches the actual behavior, and the control flow is still relatively simple.
</issue_to_address>请帮我变得更有用!可以在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English
Hey - I've found 3 issues, and left some high level feedback:
- The
media_group_max_waitconfiguration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion. - In
handle_media_group_message, instead of iterating overself.scheduler.get_jobs()to find and cancel the existing job, you can useself.scheduler.get_job(job_id)directly to simplify the logic and avoid scanning all jobs on every media-group message.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `media_group_max_wait` configuration is defined but never used; either wire it into the debounce logic (e.g., as an upper bound on how long a group can be delayed) or remove it to avoid confusion.
- In `handle_media_group_message`, instead of iterating over `self.scheduler.get_jobs()` to find and cancel the existing job, you can use `self.scheduler.get_job(job_id)` directly to simplify the logic and avoid scanning all jobs on every media-group message.
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:97-98` </location>
<code_context>
+ self.media_group_timeout = self.config.get(
+ "telegram_media_group_timeout", 2.5
+ ) # seconds
+ self.media_group_max_wait = self.config.get(
+ "telegram_media_group_max_wait", 10.0
+ ) # max seconds
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Configured `media_group_max_wait` is never used, so media groups can be indefinitely delayed if messages keep arriving within timeout.
`self.media_group_max_wait` is set but never used in the scheduling logic. As a result, if messages in a media group keep arriving within `media_group_timeout` of each other, processing can be postponed indefinitely. Consider tracking the first message time per `media_group_id` and capping the scheduled run to something like `min(first_message_time + media_group_max_wait, now + media_group_timeout)` so the group is always processed within the maximum wait.
</issue_to_address>
### Comment 2
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:444-447` </location>
<code_context>
+
+ # Cancel any existing scheduled job for this media group
+ job_id = f"media_group_{media_group_id}"
+ existing_jobs = self.scheduler.get_jobs()
+ for job in existing_jobs:
+ if job.id == job_id:
+ job.remove()
+ logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
+
</code_context>
<issue_to_address>
**suggestion (performance):** Manual job removal plus `replace_existing=True` is redundant and scales poorly with many jobs.
Since `add_job(..., id=job_id, replace_existing=True)` will automatically replace any existing job with the same ID, the explicit `get_jobs()` scan and `job.remove()` loop is unnecessary and adds O(N) overhead per media message. You can remove the loop and rely solely on `replace_existing=True` when adding the job.
Suggested implementation:
```python
# 使用稳定的 job_id,在调度任务时通过 replace_existing=True 自动替换旧任务
job_id = f"media_group_{media_group_id}"
```
You should also update the corresponding `self.scheduler.add_job(...)` call for this media group to pass `id=job_id` and `replace_existing=True`, for example:
```python
self.scheduler.add_job(
self._flush_media_group,
"date",
run_date=run_at,
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
so that any previously scheduled job with the same `job_id` is automatically replaced without scanning `get_jobs()`.
</issue_to_address>
### Comment 3
<location> `astrbot/core/platform/sources/telegram/tg_adapter.py:442` </location>
<code_context>
+ f"当前共 {len(self.media_group_cache[media_group_id])} 条"
+ )
+
+ # Cancel any existing scheduled job for this media group
+ job_id = f"media_group_{media_group_id}"
+ existing_jobs = self.scheduler.get_jobs()
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the media-group scheduling and processing logic by relying on `replace_existing`, reusing `convert_message`, and either using or removing the unused `media_group_max_wait` configuration.
You can trim a fair bit of complexity without changing behavior:
### 1. Remove manual job lookup/removal
The explicit scan/removal makes the flow harder to follow and is redundant with `replace_existing=True`.
Current:
```python
job_id = f"media_group_{media_group_id}"
existing_jobs = self.scheduler.get_jobs()
for job in existing_jobs:
if job.id == job_id:
job.remove()
logger.debug(f"取消媒体组 {media_group_id} 的旧任务")
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
Simpler, same behavior:
```python
job_id = f"media_group_{media_group_id}"
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True, # this already cancels/replaces the old job
)
logger.debug(f"已安排媒体组 {media_group_id} 在 {self.media_group_timeout} 秒后处理")
```
If you want an explicit check, use `get_job` instead of iterating:
```python
job_id = f"media_group_{media_group_id}"
if self.scheduler.get_job(job_id):
logger.debug(f"替换媒体组 {media_group_id} 的旧任务")
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=self.media_group_timeout),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
### 2. Reuse `convert_message` for all media-group items
`process_media_group` is partly reimplementing `convert_message` for the extra items, which splits the conversion logic and is easy to forget when you add new media types.
Current (simplified):
```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
# ...
for update, _context in updates_and_contexts[1:]:
if not update.message:
continue
if update.message.photo:
...
elif update.message.video:
...
elif update.message.document:
...
```
You can centralize conversion via `convert_message` and just merge the `message` lists, keeping all special cases (captions, new media types, etc.) in one place:
```python
first_update, first_context = updates_and_contexts[0]
abm = await self.convert_message(first_update, first_context)
if not abm:
logger.warning(f"转换媒体组 {media_group_id} 的第一条消息失败")
return
for update, ctx in updates_and_contexts[1:]:
extra = await self.convert_message(update, ctx, get_reply=False)
if not extra:
continue
# merge only the components, keep base session/meta from the first message
abm.message.extend(extra.message)
await self.handle_msg(abm)
```
If you’re worried about overhead, you can factor out a small helper that `convert_message` and `process_media_group` both call, but even the direct reuse above already removes the duplicated media-type branches.
### 3. `media_group_max_wait` configuration
`self.media_group_max_wait` is currently unused, which makes the behavior harder to infer from config.
If you don’t intend to enforce a hard upper bound yet, consider dropping the config until you do:
```python
# remove this if not used anywhere
# self.media_group_max_wait = self.config.get(
# "telegram_media_group_max_wait", 10.0
# )
```
Or, if you want the hard cap now, you can incorporate it with minimal extra logic, e.g. store a `created_at` timestamp with the cache entry and skip rescheduling once it exceeds `max_wait`. For example:
```python
if media_group_id not in self.media_group_cache:
self.media_group_cache[media_group_id] = {
"created_at": datetime.now(),
"items": [],
}
entry = self.media_group_cache[media_group_id]
entry["items"].append((update, context))
elapsed = (datetime.now() - entry["created_at"]).total_seconds()
delay = 0 if elapsed >= self.media_group_max_wait else self.media_group_timeout
self.scheduler.add_job(
self.process_media_group,
"date",
run_date=datetime.now() + timedelta(seconds=delay),
args=[media_group_id],
id=job_id,
replace_existing=True,
)
```
This way the config meaning matches the actual behavior, and the control flow is still relatively simple.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
根据代码审查反馈改进: 1. 实现 media_group_max_wait 防止无限延迟 - 跟踪媒体组创建时间,超过最大等待时间立即处理 - 最坏情况下 10 秒内必定处理,防止消息持续到达导致无限延迟 2. 移除手动 job 查找优化性能 - 删除 O(N) 的 get_jobs() 循环扫描 - 依赖 replace_existing=True 自动替换任务 3. 重用 convert_message 减少代码重复 - 统一所有媒体类型转换逻辑 - 未来添加新媒体类型只需修改一处 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Contributor
Author
Contributor
Author
|
我在前面的PR描述有一些不太清晰:对于时间配置,我已修改为了硬编码,我不认为这个配置项需要接出到配置文件中 |
…mprove logging messages
Soulter
approved these changes
Feb 8, 2026
united-pooh
pushed a commit
to united-pooh/AstrBot
that referenced
this pull request
Feb 19, 2026
…tDevs#4893) * feat(telegram): 添加媒体组(相册)支持 / add media group (album) support ## 功能说明 支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。 ## 主要改动 ### 1. 初始化媒体组缓存 (__init__) - 添加 `media_group_cache` 字典存储待处理的媒体组消息 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) ### 2. 消息处理流程 (message_handler) - 检测 `media_group_id` 判断是否为媒体组消息 - 媒体组消息走特殊处理流程,避免分散处理 ### 3. 媒体组消息缓存 (handle_media_group_message) - 缓存收到的媒体组消息 - 使用 APScheduler 实现防抖(debounce)机制 - 每收到新消息时重置超时计时器 - 超时后触发统一处理 ### 4. 媒体组合并处理 (process_media_group) - 从缓存中取出所有媒体项 - 使用第一条消息作为基础(保留文本、回复等信息) - 依次添加所有图片、视频、文档到消息链 - 将合并后的消息发送到处理流程 ## 技术方案论证 Telegram Bot API 在处理媒体组时的设计限制: 1. 将媒体组的每个消息作为独立的 update 发送 2. 每个 update 带有相同的 `media_group_id` 3. **不提供**组的总数、结束标志或一次性完整组的机制 因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。 这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。 ### 官方和社区证据: - **Telegram Bot API 服务器实现(tdlib)**:明确指出缺少结束标志或总数信息 tdlib/telegram-bot-api#643 - **Telegram Bot API 服务器 issue**:讨论媒体组处理的不便性,推荐使用超时机制 tdlib/telegram-bot-api#339 - **Telegraf(Node.js 框架)**:专用媒体组中间件使用 timeout 控制等待时间 https://github.com/DieTime/telegraf-media-group - **StackOverflow 讨论**:无法一次性获取媒体组所有文件,必须手动收集 https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id - **python-telegram-bot 社区**:确认媒体组消息单独到达,需手动处理 python-telegram-bot/python-telegram-bot#3143 - **Telegram Bot API 官方文档**:仅定义 `media_group_id` 为可选字段,不提供获取完整组的接口 https://core.telegram.org/bots/api#message ## 实现细节 - 使用 2.5 秒超时收集媒体组消息(基于社区最佳实践) - 最大等待时间 10 秒(防止永久等待) - 采用防抖(debounce)机制:每收到新消息重置计时器 - 利用 APScheduler 实现延迟处理和任务调度 ## 测试验证 - ✅ 发送 5 张图片相册,成功合并为一条消息 - ✅ 保留原始文本说明和回复信息 - ✅ 支持图片、视频、文档混合的媒体组 - ✅ 日志显示 Processing media group <media_group_id> with 5 items ## 代码变更 - 文件:astrbot/core/platform/sources/telegram/tg_adapter.py - 新增代码:124 行 - 新增方法:handle_media_group_message(), process_media_group() Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * refactor(telegram): 优化媒体组处理性能和可靠性 根据代码审查反馈改进: 1. 实现 media_group_max_wait 防止无限延迟 - 跟踪媒体组创建时间,超过最大等待时间立即处理 - 最坏情况下 10 秒内必定处理,防止消息持续到达导致无限延迟 2. 移除手动 job 查找优化性能 - 删除 O(N) 的 get_jobs() 循环扫描 - 依赖 replace_existing=True 自动替换任务 3. 重用 convert_message 减少代码重复 - 统一所有媒体类型转换逻辑 - 未来添加新媒体类型只需修改一处 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * fix(telegram): handle missing message in media group processing and improve logging messages --------- Co-authored-by: Ubuntu <ubuntu@localhost.localdomain> Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com> Co-authored-by: Soulter <905617992@qq.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


功能说明
支持 Telegram 的媒体组消息(相册),将多张图片/视频合并为一条消息处理,而不是分散成多条消息。
主要改动
1. 初始化媒体组缓存 (init)
media_group_cache字典存储待处理的媒体组消息2. 消息处理流程 (message_handler)
media_group_id判断是否为媒体组消息3. 媒体组消息缓存 (handle_media_group_message)
4. 媒体组合并处理 (process_media_group)
技术方案论证
Telegram Bot API 在处理媒体组时的设计限制:
media_group_id因此,bot 必须自行收集消息,并通过硬编码超时(timeout/delay)等待可能延迟到达的消息。 这是目前唯一可靠的方案,被官方实现、主流框架和开发者社区广泛采用。
官方和社区证据:
Telegram Bot API服务器实现(tdlib)明确指出媒体组缺少结束标志或总数信息
当前实现仅通过media_group_id将多个Message update分组,没有提供组完成信号或消息总数。开发者必须自行收集并等待一段时间,否则无法可靠处理完整组。这证明无其他方案,只能依赖超时等待。
链接:Give us some more information on grouped media tdlib/telegram-bot-api#643
Telegram Bot API服务器issue讨论媒体组处理的不便性
如果尝试等待固定数量消息而无超时机制,bot可能永久挂起(因为消息可能因网络延迟未全部到达)。推荐必须引入超时(timeout),这直接支持硬编码等待作为必要方案。
链接:Inconvenient way to handle Media Groups tdlib/telegram-bot-api#339
Telegraf(流行Node.js Telegram Bot框架)专用媒体组中间件实现
该中间件明确使用timeout选项控制等待下一个媒体组消息的时间(默认或自定义硬编码值)。超时后才处理收集到的消息,这证明社区主流方案是硬编码等待,无动态替代。
链接:https://github.com/DieTime/telegraf-media-group
StackOverflow开发者讨论:无法一次性获取媒体组所有文件,必须手动收集
当用户发送媒体组时,bot只收到部分消息(带media_group_id),需自行查询/等待其他消息。无数开发者确认需使用timer或delay机制收集,无API原生支持完整组。
链接:https://stackoverflow.com/questions/50180048/telegram-api-get-all-uploaded-photos-by-media-group-id
python-telegram-bot库社区讨论:媒体组消息单独到达,需手动处理
即使在使用成熟库时,媒体组仍表现为多个独立消息(仅一个带有media_group_id),开发者必须实现收集逻辑,通常结合超时等待。
链接:Get files in media group by media_group_id python-telegram-bot/python-telegram-bot#3143
Telegram Bot API官方文档对media_group_id的描述
文档仅定义media_group_id为“媒体消息组的唯一标识符”(可选字段),不提供任何获取完整组或检测结束的接口,隐含开发者需自行实现收集与等待。
链接:https://core.telegram.org/bots/api#message
总结:以上证据一致表明,由于Telegram Bot API设计局限(无论polling还是webhook),处理媒体组别无他法,只能通过硬编码固定等待时间(常见3-10秒)来收集可能延迟的消息。这是当前最优且唯一的可靠方案,广泛被官方实现、库作者和开发者社区采用。
实现细节
测试验证
正在处理媒体组 <media_group_id>,共 5 项Motivation / 动机
当前 Telegram 适配器在处理用户发送的多图相册(媒体组)时,会将每张图片作为独立的消息处理,导致:
此 PR 实现媒体组合并处理,解决了这个体验问题。
Modifications / 改动点
核心文件:
astrbot/core/platform/sources/telegram/tg_adapter.py(+124 行)实现功能:
message_handler中检测media_group_idmedia_group_cache字典临时存储媒体组消息handle_media_group_message()- 缓存媒体组消息process_media_group()- 合并并处理媒体组3
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
在 Telegram 适配器中新增对 Telegram 媒体组(相册)消息的统一处理。
新功能:
增强:
Original summary in English
Summary by Sourcery
Add consolidated handling of Telegram media group (album) messages in the Telegram adapter.
New Features:
Enhancements: