Skip to content

feat(platform): add CLI Tester for plugin testing and debugging#4787

Open
YukiRa1n wants to merge 22 commits intoAstrBotDevs:masterfrom
YukiRa1n:master
Open

feat(platform): add CLI Tester for plugin testing and debugging#4787
YukiRa1n wants to merge 22 commits intoAstrBotDevs:masterfrom
YukiRa1n:master

Conversation

@YukiRa1n
Copy link
Contributor

@YukiRa1n YukiRa1n commented Jan 31, 2026

CLI Tester - 命令行测试平台

概述

为 AstrBot 添加 CLI 平台适配器,支持通过命令行直接测试插件功能,无需配置 IM 平台。

功能特性

核心功能

  • Socket 模式: 支持 Unix Socket (Linux/macOS) 和 TCP Socket (Windows/容器)
  • TTY 模式: 交互式终端测试
  • File 模式: 批量文件输入测试
  • 会话隔离: 支持 use_isolated_sessions 配置,每个请求独立会话
  • Token 认证: 自动生成安全 Token,支持认证校验

消息处理

  • 富媒体支持: 图片自动转 Base64 编码
  • JSON 响应: 结构化响应格式,包含文本、图片、状态
  • 多轮回复收集: 支持 LLM 多轮响应聚合
  • CLI 客户端: pip 安装后全局可用 astr 命令

AOP 装饰器

  • @handle_exceptions: 统一异常处理
  • @retry: 重试机制
  • @timeout: 超时控制
  • @log_entry_exit: 入口出口日志
  • @log_performance: 性能监控
  • @require_auth: Token 认证
  • @require_whitelist: 白名单校验

文件架构

平台适配器

astrbot/core/platform/sources/cli/
├── __init__.py
├── cli_adapter.py           # 主适配器 (编排层)
├── cli_event.py             # CLI 事件定义
├── interfaces.py            # 接口定义 (IHandler, ITokenValidator, IMessageConverter)
├── platform_detector.py     # 平台检测 (Unix/Windows/容器)
├── connection_info_writer.py # 连接信息写入
│
├── config/                  # 配置层
│   ├── __init__.py
│   ├── config_loader.py     # 配置加载器
│   └── token_manager.py     # Token 管理
│
├── handlers/                # 处理器层 (实现 IHandler 接口)
│   ├── __init__.py
│   ├── socket_handler.py    # Socket 模式处理器
│   ├── tty_handler.py       # TTY 模式处理器
│   └── file_handler.py      # File 模式处理器
│
├── message/                 # 消息层
│   ├── __init__.py
│   ├── converter.py         # 消息转换器
│   ├── image_processor.py   # 图片处理 (Base64 编解码)
│   ├── response_builder.py  # JSON 响应构建
│   └── response_collector.py # 多轮回复收集
│
├── session/                 # 会话层
│   ├── __init__.py
│   └── session_manager.py   # 会话管理 (TTL, 过期清理)
│
├── utils/                   # 工具层 (AOP)
│   ├── __init__.py
│   └── decorators.py        # AOP 装饰器集合
│
├── socket_abstract.py       # Socket 抽象基类
├── socket_factory.py        # Socket 工厂
├── unix_socket_server.py    # Unix Socket 实现
└── tcp_socket_server.py     # TCP Socket 实现

CLI 客户端

astrbot/cli/client/
├── __init__.py              # 客户端包入口
└── __main__.py              # astr 命令入口
    ├── get_data_path()       # 获取数据目录(支持全局调用)
    ├── get_temp_path()       # 获取临时目录
    ├── load_auth_token()     # 加载认证 Token
    ├── load_connection_info() # 加载连接信息
    ├── connect_to_server()   # 连接到服务器
    ├── send_message()        # 发送消息
    ├── get_logs()            # 获取日志
    ├── format_response()     # 格式化输出(图片占位符)
    ├── fix_git_bash_path()   # 兼容 Git Bash 路径转换
    └── main()                # 主函数(argparse 解析)

配置修改

pyproject.toml 添加 astr 命令:

[project.scripts]
astrbot = "astrbot.cli.__main__:cli"
astr = "astrbot.cli.client.__main__:main"  # 新增,全局可用

测试覆盖

共 76 个单元测试,覆盖所有核心模块:

测试文件 测试数 覆盖模块
test_decorators.py 22 AOP 装饰器
test_token_manager.py 8 Token 管理
test_session_manager.py 7 会话管理
test_message_converter.py 6 消息转换
test_image_processor.py 10 图片处理
test_response_builder.py 7 响应构建
test_e2e.py 9 端到端流程
test_cli_event.py 7 CLI 事件

使用方式

安装后全局使用

cd AstrBot
pip install -e .
astr --help

安装后可在任何目录使用 astr 命令:

# 在任何目录下都可以使用
astr 你好
astr /help
astr plugin ls
astr --log

常用命令

# 发送消息
astr 你好
astr /help

# 列出插件
astr plugin ls

# 获取日志
astr --log

# JSON 输出
astr -j "你好"

源码直接运行

cd AstrBot
python -m astrbot.cli.client 你好

配置说明

编辑 data/cmd_config.json

{
  "platform": [{
    "id": "cli_test",
    "type": "cli",
    "enable": true,
    "mode": "socket"
  }]
}

输出说明

  • 默认模式下,图片以 [图片] 占位符显示
  • 分段回复会自动分行显示
  • 使用 -j 参数获取完整 JSON(包含图片 URL/base64)
  • 使用 --log 参数查看详细日志
  • 使用 astr --help 查看完整帮助信息

设计原则

  • 单一职责: 每个模块职责单一,最大文件 < 250 行
  • 依赖倒置: handlers 依赖 IHandler 接口,不依赖具体实现
  • 组合优于继承: 通过组合小组件构建复杂功能
  • AOP: 日志、异常处理、认证等横切关注点抽离到装饰器

兼容性

  • Linux: Unix Socket (默认)
  • macOS: Unix Socket (默认)
  • Windows: TCP Socket (自动检测)
  • Docker 容器: TCP Socket (自动检测)
  • Git Bash: 自动兼容路径转换(/pluginC:/Program Files/Git/plugin 自动还原)

- Add CLI platform adapter with Unix socket mode
- Support isolated sessions with configurable TTL
- Add whitelist exemption for CLI platform
- Include astrbot-cli command-line tool
- Support independent configuration file system
…efault

- Rename "CLI Platform Adapter" to "CLI Tester"
- Set default enable to false (disabled by default)
- Update descriptions to emphasize testing and debugging purpose
- Clarify design goal: build fast feedback loop for vibe coding
@auto-assign auto-assign bot requested review from Fridemn and anka-afk January 31, 2026 18:22
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. labels Jan 31, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了两个问题,并留下了一些高层面的反馈:

  • 当前 CLI 适配器将诸如 /AstrBot/data/{config_file}/tmp/astrbot.sock 这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。
  • 白名单检查对 cli 平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。
  • 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过 os.chmod 或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。
供 AI Agents 使用的提示词
请根据这次代码审查中的评论进行修改:

## 整体评论
- 当前 CLI 适配器将诸如 `/AstrBot/data/{config_file}``/tmp/astrbot.sock` 这样的路径写死;建议从现有的配置/根路径工具中派生这些路径,或者允许通过配置覆盖,这样在非 Docker 或非默认部署环境中该特性也能正常工作。
- 白名单检查对 `cli` 平台无条件绕过校验;如果你预期有些环境会以更受控的方式使用 CLI,那么将该行为放在一个配置开关后面,可能会比硬编码豁免更安全。
- 对于 Unix 套接字服务器,你可能需要显式控制文件权限/所有权(例如通过 `os.chmod` 或 umask),并在 JSON 载荷上增加最小的分帧/大小检查,以避免在多个客户端连接时因部分读取或超大读取导致的问题。

## 单独评论

### 评论 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+            while self._running:
+                try:
+                    # 接受连接(非阻塞)
+                    loop = asyncio.get_event_loop()
+                    client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 在异步代码中使用 asyncio.get_event_loop() 并不推荐;asyncio.get_running_loop() 更安全且更具前向兼容性。

在 Python 3.10+ 中,`asyncio.get_event_loop()` 在异步代码中已被弃用,并且在某些策略下可能返回错误的事件循环。在 `_run_socket_mode` 中(以及类似的 `_handle_socket_client` / `_read_input` 中),请使用 `asyncio.get_running_loop()`,以确保你获取的是当前任务所在的事件循环。

建议实现:

```python
                    # 接受连接(非阻塞)
                    loop = asyncio.get_running_loop()
                    client_socket, _ = await loop.sock_accept(server_socket)

````astrbot/core/platform/sources/cli/cli_adapter.py` 中搜索其他在 `async def` 函数内部使用 `asyncio.get_event_loop()` 的地方,尤其是 `_handle_socket_client``_read_input`,并以类似方式替换:

- 当上下文为运行在事件循环上的异步代码时,将 `loop = asyncio.get_event_loop()` 替换为 `loop = asyncio.get_running_loop()`。

如果在纯同步初始化代码中(不在活动事件循环内)有 `asyncio.get_event_loop()` 的使用,应单独进行审查,因为它们可能需要不同的模式(例如使用 `asyncio.new_event_loop()` 显式创建事件循环,而不是使用 `get_running_loop()`)。
</issue_to_address>

### 评论 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+        logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+        return self._run_loop()
+
+    async def _run_loop(self) -> None:
+        """主运行循环
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议通过抽取小的辅助函数、集中复用逻辑以及将模式映射到处理函数的方式来重构 CLI 适配器,使代码在不改变行为的前提下更易理解。

在不改变行为的前提下,你可以通过抽取一些聚焦的小型辅助函数并整合重复逻辑,显著降低复杂度。下面是一些具体、局部化的重构建议,可以保留现有设计,但让代码更易于推理。

---

### 1. 使用策略映射简化模式选择

`_run_loop` 目前在函数体中同时包含 TTY 检测和分支逻辑。你可以将模式解析抽取到一个小的辅助函数中,并将模式映射到可调用对象:

```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
    "tty": self._run_tty_mode,
    "file": self._run_file_mode,
    "socket": self._run_socket_mode,
}

def _resolve_mode(self) -> str:
    has_tty = sys.stdin.isatty()
    if self.mode == "auto":
        return "file" if not has_tty else "tty"
    if self.mode in ("tty", "file", "socket"):
        if self.mode == "tty" and not has_tty:
            logger.warning(
                "[PROCESS] TTY mode requested but no TTY detected. "
                "CLI platform will not start."
            )
            return ""  # or None
        return self.mode
    logger.error("[ERROR] Unknown mode: %s", self.mode)
    return ""
```

```python
async def _run_loop(self) -> None:
    logger.info("[PROCESS] Starting CLI loop")

    if self.use_isolated_sessions:
        self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())

    mode = self._resolve_mode()
    if not mode:
        return

    handler = self._mode_handlers.get(mode)
    if handler:
        logger.info("[PROCESS] Starting %s mode", mode)
        await handler()
```

这样可以在不改变行为的前提下,压平分支结构,并把 TTY 逻辑局部化。

---

### 2. 抽取一个小型 `SessionTracker` 辅助类

`_convert_input``_cleanup_expired_sessions` 都会操作 `_session_timestamps`。将它们移入一个小型辅助类中,可以将消息构建和会话生命周期解耦:

```python
class _SessionTracker:
    def __init__(self, ttl: int) -> None:
        import time
        self._ttl = ttl
        self._timestamps: dict[str, float] = {}
        self._time = time

    def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
        if request_id is None:
            return base_session_id
        session_id = f"{base_session_id}_{request_id}"
        if session_id not in self._timestamps:
            self._timestamps[session_id] = self._time.time()
        return session_id

    def collect_expired(self) -> list[str]:
        now = self._time.time()
        expired = [
            s for s, ts in list(self._timestamps.items())
            if now - ts > self._ttl
        ]
        for s in expired:
            self._timestamps.pop(s, None)
        return expired
```

将其接入适配器:

```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```

```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
    ...
    if self.use_isolated_sessions and request_id:
        message.session_id = self._session_tracker.ensure_session(
            base_session_id="cli_session",
            request_id=request_id,
        )
    else:
        message.session_id = self.session_id
    ...
```

```python
async def _cleanup_expired_sessions(self) -> None:
    logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
    while self._running:
        try:
            await asyncio.sleep(10)
            if not self.use_isolated_sessions:
                continue

            expired_sessions = self._session_tracker.collect_expired()
            for session_id in expired_sessions:
                logger.info("[PROCESS] Cleaning expired session: %s", session_id)
                # TODO: DB cleanup if needed

            if expired_sessions:
                logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
        except Exception as e:
            logger.error("[ERROR] Session cleanup error: %s", e)
```

这样可以避免在多个位置直接修改 `_session_timestamps`,并将会话策略集中到一个地方。

---

### 3. 去重图片提取/归一化逻辑

你已经在 `_handle_socket_client` 中实现了图片提取和 base64 转换。如果 `CLIMessageEvent.send`(或类似逻辑)中有重叠代码,可以将其集中到一个可复用的辅助函数中:

```python
from astrbot.core.message.components import Image

def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
    import base64
    images: list[dict[str, Any]] = []

    for comp in message_chain.chain:
        if not isinstance(comp, Image) or not comp.file:
            continue

        info: dict[str, Any] = {}
        if comp.file.startswith("http"):
            info["type"] = "url"
            info["url"] = comp.file

        elif comp.file.startswith("file:///"):
            info["type"] = "file"
            file_path = comp.file[8:]
            info["path"] = file_path
            try:
                with open(file_path, "rb") as f:
                    raw = f.read()
                info["base64_data"] = base64.b64encode(raw).decode("utf-8")
                info["size"] = len(raw)
            except Exception as e:
                logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
                info["error"] = str(e)

        elif comp.file.startswith("base64://"):
            info["type"] = "base64"
            base64_data = comp.file[9:]
            info["base64_data"] = base64_data
            info["base64_length"] = len(base64_data)

        images.append(info)

    return images
````_handle_socket_client` 中使用:

```python
from .image_utils import normalize_images_from_chain  # or local helper

...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)

response = json.dumps(
    {
        "status": "success",
        "response": response_text,
        "images": images,
        "request_id": request_id,
    },
    ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```

并在 `CLIMessageEvent` 或其他复用该逻辑的代码路径中同样复用该辅助函数。

---

### 4. 将 `_handle_socket_client` 拆分为更小的步骤

在不改变行为的前提下,你可以把该方法拆分成解析 / 处理 / 序列化等辅助函数,使其更易测试且更短:

```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
    import json
    request = json.loads(raw.decode("utf-8"))
    message_text = request.get("message", "")
    request_id = request.get("request_id", str(uuid.uuid4()))
    return request, message_text, request_id

def _serialize_error(self, request_id: str, msg: str) -> bytes:
    import json
    return json.dumps(
        {"status": "error", "error": msg, "request_id": request_id},
        ensure_ascii=False,
    ).encode("utf-8")

def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
    import json
    response_text = chain.get_plain_text()
    images = normalize_images_from_chain(chain)
    return json.dumps(
        {
            "status": "success",
            "response": response_text,
            "images": images,
            "request_id": request_id,
        },
        ensure_ascii=False,
    ).encode("utf-8")
````_handle_socket_client` 中使用它们:

```python
async def _handle_socket_client(self, client_socket) -> None:
    import json
    logger.debug("[ENTRY] _handle_socket_client")
    loop = asyncio.get_event_loop()

    try:
        data = await loop.sock_recv(client_socket, 4096)
        if not data:
            logger.debug("[PROCESS] Empty request, closing connection")
            return

        try:
            _, message_text, request_id = self._parse_socket_request(data)
        except json.JSONDecodeError:
            await loop.sock_sendall(
                client_socket, self._serialize_error("", "Invalid JSON format")
            )
            return

        response_future: asyncio.Future[MessageChain] = asyncio.Future()
        message = self._convert_input(message_text, request_id=request_id)

        message_event = CLIMessageEvent(
            message_str=message.message_str,
            message_obj=message,
            platform_meta=self.meta(),
            session_id=message.session_id,
            output_queue=self._output_queue,
            response_future=response_future,
        )
        self.commit_event(message_event)

        try:
            chain = await asyncio.wait_for(response_future, timeout=30.0)
            await loop.sock_sendall(
                client_socket, self._serialize_success(request_id, chain)
            )
        except asyncio.TimeoutError:
            await loop.sock_sendall(
                client_socket, self._serialize_error(request_id, "Request timeout")
            )
    except Exception as e:
        logger.error("[ERROR] Socket client handler error: %s", e)
        import traceback
        logger.error(traceback.format_exc())
    finally:
        client_socket.close()
        logger.debug("[EXIT] _handle_socket_client return=None")
```

这样可以在保持现有行为的同时,将这个较长的方法拆分为更小、可单独测试的步骤。

---

这些改动在保持现有特性和流程的前提下,可以:

- 将会话策略从消息转换逻辑中分离出来;
- 降低模式选择时的分支复杂度;
- 去除图片处理中的重复代码;
- 将套接字客户端处理逻辑拆分得更短、更清晰。
</issue_to_address>

Sourcery 对开源项目免费——如果你喜欢我们的评审,请考虑分享 ✨
帮我变得更有用!请在每条评论上点击 👍 或 👎,我会根据这些反馈来改进后续的评审。
Original comment in English

Hey - I've found 2 issues, and left some high level feedback:

  • The CLI adapter currently hardcodes paths like /AstrBot/data/{config_file} and /tmp/astrbot.sock; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments.
  • The whitelist check unconditionally bypasses validation for the cli platform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption.
  • For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via os.chmod or umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The CLI adapter currently hardcodes paths like `/AstrBot/data/{config_file}` and `/tmp/astrbot.sock`; consider deriving these from existing config/root-path utilities or allowing them to be overridden so the feature works in non-Docker or non-default deployments.
- The whitelist check unconditionally bypasses validation for the `cli` platform; if you expect some environments to use CLI in a more controlled way, it might be safer to gate this behavior behind a config flag rather than hardcoding the exemption.
- For the Unix socket server, you may want to explicitly control file permissions/ownership (e.g., via `os.chmod` or umask) and add minimal framing/size checks on the JSON payload to avoid issues with partial or oversized reads when multiple clients connect.

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:302` </location>
<code_context>
+            while self._running:
+                try:
+                    # 接受连接(非阻塞)
+                    loop = asyncio.get_event_loop()
+                    client_socket, _ = await loop.sock_accept(server_socket)
+
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Using asyncio.get_event_loop() in async code is discouraged; asyncio.get_running_loop() is safer and future‑proof.

In Python 3.10+, `asyncio.get_event_loop()` is deprecated in async code and can return the wrong loop under some policies. In `_run_socket_mode` (and similarly in `_handle_socket_client` / `_read_input`), please use `asyncio.get_running_loop()` so you reliably get the loop for the current task.

Suggested implementation:

```python
                    # 接受连接(非阻塞)
                    loop = asyncio.get_running_loop()
                    client_socket, _ = await loop.sock_accept(server_socket)

```

Search in `astrbot/core/platform/sources/cli/cli_adapter.py` for other occurrences of `asyncio.get_event_loop()` used inside `async def` functions, especially in `_handle_socket_client` and `_read_input`, and replace them similarly:

- Change `loop = asyncio.get_event_loop()` to `loop = asyncio.get_running_loop()` where the surrounding context is async code running on the event loop.

If there are any uses of `asyncio.get_event_loop()` in purely synchronous initialization code (outside of an active event loop), those should be reviewed separately, as they may need a different pattern (e.g., explicitly creating a loop with `asyncio.new_event_loop()` rather than `get_running_loop()`).
</issue_to_address>

### Comment 2
<location> `astrbot/core/platform/sources/cli/cli_adapter.py:134` </location>
<code_context>
+        logger.info("[ENTRY] CLIPlatformAdapter.run inputs={}")
+        return self._run_loop()
+
+    async def _run_loop(self) -> None:
+        """主运行循环
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider refactoring the CLI adapter by extracting small helpers, centralizing shared logic, and mapping modes to handlers to make the code easier to follow without changing behavior.

You can reduce complexity meaningfully without changing behavior by extracting a few focused helpers and consolidating duplicated logic. Here are concrete, localized refactors that keep the existing design but make it easier to reason about.

---

### 1. Simplify mode selection with a strategy map

`_run_loop` currently mixes TTY detection and branching logic inline. You can pull the mode resolution into a small helper and map modes to callables:

```python
# in __init__
self._mode_handlers: dict[str, callable[[], Awaitable[None]]] = {
    "tty": self._run_tty_mode,
    "file": self._run_file_mode,
    "socket": self._run_socket_mode,
}

def _resolve_mode(self) -> str:
    has_tty = sys.stdin.isatty()
    if self.mode == "auto":
        return "file" if not has_tty else "tty"
    if self.mode in ("tty", "file", "socket"):
        if self.mode == "tty" and not has_tty:
            logger.warning(
                "[PROCESS] TTY mode requested but no TTY detected. "
                "CLI platform will not start."
            )
            return ""  # or None
        return self.mode
    logger.error("[ERROR] Unknown mode: %s", self.mode)
    return ""
```

```python
async def _run_loop(self) -> None:
    logger.info("[PROCESS] Starting CLI loop")

    if self.use_isolated_sessions:
        self._cleanup_task = asyncio.create_task(self._cleanup_expired_sessions())

    mode = self._resolve_mode()
    if not mode:
        return

    handler = self._mode_handlers.get(mode)
    if handler:
        logger.info("[PROCESS] Starting %s mode", mode)
        await handler()
```

This flattens the branching and localizes TTY logic without changing behavior.

---

### 2. Extract a small `SessionTracker` helper

`_convert_input` and `_cleanup_expired_sessions` both manipulate `_session_timestamps`. Move this into a tiny helper to decouple message construction from session lifecycle:

```python
class _SessionTracker:
    def __init__(self, ttl: int) -> None:
        import time
        self._ttl = ttl
        self._timestamps: dict[str, float] = {}
        self._time = time

    def ensure_session(self, base_session_id: str, request_id: str | None) -> str:
        if request_id is None:
            return base_session_id
        session_id = f"{base_session_id}_{request_id}"
        if session_id not in self._timestamps:
            self._timestamps[session_id] = self._time.time()
        return session_id

    def collect_expired(self) -> list[str]:
        now = self._time.time()
        expired = [
            s for s, ts in list(self._timestamps.items())
            if now - ts > self._ttl
        ]
        for s in expired:
            self._timestamps.pop(s, None)
        return expired
```

Wire it into the adapter:

```python
# __init__
self._session_tracker = _SessionTracker(self.session_ttl)
```

```python
def _convert_input(self, text: str, request_id: str | None = None) -> AstrBotMessage:
    ...
    if self.use_isolated_sessions and request_id:
        message.session_id = self._session_tracker.ensure_session(
            base_session_id="cli_session",
            request_id=request_id,
        )
    else:
        message.session_id = self.session_id
    ...
```

```python
async def _cleanup_expired_sessions(self) -> None:
    logger.info("[ENTRY] _cleanup_expired_sessions started, TTL=%s seconds", self.session_ttl)
    while self._running:
        try:
            await asyncio.sleep(10)
            if not self.use_isolated_sessions:
                continue

            expired_sessions = self._session_tracker.collect_expired()
            for session_id in expired_sessions:
                logger.info("[PROCESS] Cleaning expired session: %s", session_id)
                # TODO: DB cleanup if needed

            if expired_sessions:
                logger.info("[PROCESS] Cleaned %d expired sessions", len(expired_sessions))
        except Exception as e:
            logger.error("[ERROR] Session cleanup error: %s", e)
```

This removes direct mutation of `_session_timestamps` from multiple places and keeps session policy in one spot.

---

### 3. Deduplicate image extraction/normalization

You already do image extraction and base64 conversion in `_handle_socket_client`. If `CLIMessageEvent.send` (or similar) has overlapping logic, centralize it in a reusable helper:

```python
from astrbot.core.message.components import Image

def normalize_images_from_chain(message_chain: MessageChain) -> list[dict[str, Any]]:
    import base64
    images: list[dict[str, Any]] = []

    for comp in message_chain.chain:
        if not isinstance(comp, Image) or not comp.file:
            continue

        info: dict[str, Any] = {}
        if comp.file.startswith("http"):
            info["type"] = "url"
            info["url"] = comp.file

        elif comp.file.startswith("file:///"):
            info["type"] = "file"
            file_path = comp.file[8:]
            info["path"] = file_path
            try:
                with open(file_path, "rb") as f:
                    raw = f.read()
                info["base64_data"] = base64.b64encode(raw).decode("utf-8")
                info["size"] = len(raw)
            except Exception as e:
                logger.error("[ERROR] Failed to read image file %s: %s", file_path, e)
                info["error"] = str(e)

        elif comp.file.startswith("base64://"):
            info["type"] = "base64"
            base64_data = comp.file[9:]
            info["base64_data"] = base64_data
            info["base64_length"] = len(base64_data)

        images.append(info)

    return images
```

Then in `_handle_socket_client`:

```python
from .image_utils import normalize_images_from_chain  # or local helper

...
message_chain = await asyncio.wait_for(response_future, timeout=30.0)
response_text = message_chain.get_plain_text()
images = normalize_images_from_chain(message_chain)

response = json.dumps(
    {
        "status": "success",
        "response": response_text,
        "images": images,
        "request_id": request_id,
    },
    ensure_ascii=False,
)
await loop.sock_sendall(client_socket, response.encode("utf-8"))
```

And reuse the same helper wherever you currently replicate that logic in `CLIMessageEvent` or other code paths.

---

### 4. Factor `_handle_socket_client` into smaller steps

Without changing behavior, you can split the method into parsing / processing / serializing helpers to make it testable and shorter:

```python
def _parse_socket_request(self, raw: bytes) -> tuple[dict[str, Any], str, str]:
    import json
    request = json.loads(raw.decode("utf-8"))
    message_text = request.get("message", "")
    request_id = request.get("request_id", str(uuid.uuid4()))
    return request, message_text, request_id

def _serialize_error(self, request_id: str, msg: str) -> bytes:
    import json
    return json.dumps(
        {"status": "error", "error": msg, "request_id": request_id},
        ensure_ascii=False,
    ).encode("utf-8")

def _serialize_success(self, request_id: str, chain: MessageChain) -> bytes:
    import json
    response_text = chain.get_plain_text()
    images = normalize_images_from_chain(chain)
    return json.dumps(
        {
            "status": "success",
            "response": response_text,
            "images": images,
            "request_id": request_id,
        },
        ensure_ascii=False,
    ).encode("utf-8")
```

Use them in `_handle_socket_client`:

```python
async def _handle_socket_client(self, client_socket) -> None:
    import json
    logger.debug("[ENTRY] _handle_socket_client")
    loop = asyncio.get_event_loop()

    try:
        data = await loop.sock_recv(client_socket, 4096)
        if not data:
            logger.debug("[PROCESS] Empty request, closing connection")
            return

        try:
            _, message_text, request_id = self._parse_socket_request(data)
        except json.JSONDecodeError:
            await loop.sock_sendall(
                client_socket, self._serialize_error("", "Invalid JSON format")
            )
            return

        response_future: asyncio.Future[MessageChain] = asyncio.Future()
        message = self._convert_input(message_text, request_id=request_id)

        message_event = CLIMessageEvent(
            message_str=message.message_str,
            message_obj=message,
            platform_meta=self.meta(),
            session_id=message.session_id,
            output_queue=self._output_queue,
            response_future=response_future,
        )
        self.commit_event(message_event)

        try:
            chain = await asyncio.wait_for(response_future, timeout=30.0)
            await loop.sock_sendall(
                client_socket, self._serialize_success(request_id, chain)
            )
        except asyncio.TimeoutError:
            await loop.sock_sendall(
                client_socket, self._serialize_error(request_id, "Request timeout")
            )
    except Exception as e:
        logger.error("[ERROR] Socket client handler error: %s", e)
        import traceback
        logger.error(traceback.format_exc())
    finally:
        client_socket.close()
        logger.debug("[EXIT] _handle_socket_client return=None")
```

This keeps the same behavior but breaks the long method into smaller, independently testable pieces.

---

These changes preserve the existing feature set and flow, but:

- Isolate session policy from message conversion.
- Reduce branching complexity in mode selection.
- Remove duplication in image handling.
- Shorten and clarify the socket client handler into clear sub-steps.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

- Implement automatic token generation on first startup
- Add token validation in socket handler
- Add token loading and sending in CLI client
- Set token file permissions to 600
- Add security logging for rejected requests

Token file: /AstrBot/data/.cli_token
Algorithm: secrets.token_urlsafe(32)
- Replace hardcoded paths with dynamic path resolution
  - Use get_astrbot_data_path() for data directory
  - Use get_astrbot_temp_path() for temp directory
  - Support ASTRBOT_ROOT environment variable

- Fix asyncio deprecation warnings
  - Replace asyncio.get_event_loop() with get_running_loop()
  - Improve Python 3.10+ compatibility

- Add socket file permission control
  - Set socket permissions to 600 (owner-only)
  - Add security logging

- Update astrbot-cli client
  - Add dynamic path resolution functions
  - Match server-side path logic
  - Improve cross-environment compatibility

Addresses code review feedback from PR AstrBotDevs#4787
- Fix client to receive large responses by using loop recv instead of single recv(4096)
- Save base64 images to temp files instead of exposing in JSON response
- Implement adaptive delay mechanism for multi-round reply collection:
  * First send: 5s delay (fast response for simple text)
  * Subsequent sends: 10s delay (auto-switch for tool invocation)
- Support tool invocation scenarios with multiple replies (text + images)
- Add detailed logging for debugging multi-round reply collection
- Add repository check to dashboard_ci.yml
- Only create release in main repository (AstrBotDevs/AstrBot)
- Prevents token error in forked repositories
- Fix invalid parameter 'fetch-tag' in docker-image.yml
- Should be 'fetch-tags' (plural) according to actions/checkout@v6 docs
- Resolves 'Unexpected input' warning in GitHub Actions
@Clhikari
Copy link
Contributor

Clhikari commented Feb 2, 2026

建议加个Windows兼容性支持

- Refactor CLI platform to support both Unix Socket and TCP Socket
- Add platform detection module to auto-detect OS capabilities
- Add socket factory pattern for creating appropriate socket servers
- Split monolithic cli_adapter.py into modular components:
  * platform_detector.py: OS and socket capability detection
  * socket_abstract.py: Abstract base class for socket servers
  * socket_factory.py: Factory for creating socket servers
  * tcp_socket_server.py: TCP socket server implementation
  * unix_socket_server.py: Unix socket server implementation
  * connection_info_writer.py: Connection info file writer
- Update astrbot-cli client to support both socket types
- Fix logger imports to use AstrBot's custom logger system

This enables CLI platform to work on Windows (using TCP Socket)
while maintaining Unix Socket support on Linux/Mac.
@astrbot-doc-agent
Copy link

已为该 PR 生成文档更新 PR(待人工审核):
AstrBotDevs/AstrBot-docs#115


AI 改动摘要:

根据 PR 内容,我已更新了文档,增加了关于 CLI 测试器 (CLI Tester) 的说明。

修改内容:

  1. 新增文档页面
    • zh/dev/cli-tester.md:详细介绍了 CLI 测试器的核心价值、启用方法、使用方式(包括 astrbot-cli 工具)以及核心功能。
    • en/dev/cli-tester.md:上述文档的英文版本。
  2. 更新侧边栏配置
    • .vitepress/config.mjs 中,将 "CLI 测试器" (CLI Tester) 添加到 "开发" (Development) 栏目下,方便开发者快速找到。

文档要点:

  • 定位:专为插件开发者设计,用于构建快速反馈循环,支持 Vibe Coding 模式。
  • 功能:支持 Unix Socket 模式、会话隔离、富媒体(图片)Base64 编码、白名单豁免。
  • 用法:通过 astrbot-cli 工具在命令行直接与机器人交互,无需登录 IM 平台。

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 2, 2026

建议加个Windows兼容性支持

现已支持

python astrbot-cli "你好"

在astrbot根目录执行这个就可以了

图片 这是效果

@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 2, 2026

比如说后续对bot本体进行开发,也可以随时部署,随时就测试,而无需繁琐的适配器配置

astrbot-doc-agent bot pushed a commit to AstrBotDevs/AstrBot-docs that referenced this pull request Feb 2, 2026
@astrbot-doc-agent
Copy link

已为该 PR 生成文档更新 PR(待人工审核):
AstrBotDevs/AstrBot-docs#115


AI 改动摘要:

根据上游 PR #4787 的改动,我已更新了文档以包含新引入的 CLI Tester (CLI测试器)

主要改动:

  1. 新增详细指南页
    • 创建了 zh/dev/star/guides/test-with-cli.mden/dev/star/guides/test-with-cli.md,详细介绍了 CLI Tester 的核心价值、启用步骤、基础用法(包括 Docker 命令示例)以及进阶技巧(如会话隔离)。
  2. 新增平台接入页
    • 创建了 zh/deploy/platform/cli-tester.mden/deploy/platform/cli-tester.md,作为平台接入列表中的入口点。
  3. 更新插件开发指南
    • zh/dev/star/plugin-new.mden/dev/star/plugin-new.md 的“调试插件”章节中添加了关于使用 CLI Tester 的说明和链接。
  4. 更新侧边栏导航
    • .vitepress/config.mjs 中,将 “CLI 测试器” 添加到了“接入到消息平台”和“插件开发”两个侧边栏分类中,方便用户快速找到。

文档摘要:

  • CLI Tester:专为开发者设计的本地测试工具,无需连接 QQ/微信等平台即可通过命令行调试插件。
  • 快速反馈:支持通过 astrbot-cli 工具发送消息并即时获取文本或富媒体(Base64 格式)响应。
  • 会话隔离:支持模拟独立会话,适合并发测试和上下文逻辑验证。
  • 白名单豁免:CLI 平台自动豁免白名单检查,简化测试流程。

@YukiRa1n YukiRa1n force-pushed the master branch 2 times, most recently from 2609c52 to 4cad0a4 Compare February 5, 2026 10:09
## 主要变更
- cli_adapter.py: 897行 → 204行 (77%精简)
- cli_event.py: 194行 → 116行
- 新增76个单元测试,全部通过

## 新增模块
- config/: ConfigLoader, TokenManager
- handlers/: SocketHandler, TTYHandler, FileHandler
- message/: MessageConverter, ImageProcessor, ResponseBuilder
- session/: SessionManager
- utils/: AOP装饰器集合 (异常处理/重试/超时/日志/权限)

## 设计原则
- 单一职责: 每个模块职责单一
- 依赖倒置: IHandler/IMessageConverter等接口
- AOP: 横切关注点从业务代码抽离
- 组合优于继承: 通过组合构建复杂功能

## 修复
- Windows UTF-8输出乱码问题
@LIghtJUNction
Copy link
Member

修改pyproject.toml
把命令行程序入口加在这后面:
[project.scripts]
astrbot = "astrbot.cli.main:cli"
其实现在已经有一个了
往后面新增
调用时前面就不需要加上python
直接astrbot-cli就好了
建议重命名为astr以免和现有的重名
功能划分:
astrbot:拉起daemon进程,以及一些基础功能

astr : 侧重于开发,以及更加复杂的功能,控制daemon进程等

我建议在后面加上
astr = "astrbot.core.platform.sources.cli.......:入口函数名称"

@LIghtJUNction
Copy link
Member

不要在根目录丢一个astrbot-cli
这样不会被打包工具打包
入口文件名建议使用前后双下划线main来命名
python执行包将以这个文件作为入口,参考astrbot/clli/main.py

新增 `astr` 命令作为 CLI Platform 的客户端工具,用于快速测试和调试。

功能:
- `astr "消息"` - 发送消息到 CLI Platform
- `astr --log` - 获取控制台日志
- `astr --log --lines N --level LEVEL --pattern PATTERN` - 日志过滤

变更:
- 新增 `astrbot/cli/client/` 包
  - `__init__.py` - 包入口
  - `__main__.py` - Socket 客户端实现(日志抑制、token 自动认证、日志获取)
- 修改 `pyproject.toml` - 添加 `astr` 命令入口点
- 修改 `socket_handler.py` - 添加 `_get_logs()` 方法处理日志请求
- 修改 `cli_adapter.py` - 传递 data_path 给 SocketClientHandler

安全:
- 所有请求(包括获取日志)都需要 token 认证
- Token 从 `data/.cli_token` 自动读取
已被 usage: astr [-h] [-s SOCKET] [-t TIMEOUT] [-j] [--log] [--lines LINES]
            [--level LEVEL] [--pattern PATTERN]
            [message]

AstrBot CLI Client - Send messages to AstrBot CLI Platform (Unix Socket or TCP Socket)

positional arguments:
  message               Message to send (if not provided, read from stdin)

options:
  -h, --help            show this help message and exit
  -s SOCKET, --socket SOCKET
                        Unix socket path (default: {temp_dir}/astrbot.sock)
  -t TIMEOUT, --timeout TIMEOUT
                        Timeout in seconds (default: 30.0)
  -j, --json            Output raw JSON response
  --log                 Get recent console logs (instead of sending a message)
  --lines LINES         Number of log lines to return (default: 100, max:
                        1000)
  --level LEVEL         Filter logs by level
                        (DEBUG/INFO/WARNING/ERROR/CRITICAL)
  --pattern PATTERN     Filter logs by pattern (substring match)

Examples:
  astr "你好"
  astr "/help"
  astr --socket /tmp/custom.sock "测试消息"
  echo "你好" | astr

Connection:
  Automatically detects connection type from .cli_connection file.
  Falls back to default Unix Socket if file not found.
         命令替代,功能已整合到  包中
…nes LINES]

            [--level LEVEL] [--pattern PATTERN]
            [message]

AstrBot CLI Client - 与 CLI Platform 通信的客户端工具

positional arguments:
  message               Message to send (if not provided, read from stdin)

options:
  -h, --help            show this help message and exit
  -s SOCKET, --socket SOCKET
                        Unix socket path (default: {temp_dir}/astrbot.sock)
  -t TIMEOUT, --timeout TIMEOUT
                        Timeout in seconds (default: 30.0)
  -j, --json            Output raw JSON response
  --log                 Get recent console logs (instead of sending a message)
  --lines LINES         Number of log lines to return (default: 100, max:
                        1000)
  --level LEVEL         Filter logs by level
                        (DEBUG/INFO/WARNING/ERROR/CRITICAL)
  --pattern PATTERN     Filter logs by pattern (substring match)

使用示例:

  发送消息:
    astr "你好"                    # 发送消息给 AstrBot
    astr "/help"                   # 查看内置帮助
    echo "你好" | astr             # 从标准输入读取

  获取日志:
    astr --log                     # 获取最近 100 行日志
    astr --log --lines 50          # 获取最近 50 行
    astr --log --level ERROR       # 只显示 ERROR 级别
    astr --log --pattern "CLI"     # 只显示包含 "CLI" 的日志
    astr --log --json              # 以 JSON 格式输出日志

  高级选项:
    astr -j "测试"                 # 输出原始 JSON 响应
    astr -t 60 "长时间任务"        # 设置超时时间为 60 秒

连接说明:
  - 自动从 data/.cli_connection 文件检测连接类型(Unix Socket 或 TCP)
  - Token 自动从 data/.cli_token 文件读取
  - 必须在 AstrBot 根目录下运行,或设置 ASTRBOT_ROOT 环境变量
         输出

- 添加中文说明
- 补充详细使用示例(发送消息、获取日志、高级选项)
- 添加连接说明
- / 不是 AstrBot 的命令前缀
- help 等内置命令不需要 / 前缀
- 添加内置命令使用示例说明
- 说明带 / 的消息会发给 LLM 处理
使用 argparse.REMAINDER 模式捕获所有剩余参数,
允许 /plugin ls 这类命令正常工作。
在接收 Socket 数据时,如果多字节字符被截断在缓冲区边界会导致解码失败。
使用 errors="replace" 参数来处理截断的 UTF-8 字符。
- 添加 format_response() 函数处理分段回复和图片占位符
- 图片显示为 [图片] 或 [N张图片] 占位符
- 更新 --help 说明,添加输出说明部分
- 修复 socket_handler.py 中 action 变量未定义的问题
- 修复 _get_logs() 方法返回格式问题
当日志文件不存在时,返回中文提示信息,
说明需要在配置中启用 log_file_enable。
- 添加 noqa 注释抑制必要的警告
- 格式化代码符合 ruff 规范
Git Bash (MSYS2) 会把 /plugin ls 转换为 C:/Program Files/Git/plugin ls
添加 fix_git_bash_path() 函数检测并还原原始命令
@YukiRa1n
Copy link
Contributor Author

YukiRa1n commented Feb 5, 2026

修改pyproject.toml 把命令行程序入口加在这后面: [project.scripts] astrbot = "astrbot.cli.main:cli" 其实现在已经有一个了 往后面新增 调用时前面就不需要加上python 直接astrbot-cli就好了 建议重命名为astr以免和现有的重名 功能划分: astrbot:拉起daemon进程,以及一些基础功能

astr : 侧重于开发,以及更加复杂的功能,控制daemon进程等

我建议在后面加上 astr = "astrbot.core.platform.sources.cli.......:入口函数名称"

谢谢建议,我已修改,可以看一下我新的pr描述。
然后入口文件名放在了AstrBot\astrbot\cli\client_main_.py
同时完善了cli指令

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants