校园网站建设标书,项目管理的软件有哪些,机械设备公司网站制作,廊坊教育云网站建设10.4.2 基于MCP的代理通信模块文件agent_mcp/tools/agent_communication_tools.py是本项目中基于MCP协议的代理通信工具模块#xff0c;主要功能是实现代理间的安全、规范通信。此文件提供了如下所示的三大核心功能#xff1a;代理间消息发送#xff1a;通过权限校验#…10.4.2 基于MCP的代理通信模块文件agent_mcp/tools/agent_communication_tools.py是本项目中基于MCP协议的代理通信工具模块主要功能是实现代理间的安全、规范通信。此文件提供了如下所示的三大核心功能代理间消息发送通过权限校验如管理员可与所有代理通信、普通代理仅能与活跃代理通信等确保通信合法性支持文本、协助请求等多种消息类型可通过tmux会话实时投递或存储待取并记录消息状态。消息检索代理可以获取发送或接收的消息支持按类型、已读状态等筛选且能自动标记已读消息。管理员广播允许管理员向所有活跃代理发送消息适用于系统通知等场景。同时所有通信操作均记录到数据库和审计日志确保可追溯为多代理协作提供了可靠的信息交互支撑。1下面代码的功能是检查两个代理是否允许通信确保代理间交互符合系统规则。原理是基于发送者身份是否为管理员、发送者与接收者是否为同一代理、接收者是否为管理员代理以及双方是否均处于活跃状态等条件判断通信权限并返回结果及原因。def _can_agents_communicate(sender_id: str, recipient_id: str, is_admin: bool) - tuple[bool, str]: 检查两个代理是否允许通信。 参数: sender_id: 发送代理的ID recipient_id: 接收代理的ID is_admin: 发送者是否具有管理员权限 返回: 包含是否允许: bool, 原因: str的元组 # 管理员可以与任何代理通信 if is_admin: return True, 管理员权限 # 不允许自我通信应使用内部方法 if sender_id recipient_id: return False, 不允许自我通信 # 管理员代理始终可以被联系 if recipient_id admin or recipient_id.lower().startswith(admin): return True, 管理员代理始终可被联系 # 检查接收者是否允许来自发送者的通信 # 此处可扩展为数据库中的权限系统 # 目前使用简单规则两个代理均活跃时可通信 if sender_id in g.active_agents and recipient_id in g.active_agents: return True, 两个代理均处于活跃状态 # 检查代理是否处于同一任务上下文中 # 这需要额外的任务关系检查 return False, 这些代理之间不允许通信2下面代码的功能是实现代理间消息发送支持权限校验和多种投递方式。原理是先验证发送者身份和消息合法性通过通信权限检查后生成唯一消息ID并存储到数据库再根据指定方式tmux实时投递、存储待取或两者结合尝试投递消息记录投递状态并返回结果。async def send_agent_message_tool_impl(arguments: Dict[str, Any]) - List[mcp_types.TextContent]: 带权限检查的代理间消息发送。 消息可通过tmux会话投递或存储供后续检索。 sender_token arguments.get(token) recipient_id arguments.get(recipient_id) message_content arguments.get(message) message_type arguments.get(message_type, text) # 文本、协助请求、任务更新 priority arguments.get(priority, normal) # 低、正常、高、紧急 deliver_method arguments.get(deliver_method, tmux) # tmux、存储、两者皆有 # 身份验证 sender_id get_agent_id(sender_token) if not sender_id: return [mcp_types.TextContent(typetext, text未授权需要有效令牌)] # 验证 if not recipient_id or not message_content: return [mcp_types.TextContent(typetext, text错误需要recipient_id和message)] if len(message_content) 4000: # 合理的消息大小限制 return [mcp_types.TextContent(typetext, text错误消息过长最大4000字符)] # 停止命令仅限管理员使用 is_admin verify_token(sender_token, admin) if message_type stop_command and not is_admin: return [mcp_types.TextContent(typetext, text错误仅管理员可发送停止命令)] # 权限检查 can_communicate, reason _can_agents_communicate(sender_id, recipient_id, is_admin) if not can_communicate: return [mcp_types.TextContent(typetext, textf通信被拒绝{reason})] # 创建消息数据 message_id _generate_message_id() timestamp datetime.datetime.now().isoformat() message_data { message_id: message_id, sender_id: sender_id, recipient_id: recipient_id, message_content: message_content, message_type: message_type, priority: priority, timestamp: timestamp, delivered: False, read: False } conn None try: conn get_db_connection() cursor conn.cursor() # 将消息存储到数据库 cursor.execute( INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) , (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, False, False)) # 根据方式尝试投递 delivery_status stored # 默认状态已存储3下面代码的功能是检索代理的消息支持多种筛选条件。原理是验证代理身份后根据是否包含发送/接收消息、消息类型、未读状态等筛选条件从数据库查询消息可自动标记接收的消息为已读最后格式化消息列表并返回。async def get_agent_messages_tool_impl(arguments: Dict[str, Any]) - List[mcp_types.TextContent]: 检索代理的消息。 agent_token arguments.get(token) include_sent arguments.get(include_sent, False) include_received arguments.get(include_received, True) mark_as_read arguments.get(mark_as_read, True) limit arguments.get(limit, 20) message_type_filter arguments.get(message_type) unread_only arguments.get(unread_only, False) # 身份验证 agent_id get_agent_id(agent_token) if not agent_id: return [mcp_types.TextContent(typetext, text未授权需要有效令牌)] # 验证limit参数 try: limit int(limit) if not (1 limit 100): limit 20 except (ValueError, TypeError): limit 20 conn None try: conn get_db_connection() cursor conn.cursor() # 构建查询 query_conditions [] query_params [] if include_received and include_sent: query_conditions.append((recipient_id ? OR sender_id ?)) query_params.extend([agent_id, agent_id]) elif include_received: query_conditions.append(recipient_id ?) query_params.append(agent_id) elif include_sent: query_conditions.append(sender_id ?) query_params.append(agent_id) else: return [mcp_types.TextContent(typetext, text错误必须包含发送或接收的消息)] if message_type_filter: query_conditions.append(message_type ?) query_params.append(message_type_filter) if unread_only: query_conditions.append(read ?) query_params.append(False) where_clause AND .join(query_conditions) query f SELECT message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read FROM agent_messages WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? query_params.append(limit) cursor.execute(query, query_params) messages cursor.fetchall() # 如果需要将接收的消息标记为已读 if mark_as_read and include_received: message_ids_to_mark [msg[message_id] for msg in messages if msg[recipient_id] agent_id and not msg[read]] if message_ids_to_mark: placeholders ,.join(? * len(message_ids_to_mark)) cursor.execute(fUPDATE agent_messages SET read ? WHERE message_id IN ({placeholders}), [True] message_ids_to_mark) conn.commit()