赣州唯宅汇科技有限公司,抖音优化是什么意思,建设网站征集图片的通知,wordpress配置qq邮箱#x1f680; TestMaster 自动化测试平台 - 完整开源项目╔════════════════════════════════════════════════════════════════════════════╗
║ DREAMVFIA 开源编程大师 V1.1 -… TestMaster 自动化测试平台 - 完整开源项目╔════════════════════════════════════════════════════════════════════════════╗ ║ DREAMVFIA 开源编程大师 V1.1 - 项目生成系统 ║ ║ TESTMASTER PROJECT GENERATION ║ ╚════════════════════════════════════════════════════════════════════════════╝ 项目定位: 端到端测试自动化解决方案 技术栈: React TypeScript Node.js Python 架构模式: 微服务 事件驱动 安全级别: QUANTUM-SSS (企业级) 项目生成启动中...第一部分项目架构设计与技术选型1.1 项目概述1.1.1 项目背景TestMaster 是一个企业级的端到端自动化测试平台旨在解决现代软件开发中测试自动化的痛点核心痛点测试用例编写门槛高- 需要编程知识跨平台测试复杂- 浏览器/设备兼容性难以保证测试场景覆盖不足- 人工设计测试用例容易遗漏CI/CD集成困难- 测试与开发流程脱节性能测试专业性强- 需要专门的性能测试工具TestMaster 解决方案✅无代码测试创建- 可视化拖拽式测试用例设计✅AI 智能生成- 基于页面结构自动生成测试场景✅跨平台支持- 一次编写多端运行✅性能测试集成- 内置负载测试和性能监控✅无缝 CI/CD- 与主流 CI/CD 工具深度集成1.1.2 技术架构图┌─────────────────────────────────────────────────────────────────────┐ │ TestMaster 架构图 │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 前端层 (Frontend) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ React UI │ │ 可视化编辑器│ │ 测试报告 │ │ │ │ │ │ Dashboard │ │ (No-Code) │ │ Dashboard │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ API 网关层 (Gateway) │ │ │ │ ┌─────────────────────────────┐ │ │ │ │ │ GraphQL / REST API Gateway │ │ │ │ │ │ 认证 / 授权 / 限流 │ │ │ │ │ └─────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 微服务层 (Microservices) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │测试引擎 │ │AI生成器 │ │执行器 │ │报告服务 │ │ │ │ │ │Service │ │Service │ │Service │ │Service │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │性能测试 │ │设备管理 │ │CI/CD集成 │ │用户管理 │ │ │ │ │ │Service │ │Service │ │Service │ │Service │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 数据层 (Data Layer) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │PostgreSQL│ │MongoDB │ │Redis │ │S3/MinIO │ │ │ │ │ │(关系数据)│ │(测试数据)│ │(缓存) │ │(文件存储)│ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 基础设施层 (Infrastructure) │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │Kubernetes│ │Docker │ │Selenium │ │Playwright│ │ │ │ │ │(编排) │ │(容器) │ │Grid │ │(浏览器) │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘1.1.3 技术栈选型前端技术栈{ framework: React 18.2, language: TypeScript 5.0, stateManagement: Zustand React Query, ui: Tailwind CSS shadcn/ui, visualization: Recharts D3.js, editor: Monaco Editor (VS Code), dragDrop: React DnD, testing: Vitest React Testing Library }后端技术栈{ runtime: Node.js 20 LTS, framework: NestJS (TypeScript), api: GraphQL (Apollo) REST, testEngine: Python 3.11 (Pytest Selenium), aiEngine: Python (OpenAI GPT-4 / Local LLM), database: { relational: PostgreSQL 15, document: MongoDB 6.0, cache: Redis 7.0, search: Elasticsearch 8.0 }, messageQueue: RabbitMQ / Apache Kafka, storage: MinIO (S3-compatible) }DevOps 技术栈containerization: Docker orchestration: Kubernetes ci_cd: - GitHub Actions - GitLab CI - Jenkins monitoring: - Prometheus - Grafana - ELK Stack testing_tools: - Selenium Grid - Playwright - Appium (移动端) - JMeter (性能测试)1.2 项目目录结构testmaster/ ├── README.md # 项目说明 ├── LICENSE # MIT 许可证 ├── .gitignore # Git 忽略文件 ├── docker-compose.yml # Docker 编排 ├── kubernetes/ # K8s 部署配置 │ ├── namespace.yaml │ ├── deployments/ │ ├── services/ │ └── ingress.yaml │ ├── docs/ # 文档目录 │ ├── architecture.md # 架构文档 │ ├── api-reference.md # API 文档 │ ├── user-guide.md # 用户指南 │ └── development.md # 开发指南 │ ├── frontend/ # 前端应用 │ ├── package.json │ ├── tsconfig.json │ ├── vite.config.ts │ ├── tailwind.config.js │ ├── src/ │ │ ├── main.tsx # 入口文件 │ │ ├── App.tsx # 根组件 │ │ ├── components/ # 组件库 │ │ │ ├── ui/ # UI 组件 │ │ │ ├── editor/ # 可视化编辑器 │ │ │ ├── dashboard/ # 仪表盘 │ │ │ └── reports/ # 报告组件 │ │ ├── pages/ # 页面组件 │ │ │ ├── Dashboard.tsx │ │ │ ├── TestEditor.tsx │ │ │ ├── TestRunner.tsx │ │ │ └── Reports.tsx │ │ ├── hooks/ # 自定义 Hooks │ │ ├── store/ # 状态管理 │ │ ├── services/ # API 服务 │ │ ├── types/ # TypeScript 类型 │ │ └── utils/ # 工具函数 │ └── public/ # 静态资源 │ ├── backend/ # 后端服务 │ ├── gateway/ # API 网关 │ │ ├── package.json │ │ ├── src/ │ │ │ ├── main.ts │ │ │ ├── app.module.ts │ │ │ ├── auth/ # 认证模块 │ │ │ ├── graphql/ # GraphQL │ │ │ └── rest/ # REST API │ │ └── test/ │ │ │ ├── services/ # 微服务 │ │ ├── test-engine/ # 测试引擎服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── test-executor/ │ │ │ ├── test-parser/ │ │ │ └── test-validator/ │ │ │ │ │ ├── ai-generator/ # AI 生成服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── llm_service.py │ │ │ ├── scenario_generator.py │ │ │ └── page_analyzer.py │ │ │ │ │ ├── executor/ # 执行器服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── selenium_runner.py │ │ │ ├── playwright_runner.py │ │ │ └── appium_runner.py │ │ │ │ │ ├── performance/ # 性能测试服务 │ │ │ ├── requirements.txt │ │ │ └── src/ │ │ │ ├── main.py │ │ │ ├── load_tester.py │ │ │ └── metrics_collector.py │ │ │ │ │ ├── device-manager/ # 设备管理服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── browser-pool/ │ │ │ └── device-pool/ │ │ │ │ │ ├── report/ # 报告服务 │ │ │ ├── package.json │ │ │ └── src/ │ │ │ ├── main.ts │ │ │ ├── report-generator/ │ │ │ └── analytics/ │ │ │ │ │ └── cicd-integration/ # CI/CD 集成服务 │ │ ├── package.json │ │ └── src/ │ │ ├── main.ts │ │ ├── github-actions/ │ │ ├── gitlab-ci/ │ │ └── jenkins/ │ │ │ └── shared/ # 共享库 │ ├── types/ # 共享类型 │ ├── utils/ # 工具函数 │ └── constants/ # 常量定义 │ ├── database/ # 数据库脚本 │ ├── migrations/ # 数据库迁移 │ ├── seeds/ # 种子数据 │ └── schemas/ # 数据库模式 │ ├── scripts/ # 脚本工具 │ ├── setup.sh # 环境搭建 │ ├── deploy.sh # 部署脚本 │ └── test.sh # 测试脚本 │ └── tests/ # 测试文件 ├── e2e/ # 端到端测试 ├── integration/ # 集成测试 └── unit/ # 单元测试1.3 数据库设计1.3.1 PostgreSQL 关系型数据库设计-- -- TestMaster 数据库设计 -- -- 用户表 CREATE TABLE users ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), email VARCHAR(255) UNIQUE NOT NULL, username VARCHAR(100) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, full_name VARCHAR(255), avatar_url TEXT, role VARCHAR(50) DEFAULT user, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login_at TIMESTAMP ); -- 项目表 CREATE TABLE projects ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(255) NOT NULL, description TEXT, owner_id UUID REFERENCES users(id) ON DELETE CASCADE, settings JSONB DEFAULT {}, is_archived BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试套件表 CREATE TABLE test_suites ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, description TEXT, tags TEXT[], created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试用例表 CREATE TABLE test_cases ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), suite_id UUID REFERENCES test_suites(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, description TEXT, priority VARCHAR(20) DEFAULT medium, status VARCHAR(50) DEFAULT draft, test_data JSONB NOT NULL, expected_results JSONB, tags TEXT[], created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试步骤表 CREATE TABLE test_steps ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_case_id UUID REFERENCES test_cases(id) ON DELETE CASCADE, step_order INTEGER NOT NULL, action_type VARCHAR(100) NOT NULL, selector VARCHAR(500), value TEXT, wait_condition JSONB, screenshot BOOLEAN DEFAULT false, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试执行表 CREATE TABLE test_executions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_case_id UUID REFERENCES test_cases(id), suite_id UUID REFERENCES test_suites(id), environment VARCHAR(100), browser VARCHAR(50), device VARCHAR(100), status VARCHAR(50) DEFAULT pending, started_at TIMESTAMP, completed_at TIMESTAMP, duration_ms INTEGER, triggered_by UUID REFERENCES users(id), ci_build_id VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 测试结果表 CREATE TABLE test_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), execution_id UUID REFERENCES test_executions(id) ON DELETE CASCADE, step_id UUID REFERENCES test_steps(id), status VARCHAR(50) NOT NULL, error_message TEXT, screenshot_url TEXT, video_url TEXT, logs JSONB, metrics JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 性能测试表 CREATE TABLE performance_tests ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, url TEXT NOT NULL, method VARCHAR(10) DEFAULT GET, headers JSONB, body TEXT, virtual_users INTEGER DEFAULT 10, duration_seconds INTEGER DEFAULT 60, ramp_up_seconds INTEGER DEFAULT 10, created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 性能测试结果表 CREATE TABLE performance_results ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), test_id UUID REFERENCES performance_tests(id) ON DELETE CASCADE, execution_id UUID, total_requests INTEGER, successful_requests INTEGER, failed_requests INTEGER, avg_response_time_ms DECIMAL(10,2), min_response_time_ms DECIMAL(10,2), max_response_time_ms DECIMAL(10,2), p95_response_time_ms DECIMAL(10,2), p99_response_time_ms DECIMAL(10,2), requests_per_second DECIMAL(10,2), error_rate DECIMAL(5,2), started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 设备/浏览器配置表 CREATE TABLE device_configs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, name VARCHAR(255) NOT NULL, type VARCHAR(50) NOT NULL, -- browser, mobile, tablet browser VARCHAR(50), browser_version VARCHAR(50), os VARCHAR(100), os_version VARCHAR(50), screen_resolution VARCHAR(50), user_agent TEXT, capabilities JSONB, is_active BOOLEAN DEFAULT true, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- CI/CD 集成配置表 CREATE TABLE cicd_integrations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), project_id UUID REFERENCES projects(id) ON DELETE CASCADE, platform VARCHAR(50) NOT NULL, -- github, gitlab, jenkins config JSONB NOT NULL, webhook_url TEXT, is_active BOOLEAN DEFAULT true, created_by UUID REFERENCES users(id), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 索引优化 CREATE INDEX idx_test_cases_suite_id ON test_cases(suite_id); CREATE INDEX idx_test_steps_case_id ON test_steps(test_case_id); CREATE INDEX idx_test_executions_case_id ON test_executions(test_case_id); CREATE INDEX idx_test_results_execution_id ON test_results(execution_id); CREATE INDEX idx_projects_owner_id ON projects(owner_id); CREATE INDEX idx_test_executions_status ON test_executions(status); CREATE INDEX idx_test_executions_created_at ON test_executions(created_at DESC);1.3.2 MongoDB 文档数据库设计// // TestMaster MongoDB 集合设计 // // AI 生成的测试场景集合 db.createCollection(ai_test_scenarios, { validator: { $jsonSchema: { bsonType: object, required: [projectId, url, scenarios, generatedAt], properties: { projectId: { bsonType: string }, url: { bsonType: string }, pageStructure: { bsonType: object, properties: { title: { bsonType: string }, elements: { bsonType: array }, forms: { bsonType: array }, links: { bsonType: array } } }, scenarios: { bsonType: array, items: { bsonType: object, properties: { name: { bsonType: string }, description: { bsonType: string }, steps: { bsonType: array }, priority: { bsonType: string }, confidence: { bsonType: double } } } }, generatedBy: { bsonType: string }, generatedAt: { bsonType: date }, model: { bsonType: string } } } } }); // 测试执行日志集合 db.createCollection(execution_logs, { validator: { $jsonSchema: { bsonType: object, required: [executionId, timestamp, level, message], properties: { executionId: { bsonType: string }, timestamp: { bsonType: date }, level: { enum: [debug, info, warn, error] }, message: { bsonType: string }, context: { bsonType: object }, stackTrace: { bsonType: string } } } } }); // 性能指标时序数据集合 db.createCollection(performance_metrics, { validator: { $jsonSchema: { bsonType: object, required: [executionId, timestamp, metrics], properties: { executionId: { bsonType: string }, timestamp: { bsonType: date }, metrics: { bsonType: object, properties: { responseTime: { bsonType: double }, requestsPerSecond: { bsonType: double }, errorRate: { bsonType: double }, cpuUsage: { bsonType: double }, memoryUsage: { bsonType: double }, networkLatency: { bsonType: double } } }, labels: { bsonType: object } } } } }); // 创建索引 db.ai_test_scenarios.createIndex({ projectId: 1, generatedAt: -1 }); db.execution_logs.createIndex({ executionId: 1, timestamp: -1 }); db.performance_metrics.createIndex({ executionId: 1, timestamp: -1 }); db.execution_logs.createIndex({ timestamp: 1 }, { expireAfterSeconds: 2592000 }); // 30天过期现在我将继续生成第二部分核心代码实现。由于内容非常庞大我将分多个回复完成。第二部分核心代码实现2.1 前端核心代码2.1.1 主入口文件frontend/src/main.tsximport React from react; import ReactDOM from react-dom/client; import App from ./App; import ./index.css; // 错误边界 import { ErrorBoundary } from ./components/ErrorBoundary; ReactDOM.createRoot(document.getElementById(root)!).render( React.StrictMode ErrorBoundary App / /ErrorBoundary /React.StrictMode );2.1.2 根组件frontend/src/App.tsximport React from react; import { BrowserRouter, Routes, Route, Navigate } from react-router-dom; import { QueryClient, QueryClientProvider } from tanstack/react-query; import { Toaster } from ./components/ui/toaster; // 页面组件 import Dashboard from ./pages/Dashboard; import TestEditor from ./pages/TestEditor; import TestRunner from ./pages/TestRunner; import Reports from ./pages/Reports; import PerformanceTest from ./pages/PerformanceTest; import Settings from ./pages/Settings; import Login from ./pages/Login; // 布局组件 import MainLayout from ./components/layout/MainLayout; import AuthGuard from ./components/auth/AuthGuard; // 创建 React Query 客户端 const queryClient new QueryClient({ defaultOptions: { queries: { refetchOnWindowFocus: false, retry: 1, staleTime: 5 * 60 * 1000, // 5分钟 }, }, }); function App() { return ( QueryClientProvider client{queryClient} BrowserRouter Routes {/* 公开路由 */} Route path/login element{Login /} / {/* 受保护路由 */} Route element{AuthGuardMainLayout //AuthGuard} Route path/ element{Navigate to/dashboard replace /} / Route path/dashboard element{Dashboard /} / Route path/test-editor element{TestEditor /} / Route path/test-editor/:id element{TestEditor /} / Route path/test-runner element{TestRunner /} / Route path/reports element{Reports /} / Route path/performance element{PerformanceTest /} / Route path/settings element{Settings /} / /Route /Routes Toaster / /BrowserRouter /QueryClientProvider ); } export default App;2.1.3 可视化测试编辑器核心组件frontend/src/components/editor/TestEditor.tsximport React, { useState, useCallback } from react; import { DndContext, DragEndEvent, closestCenter } from dnd-kit/core; import { SortableContext, verticalListSortingStrategy, arrayMove } from dnd-kit/sortable; import { useMutation, useQuery } from tanstack/react-query; import { Plus, Play, Save, Wand2 } from lucide-react; import { Button } from ../ui/button; import { Card } from ../ui/card; import TestStepItem from ./TestStepItem; import ActionPalette from ./ActionPalette; import AIScenarioGenerator from ./AIScenarioGenerator; import { TestStep, TestCase } from ../../types/test; import { testCaseService } from ../../services/testCaseService; import { useToast } from ../../hooks/use-toast; interface TestEditorProps { testCaseId?: string; projectId: string; } const TestEditor: React.FCTestEditorProps ({ testCaseId, projectId }) { const { toast } useToast(); const [testCase, setTestCase] useStateTestCase({ id: testCaseId || , name: New Test Case, description: , steps: [], projectId, }); const [showAIGenerator, setShowAIGenerator] useState(false); // 加载现有测试用例 const { isLoading } useQuery({ queryKey: [testCase, testCaseId], queryFn: () testCaseService.getTestCase(testCaseId!), enabled: !!testCaseId, onSuccess: (data) setTestCase(data), }); // 保存测试用例 const saveMutation useMutation({ mutationFn: () testCaseService.saveTestCase(testCase), onSuccess: () { toast({ title: Success, description: Test case saved successfully, }); }, onError: () { toast({ title: Error, description: Failed to save test case, variant: destructive, }); }, }); // 添加测试步骤 const handleAddStep useCallback((actionType: string) { const newStep: TestStep { id: step-${Date.now()}, order: testCase.steps.length, actionType, selector: , value: , waitCondition: null, screenshot: false, }; setTestCase(prev ({ ...prev, steps: [...prev.steps, newStep], })); }, [testCase.steps.length]); // 更新测试步骤 const handleUpdateStep useCallback((stepId: string, updates: PartialTestStep) { setTestCase(prev ({ ...prev, steps: prev.steps.map(step step.id stepId ? { ...step, ...updates } : step ), })); }, []); // 删除测试步骤 const handleDeleteStep useCallback((stepId: string) { setTestCase(prev ({ ...prev, steps: prev.steps.filter(step step.id ! stepId), })); }, []); // 拖拽排序 const handleDragEnd useCallback((event: DragEndEvent) { const { active, over } event; if (over active.id ! over.id) { setTestCase(prev { const oldIndex prev.steps.findIndex(step step.id active.id); const newIndex prev.steps.findIndex(step step.id over.id); const newSteps arrayMove(prev.steps, oldIndex, newIndex).map((step, index) ({ ...step, order: index, })); return { ...prev, steps: newSteps }; }); } }, []); // AI 生成测试场景回调 const handleAIGenerated useCallback((scenarios: TestCase[]) { if (scenarios.length 0) { setTestCase(prev ({ ...prev, steps: scenarios[0].steps, })); setShowAIGenerator(false); toast({ title: AI Generation Complete, description: Generated ${scenarios.length} test scenarios, }); } }, [toast]); if (isLoading) { return div classNameflex items-center justify-center h-fullLoading.../div; } return ( div classNameh-full flex flex-col gap-4 p-6 {/* 头部工具栏 */} div classNameflex items-center justify-between div classNameflex-1 input typetext value{testCase.name} onChange{(e) setTestCase(prev ({ ...prev, name: e.target.value }))} classNametext-2xl font-bold bg-transparent border-none focus:outline-none focus:ring-2 focus:ring-blue-500 rounded px-2 placeholderTest Case Name / textarea value{testCase.description} onChange{(e) setTestCase(prev ({ ...prev, description: e.target.value }))} classNamemt-2 w-full bg-transparent border-none focus:outline-none focus:ring-2 focus:ring-blue-500 rounded px-2 text-gray-600 placeholderDescription (optional) rows{2} / /div div classNameflex gap-2 Button variantoutline onClick{() setShowAIGenerator(true)} classNamegap-2 Wand2 classNamew-4 h-4 / AI Generate /Button Button variantoutline onClick{() saveMutation.mutate()} disabled{saveMutation.isLoading} classNamegap-2 Save classNamew-4 h-4 / Save /Button Button classNamegap-2 Play classNamew-4 h-4 / Run Test /Button /div /div {/* 主编辑区域 */} div classNameflex-1 flex gap-4 overflow-hidden {/* 左侧动作面板 */} Card classNamew-64 p-4 overflow-y-auto h3 classNamefont-semibold mb-4Actions/h3 ActionPalette onActionSelect{handleAddStep} / /Card {/* 中间测试步骤列表 */} Card classNameflex-1 p-4 overflow-y-auto div classNameflex items-center justify-between mb-4 h3 classNamefont-semiboldTest Steps/h3 span classNametext-sm text-gray-500 {testCase.steps.length} steps /span /div {testCase.steps.length 0 ? ( div classNameflex flex-col items-center justify-center h-64 text-gray-400 Plus classNamew-12 h-12 mb-2 / pNo steps yet. Drag actions from the left panel or use AI to generate./p /div ) : ( DndContext collisionDetection{closestCenter} onDragEnd{handleDragEnd} SortableContext items{testCase.steps.map(s s.id)} strategy{verticalListSortingStrategy} div classNamespace-y-2 {testCase.steps.map((step) ( TestStepItem key{step.id} step{step} onUpdate{handleUpdateStep} onDelete{handleDeleteStep} / ))} /div /SortableContext /DndContext )} /Card {/* 右侧预览和配置 */} Card classNamew-80 p-4 overflow-y-auto h3 classNamefont-semibold mb-4Configuration/h3 div classNamespace-y-4 div label classNameblock text-sm font-medium mb-1Priority/label select value{testCase.priority || medium} onChange{(e) setTestCase(prev ({ ...prev, priority: e.target.value }))} classNamew-full border rounded px-3 py-2 option valuelowLow/option option valuemediumMedium/option option valuehighHigh/option option valuecriticalCritical/option /select /div div label classNameblock text-sm font-medium mb-1Tags/label input typetext placeholdersmoke, regression, ... classNamew-full border rounded px-3 py-2 / /div /div /Card /div {/* AI 生成器模态框 */} {showAIGenerator ( AIScenarioGenerator projectId{projectId} onGenerated{handleAIGenerated} onClose{() setShowAIGenerator(false)} / )} /div ); }; export default TestEditor;TestMaster 自动化测试平台 - 第二部分后端核心服务代码2.2 后端核心服务实现2.2.1 API 网关服务backend/gateway/src/main.tsimport { NestFactory } from nestjs/core; import { ValidationPipe } from nestjs/common; import { SwaggerModule, DocumentBuilder } from nestjs/swagger; import { AppModule } from ./app.module; import helmet from helmet; import * as compression from compression; async function bootstrap() { const app await NestFactory.create(AppModule, { logger: [error, warn, log, debug, verbose], }); // 安全中间件 app.use(helmet()); // 压缩响应 app.use(compression()); // 全局验证管道 app.useGlobalPipes( new ValidationPipe({ whitelist: true, forbidNonWhitelisted: true, transform: true, transformOptions: { enableImplicitConversion: true, }, }), ); // CORS 配置 app.enableCors({ origin: process.env.FRONTEND_URL || http://localhost:5173, credentials: true, }); // Swagger API 文档 const config new DocumentBuilder() .setTitle(TestMaster API) .setDescription(TestMaster 自动化测试平台 API 文档) .setVersion(1.0) .addBearerAuth() .addTag(auth, 认证相关接口) .addTag(projects, 项目管理接口) .addTag(test-cases, 测试用例接口) .addTag(executions, 测试执行接口) .addTag(reports, 测试报告接口) .build(); const document SwaggerModule.createDocument(app, config); SwaggerModule.setup(api-docs, app, document); const port process.env.PORT || 3000; await app.listen(port); console.log( ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ TestMaster API Gateway Started ║ ║ ║ ║ Server: http://localhost:${port} ║ ║ API Docs: http://localhost:${port}/api-docs ║ ║ GraphQL: http://localhost:${port}/graphql ║ ║ ║ ║ Environment: ${process.env.NODE_ENV || development} ║ ║ Version: 1.0.0 ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ ); } bootstrap();2.2.2 应用模块backend/gateway/src/app.module.tsimport { Module } from nestjs/common; import { ConfigModule, ConfigService } from nestjs/config; import { TypeOrmModule } from nestjs/typeorm; import { GraphQLModule } from nestjs/graphql; import { ApolloDriver, ApolloDriverConfig } from nestjs/apollo; import { BullModule } from nestjs/bull; import { CacheModule } from nestjs/cache-manager; import { redisStore } from cache-manager-redis-store; import { join } from path; // 模块导入 import { AuthModule } from ./auth/auth.module; import { UsersModule } from ./users/users.module; import { ProjectsModule } from ./projects/projects.module; import { TestCasesModule } from ./test-cases/test-cases.module; import { ExecutionsModule } from ./executions/executions.module; import { ReportsModule } from ./reports/reports.module; import { PerformanceModule } from ./performance/performance.module; import { DevicesModule } from ./devices/devices.module; import { CicdModule } from ./cicd/cicd.module; import { AiModule } from ./ai/ai.module; Module({ imports: [ // 配置模块 ConfigModule.forRoot({ isGlobal: true, envFilePath: .env.${process.env.NODE_ENV || development}, }), // 数据库连接 TypeOrmModule.forRootAsync({ imports: [ConfigModule], useFactory: (configService: ConfigService) ({ type: postgres, host: configService.get(DB_HOST), port: configService.get(DB_PORT), username: configService.get(DB_USERNAME), password: configService.get(DB_PASSWORD), database: configService.get(DB_DATABASE), entities: [__dirname /**/*.entity{.ts,.js}], synchronize: configService.get(NODE_ENV) development, logging: configService.get(NODE_ENV) development, ssl: configService.get(DB_SSL) true ? { rejectUnauthorized: false } : false, }), inject: [ConfigService], }), // GraphQL 配置 GraphQLModule.forRootApolloDriverConfig({ driver: ApolloDriver, autoSchemaFile: join(process.cwd(), src/schema.gql), sortSchema: true, playground: true, context: ({ req, res }) ({ req, res }), formatError: (error) { return { message: error.message, code: error.extensions?.code, path: error.path, }; }, }), // Redis 缓存 CacheModule.registerAsync({ isGlobal: true, imports: [ConfigModule], useFactory: async (configService: ConfigService) ({ store: redisStore as any, host: configService.get(REDIS_HOST), port: configService.get(REDIS_PORT), password: configService.get(REDIS_PASSWORD), ttl: 300, // 5分钟默认缓存 }), inject: [ConfigService], }), // 消息队列 BullModule.forRootAsync({ imports: [ConfigModule], useFactory: async (configService: ConfigService) ({ redis: { host: configService.get(REDIS_HOST), port: configService.get(REDIS_PORT), password: configService.get(REDIS_PASSWORD), }, }), inject: [ConfigService], }), // 业务模块 AuthModule, UsersModule, ProjectsModule, TestCasesModule, ExecutionsModule, ReportsModule, PerformanceModule, DevicesModule, CicdModule, AiModule, ], }) export class AppModule {}2.2.3 认证模块backend/gateway/src/auth/auth.service.tsimport { Injectable, UnauthorizedException } from nestjs/common; import { JwtService } from nestjs/jwt; import { InjectRepository } from nestjs/typeorm; import { Repository } from typeorm; import * as bcrypt from bcrypt; import { User } from ../users/entities/user.entity; import { LoginDto, RegisterDto } from ./dto/auth.dto; Injectable() export class AuthService { constructor( InjectRepository(User) private usersRepository: RepositoryUser, private jwtService: JwtService, ) {} /** * 用户注册 */ async register(registerDto: RegisterDto) { const { email, username, password, fullName } registerDto; // 检查用户是否已存在 const existingUser await this.usersRepository.findOne({ where: [{ email }, { username }], }); if (existingUser) { throw new UnauthorizedException(Email or username already exists); } // 密码加密 const salt await bcrypt.genSalt(10); const passwordHash await bcrypt.hash(password, salt); // 创建用户 const user this.usersRepository.create({ email, username, passwordHash, fullName, role: user, isActive: true, }); await this.usersRepository.save(user); // 生成 JWT const tokens await this.generateTokens(user); return { user: this.sanitizeUser(user), ...tokens, }; } /** * 用户登录 */ async login(loginDto: LoginDto) { const { email, password } loginDto; // 查找用户 const user await this.usersRepository.findOne({ where: { email }, }); if (!user) { throw new UnauthorizedException(Invalid credentials); } // 验证密码 const isPasswordValid await bcrypt.compare(password, user.passwordHash); if (!isPasswordValid) { throw new UnauthorizedException(Invalid credentials); } // 检查用户是否激活 if (!user.isActive) { throw new UnauthorizedException(Account is deactivated); } // 更新最后登录时间 user.lastLoginAt new Date(); await this.usersRepository.save(user); // 生成 JWT const tokens await this.generateTokens(user); return { user: this.sanitizeUser(user), ...tokens, }; } /** * 刷新令牌 */ async refreshToken(refreshToken: string) { try { const payload this.jwtService.verify(refreshToken, { secret: process.env.JWT_REFRESH_SECRET, }); const user await this.usersRepository.findOne({ where: { id: payload.sub }, }); if (!user || !user.isActive) { throw new UnauthorizedException(Invalid refresh token); } return this.generateTokens(user); } catch (error) { throw new UnauthorizedException(Invalid refresh token); } } /** * 验证用户 */ async validateUser(userId: string): PromiseUser { const user await this.usersRepository.findOne({ where: { id: userId }, }); if (!user || !user.isActive) { throw new UnauthorizedException(User not found or inactive); } return user; } /** * 生成访问令牌和刷新令牌 */ private async generateTokens(user: User) { const payload { sub: user.id, email: user.email, username: user.username, role: user.role, }; const [accessToken, refreshToken] await Promise.all([ this.jwtService.signAsync(payload, { secret: process.env.JWT_SECRET, expiresIn: 15m, }), this.jwtService.signAsync(payload, { secret: process.env.JWT_REFRESH_SECRET, expiresIn: 7d, }), ]); return { accessToken, refreshToken, expiresIn: 900, // 15分钟 }; } /** * 清理用户敏感信息 */ private sanitizeUser(user: User) { const { passwordHash, ...sanitized } user; return sanitized; } }2.2.4 测试用例服务backend/gateway/src/test-cases/test-cases.service.tsimport { Injectable, NotFoundException, ForbiddenException } from nestjs/common; import { InjectRepository } from nestjs/typeorm; import { Repository } from typeorm; import { InjectQueue } from nestjs/bull; import { Queue } from bull; import { TestCase } from ./entities/test-case.entity; import { TestStep } from ./entities/test-step.entity; import { CreateTestCaseDto, UpdateTestCaseDto } from ./dto/test-case.dto; import { User } from ../users/entities/user.entity; Injectable() export class TestCasesService { constructor( InjectRepository(TestCase) private testCasesRepository: RepositoryTestCase, InjectRepository(TestStep) private testStepsRepository: RepositoryTestStep, InjectQueue(test-execution) private executionQueue: Queue, ) {} /** * 创建测试用例 */ async create(createTestCaseDto: CreateTestCaseDto, user: User): PromiseTestCase { const { steps, ...testCaseData } createTestCaseDto; // 创建测试用例 const testCase this.testCasesRepository.create({ ...testCaseData, createdBy: user, }); const savedTestCase await this.testCasesRepository.save(testCase); // 创建测试步骤 if (steps steps.length 0) { const testSteps steps.map((step, index) this.testStepsRepository.create({ ...step, stepOrder: index, testCase: savedTestCase, }), ); await this.testStepsRepository.save(testSteps); } return this.findOne(savedTestCase.id); } /** * 查询所有测试用例 */ async findAll( suiteId?: string, projectId?: string, tags?: string[], status?: string, ): PromiseTestCase[] { const query this.testCasesRepository .createQueryBuilder(testCase) .leftJoinAndSelect(testCase.steps, steps) .leftJoinAndSelect(testCase.suite, suite) .leftJoinAndSelect(testCase.createdBy, createdBy) .orderBy(steps.stepOrder, ASC); if (suiteId) { query.andWhere(testCase.suiteId :suiteId, { suiteId }); } if (projectId) { query.andWhere(suite.projectId :projectId, { projectId }); } if (tags tags.length 0) { query.andWhere(testCase.tags :tags, { tags }); } if (status) { query.andWhere(testCase.status :status, { status }); } return query.getMany(); } /** * 查询单个测试用例 */ async findOne(id: string): PromiseTestCase { const testCase await this.testCasesRepository.findOne({ where: { id }, relations: [steps, suite, createdBy], order: { steps: { stepOrder: ASC, }, }, }); if (!testCase) { throw new NotFoundException(Test case with ID ${id} not found); } return testCase; } /** * 更新测试用例 */ async update( id: string, updateTestCaseDto: UpdateTestCaseDto, user: User, ): PromiseTestCase { const testCase await this.findOne(id); // 权限检查 if (testCase.createdBy.id ! user.id user.role ! admin) { throw new ForbiddenException(You do not have permission to update this test case); } const { steps, ...testCaseData } updateTestCaseDto; // 更新测试用例基本信息 Object.assign(testCase, testCaseData); await this.testCasesRepository.save(testCase); // 更新测试步骤 if (steps) { // 删除旧步骤 await this.testStepsRepository.delete({ testCase: { id } }); // 创建新步骤 if (steps.length 0) { const testSteps steps.map((step, index) this.testStepsRepository.create({ ...step, stepOrder: index, testCase, }), ); await this.testStepsRepository.save(testSteps); } } return this.findOne(id); } /** * 删除测试用例 */ async remove(id: string, user: User): Promisevoid { const testCase await this.findOne(id); // 权限检查 if (testCase.createdBy.id ! user.id user.role ! admin) { throw new ForbiddenException(You do not have permission to delete this test case); } await this.testCasesRepository.remove(testCase); } /** * 执行测试用例 */ async execute( id: string, environment: string, browser: string, device?: string, ): Promise{ executionId: string } { const testCase await this.findOne(id); // 创建执行任务 const job await this.executionQueue.add(execute-test, { testCaseId: testCase.id, environment, browser, device, timestamp: new Date(), }); return { executionId: job.id.toString(), }; } /** * 批量执行测试用例 */ async executeBatch( testCaseIds: string[], environment: string, browser: string, device?: string, ): Promise{ executionIds: string[] } { const jobs await Promise.all( testCaseIds.map((id) this.executionQueue.add(execute-test, { testCaseId: id, environment, browser, device, timestamp: new Date(), }), ), ); return { executionIds: jobs.map((job) job.id.toString()), }; } /** * 复制测试用例 */ async duplicate(id: string, user: User): PromiseTestCase { const originalTestCase await this.findOne(id); const duplicatedTestCase this.testCasesRepository.create({ name: ${originalTestCase.name} (Copy), description: originalTestCase.description, priority: originalTestCase.priority, tags: originalTestCase.tags, suite: originalTestCase.suite, createdBy: user, }); const savedTestCase await this.testCasesRepository.save(duplicatedTestCase); // 复制测试步骤 if (originalTestCase.steps originalTestCase.steps.length 0) { const duplicatedSteps originalTestCase.steps.map((step) this.testStepsRepository.create({ stepOrder: step.stepOrder, actionType: step.actionType, selector: step.selector, value: step.value, waitCondition: step.waitCondition, screenshot: step.screenshot, testCase: savedTestCase, }), ); await this.testStepsRepository.save(duplicatedSteps); } return this.findOne(savedTestCase.id); } /** * 获取测试用例统计信息 */ async getStatistics(projectId: string) { const query this.testCasesRepository .createQueryBuilder(testCase) .leftJoin(testCase.suite, suite) .where(suite.projectId :projectId, { projectId }); const [total, byStatus, byPriority] await Promise.all([ query.getCount(), query .select(testCase.status, status) .addSelect(COUNT(*), count) .groupBy(testCase.status) .getRawMany(), query .select(testCase.priority, priority) .addSelect(COUNT(*), count) .groupBy(testCase.priority) .getRawMany(), ]); return { total, byStatus: byStatus.reduce((acc, item) { acc[item.status] parseInt(item.count); return acc; }, {}), byPriority: byPriority.reduce((acc, item) { acc[item.priority] parseInt(item.count); return acc; }, {}), }; } }2.2.5 测试用例实体backend/gateway/src/test-cases/entities/test-case.entity.tsimport { Entity, PrimaryGeneratedColumn, Column, ManyToOne, OneToMany, CreateDateColumn, UpdateDateColumn, JoinColumn, } from typeorm; import { TestSuite } from ../../test-suites/entities/test-suite.entity; import { TestStep } from ./test-step.entity; import { User } from ../../users/entities/user.entity; import { TestExecution } from ../../executions/entities/test-execution.entity; Entity(test_cases) export class TestCase { PrimaryGeneratedColumn(uuid) id: string; ManyToOne(() TestSuite, (suite) suite.testCases, { onDelete: CASCADE }) JoinColumn({ name: suite_id }) suite: TestSuite; Column({ length: 255 }) name: string; Column({ type: text, nullable: true }) description: string; Column({ length: 20, default: medium }) priority: string; Column({ length: 50, default: draft }) status: string; Column({ type: jsonb, nullable: true }) testData: any; Column({ type: jsonb, nullable: true }) expectedResults: any; Column({ type: text, array: true, default: [] }) tags: string[]; OneToMany(() TestStep, (step) step.testCase, { cascade: true }) steps: TestStep[]; OneToMany(() TestExecution, (execution) execution.testCase) executions: TestExecution[]; ManyToOne(() User) JoinColumn({ name: created_by }) createdBy: User; CreateDateColumn({ name: created_at }) createdAt: Date; UpdateDateColumn({ name: updated_at }) updatedAt: Date; }2.2.6 测试步骤实体backend/gateway/src/test-cases/entities/test-step.entity.tsimport { Entity, PrimaryGeneratedColumn, Column, ManyToOne, CreateDateColumn, JoinColumn, } from typeorm; import { TestCase } from ./test-case.entity; Entity(test_steps) export class TestStep { PrimaryGeneratedColumn(uuid) id: string; ManyToOne(() TestCase, (testCase) testCase.steps, { onDelete: CASCADE }) JoinColumn({ name: test_case_id }) testCase: TestCase; Column({ name: step_order }) stepOrder: number; Column({ name: action_type, length: 100 }) actionType: string; Column({ length: 500, nullable: true }) selector: string; Column({ type: text, nullable: true }) value: string; Column({ name: wait_condition, type: jsonb, nullable: true }) waitCondition: { type: element | time | url | custom; value: string | number; timeout?: number; }; Column({ default: false }) screenshot: boolean; CreateDateColumn({ name: created_at }) createdAt: Date; }2.2.7 测试执行服务backend/gateway/src/executions/executions.service.tsimport { Injectable, NotFoundException } from nestjs/common; import { InjectRepository } from nestjs/typeorm; import { Repository, In, Between } from typeorm; import { TestExecution } from ./entities/test-execution.entity; import { TestResult } from ./entities/test-result.entity; import { TestCasesService } from ../test-cases/test-cases.service; Injectable() export class ExecutionsService { constructor( InjectRepository(TestExecution) private executionsRepository: RepositoryTestExecution, InjectRepository(TestResult) private resultsRepository: RepositoryTestResult, private testCasesService: TestCasesService, ) {} /** * 创建测试执行记录 */ async create( testCaseId: string, environment: string, browser: string, device?: string, triggeredBy?: string, ciBuildId?: string, ): PromiseTestExecution { const testCase await this.testCasesService.findOne(testCaseId); const execution this.executionsRepository.create({ testCase, environment, browser, device, status: pending, triggeredBy: triggeredBy ? { id: triggeredBy } as any : null, ciBuildId, }); return this.executionsRepository.save(execution); } /** * 查询所有执行记录 */ async findAll(filters: { projectId?: string; suiteId?: string; testCaseId?: string; status?: string; environment?: string; browser?: string; startDate?: Date; endDate?: Date; limit?: number; offset?: number; }): Promise{ executions: TestExecution[]; total: number } { const query this.executionsRepository .createQueryBuilder(execution) .leftJoinAndSelect(execution.testCase, testCase) .leftJoinAndSelect(execution.suite, suite) .leftJoinAndSelect(execution.results, results) .leftJoinAndSelect(execution.triggeredBy, triggeredBy) .orderBy(execution.createdAt, DESC); if (filters.testCaseId) { query.andWhere(execution.testCaseId :testCaseId, { testCaseId: filters.testCaseId, }); } if (filters.suiteId) { query.andWhere(execution.suiteId :suiteId, { suiteId: filters.suiteId, }); } if (filters.status) { query.andWhere(execution.status :status, { status: filters.status }); } if (filters.environment) { query.andWhere(execution.environment :environment, { environment: filters.environment, }); } if (filters.browser) { query.andWhere(execution.browser :browser, { browser: filters.browser }); } if (filters.startDate filters.endDate) { query.andWhere(execution.createdAt BETWEEN :startDate AND :endDate, { startDate: filters.startDate, endDate: filters.endDate, }); } const total await query.getCount(); if (filters.limit) { query.take(filters.limit); } if (filters.offset) { query.skip(filters.offset); } const executions await query.getMany(); return { executions, total }; } /** * 查询单个执行记录 */ async findOne(id: string): PromiseTestExecution { const execution await this.executionsRepository.findOne({ where: { id }, relations: [testCase, testCase.steps, suite, results, triggeredBy], order: { results: { createdAt: ASC, }, }, }); if (!execution) { throw new NotFoundException(Execution with ID ${id} not found); } return execution; } /** * 更新执行状态 */ async updateStatus( id: string, status: running | passed | failed | skipped | error, startedAt?: Date, completedAt?: Date, ): PromiseTestExecution { const execution await this.findOne(id); execution.status status; if (startedAt) { execution.startedAt startedAt; } if (completedAt) { execution.completedAt completedAt; if (execution.startedAt) { execution.durationMs completedAt.getTime() - execution.startedAt.getTime(); } } return this.executionsRepository.save(execution); } /** * 添加测试结果 */ async addResult( executionId: string, stepId: string, status: passed | failed | skipped | error, errorMessage?: string, screenshotUrl?: string, videoUrl?: string, logs?: any, metrics?: any, ): PromiseTestResult { const execution await this.findOne(executionId); const result this.resultsRepository.create({ execution, step: { id: stepId } as any, status, errorMessage, screenshotUrl, videoUrl, logs, metrics, }); return this.resultsRepository.save(result); } /** * 获取执行统计 */ async getStatistics( projectId?: string, startDate?: Date, endDate?: Date, ): Promiseany { const query this.executionsRepository .createQueryBuilder(execution) .leftJoin(execution.testCase, testCase) .leftJoin(testCase.suite, suite); if (projectId) { query.where(suite.projectId :projectId, { projectId }); } if (startDate endDate) { query.andWhere(execution.createdAt BETWEEN :startDate AND :endDate, { startDate, endDate, }); } const [ total, byStatus, byEnvironment, byBrowser, avgDuration, successRate, ] await Promise.all([ query.getCount(), query .select(execution.status, status) .addSelect(COUNT(*), count) .groupBy(execution.status) .getRawMany(), query .select(execution.environment, environment) .addSelect(COUNT(*), count) .groupBy(execution.environment) .getRawMany(), query .select(execution.browser, browser) .addSelect(COUNT(*), count) .groupBy(execution.browser) .getRawMany(), query .select(AVG(execution.durationMs), avgDuration) .where(execution.durationMs IS NOT NULL) .getRawOne(), query .select( (COUNT(CASE WHEN execution.status passed THEN 1 END)::float / COUNT(*)::float * 100), successRate, ) .getRawOne(), ]); return { total, byStatus: byStatus.reduce((acc, item) { acc[item.status] parseInt(item.count); return acc; }, {}), byEnvironment: byEnvironment.reduce((acc, item) { acc[item.environment] parseInt(item.count); return acc; }, {}), byBrowser: byBrowser.reduce((acc, item) { acc[item.browser] parseInt(item.count); return acc; }, {}), avgDuration: parseFloat(avgDuration?.avgDuration || 0), successRate: parseFloat(successRate?.successRate || 0), }; } /** * 获取趋势数据 */ async getTrends( projectId: string, days: number 30, ): Promiseany[] { const startDate new Date(); startDate.setDate(startDate.getDate() - days); const executions await this.executionsRepository .createQueryBuilder(execution) .leftJoin(execution.testCase, testCase) .leftJoin(testCase.suite, suite) .where(suite.projectId :projectId, { projectId }) .andWhere(execution.createdAt :startDate, { startDate }) .select([ DATE(execution.createdAt) as date, COUNT(*) as total, COUNT(CASE WHEN execution.status passed THEN 1 END) as passed, COUNT(CASE WHEN execution.status failed THEN 1 END) as failed, AVG(execution.durationMs) as avgDuration, ]) .groupBy(DATE(execution.createdAt)) .orderBy(date, ASC) .getRawMany(); return executions.map((item) ({ date: item.date, total: parseInt(item.total), passed: parseInt(item.passed), failed: parseInt(item.failed), avgDuration: parseFloat(item.avgDuration || 0), successRate: (parseInt(item.passed) / parseInt(item.total)) * 100, })); } }继续下一部分...2.2.8 测试执行消费者backend/gateway/src/executions/execution.processor.tsimport { Process, Processor } from nestjs/bull; import { Logger } from nestjs/common; import { Job } from bull; import { ExecutionsService } from ./executions.service; import { TestEngineClient } from ../clients/test-engine.client; Processor(test-execution) export class ExecutionProcessor { private readonly logger new Logger(ExecutionProcessor.name); constructor( private executionsService: ExecutionsService, private testEngineClient: TestEngineClient, ) {} Process(execute-test) async handleTestExecution(job: Job) { const { testCaseId, environment, browser, device, triggeredBy, ciBuildId } job.data; this.logger.log( Starting test execution for test case ${testCaseId} on ${browser}, ); try { // 创建执行记录 const execution await this.executionsService.create( testCaseId, environment, browser, device, triggeredBy, ciBuildId, ); // 更新状态为运行中 await this.executionsService.updateStatus( execution.id, running, new Date(), ); // 调用测试引擎执行测试 const result await this.testEngineClient.executeTest({ executionId: execution.id, testCaseId, environment, browser, device, }); // 更新执行结果 await this.executionsService.updateStatus( execution.id, result.status, result.startedAt, result.completedAt, ); // 保存每个步骤的结果 for (const stepResult of result.stepResults) { await this.executionsService.addResult( execution.id, stepResult.stepId, stepResult.status, stepResult.errorMessage, stepResult.screenshotUrl, stepResult.videoUrl, stepResult.logs, stepResult.metrics, ); } this.logger.log( Test execution completed for ${testCaseId}: ${result.status}, ); return { executionId: execution.id, status: result.status }; } catch (error) { this.logger.error( Test execution failed for ${testCaseId}: ${error.message}, error.stack, ); throw error; } } Process(execute-suite) async handleSuiteExecution(job: Job) { const { suiteId, environment, browser, device, triggeredBy, ciBuildId } job.data; this.logger.log(Starting suite execution for suite ${suiteId}); // 实现套件执行逻辑 // 获取套件中的所有测试用例依次执行 return { suiteId, status: completed }; } }现在让我继续提供 AI 生成器服务和 Python 测试引擎的实现...2.3 AI 生成器服务Python2.3.1 AI 服务主文件backend/services/ai-generator/src/main.py TestMaster AI 测试场景生成服务 使用 GPT-4 或本地 LLM 生成测试场景 from fastapi import FastAPI, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from typing import List, Optional, Dict, Any import logging import asyncio from datetime import datetime from llm_service import LLMService from scenario_generator import ScenarioGenerator from page_analyzer import PageAnalyzer # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) logger logging.getLogger(__name__) # 创建 FastAPI 应用 app FastAPI( titleTestMaster AI Generator, descriptionAI-powered test scenario generation service, version1.0.0 ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # 请求模型 class GenerateRequest(BaseModel): url: HttpUrl project_id: str user_id: str options: Optional[Dict[str, Any]] { max_scenarios: 10, include_edge_cases: True, include_negative_tests: True, complexity: medium # low, medium, high } class TestScenario(BaseModel): name: str description: str priority: str steps: List[Dict[str, Any]] confidence: float tags: List[str] class GenerateResponse(BaseModel): project_id: str url: str page_structure: Dict[str, Any] scenarios: List[TestScenario] generated_at: datetime model: str total_scenarios: int # 依赖注入 def get_llm_service(): return LLMService() def get_page_analyzer(): return PageAnalyzer() def get_scenario_generator( llm_service: LLMService Depends(get_llm_service) ): return ScenarioGenerator(llm_service) # API 端点 app.get(/health) async def health_check(): 健康检查 return { status: healthy, service: ai-generator, version: 1.0.0, timestamp: datetime.now().isoformat() } app.post(/generate, response_modelGenerateResponse) async def generate_test_scenarios( request: GenerateRequest, page_analyzer: PageAnalyzer Depends(get_page_analyzer), scenario_generator: ScenarioGenerator Depends(get_scenario_generator) ): 生成测试场景 Args: request: 生成请求包含 URL 和选项 Returns: 生成的测试场景列表 try: logger.info(fGenerating test scenarios for URL: {request.url}) # 1. 分析页面结构 logger.info(Analyzing page structure...) page_structure await page_analyzer.analyze(str(request.url)) # 2. 生成测试场景 logger.info(Generating test scenarios with AI...) scenarios await scenario_generator.generate( urlstr(request.url), page_structurepage_structure, optionsrequest.options ) # 3. 构建响应 response GenerateResponse( project_idrequest.project_id, urlstr(request.url), page_structurepage_structure, scenariosscenarios, generated_atdatetime.now(), modelscenario_generator.model_name, total_scenarioslen(scenarios) ) logger.info(fSuccessfully generated {len(scenarios)} test scenarios) return response except Exception as e: logger.error(fError generating test scenarios: {str(e)}, exc_infoTrue) raise HTTPException( status_code500, detailfFailed to generate test scenarios: {str(e)} ) app.post(/analyze-page) async def analyze_page( url: HttpUrl, page_analyzer: PageAnalyzer Depends(get_page_analyzer) ): 分析页面结构 Args: url: 要分析的页面 URL Returns: 页面结构分析结果 try: logger.info(fAnalyzing page: {url}) page_structure await page_analyzer.analyze(str(url)) return { url: str(url), structure: page_structure, analyzed_at: datetime.now().isoformat() } except Exception as e: logger.error(fError analyzing page: {str(e)}, exc_infoTrue) raise HTTPException( status_code500, detailfFailed to analyze page: {str(e)} ) if __name__ __main__: import uvicorn print( ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ TestMaster AI Generator Service ║ ║ ║ ║ Server: http://localhost:8001 ║ ║ Docs: http://localhost:8001/docs ║ ║ ║ ║ Status: Starting... ║ ║ Version: 1.0.0 ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ ) uvicorn.run( main:app, host0.0.0.0, port8001, reloadTrue, log_levelinfo )2.3.2 LLM 服务backend/services/ai-generator/src/llm_service.py LLM 服务 - 支持 OpenAI GPT-4 和本地 LLM import os import json import logging from typing import List, Dict, Any, Optional from openai import AsyncOpenAI import anthropic logger logging.getLogger(__name__) class LLMService: 大语言模型服务 def __init__(self): self.provider os.getenv(LLM_PROVIDER, openai) # openai, anthropic, local self.model_name os.getenv(LLM_MODEL, gpt-4-turbo-preview) if self.provider openai: self.client AsyncOpenAI(api_keyos.getenv(OPENAI_API_KEY)) elif self.provider anthropic: self.client anthropic.AsyncAnthropic(api_keyos.getenv(ANTHROPIC_API_KEY)) else: # 本地 LLM (使用 Ollama 或其他) self.client None logger.info(Using local LLM) async def generate_test_scenarios( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) - List[Dict[str, Any]]: 使用 LLM 生成测试场景 Args: url: 页面 URL page_structure: 页面结构分析结果 options: 生成选项 Returns: 测试场景列表 # 构建提示词 prompt self._build_prompt(url, page_structure, options) # 调用 LLM if self.provider openai: response await self._call_openai(prompt) elif self.provider anthropic: response await self._call_anthropic(prompt) else: response await self._call_local_llm(prompt) # 解析响应 scenarios self._parse_response(response) return scenarios def _build_prompt( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) - str: 构建 LLM 提示词 max_scenarios options.get(max_scenarios, 10) include_edge_cases options.get(include_edge_cases, True) include_negative_tests options.get(include_negative_tests, True) complexity options.get(complexity, medium) prompt fYou are an expert QA automation engineer. Analyze the following web page and generate comprehensive test scenarios. **Page URL:** {url} **Page Structure:** json {json.dumps(page_structure, indent2)}Requirements:Generate up to {max_scenarios} test scenariosInclude edge cases: {include_edge_cases}Include negative tests: {include_negative_tests}Complexity level: {complexity}Output Format:Return a JSON array of test scenarios. Each scenario should have:name: Clear, descriptive test namedescription: What the test validatespriority: critical, high, medium, or lowsteps: Array of test steps with:action: Action type (navigate, click, type, select, wait, assert, etc.)selector: CSS selector or XPathvalue: Input value (if applicable)description: Human-readable step descriptionconfidence: Your confidence in this test (0.0 to 1.0)tags: Relevant tags (e.g., [smoke, regression, ui])Example:[ {{ name: User Login - Valid Credentials, description: Verify that a user can successfully log in with valid credentials, priority: critical, steps: [ {{ action: navigate, selector: , value: {url}, description: Navigate to login page }}, {{ action: type, selector: #email, value: testexample.com, description: Enter email address }}, {{ action: type, selector: #password, value: password123, description: Enter password }}, {{ action: click, selector: button[typesubmit], value: , description: Click login button }}, {{ action: assert, selector: .dashboard, value: visible, description: Verify dashboard is displayed }} ], confidence: 0.95, tags: [smoke, authentication, critical] }} ]Generate test scenarios now:return prompt async def _call_openai(self, prompt: str) - str: 调用 OpenAI API try: response await self.client.chat.completions.create( modelself.model_name, messages[ { role: system, content: You are an expert QA automation engineer specializing in web testing. Always respond with valid JSON. }, { role: user, content: prompt } ], temperature0.7, max_tokens4000, response_format{type: json_object} ) return response.choices[0].message.content except Exception as e: logger.error(fOpenAI API error: {str(e)}) raise async def _call_anthropic(self, prompt: str) - str: 调用 Anthropic Claude API try: response await self.client.messages.create( modelself.model_name, max_tokens4000, messages[ { role: user, content: prompt } ] ) return response.content[0].text except Exception as e: logger.error(fAnthropic API error: {str(e)}) raise async def _call_local_llm(self, prompt: str) - str: 调用本地 LLM (Ollama) import aiohttp try: async with aiohttp.ClientSession() as session: async with session.post( http://localhost:11434/api/generate, json{ model: llama2, prompt: prompt, stream: False } ) as response: result await response.json() return result[response] except Exception as e: logger.error(fLocal LLM error: {str(e)}) raise def _parse_response(self, response: str) - List[Dict[str, Any]]: 解析 LLM 响应 try: # 尝试直接解析 JSON data json.loads(response) # 如果返回的是对象而不是数组尝试提取数组 if isinstance(data, dict): if scenarios in data: scenarios data[scenarios] elif test_scenarios in data: scenarios data[test_scenarios] else: # 假设整个对象就是一个场景 scenarios [data] else: scenarios data # 验证和清理场景 validated_scenarios [] for scenario in scenarios: if self._validate_scenario(scenario): validated_scenarios.append(scenario) else: logger.warning(fInvalid scenario skipped: {scenario.get(name, Unknown)}) return validated_scenarios except json.JSONDecodeError as e: logger.error(fFailed to parse LLM response: {str(e)}) logger.error(fResponse: {response}) # 尝试提取 JSON 代码块 import re json_match re.search(rjson\s*(.*?)\s*, response, re.DOTALL) if json_match: try: data json.loads(json_match.group(1)) return self._parse_response(json.dumps(data)) except: pass return [] def _validate_scenario(self, scenario: Dict[str, Any]) - bool: 验证测试场景格式 required_fields [name, description, steps] # 检查必需字段 for field in required_fields: if field not in scenario: return False # 检查步骤 if not isinstance(scenario[steps], list) or len(scenario[steps]) 0: return False # 验证每个步骤 for step in scenario[steps]: if not isinstance(step, dict): return False if action not in step: return False return True继续下一部分... #### 2.3.3 页面分析器 backend/services/ai-generator/src/page_analyzer.py python 页面分析器 - 使用 Playwright 分析页面结构 import logging from typing import Dict, Any, List from playwright.async_api import async_playwright, Page import asyncio logger logging.getLogger(__name__) class PageAnalyzer: 页面结构分析器 def __init__(self): self.timeout 30000 # 30秒超时 async def analyze(self, url: str) - Dict[str, Any]: 分析页面结构 Args: url: 页面 URL Returns: 页面结构信息 async with async_playwright() as p: browser await p.chromium.launch(headlessTrue) context await browser.new_context( viewport{width: 1920, height: 1080}, user_agentMozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ) page await context.new_page() try: # 导航到页面 logger.info(fNavigating to {url}) await page.goto(url, wait_untilnetworkidle, timeoutself.timeout) # 等待页面加载完成 await page.wait_for_load_state(domcontentloaded) # 提取页面信息 structure await self._extract_page_structure(page) return structure except Exception as e: logger.error(fError analyzing page {url}: {str(e)}) raise finally: await browser.close() async def _extract_page_structure(self, page: Page) - Dict[str, Any]: 提取页面结构 # 执行 JavaScript 提取页面元素 structure await page.evaluate( () { const result { title: document.title, url: window.location.href, meta: {}, forms: [], inputs: [], buttons: [], links: [], headings: [], images: [], tables: [], interactiveElements: [] }; // Meta 信息 document.querySelectorAll(meta).forEach(meta { const name meta.getAttribute(name) || meta.getAttribute(property); const content meta.getAttribute(content); if (name content) { result.meta[name] content; } }); // 表单 document.querySelectorAll(form).forEach((form, index) { const formData { id: form.id || form-${index}, name: form.name, action: form.action, method: form.method, inputs: [] }; form.querySelectorAll(input, select, textarea).forEach(input { formData.inputs.push({ type: input.type || input.tagName.toLowerCase(), name: input.name, id: input.id, placeholder: input.placeholder, required: input.required, selector: #${input.id} || [name${input.name}] }); }); result.forms.push(formData); }); // 输入框 document.querySelectorAll(input, textarea, select).forEach(input { result.inputs.push({ type: input.type || input.tagName.toLowerCase(), name: input.name, id: input.id, placeholder: input.placeholder, required: input.required, selector: input.id ? #${input.id} : [name${input.name}], label: input.labels?.[0]?.textContent?.trim() }); }); // 按钮 document.querySelectorAll(button, input[typesubmit], input[typebutton]).forEach((btn, index) { result.buttons.push({ text: btn.textContent?.trim() || btn.value, type: btn.type, id: btn.id || button-${index}, selector: btn.id ? #${btn.id} : button:nth-of-type(${index 1}), disabled: btn.disabled }); }); // 链接 document.querySelectorAll(a[href]).forEach((link, index) { result.links.push({ text: link.textContent?.trim(), href: link.href, id: link.id, selector: link.id ? #${link.id} : a:nth-of-type(${index 1}) }); }); // 标题 [h1, h2, h3, h4, h5, h6].forEach(tag { document.querySelectorAll(tag).forEach((heading, index) { result.headings.push({ level: tag, text: heading.textContent?.trim(), id: heading.id, selector: heading.id ? #${heading.id} : ${tag}:nth-of-type(${index 1}) }); }); }); // 图片 document.querySelectorAll(img).forEach((img, index) { result.images.push({ src: img.src, alt: img.alt, id: img.id, selector: img.id ? #${img.id} : img:nth-of-type(${index 1}) }); }); // 表格 document.querySelectorAll(table).forEach((table, index) { result.tables.push({ id: table.id || table-${index}, rows: table.rows.length, columns: table.rows[0]?.cells.length || 0, selector: table.id ? #${table.id} : table:nth-of-type(${index 1}) }); }); // 交互元素 document.querySelectorAll([onclick], [data-action], .clickable, [rolebutton]).forEach((el, index) { result.interactiveElements.push({ tag: el.tagName.toLowerCase(), text: el.textContent?.trim().substring(0, 50), id: el.id, className: el.className, selector: el.id ? #${el.id} : .${el.className.split( )[0]} }); }); return result; } ) # 添加截图 screenshot await page.screenshot(full_pageTrue, typepng) structure[screenshot] screenshot.hex() # 添加页面性能指标 performance await page.evaluate( () { const perfData performance.getEntriesByType(navigation)[0]; return { loadTime: perfData.loadEventEnd - perfData.fetchStart, domContentLoaded: perfData.domContentLoadedEventEnd - perfData.fetchStart, responseTime: perfData.responseEnd - perfData.requestStart }; } ) structure[performance] performance logger.info(fExtracted {len(structure[forms])} forms, f{len(structure[inputs])} inputs, f{len(structure[buttons])} buttons, f{len(structure[links])} links) return structure2.3.4 场景生成器backend/services/ai-generator/src/scenario_generator.py 测试场景生成器 import logging from typing import List, Dict, Any from llm_service import LLMService logger logging.getLogger(__name__) class ScenarioGenerator: 测试场景生成器 def __init__(self, llm_service: LLMService): self.llm_service llm_service self.model_name llm_service.model_name async def generate( self, url: str, page_structure: Dict[str, Any], options: Dict[str, Any] ) - List[Dict[str, Any]]: 生成测试场景 Args: url: 页面 URL page_structure: 页面结构 options: 生成选项 Returns: 测试场景列表 logger.info(fGenerating test scenarios for {url}) # 使用 LLM 生成场景 scenarios await self.llm_service.generate_test_scenarios( url, page_structure, options ) # 后处理场景 processed_scenarios self._post_process_scenarios(scenarios, page_structure) # 排序按优先级和置信度 sorted_scenarios self._sort_scenarios(processed_scenarios) # 限制数量 max_scenarios options.get(max_scenarios, 10) return sorted_scenarios[:max_scenarios] def _post_process_scenarios( self, scenarios: List[Dict[str, Any]], page_structure: Dict[str, Any] ) - List[Dict[str, Any]]: 后处理场景 processed [] for scenario in scenarios: # 验证选择器 scenario self._validate_selectors(scenario, page_structure) # 添加等待条件 scenario self._add_wait_conditions(scenario) # 添加断言 scenario self._enhance_assertions(scenario) processed.append(scenario) return processed def _validate_selectors( self, scenario: Dict[str, Any], page_structure: Dict[str, Any] ) - Dict[str, Any]: 验证和优化选择器 # 提取所有可用的选择器 available_selectors set() for input_elem in page_structure.get(inputs, []): if input_elem.get(selector): available_selectors.add(input_elem[selector]) for button in page_structure.get(buttons, []): if button.get(selector): available_selectors.add(button[selector]) # 验证步骤中的选择器 for step in scenario.get(steps, []): selector step.get(selector, ) # 如果选择器不在可用列表中尝试修复 if selector and selector not in available_selectors: # 尝试通过文本匹配找到更好的选择器 if step[action] in [click, type]: better_selector self._find_better_selector( step, page_structure ) if better_selector: step[selector] better_selector logger.info(fUpdated selector from {selector} to {better_selector}) return scenario def _find_better_selector( self, step: Dict[str, Any], page_structure: Dict[str, Any] ) - str: 查找更好的选择器 action step[action] description step.get(description, ).lower() if action type: # 查找匹配的输入框 for input_elem in page_structure.get(inputs, []): label input_elem.get(label, ).lower() placeholder input_elem.get(placeholder, ).lower() if label in description or placeholder in description: return input_elem[selector] elif action click: # 查找匹配的按钮 for button in page_structure.get(buttons, []): text button.get(text, ).lower() if text in description: return button[selector] return def _add_wait_conditions(self, scenario: Dict[str, Any]) - Dict[str, Any]: 添加等待条件 for i, step in enumerate(scenario.get(steps, [])): # 在导航后添加等待 if step[action] navigate: if i 1 len(scenario[steps]): scenario[steps].insert(i 1, { action: wait, selector: body, value: load, description: Wait for page to load }) # 在点击后添加短暂等待 elif step[action] click: step[waitAfter] 500 # 500ms return scenario def _enhance_assertions(self, scenario: Dict[str, Any]) - Dict[str, Any]: 增强断言 steps scenario.get(steps, []) # 如果最后一步不是断言添加一个 if steps and steps[-1][action] ! assert: steps.append({ action: assert, selector: body, value: visible, description: Verify page is loaded successfully }) return scenario def _sort_scenarios(self, scenarios: List[Dict[str, Any]]) - List[Dict[str, Any]]: 排序场景 priority_order {critical: 0, high: 1, medium: 2, low: 3} return sorted( scenarios, keylambda s: ( priority_order.get(s.get(priority, medium), 2), -s.get(confidence, 0.5) ) )现在继续提供测试执行引擎的实现...2.4 测试执行引擎Python Selenium/Playwright2.4.1 执行器主文件backend/services/executor/src/main.py TestMaster 测试执行引擎 支持 Selenium 和 Playwright from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Any, Optional import logging from datetime import datetime import asyncio from selenium_runner import SeleniumRunner from playwright_runner import PlaywrightRunner from appium_runner import AppiumRunner # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) logger logging.getLogger(__name__) # 创建 FastAPI 应用 app FastAPI( titleTestMaster Executor, descriptionTest execution engine service, version1.0.0 ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # 请求模型 class TestStep(BaseModel): id: str action: str selector: Optional[str] None value: Optional[str] None wait_condition: Optional[Dict[str, Any]] None screenshot: bool False class ExecuteRequest(BaseModel): execution_id: str test_case_id: str steps: List[TestStep] environment: str browser: str chrome device: Optional[str] None headless: bool True video: bool False class ExecuteResponse(BaseModel): execution_id: str status: str started_at: datetime completed_at: datetime duration_ms: int step_results: List[Dict[str, Any]] # 全局执行器实例 selenium_runner SeleniumRunner() playwright_runner PlaywrightRunner() appium_runner AppiumRunner() # API 端点 app.get(/health) async def health_check(): 健康检查 return { status: healthy, service: executor, version: 1.0.0, runners: { selenium: selenium_runner.is_available(), playwright: playwright_runner.is_available(), appium: appium_runner.is_available() } } app.post(/execute, response_modelExecuteResponse) async def execute_test(request: ExecuteRequest, background_tasks: BackgroundTasks): 执行测试 Args: request: 执行请求 background_tasks: 后台任务 Returns: 执行结果 logger.info(fExecuting test {request.test_case_id} on {request.browser}) started_at datetime.now() try: # 选择执行器 if request.device: # 移动设备测试 runner appium_runner elif request.browser in [chrome, firefox, edge, safari]: # Web 浏览器测试 - 使用 Playwright (更快更稳定) runner playwright_runner else: # 默认使用 Selenium runner selenium_runner # 执行测试 step_results await runner.execute( execution_idrequest.execution_id, steps[step.dict() for step in request.steps], browserrequest.browser, devicerequest.device, headlessrequest.headless, videorequest.video ) completed_at datetime.now() duration_ms int((completed_at - started_at).total_seconds() * 1000) # 判断整体状态 status passed for result in step_results: if result[status] in [failed, error]: status failed break response ExecuteResponse( execution_idrequest.execution_id, statusstatus, started_atstarted_at, completed_atcompleted_at, duration_msduration_ms, step_resultsstep_results ) logger.info(fTest execution completed: {status} in {duration_ms}ms) return response except Exception as e: logger.error(fTest execution failed: {str(e)}, exc_infoTrue) completed_at datetime.now() duration_ms int((completed_at - started_at).total_seconds() * 1000) return ExecuteResponse( execution_idrequest.execution_id, statuserror, started_atstarted_at, completed_atcompleted_at, duration_msduration_ms, step_results[{ step_id: error, status: error, error_message: str(e) }] ) app.get(/browsers) async def get_available_browsers(): 获取可用的浏览器 return { browsers: [ {name: chrome, version: latest, available: True}, {name: firefox, version: latest, available: True}, {name: edge, version: latest, available: True}, {name: safari, version: latest, available: False} ] } app.get(/devices) async def get_available_devices(): 获取可用的移动设备 devices await appium_runner.get_available_devices() return {devices: devices} if __name__ __main__: import uvicorn print( ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ TestMaster Executor Service ║ ║ ║ ║ Server: http://localhost:8002 ║ ║ Docs: http://localhost:8002/docs ║ ║ ║ ║ Runners: ║ ║ - Selenium Grid ║ ║ - Playwright ║ ║ - Appium ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ ) uvicorn.run( main:app, host0.0.0.0, port8002, reloadTrue, log_levelinfo )TestMaster 自动化测试平台 - 第三部分Playwright 执行器实现2.4.2 Playwright 执行器backend/services/executor/src/playwright_runner.py Playwright 测试执行器 支持 Chromium, Firefox, WebKit import logging import asyncio from typing import List, Dict, Any, Optional from datetime import datetime import os import json from pathlib import Path from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Error import aiofiles logger logging.getLogger(__name__) class PlaywrightRunner: Playwright 测试执行器 def __init__(self): self.screenshots_dir Path(./screenshots) self.videos_dir Path(./videos) self.logs_dir Path(./logs) # 创建目录 self.screenshots_dir.mkdir(exist_okTrue) self.videos_dir.mkdir(exist_okTrue) self.logs_dir.mkdir(exist_okTrue) self.default_timeout 30000 # 30秒 self.action_timeout 10000 # 10秒 logger.info(PlaywrightRunner initialized) def is_available(self) - bool: 检查 Playwright 是否可用 try: import playwright return True except ImportError: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str chromium, device: Optional[str] None, headless: bool True, video: bool False ) - List[Dict[str, Any]]: 执行测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型 (chromium, firefox, webkit) device: 设备模拟 (可选) headless: 是否无头模式 video: 是否录制视频 Returns: 步骤执行结果列表 logger.info(fStarting Playwright execution {execution_id} on {browser}) step_results [] playwright None browser_instance None context None page None try: # 启动 Playwright playwright await async_playwright().start() # 选择浏览器 if browser firefox: browser_instance await playwright.firefox.launch(headlessheadless) elif browser webkit: browser_instance await playwright.webkit.launch(headlessheadless) else: # chromium/chrome browser_instance await playwright.chromium.launch( headlessheadless, args[ --disable-blink-featuresAutomationControlled, --disable-dev-shm-usage, --no-sandbox ] ) # 创建上下文 context_options { viewport: {width: 1920, height: 1080}, user_agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, locale: zh-CN, timezone_id: Asia/Shanghai, permissions: [geolocation, notifications], ignore_https_errors: True } # 设备模拟 if device: device_descriptor playwright.devices.get(device) if device_descriptor: context_options.update(device_descriptor) # 视频录制 if video: context_options[record_video_dir] str(self.videos_dir / execution_id) context_options[record_video_size] {width: 1920, height: 1080} context await browser_instance.new_context(**context_options) # 设置默认超时 context.set_default_timeout(self.default_timeout) context.set_default_navigation_timeout(self.default_timeout) # 创建页面 page await context.new_page() # 设置控制台日志监听 page.on(console, lambda msg: logger.info(fConsole: {msg.text})) page.on(pageerror, lambda err: logger.error(fPage error: {err})) # 执行每个步骤 for i, step in enumerate(steps): step_id step.get(id, fstep-{i}) logger.info(fExecuting step {i1}/{len(steps)}: {step[action]}) try: result await self._execute_step( pagepage, stepstep, execution_idexecution_id, step_indexi ) result[step_id] step_id step_results.append(result) # 如果步骤失败停止执行 if result[status] in [failed, error]: logger.warning(fStep {i1} failed, stopping execution) break except Exception as e: logger.error(fStep {i1} error: {str(e)}, exc_infoTrue) # 截图保存错误状态 screenshot_path await self._take_screenshot( page, execution_id, ferror-step-{i} ) step_results.append({ step_id: step_id, status: error, error_message: str(e), screenshot_url: screenshot_path, timestamp: datetime.now().isoformat() }) break # 保存最终截图 if page: await self._take_screenshot(page, execution_id, final) logger.info(fExecution {execution_id} completed with {len(step_results)} steps) return step_results except Exception as e: logger.error(fExecution {execution_id} failed: {str(e)}, exc_infoTrue) raise finally: # 清理资源 if page: await page.close() if context: await context.close() if browser_instance: await browser_instance.close() if playwright: await playwright.stop() async def _execute_step( self, page: Page, step: Dict[str, Any], execution_id: str, step_index: int ) - Dict[str, Any]: 执行单个测试步骤 Args: page: Playwright 页面对象 step: 步骤配置 execution_id: 执行ID step_index: 步骤索引 Returns: 步骤执行结果 action step[action] selector step.get(selector, ) value step.get(value, ) wait_condition step.get(wait_condition, {}) screenshot step.get(screenshot, False) start_time datetime.now() result { action: action, selector: selector, status: passed, timestamp: start_time.isoformat() } try: # 执行前等待 if wait_condition: await self._handle_wait(page, wait_condition) # 执行动作 if action navigate: await page.goto(value, wait_untilnetworkidle, timeoutself.default_timeout) result[url] page.url elif action click: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) await element.click(timeoutself.action_timeout) elif action type or action fill: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) await element.clear() await element.fill(value, timeoutself.action_timeout) elif action select: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) await element.select_option(value, timeoutself.action_timeout) elif action check or action uncheck: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) if action check: await element.check(timeoutself.action_timeout) else: await element.uncheck(timeoutself.action_timeout) elif action hover: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) await element.hover(timeoutself.action_timeout) elif action scroll: if selector: element page.locator(selector) await element.scroll_into_view_if_needed(timeoutself.action_timeout) else: # 滚动到页面底部 await page.evaluate(fwindow.scrollTo(0, {value or document.body.scrollHeight})) elif action wait: wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) if wait_type time: await asyncio.sleep(int(wait_value) / 1000) elif wait_type element: await page.wait_for_selector(selector, timeoutint(wait_value)) elif wait_type url: await page.wait_for_url(value, timeoutint(wait_value)) elif wait_type load: await page.wait_for_load_state(networkidle, timeoutint(wait_value)) elif action assert: await self._handle_assertion(page, selector, value, result) elif action execute_script: script_result await page.evaluate(value) result[script_result] script_result elif action upload: element page.locator(selector) await element.set_input_files(value, timeoutself.action_timeout) elif action press: await page.keyboard.press(value) elif action switch_frame: frame page.frame(selector) if frame: # 在 Playwright 中frame 操作是自动的 result[frame] selector else: raise Exception(fFrame not found: {selector}) elif action switch_window: # 等待新窗口 async with page.expect_popup() as popup_info: page await popup_info.value result[window] page.url elif action accept_alert: page.on(dialog, lambda dialog: dialog.accept()) elif action dismiss_alert: page.on(dialog, lambda dialog: dialog.dismiss()) elif action get_text: element page.locator(selector) text await element.text_content(timeoutself.action_timeout) result[text] text elif action get_attribute: element page.locator(selector) attr_value await element.get_attribute(value, timeoutself.action_timeout) result[attribute_value] attr_value else: raise Exception(fUnknown action: {action}) # 截图 if screenshot or action assert: screenshot_path await self._take_screenshot( page, execution_id, fstep-{step_index} ) result[screenshot_url] screenshot_path # 收集性能指标 if action navigate: metrics await self._collect_performance_metrics(page) result[metrics] metrics # 计算执行时间 end_time datetime.now() result[duration_ms] int((end_time - start_time).total_seconds() * 1000) logger.info(fStep {action} completed successfully in {result[duration_ms]}ms) except Error as e: result[status] failed result[error_message] str(e) logger.error(fStep {action} failed: {str(e)}) # 错误截图 try: screenshot_path await self._take_screenshot( page, execution_id, ferror-step-{step_index} ) result[screenshot_url] screenshot_path except: pass except Exception as e: result[status] error result[error_message] str(e) logger.error(fStep {action} error: {str(e)}, exc_infoTrue) return result async def _handle_wait(self, page: Page, wait_condition: Dict[str, Any]): 处理等待条件 wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) timeout wait_condition.get(timeout, self.action_timeout) if wait_type time: await asyncio.sleep(int(wait_value) / 1000) elif wait_type element: await page.wait_for_selector(str(wait_value), timeouttimeout) elif wait_type url: await page.wait_for_url(str(wait_value), timeouttimeout) elif wait_type load: await page.wait_for_load_state(networkidle, timeouttimeout) elif wait_type custom: # 自定义 JavaScript 条件 await page.wait_for_function(str(wait_value), timeouttimeout) async def _handle_assertion( self, page: Page, selector: str, expected_value: str, result: Dict[str, Any] ): 处理断言 assertion_type, assertion_value expected_value.split(:, 1) if : in expected_value else (visible, expected_value) try: if assertion_type visible: element page.locator(selector) await element.wait_for(statevisible, timeoutself.action_timeout) is_visible await element.is_visible() if not is_visible: raise AssertionError(fElement {selector} is not visible) elif assertion_type hidden: element page.locator(selector) is_hidden await element.is_hidden() if not is_hidden: raise AssertionError(fElement {selector} is not hidden) elif assertion_type text: element page.locator(selector) text await element.text_content(timeoutself.action_timeout) if assertion_value not in text: raise AssertionError(fExpected text {assertion_value} not found in {text}) elif assertion_type value: element page.locator(selector) actual_value await element.input_value(timeoutself.action_timeout) if actual_value ! assertion_value: raise AssertionError(fExpected value {assertion_value}, got {actual_value}) elif assertion_type count: elements page.locator(selector) count await elements.count() expected_count int(assertion_value) if count ! expected_count: raise AssertionError(fExpected {expected_count} elements, found {count}) elif assertion_type url: current_url page.url if assertion_value not in current_url: raise AssertionError(fExpected URL to contain {assertion_value}, got {current_url}) elif assertion_type title: title await page.title() if assertion_value not in title: raise AssertionError(fExpected title to contain {assertion_value}, got {title}) elif assertion_type attribute: attr_name, expected_attr_value assertion_value.split(, 1) element page.locator(selector) actual_attr_value await element.get_attribute(attr_name, timeoutself.action_timeout) if actual_attr_value ! expected_attr_value: raise AssertionError( fExpected attribute {attr_name}{expected_attr_value}, fgot {actual_attr_value} ) result[assertion_passed] True except AssertionError as e: result[status] failed result[error_message] str(e) result[assertion_passed] False raise async def _take_screenshot( self, page: Page, execution_id: str, name: str ) - str: 截图 try: screenshot_path self.screenshots_dir / execution_id / f{name}.png screenshot_path.parent.mkdir(parentsTrue, exist_okTrue) await page.screenshot( pathstr(screenshot_path), full_pageTrue, typepng ) # 返回相对路径 return f/screenshots/{execution_id}/{name}.png except Exception as e: logger.error(fFailed to take screenshot: {str(e)}) return async def _collect_performance_metrics(self, page: Page) - Dict[str, Any]: 收集性能指标 try: # 使用 Performance API 收集指标 metrics await page.evaluate( () { const perfData performance.getEntriesByType(navigation)[0]; const paintData performance.getEntriesByType(paint); return { // 导航时间 dns_lookup: perfData.domainLookupEnd - perfData.domainLookupStart, tcp_connection: perfData.connectEnd - perfData.connectStart, request_time: perfData.responseStart - perfData.requestStart, response_time: perfData.responseEnd - perfData.responseStart, dom_loading: perfData.domInteractive - perfData.domLoading, dom_interactive: perfData.domInteractive - perfData.fetchStart, dom_complete: perfData.domComplete - perfData.fetchStart, load_event: perfData.loadEventEnd - perfData.loadEventStart, // 总时间 total_load_time: perfData.loadEventEnd - perfData.fetchStart, // 绘制时间 first_paint: paintData.find(p p.name first-paint)?.startTime || 0, first_contentful_paint: paintData.find(p p.name first-contentful-paint)?.startTime || 0, // 资源 transfer_size: perfData.transferSize, encoded_body_size: perfData.encodedBodySize, decoded_body_size: perfData.decodedBodySize }; } ) # 获取内存使用如果可用 try: memory await page.evaluate(() performance.memory) metrics[memory] { used_js_heap_size: memory.get(usedJSHeapSize, 0), total_js_heap_size: memory.get(totalJSHeapSize, 0), js_heap_size_limit: memory.get(jsHeapSizeLimit, 0) } except: pass return metrics except Exception as e: logger.error(fFailed to collect performance metrics: {str(e)}) return {} async def get_available_devices(self) - List[Dict[str, str]]: 获取可用的设备模拟列表 async with async_playwright() as p: devices [] # Playwright 内置设备 for device_name in [ iPhone 12, iPhone 12 Pro, iPhone 13, iPhone 13 Pro, iPhone 14, iPhone 14 Pro, Pixel 5, Pixel 6, Galaxy S9, Galaxy S21, iPad Pro, iPad Mini, Desktop Chrome, Desktop Firefox, Desktop Safari ]: if device_name in p.devices: device p.devices[device_name] devices.append({ name: device_name, viewport: f{device[viewport][width]}x{device[viewport][height]}, user_agent: device.get(user_agent, )[:50] ... }) return devices2.4.3 Selenium 执行器backend/services/executor/src/selenium_runner.py Selenium WebDriver 测试执行器 支持 Chrome, Firefox, Edge, Safari import logging from typing import List, Dict, Any, Optional from datetime import datetime import os from pathlib import Path import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.select import Select from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, WebDriverException ) logger logging.getLogger(__name__) class SeleniumRunner: Selenium 测试执行器 def __init__(self): self.screenshots_dir Path(./screenshots) self.logs_dir Path(./logs) # 创建目录 self.screenshots_dir.mkdir(exist_okTrue) self.logs_dir.mkdir(exist_okTrue) self.default_timeout 30 self.action_timeout 10 logger.info(SeleniumRunner initialized) def is_available(self) - bool: 检查 Selenium 是否可用 try: import selenium return True except ImportError: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str chrome, device: Optional[str] None, headless: bool True, video: bool False ) - List[Dict[str, Any]]: 执行测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型 device: 设备模拟 headless: 是否无头模式 video: 是否录制视频 Returns: 步骤执行结果列表 logger.info(fStarting Selenium execution {execution_id} on {browser}) step_results [] driver None try: # 创建 WebDriver driver self._create_driver(browser, headless, device) # 设置隐式等待 driver.implicitly_wait(self.action_timeout) # 执行每个步骤 for i, step in enumerate(steps): step_id step.get(id, fstep-{i}) logger.info(fExecuting step {i1}/{len(steps)}: {step[action]}) try: result self._execute_step( driverdriver, stepstep, execution_idexecution_id, step_indexi ) result[step_id] step_id step_results.append(result) # 如果步骤失败停止执行 if result[status] in [failed, error]: logger.warning(fStep {i1} failed, stopping execution) break except Exception as e: logger.error(fStep {i1} error: {str(e)}, exc_infoTrue) # 截图保存错误状态 screenshot_path self._take_screenshot( driver, execution_id, ferror-step-{i} ) step_results.append({ step_id: step_id, status: error, error_message: str(e), screenshot_url: screenshot_path, timestamp: datetime.now().isoformat() }) break # 保存最终截图 if driver: self._take_screenshot(driver, execution_id, final) logger.info(fExecution {execution_id} completed with {len(step_results)} steps) return step_results except Exception as e: logger.error(fExecution {execution_id} failed: {str(e)}, exc_infoTrue) raise finally: # 清理资源 if driver: driver.quit() def _create_driver( self, browser: str, headless: bool, device: Optional[str] ) - webdriver.Remote: 创建 WebDriver 实例 if browser chrome: options webdriver.ChromeOptions() if headless: options.add_argument(--headlessnew) options.add_argument(--no-sandbox) options.add_argument(--disable-dev-shm-usage) options.add_argument(--disable-blink-featuresAutomationControlled) options.add_experimental_option(excludeSwitches, [enable-automation]) options.add_experimental_option(useAutomationExtension, False) # 设备模拟 if device: mobile_emulation self._get_mobile_emulation(device) if mobile_emulation: options.add_experimental_option(mobileEmulation, mobile_emulation) driver webdriver.Chrome(optionsoptions) elif browser firefox: options webdriver.FirefoxOptions() if headless: options.add_argument(--headless) driver webdriver.Firefox(optionsoptions) elif browser edge: options webdriver.EdgeOptions() if headless: options.add_argument(--headlessnew) driver webdriver.Edge(optionsoptions) else: raise ValueError(fUnsupported browser: {browser}) # 设置窗口大小 if not device: driver.set_window_size(1920, 1080) return driver def _get_mobile_emulation(self, device: str) - Optional[Dict[str, Any]]: 获取移动设备模拟配置 devices { iPhone 12: { deviceMetrics: {width: 390, height: 844, pixelRatio: 3.0}, userAgent: Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) }, iPhone 13: { deviceMetrics: {width: 390, height: 844, pixelRatio: 3.0}, userAgent: Mozilla/5.0 (iPhone; CPU iPhone OS 15_0 like Mac OS X) }, Pixel 5: { deviceMetrics: {width: 393, height: 851, pixelRatio: 2.75}, userAgent: Mozilla/5.0 (Linux; Android 11; Pixel 5) }, iPad Pro: { deviceMetrics: {width: 1024, height: 1366, pixelRatio: 2.0}, userAgent: Mozilla/5.0 (iPad; CPU OS 14_0 like Mac OS X) } } return devices.get(device) def _execute_step( self, driver: webdriver.Remote, step: Dict[str, Any], execution_id: str, step_index: int ) - Dict[str, Any]: 执行单个测试步骤 action step[action] selector step.get(selector, ) value step.get(value, ) wait_condition step.get(wait_condition, {}) screenshot step.get(screenshot, False) start_time datetime.now() result { action: action, selector: selector, status: passed, timestamp: start_time.isoformat() } try: # 执行前等待 if wait_condition: self._handle_wait(driver, wait_condition) # 执行动作 if action navigate: driver.get(value) result[url] driver.current_url elif action click: element self._find_element(driver, selector) element.click() elif action type or action fill: element self._find_element(driver, selector) element.clear() element.send_keys(value) elif action select: element self._find_element(driver, selector) select Select(element) select.select_by_visible_text(value) elif action check: element self._find_element(driver, selector) if not element.is_selected(): element.click() elif action uncheck: element self._find_element(driver, selector) if element.is_selected(): element.click() elif action hover: element self._find_element(driver, selector) ActionChains(driver).move_to_element(element).perform() elif action scroll: if selector: element self._find_element(driver, selector) driver.execute_script(arguments[0].scrollIntoView(true);, element) else: driver.execute_script(fwindow.scrollTo(0, {value or document.body.scrollHeight});) elif action wait: wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) if wait_type time: time.sleep(int(wait_value) / 1000) elif wait_type element: WebDriverWait(driver, int(wait_value) / 1000).until( EC.presence_of_element_located(self._parse_selector(selector)) ) elif action assert: self._handle_assertion(driver, selector, value, result) elif action execute_script: script_result driver.execute_script(value) result[script_result] script_result elif action upload: element self._find_element(driver, selector) element.send_keys(value) elif action press: ActionChains(driver).send_keys(getattr(Keys, value.upper())).perform() elif action switch_frame: driver.switch_to.frame(selector) elif action switch_window: driver.switch_to.window(driver.window_handles[-1]) elif action accept_alert: driver.switch_to.alert.accept() elif action dismiss_alert: driver.switch_to.alert.dismiss() elif action get_text: element self._find_element(driver, selector) result[text] element.text elif action get_attribute: element self._find_element(driver, selector) result[attribute_value] element.get_attribute(value) else: raise Exception(fUnknown action: {action}) # 截图 if screenshot or action assert: screenshot_path self._take_screenshot( driver, execution_id, fstep-{step_index} ) result[screenshot_url] screenshot_path # 计算执行时间 end_time datetime.now() result[duration_ms] int((end_time - start_time).total_seconds() * 1000) logger.info(fStep {action} completed successfully in {result[duration_ms]}ms) except (TimeoutException, NoSuchElementException) as e: result[status] failed result[error_message] str(e) logger.error(fStep {action} failed: {str(e)}) # 错误截图 try: screenshot_path self._take_screenshot( driver, execution_id, ferror-step-{step_index} ) result[screenshot_url] screenshot_path except: pass except Exception as e: result[status] error result[error_message] str(e) logger.error(fStep {action} error: {str(e)}, exc_infoTrue) return result def _find_element(self, driver: webdriver.Remote, selector: str): 查找元素 by, value self._parse_selector(selector) wait WebDriverWait(driver, self.action_timeout) element wait.until(EC.presence_of_element_located((by, value))) return element def _parse_selector(self, selector: str) - tuple: 解析选择器 if selector.startswith(//): return (By.XPATH, selector) elif selector.startswith(#): return (By.ID, selector[1:]) elif selector.startswith(.): return (By.CLASS_NAME, selector[1:]) elif selector.startswith([): return (By.CSS_SELECTOR, selector) else: return (By.CSS_SELECTOR, selector) def _handle_wait(self, driver: webdriver.Remote, wait_condition: Dict[str, Any]): 处理等待条件 wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) timeout wait_condition.get(timeout, self.action_timeout) if wait_type time: time.sleep(int(wait_value) / 1000) elif wait_type element: by, value self._parse_selector(str(wait_value)) WebDriverWait(driver, timeout).until( EC.presence_of_element_located((by, value)) ) elif wait_type url: WebDriverWait(driver, timeout).until( EC.url_contains(str(wait_value)) ) def _handle_assertion( self, driver: webdriver.Remote, selector: str, expected_value: str, result: Dict[str, Any] ): 处理断言 assertion_type, assertion_value expected_value.split(:, 1) if : in expected_value else (visible, expected_value) try: if assertion_type visible: element self._find_element(driver, selector) if not element.is_displayed(): raise AssertionError(fElement {selector} is not visible) elif assertion_type text: element self._find_element(driver, selector) if assertion_value not in element.text: raise AssertionError( fExpected text {assertion_value} not found in {element.text} ) elif assertion_type value: element self._find_element(driver, selector) if element.get_attribute(value) ! assertion_value: raise AssertionError( fExpected value {assertion_value}, fgot {element.get_attribute(value)} ) elif assertion_type url: if assertion_value not in driver.current_url: raise AssertionError( fExpected URL to contain {assertion_value}, fgot {driver.current_url} ) elif assertion_type title: if assertion_value not in driver.title: raise AssertionError( fExpected title to contain {assertion_value}, fgot {driver.title} ) result[assertion_passed] True except AssertionError as e: result[status] failed result[error_message] str(e) result[assertion_passed] False raise def _take_screenshot( self, driver: webdriver.Remote, execution_id: str, name: str ) - str: 截图 try: screenshot_path self.screenshots_dir / execution_id / f{name}.png screenshot_path.parent.mkdir(parentsTrue, exist_okTrue) driver.save_screenshot(str(screenshot_path)) # 返回相对路径 return f/screenshots/{execution_id}/{name}.png except Exception as e: logger.error(fFailed to take screenshot: {str(e)}) return TestMaster 自动化测试平台 - 第四部分Appium 移动端执行器实现2.4.4 Appium 移动端执行器backend/services/executor/src/appium_runner.py Appium 移动端测试执行器 支持 iOS 和 Android 原生应用、混合应用和移动 Web 测试 import logging from typing import List, Dict, Any, Optional from datetime import datetime import os from pathlib import Path import time import asyncio import subprocess import json from appium import webdriver from appium.webdriver.common.appiumby import AppiumBy from appium.options.android import UiAutomator2Options from appium.options.ios import XCUITestOptions from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import ( TimeoutException, NoSuchElementException, WebDriverException ) logger logging.getLogger(__name__) class AppiumRunner: Appium 移动端测试执行器 def __init__(self): self.screenshots_dir Path(./screenshots) self.videos_dir Path(./videos) self.logs_dir Path(./logs) # 创建目录 self.screenshots_dir.mkdir(exist_okTrue) self.videos_dir.mkdir(exist_okTrue) self.logs_dir.mkdir(exist_okTrue) self.default_timeout 30 self.action_timeout 10 # Appium 服务器配置 self.appium_server_url os.getenv(APPIUM_SERVER_URL, http://localhost:4723) logger.info(AppiumRunner initialized) def is_available(self) - bool: 检查 Appium 是否可用 try: import appium # 检查 Appium 服务器是否运行 import requests response requests.get(f{self.appium_server_url}/status, timeout5) return response.status_code 200 except: return False async def execute( self, execution_id: str, steps: List[Dict[str, Any]], browser: str chrome, device: Optional[str] None, headless: bool True, video: bool False ) - List[Dict[str, Any]]: 执行移动端测试步骤 Args: execution_id: 执行ID steps: 测试步骤列表 browser: 浏览器类型移动端忽略 device: 设备配置必需 headless: 是否无头模式移动端忽略 video: 是否录制视频 Returns: 步骤执行结果列表 if not device: raise ValueError(Device configuration is required for mobile testing) logger.info(fStarting Appium execution {execution_id} on device: {device}) step_results [] driver None try: # 解析设备配置 device_config self._parse_device_config(device) # 创建 Appium Driver driver await self._create_driver(device_config, video) # 设置隐式等待 driver.implicitly_wait(self.action_timeout) # 执行每个步骤 for i, step in enumerate(steps): step_id step.get(id, fstep-{i}) logger.info(fExecuting step {i1}/{len(steps)}: {step[action]}) try: result await self._execute_step( driverdriver, stepstep, execution_idexecution_id, step_indexi, device_configdevice_config ) result[step_id] step_id step_results.append(result) # 如果步骤失败停止执行 if result[status] in [failed, error]: logger.warning(fStep {i1} failed, stopping execution) break except Exception as e: logger.error(fStep {i1} error: {str(e)}, exc_infoTrue) # 截图保存错误状态 screenshot_path self._take_screenshot( driver, execution_id, ferror-step-{i} ) step_results.append({ step_id: step_id, status: error, error_message: str(e), screenshot_url: screenshot_path, timestamp: datetime.now().isoformat() }) break # 保存最终截图 if driver: self._take_screenshot(driver, execution_id, final) logger.info(fExecution {execution_id} completed with {len(step_results)} steps) return step_results except Exception as e: logger.error(fExecution {execution_id} failed: {str(e)}, exc_infoTrue) raise finally: # 清理资源 if driver: try: driver.quit() except: pass def _parse_device_config(self, device: str) - Dict[str, Any]: 解析设备配置 支持格式: - android:emulator-5554 - Android 模拟器 - android:real:device-id - Android 真机 - ios:simulator:iPhone 14 - iOS 模拟器 - ios:real:udid - iOS 真机 - JSON 字符串配置 # 尝试解析 JSON try: config json.loads(device) return config except: pass # 解析简单格式 parts device.split(:) if len(parts) 2: raise ValueError(fInvalid device format: {device}) platform parts[0].lower() device_type parts[1].lower() config { platform: platform, deviceType: device_type } if platform android: config[platformName] Android if device_type emulator: config[deviceName] parts[2] if len(parts) 2 else emulator-5554 config[avd] parts[3] if len(parts) 3 else None else: config[deviceName] parts[2] if len(parts) 2 else Android Device config[udid] parts[2] if len(parts) 2 else None elif platform ios: config[platformName] iOS if device_type simulator: config[deviceName] parts[2] if len(parts) 2 else iPhone 14 config[platformVersion] parts[3] if len(parts) 3 else 16.0 else: config[deviceName] iPhone config[udid] parts[2] if len(parts) 2 else None return config async def _create_driver( self, device_config: Dict[str, Any], video: bool ) - webdriver.Remote: 创建 Appium Driver 实例 platform device_config[platform] if platform android: return await self._create_android_driver(device_config, video) elif platform ios: return await self._create_ios_driver(device_config, video) else: raise ValueError(fUnsupported platform: {platform}) async def _create_android_driver( self, device_config: Dict[str, Any], video: bool ) - webdriver.Remote: 创建 Android Driver options UiAutomator2Options() # 基础配置 options.platform_name Android options.automation_name UiAutomator2 options.device_name device_config.get(deviceName, Android Device) # 设备 ID if device_config.get(udid): options.udid device_config[udid] # AVD模拟器 if device_config.get(avd): options.avd device_config[avd] options.avd_launch_timeout 120000 # 应用配置 if device_config.get(app): options.app device_config[app] elif device_config.get(appPackage) and device_config.get(appActivity): options.app_package device_config[appPackage] options.app_activity device_config[appActivity] # 浏览器测试 if device_config.get(browserName): options.browser_name device_config[browserName] # 性能配置 options.new_command_timeout 300 options.no_reset device_config.get(noReset, False) options.full_reset device_config.get(fullReset, False) options.auto_grant_permissions True # 视频录制 if video: options.video_quality medium options.video_fps 30 # 其他配置 options.unicode_keyboard True options.reset_keyboard True options.disable_window_animation True logger.info(fCreating Android driver with options: {options.to_capabilities()}) # 创建 driver driver webdriver.Remote( command_executorself.appium_server_url, optionsoptions ) return driver async def _create_ios_driver( self, device_config: Dict[str, Any], video: bool ) - webdriver.Remote: 创建 iOS Driver options XCUITestOptions() # 基础配置 options.platform_name iOS options.automation_name XCUITest options.device_name device_config.get(deviceName, iPhone) options.platform_version device_config.get(platformVersion, 16.0) # 设备 UDID if device_config.get(udid): options.udid device_config[udid] # 应用配置 if device_config.get(app): options.app device_config[app] elif device_config.get(bundleId): options.bundle_id device_config[bundleId] # 浏览器测试 if device_config.get(browserName): options.browser_name device_config[browserName] # 性能配置 options.new_command_timeout 300 options.no_reset device_config.get(noReset, False) options.full_reset device_config.get(fullReset, False) # WebDriverAgent 配置 options.wda_launch_timeout 120000 options.wda_connection_timeout 60000 # 视频录制 if video: options.video_quality medium options.video_fps 30 logger.info(fCreating iOS driver with options: {options.to_capabilities()}) # 创建 driver driver webdriver.Remote( command_executorself.appium_server_url, optionsoptions ) return driver async def _execute_step( self, driver: webdriver.Remote, step: Dict[str, Any], execution_id: str, step_index: int, device_config: Dict[str, Any] ) - Dict[str, Any]: 执行单个测试步骤 action step[action] selector step.get(selector, ) value step.get(value, ) wait_condition step.get(wait_condition, {}) screenshot step.get(screenshot, False) start_time datetime.now() result { action: action, selector: selector, status: passed, timestamp: start_time.isoformat() } try: # 执行前等待 if wait_condition: self._handle_wait(driver, wait_condition) # 执行动作 if action navigate: # 移动端导航打开 URL 或启动应用 if value.startswith(http): driver.get(value) else: # 启动应用 if device_config[platform] android: driver.start_activity( app_packagevalue.split(/)[0], app_activityvalue.split(/)[1] if / in value else .MainActivity ) else: driver.activate_app(value) elif action click or action tap: element self._find_element(driver, selector, device_config[platform]) element.click() elif action type or action fill: element self._find_element(driver, selector, device_config[platform]) element.clear() element.send_keys(value) elif action swipe: # 滑动操作 # value 格式: direction 或 x1,y1,x2,y2 if value in [up, down, left, right]: self._swipe_direction(driver, value) else: coords [int(c) for c in value.split(,)] driver.swipe(coords[0], coords[1], coords[2], coords[3], 500) elif action scroll: # 滚动到元素 if selector: element self._find_element(driver, selector, device_config[platform]) driver.execute_script(mobile: scroll, {element: element, toVisible: True}) else: # 滚动方向 self._swipe_direction(driver, value or down) elif action long_press: element self._find_element(driver, selector, device_config[platform]) from appium.webdriver.common.touch_action import TouchAction TouchAction(driver).long_press(element).perform() elif action double_tap: element self._find_element(driver, selector, device_config[platform]) from appium.webdriver.common.multi_action import MultiAction from appium.webdriver.common.touch_action import TouchAction action1 TouchAction(driver).tap(element) action2 TouchAction(driver).tap(element) MultiAction(driver).add(action1, action2).perform() elif action pinch or action zoom: # 缩放手势 element self._find_element(driver, selector, device_config[platform]) if action pinch: driver.pinch(elementelement) else: driver.zoom(elementelement) elif action hide_keyboard: if driver.is_keyboard_shown(): driver.hide_keyboard() elif action rotate: # 旋转设备 orientation value.upper() # PORTRAIT, LANDSCAPE driver.orientation orientation elif action shake: # 摇晃设备仅 iOS if device_config[platform] ios: driver.shake() elif action background_app: # 将应用置于后台 seconds int(value) if value else 3 driver.background_app(seconds) elif action install_app: driver.install_app(value) elif action remove_app: driver.remove_app(value) elif action launch_app: driver.launch_app() elif action close_app: driver.close_app() elif action reset_app: driver.reset() elif action wait: wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) if wait_type time: await asyncio.sleep(int(wait_value) / 1000) elif wait_type element: WebDriverWait(driver, int(wait_value) / 1000).until( EC.presence_of_element_located( self._parse_selector(selector, device_config[platform]) ) ) elif action assert: self._handle_assertion(driver, selector, value, result, device_config[platform]) elif action execute_script: script_result driver.execute_script(value) result[script_result] script_result elif action get_text: element self._find_element(driver, selector, device_config[platform]) result[text] element.text elif action get_attribute: element self._find_element(driver, selector, device_config[platform]) result[attribute_value] element.get_attribute(value) elif action switch_context: # 切换上下文Native/WebView if value: driver.switch_to.context(value) else: # 自动切换到 WebView contexts driver.contexts for context in contexts: if WEBVIEW in context: driver.switch_to.context(context) break result[context] driver.current_context elif action get_contexts: result[contexts] driver.contexts elif action set_network: # 设置网络状态仅 Android if device_config[platform] android: # value: wifi, data, airplane, none driver.set_network_connection(self._get_network_type(value)) elif action set_location: # 设置地理位置 # value 格式: latitude,longitude lat, lon value.split(,) driver.set_location(float(lat), float(lon), 0) else: raise Exception(fUnknown action: {action}) # 截图 if screenshot or action assert: screenshot_path self._take_screenshot( driver, execution_id, fstep-{step_index} ) result[screenshot_url] screenshot_path # 计算执行时间 end_time datetime.now() result[duration_ms] int((end_time - start_time).total_seconds() * 1000) logger.info(fStep {action} completed successfully in {result[duration_ms]}ms) except (TimeoutException, NoSuchElementException) as e: result[status] failed result[error_message] str(e) logger.error(fStep {action} failed: {str(e)}) # 错误截图 try: screenshot_path self._take_screenshot( driver, execution_id, ferror-step-{step_index} ) result[screenshot_url] screenshot_path except: pass except Exception as e: result[status] error result[error_message] str(e) logger.error(fStep {action} error: {str(e)}, exc_infoTrue) return result def _find_element( self, driver: webdriver.Remote, selector: str, platform: str ): 查找元素 by, value self._parse_selector(selector, platform) wait WebDriverWait(driver, self.action_timeout) element wait.until(EC.presence_of_element_located((by, value))) return element def _parse_selector(self, selector: str, platform: str) - tuple: 解析选择器 支持的选择器类型: - id:element_id - 通过 ID - xpath://path - 通过 XPath - class:class_name - 通过 Class Name - text:visible_text - 通过文本 - accessibility:accessibility_id - 通过 Accessibility ID - android:uiautomator - Android UiAutomator (仅 Android) - ios:predicate - iOS Predicate (仅 iOS) - ios:chain - iOS Class Chain (仅 iOS) if : in selector: selector_type, selector_value selector.split(:, 1) if selector_type id: return (AppiumBy.ID, selector_value) elif selector_type xpath: return (AppiumBy.XPATH, selector_value) elif selector_type class: return (AppiumBy.CLASS_NAME, selector_value) elif selector_type text: if platform android: return (AppiumBy.ANDROID_UIAUTOMATOR, fnew UiSelector().text({selector_value})) else: return (AppiumBy.IOS_PREDICATE, flabel {selector_value}) elif selector_type accessibility: return (AppiumBy.ACCESSIBILITY_ID, selector_value) elif selector_type android and platform android: return (AppiumBy.ANDROID_UIAUTOMATOR, selector_value) elif selector_type ios and platform ios: return (AppiumBy.IOS_PREDICATE, selector_value) elif selector_type chain and platform ios: return (AppiumBy.IOS_CLASS_CHAIN, selector_value) # 默认使用 XPath if selector.startswith(//): return (AppiumBy.XPATH, selector) # 默认使用 ID return (AppiumBy.ID, selector) def _swipe_direction(self, driver: webdriver.Remote, direction: str): 按方向滑动 size driver.get_window_size() width size[width] height size[height] # 计算滑动坐标从屏幕中心开始 start_x width // 2 start_y height // 2 if direction up: end_x start_x end_y height // 4 elif direction down: end_x start_x end_y height * 3 // 4 elif direction left: end_x width // 4 end_y start_y elif direction right: end_x width * 3 // 4 end_y start_y else: raise ValueError(fInvalid swipe direction: {direction}) driver.swipe(start_x, start_y, end_x, end_y, 500) def _get_network_type(self, network: str) - int: 获取网络类型常量 network_types { none: 0, airplane: 1, wifi: 2, data: 4, all: 6 } return network_types.get(network.lower(), 6) def _handle_wait(self, driver: webdriver.Remote, wait_condition: Dict[str, Any]): 处理等待条件 wait_type wait_condition.get(type, time) wait_value wait_condition.get(value, 1000) timeout wait_condition.get(timeout, self.action_timeout) if wait_type time: time.sleep(int(wait_value) / 1000) elif wait_type element: platform driver.capabilities.get(platformName, ).lower() by, value self._parse_selector(str(wait_value), platform) WebDriverWait(driver, timeout).until( EC.presence_of_element_located((by, value)) ) def _handle_assertion( self, driver: webdriver.Remote, selector: str, expected_value: str, result: Dict[str, Any], platform: str ): 处理断言 assertion_type, assertion_value expected_value.split(:, 1) if : in expected_value else (visible, expected_value) try: if assertion_type visible: element self._find_element(driver, selector, platform) if not element.is_displayed(): raise AssertionError(fElement {selector} is not visible) elif assertion_type text: element self._find_element(driver, selector, platform) if assertion_value not in element.text: raise AssertionError( fExpected text {assertion_value} not found in {element.text} ) elif assertion_type enabled: element self._find_element(driver, selector, platform) if not element.is_enabled(): raise AssertionError(fElement {selector} is not enabled) elif assertion_type selected: element self._find_element(driver, selector, platform) if not element.is_selected(): raise AssertionError(fElement {selector} is not selected) elif assertion_type count: by, value self._parse_selector(selector, platform) elements driver.find_elements(by, value) expected_count int(assertion_value) if len(elements) ! expected_count: raise AssertionError( fExpected {expected_count} elements, found {len(elements)} ) result[assertion_passed] True except AssertionError as e: result[status] failed result[error_message] str(e) result[assertion_passed] False raise def _take_screenshot( self, driver: webdriver.Remote, execution_id: str, name: str ) - str: 截图 try: screenshot_path self.screenshots_dir / execution_id / f{name}.png screenshot_path.parent.mkdir(parentsTrue, exist_okTrue) driver.save_screenshot(str(screenshot_path)) # 返回相对路径 return f/screenshots/{execution_id}/{name}.png except Exception as e: logger.error(fFailed to take screenshot: {str(e)}) return async def get_available_devices(self) - List[Dict[str, Any]]: 获取可用的移动设备列表 devices [] # 获取 Android 设备 try: android_devices await self._get_android_devices() devices.extend(android_devices) except Exception as e: logger.error(fFailed to get Android devices: {str(e)}) # 获取 iOS 设备 try: ios_devices await self._get_ios_devices() devices.extend(ios_devices) except Exception as e: logger.error(fFailed to get iOS devices: {str(e)}) return devices async def _get_android_devices(self) - List[Dict[str, Any]]: 获取 Android 设备列表 devices [] try: # 使用 adb 获取设备列表 result subprocess.run( [adb, devices, -l], capture_outputTrue, textTrue, timeout10 ) lines result.stdout.strip().split(\n)[1:] # 跳过标题行 for line in lines: if line.strip(): parts line.split() device_id parts[0] # 获取设备详细信息 device_info await self._get_android_device_info(device_id) devices.append({ id: device_id, name: device_info.get(model, Unknown Android Device), platform: android, type: emulator if emulator in device_id else real, os_version: device_info.get(version, Unknown), manufacturer: device_info.get(manufacturer, Unknown), config: fandroid:{emulator if emulator in device_id else real}:{device_id} }) except FileNotFoundError: logger.warning(adb not found, skipping Android devices) except Exception as e: logger.error(fError getting Android devices: {str(e)}) return devices async def _get_android_device_info(self, device_id: str) - Dict[str, str]: 获取 Android 设备详细信息 info {} try: # 获取设备型号 result subprocess.run( [adb, -s, device_id, shell, getprop, ro.product.model], capture_outputTrue, textTrue, timeout5 ) info[model] result.stdout.strip() # 获取 Android 版本 result subprocess.run( [adb, -s, device_id, shell, getprop, ro.build.version.release], capture_outputTrue, textTrue, timeout5 ) info[version] result.stdout.strip() # 获取制造商 result subprocess.run( [adb, -s, device_id, shell, getprop, ro.product.manufacturer], capture_outputTrue, textTrue, timeout5 ) info[manufacturer] result.stdout.strip() except Exception as e: logger.error(fError getting device info for {device_id}: {str(e)}) return info async def _get_ios_devices(self) - List[Dict[str, Any]]: 获取 iOS 设备列表 devices [] try: # 使用 xcrun simctl 获取模拟器列表 result subprocess.run( [xcrun, simctl, list, devices, available, --json], capture_outputTrue, textTrue, timeout10 ) data json.loads(result.stdout) for runtime, device_list in data.get(devices, {}).items(): for device in device_list: if device.get(isAvailable, False): devices.append({ id: device[udid], name: device[name], platform: ios, type: simulator, os_version: runtime.split(.)[-1], state: device.get(state, Unknown), config: fios:simulator:{device[name]} }) # 获取真机列表 result subprocess.run( [idevice_id, -l], capture_outputTrue, textTrue, timeout10 ) for udid in result.stdout.strip().split(\n): if udid: # 获取设备名称 name_result subprocess.run( [ideviceinfo, -u, udid, -k, DeviceName], capture_outputTrue, textTrue, timeout5 ) devices.append({ id: udid, name: name_result.stdout.strip() or iPhone, platform: ios, type: real, os_version: Unknown, config: fios:real:{udid} }) except FileNotFoundError: logger.warning(iOS tools not found, skipping iOS devices) except Exception as e: logger.error(fError getting iOS devices: {str(e)}) return devices2.4.5 移动端测试辅助工具backend/services/executor/src/mobile_utils.py 移动端测试辅助工具 提供常用的移动端测试功能 import logging from typing import Dict, Any, Optional, List from appium import webdriver from appium.webdriver.common.appiumby import AppiumBy import time logger logging.getLogger(__name__) class MobileTestUtils: 移动端测试工具类 staticmethod def wait_for_element( driver: webdriver.Remote, selector: str, platform: str, timeout: int 10 ): 等待元素出现 from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC by, value MobileTestUtils._parse_selector(selector, platform) wait WebDriverWait(driver, timeout) return wait.until(EC.presence_of_element_located((by, value))) staticmethod def scroll_to_element( driver: webdriver.Remote, selector: str, platform: str, max_scrolls: int 10 ): 滚动到元素可见 for i in range(max_scrolls): try: element driver.find_element( *MobileTestUtils._parse_selector(selector, platform) ) if element.is_displayed(): return element except: pass # 向下滑动 size driver.get_window_size() driver.swipe( size[width] // 2, size[height] * 3 // 4, size[width] // 2, size[height] // 4, 500 ) time.sleep(0.5) raise Exception(fElement {selector} not found after {max_scrolls} scrolls) staticmethod def get_toast_message(driver: webdriver.Remote, timeout: int 5) - Optional[str]: 获取 Toast 消息仅 Android try: from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC toast_element WebDriverWait(driver, timeout).until( EC.presence_of_element_located(( AppiumBy.XPATH, //android.widget.Toast )) ) return toast_element.text except: return None staticmethod def switch_to_webview(driver: webdriver.Remote) - bool: 切换到 WebView 上下文 try: contexts driver.contexts for context in contexts: if WEBVIEW in context: driver.switch_to.context(context) logger.info(fSwitched to context: {context}) return True return False except Exception as e: logger.error(fFailed to switch to WebView: {str(e)}) return False staticmethod def switch_to_native(driver: webdriver.Remote) - bool: 切换到 Native 上下文 try: driver.switch_to.context(NATIVE_APP) logger.info(Switched to NATIVE_APP context) return True except Exception as e: logger.error(fFailed to switch to Native: {str(e)}) return False staticmethod def take_element_screenshot( driver: webdriver.Remote, selector: str, platform: str, filepath: str ): 截取元素截图 element driver.find_element( *MobileTestUtils._parse_selector(selector, platform) ) element.screenshot(filepath) staticmethod def get_device_info(driver: webdriver.Remote) - Dict[str, Any]: 获取设备信息 capabilities driver.capabilities return { platform: capabilities.get(platformName), platform_version: capabilities.get(platformVersion), device_name: capabilities.get(deviceName), device_udid: capabilities.get(udid), automation_name: capabilities.get(automationName), app: capabilities.get(app), browser: capabilities.get(browserName) } staticmethod def set_clipboard(driver: webdriver.Remote, text: str): 设置剪贴板内容 import base64 encoded_text base64.b64encode(text.encode()).decode() driver.set_clipboard(encoded_text, plaintext) staticmethod def get_clipboard(driver: webdriver.Remote) - str: 获取剪贴板内容 import base64 encoded_text driver.get_clipboard(plaintext) return base64.b64decode(encoded_text).decode() staticmethod def _parse_selector(selector: str, platform: str) - tuple: 解析选择器 if : in selector: selector_type, selector_value selector.split(:, 1) if selector_type id: return (AppiumBy.ID, selector_value) elif selector_type xpath: return (AppiumBy.XPATH, selector_value) elif selector_type class: return (AppiumBy.CLASS_NAME, selector_value) elif selector_type accessibility: return (AppiumBy.ACCESSIBILITY_ID, selector_value) elif selector_type android and platform android: return (AppiumBy.ANDROID_UIAUTOMATOR, selector_value) elif selector_type ios and platform ios: return (AppiumBy.IOS_PREDICATE, selector_value) if selector.startswith(//): return (AppiumBy.XPATH, selector) return (AppiumBy.ID, selector) class AndroidUtils: Android 专用工具 staticmethod def press_back(driver: webdriver.Remote): 按返回键 driver.press_keycode(4) staticmethod def press_home(driver: webdriver.Remote): 按 Home 键 driver.press_keycode(3) staticmethod def press_menu(driver: webdriver.Remote): 按菜单键 driver.press_keycode(82) staticmethod def open_notifications(driver: webdriver.Remote): 打开通知栏 driver.open_notifications() staticmethod def get_network_connection(driver: webdriver.Remote) - int: 获取网络连接状态 return driver.network_connection staticmethod def set_network_connection(driver: webdriver.Remote, connection_type: int): 设置网络连接状态 driver.set_network_connection(connection_type) staticmethod def start_activity( driver: webdriver.Remote, app_package: str, app_activity: str, **kwargs ): 启动 Activity driver.start_activity(app_package, app_activity, **kwargs) staticmethod def get_current_activity(driver: webdriver.Remote) - str: 获取当前 Activity return driver.current_activity staticmethod def get_current_package(driver: webdriver.Remote) - str: 获取当前包名 return driver.current_package class IOSUtils: iOS 专用工具 staticmethod def shake(driver: webdriver.Remote): 摇晃设备 driver.shake() staticmethod def lock(driver: webdriver.Remote, seconds: int 0): 锁定设备 driver.lock(seconds) staticmethod def unlock(driver: webdriver.Remote): 解锁设备 driver.unlock() staticmethod def is_locked(driver: webdriver.Remote) - bool: 检查设备是否锁定 return driver.is_locked() staticmethod def touch_id(driver: webdriver.Remote, match: bool True): 模拟 Touch ID driver.touch_id(match) staticmethod def toggle_touch_id_enrollment(driver: webdriver.Remote, enabled: bool True): 切换 Touch ID 注册状态 driver.toggle_touch_id_enrollment(enabled)TestMaster 自动化测试平台 - 第五部分性能测试模块实现2.5 性能测试模块2.5.1 性能测试服务主文件backend/services/performance/src/main.py TestMaster 性能测试服务 支持 HTTP/HTTPS 压力测试、负载测试、并发测试 集成 Locust 和 JMeter from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from typing import List, Dict, Any, Optional import logging import asyncio from datetime import datetime import json import uuid from locust_runner import LocustRunner from jmeter_runner import JMeterRunner from performance_analyzer import PerformanceAnalyzer from websocket_manager import WebSocketManager # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s ) logger logging.getLogger(__name__) # 创建 FastAPI 应用 app FastAPI( titleTestMaster Performance Testing, descriptionPerformance testing service with Locust and JMeter integration, version1.0.0 ) # CORS 配置 app.add_middleware( CORSMiddleware, allow_origins[*], allow_credentialsTrue, allow_methods[*], allow_headers[*], ) # 请求模型 class PerformanceTestRequest(BaseModel): test_id: str name: str target_url: HttpUrl test_type: str # load, stress, spike, endurance, scalability duration: int # 测试持续时间秒 users: int # 并发用户数 spawn_rate: int # 每秒启动的用户数 scenarios: List[Dict[str, Any]] runner: str locust # locust 或 jmeter options: Optional[Dict[str, Any]] {} class LoadPattern(BaseModel): pattern_type: str # constant, ramp_up, step, spike min_users: int max_users: int duration: int steps: Optional[int] None class PerformanceTestResponse(BaseModel): test_id: str status: str started_at: datetime completed_at: Optional[datetime] None duration_seconds: Optional[int] None metrics: Optional[Dict[str, Any]] None report_url: Optional[str] None # 全局实例 locust_runner LocustRunner() jmeter_runner JMeterRunner() performance_analyzer PerformanceAnalyzer() ws_manager WebSocketManager() # 存储运行中的测试 running_tests: Dict[str, Dict[str, Any]] {} # API 端点 app.get(/health) async def health_check(): 健康检查 return { status: healthy, service: performance-testing, version: 1.0.0, runners: { locust: locust_runner.is_available(), jmeter: jmeter_runner.is_available() } } app.post(/tests/start, response_modelPerformanceTestResponse) async def start_performance_test( request: PerformanceTestRequest, background_tasks: BackgroundTasks ): 启动性能测试 Args: request: 性能测试请求 background_tasks: 后台任务 Returns: 测试响应 logger.info(fStarting performance test: {request.name}) # 选择执行器 if request.runner jmeter: runner jmeter_runner else: runner locust_runner # 创建测试配置 test_config { test_id: request.test_id, name: request.name, target_url: str(request.target_url), test_type: request.test_type, duration: request.duration, users: request.users, spawn_rate: request.spawn_rate, scenarios: request.scenarios, options: request.options, started_at: datetime.now() } # 保存到运行中的测试 running_tests[request.test_id] { config: test_config, status: starting, runner: request.runner } # 后台执行测试 background_tasks.add_task( run_performance_test, test_idrequest.test_id, runnerrunner, configtest_config ) return PerformanceTestResponse( test_idrequest.test_id, statusstarting, started_attest_config[started_at] ) app.get(/tests/{test_id}/status) async def get_test_status(test_id: str): 获取测试状态 if test_id not in running_tests: raise HTTPException(status_code404, detailTest not found) test_info running_tests[test_id] return { test_id: test_id, status: test_info[status], config: test_info[config], metrics: test_info.get(metrics), started_at: test_info[config][started_at].isoformat(), completed_at: test_info.get(completed_at).isoformat() if test_info.get(completed_at) else None } app.post(/tests/{test_id}/stop) async def stop_test(test_id: str): 停止测试 if test_id not in running_tests: raise HTTPException(status_code404, detailTest not found) test_info running_tests[test_id] runner_type test_info[runner] if runner_type jmeter: await jmeter_runner.stop_test(test_id) else: await locust_runner.stop_test(test_id) test_info[status] stopped test_info[completed_at] datetime.now() return {message: Test stopped successfully} app.get(/tests/{test_id}/metrics) async def get_test_metrics(test_id: str): 获取测试实时指标 if test_id not in running_tests: raise HTTPException(status_code404, detailTest not found) test_info running_tests[test_id] runner_type test_info[runner] if runner_type jmeter: metrics await jmeter_runner.get_metrics(test_id) else: metrics await locust_runner.get_metrics(test_id) return metrics app.get(/tests/{test_id}/report) async def get_test_report(test_id: str): 获取测试报告 if test_id not in running_tests: raise HTTPException(status_code404, detailTest not found) test_info running_tests[test_id] # 生成详细报告 report await performance_analyzer.generate_report( test_idtest_id, test_infotest_info ) return report app.websocket(/ws/tests/{test_id}) async def websocket_endpoint(websocket: WebSocket, test_id: str): WebSocket 实时推送测试指标 await ws_manager.connect(websocket, test_id) try: while True: # 接收客户端消息保持连接 data await websocket.receive_text() # 可以处理客户端发送的控制命令 if data ping: await websocket.send_text(pong) except WebSocketDisconnect: ws_manager.disconnect(websocket, test_id) app.get(/templates) async def get_test_templates(): 获取性能测试模板 return { templates: [ { id: api_load_test, name: API 负载测试, description: 测试 API 在正常负载下的性能, test_type: load, default_users: 100, default_duration: 300, scenarios: [ { name: GET Request, method: GET, path: /api/endpoint, weight: 70 }, { name: POST Request, method: POST, path: /api/endpoint, body: {key: value}, weight: 30 } ] }, { id: stress_test, name: 压力测试, description: 测试系统在极限负载下的表现, test_type: stress, default_users: 1000, default_duration: 600, load_pattern: { pattern_type: ramp_up, min_users: 100, max_users: 1000, duration: 600 } }, { id: spike_test, name: 尖峰测试, description: 测试系统应对突发流量的能力, test_type: spike, default_users: 500, default_duration: 300, load_pattern: { pattern_type: spike, min_users: 10, max_users: 500, duration: 300 } }, { id: endurance_test, name: 耐久测试, description: 测试系统长时间运行的稳定性, test_type: endurance, default_users: 200, default_duration: 7200, load_pattern: { pattern_type: constant, min_users: 200, max_users: 200, duration: 7200 } } ] } async def run_performance_test( test_id: str, runner: Any, config: Dict[str, Any] ): 后台执行性能测试 try: logger.info(fRunning performance test {test_id}) # 更新状态 running_tests[test_id][status] running # 执行测试 result await runner.run_test(config) # 更新结果 running_tests[test_id][status] completed running_tests[test_id][completed_at] datetime.now() running_tests[test_id][metrics] result[metrics] running_tests[test_id][report_url] result.get(report_url) # 分析结果 analysis await performance_analyzer.analyze_results(result) running_tests[test_id][analysis] analysis # 通过 WebSocket 推送完成通知 await ws_manager.broadcast( test_id, { type: test_completed, test_id: test_id, status: completed, metrics: result[metrics] } ) logger.info(fPerformance test {test_id} completed successfully) except Exception as e: logger.error(fPerformance test {test_id} failed: {str(e)}, exc_infoTrue) running_tests[test_id][status] failed running_tests[test_id][completed_at] datetime.now() running_tests[test_id][error] str(e) # 推送错误通知 await ws_manager.broadcast( test_id, { type: test_failed, test_id: test_id, error: str(e) } ) if __name__ __main__: import uvicorn print( ╔════════════════════════════════════════════════════════════════╗ ║ ║ ║ TestMaster Performance Testing Service ║ ║ ║ ║ Server: http://localhost:8003 ║ ║ Docs: http://localhost:8003/docs ║ ║ WebSocket: ws://localhost:8003/ws/tests/{test_id} ║ ║ ║ ║ Runners: ║ ║ - Locust (Python-based) ║ ║ - JMeter (Java-based) ║ ║ ║ ╚════════════════════════════════════════════════════════════════╝ ) uvicorn.run( main:app, host0.0.0.0, port8003, reloadTrue, log_levelinfo )2.5.2 Locust 执行器backend/services/performance/src/locust_runner.py Locust 性能测试执行器 基于 Python 的分布式负载测试工具 import logging from typing import Dict, Any, List, Optional import asyncio import subprocess import os from pathlib import Path import json import tempfile import signal import psutil logger logging.getLogger(__name__) class LocustRunner: Locust 测试执行器 def __init__(self): self.results_dir Path(./performance_results/locust) self.results_dir.mkdir(parentsTrue, exist_okTrue) self.running_processes: Dict[str, subprocess.Popen] {} logger.info(LocustRunner initialized) def is_available(self) - bool: 检查 Locust 是否可用 try: result subprocess.run( [locust, --version], capture_outputTrue, textTrue, timeout5 ) return result.returncode 0 except: return False async def run_test(self, config: Dict[str, Any]) - Dict[str, Any]: 运行 Locust 测试 Args: config: 测试配置 Returns: 测试结果 test_id config[test_id] logger.info(fStarting Locust test {test_id}) # 生成 Locust 文件 locustfile_path await self._generate_locustfile(config) # 准备输出文件 results_file self.results_dir / f{test_id}_results.json html_report self.results_dir / f{test_id}_report.html csv_prefix self.results_dir / f{test_id} # 构建 Locust 命令 cmd [ locust, -f, str(locustfile_path), --headless, --users, str(config[users]), --spawn-rate, str(config[spawn_rate]), --run-time, f{config[duration]}s, --host, config[target_url], --json, --html, str(html_report), --csv, str(csv_prefix), --loglevel, INFO ] # 添加额外选项 if config.get(options, {}).get(stop_on_error): cmd.append(--stop-on-error) if config.get(options, {}).get(expect_workers): cmd.extend([--expect-workers, str(config[options][expect_workers])]) logger.info(fLocust command: { .join(cmd)}) # 启动 Locust 进程 process subprocess.Popen( cmd, stdoutsubprocess.PIPE, stderrsubprocess.PIPE, textTrue ) self.running_processes[test_id] process # 等待测试完成 try: stdout, stderr await asyncio.wait_for( asyncio.create_subprocess_exec( *cmd, stdoutasyncio.subprocess.PIPE, stderrasyncio.subprocess.PIPE ).communicate(), timeoutconfig[duration] 60 ) # 解析结果 metrics await self._parse_results(test_id, csv_prefix) return { test_id: test_id, status: completed, metrics: metrics, report_url: f/performance_results/locust/{test_id}_report.html } except asyncio.TimeoutError: logger.error(fLocust test {test_id} timed out) await self.stop_test(test_id) raise except Exception as e: logger.error(fLocust test {test_id} failed: {str(e)}, exc_infoTrue) raise finally: if test_id in self.running_processes: del self.running_processes[test_id] async def _generate_locustfile(self, config: Dict[str, Any]) - Path: 生成 Locustfile test_id config[test_id] scenarios config[scenarios] # 构建 Locustfile 内容 locustfile_content f from locust import HttpUser, task, between, constant import json class PerformanceTestUser(HttpUser): wait_time between(1, 3) host {config[target_url]} # 添加场景任务 for i, scenario in enumerate(scenarios): method scenario.get(method, GET).upper() path scenario.get(path, /) weight scenario.get(weight, 1) headers scenario.get(headers, {}) body scenario.get(body) task_name scenario.get(name, ftask_{i}).replace( , _) locustfile_content f task({weight}) def {task_name}(self): headers {json.dumps(headers)} if method GET: locustfile_content f with self.client.get({path}, headersheaders, catch_responseTrue) as response: if response.status_code ! 200: response.failure(fGot status code {{response.status_code}}) elif method POST: body_str json.dumps(body) if body else {} locustfile_content f data {body_str} with self.client.post({path}, jsondata, headersheaders, catch_responseTrue) as response: if response.status_code not in [200, 201]: response.failure(fGot status code {{response.status_code}}) elif method PUT: body_str json.dumps(body) if body else {} locustfile_content f data {body_str} with self.client.put({path}, jsondata, headersheaders, catch_responseTrue) as response: if response.status_code ! 200: response.failure(fGot status code {{response.status_code}}) elif method DELETE: locustfile_content f with self.client.delete({path}, headersheaders, catch_responseTrue) as response: if response.status_code not in [200, 204]: response.failure(fGot status code {{response.status_code}}) # 保存 Locustfile locustfile_path self.results_dir / f{test_id}_locustfile.py with open(locustfile_path, w) as f: f.write(locustfile_content) logger.info(fGenerated Locustfile: {locustfile_path}) return locustfile_path async def _parse_results(self, test_id: str, csv_prefix: Path) - Dict[str, Any]: 解析 Locust 结果 metrics { requests: {}, failures: [], summary: {} } # 解析请求统计 stats_file Path(f{csv_prefix}_stats.csv) if stats_file.exists(): import csv with open(stats_file, r) as f: reader csv.DictReader(f) rows list(reader) for row in rows: if row[Type] Aggregated: metrics[summary] { total_requests: int(row[Request Count]), total_failures: int(row[Failure Count]), median_response_time: float(row[Median Response Time]), average_response_time: float(row[Average Response Time]), min_response_time: float(row[Min Response Time]), max_response_time: float(row[Max Response Time]), requests_per_second: float(row[Requests/s]), failures_per_second: float(row[Failures/s]) } else: metrics[requests][row[Name]] { method: row[Type], count: int(row[Request Count]), failures: int(row[Failure Count]), median_response_time: float(row[Median Response Time]), average_response_time: float(row[Average Response Time]), min_response_time: float(row[Min Response Time]), max_response_time: float(row[Max Response Time]), requests_per_second: float(row[Requests/s]) } # 解析失败记录 failures_file Path(f{csv_prefix}_failures.csv) if failures_file.exists(): import csv with open(failures_file, r) as f: reader csv.DictReader(f) for row in reader: metrics[failures].append({ method: row[Method], name: row[Name], error: row[Error], occurrences: int(row[Occurrences]) }) return metrics async def stop_test(self, test_id: str): 停止测试 if test_id in self.running_processes: process self.running_processes[test_id] try: # 发送 SIGTERM 信号 process.terminate() # 等待进程结束 try: process.wait(timeout10) except subprocess.TimeoutExpired: # 强制杀死进程 process.kill() process.wait() logger.info(fStopped Locust test {test_id}) except Exception as e: logger.error(fError stopping test {test_id}: {str(e)}) finally: del self.running_processes[test_id] async def get_metrics(self, test_id: str) - Dict[str, Any]: 获取实时指标 # Locust 提供了 Web UI 和 REST API # 这里可以通过 HTTP 请求获取实时指标 try: import aiohttp async with aiohttp.ClientSession() as session: async with session.get(fhttp://localhost:8089/stats/requests) as response: if response.status 200: data await response.json() return data except Exception as e: logger.error(fError getting metrics: {str(e)}) return {}2.5.3 JMeter 执行器backend/services/performance/src/jmeter_runner.py JMeter 性能测试执行器 基于 Java 的成熟负载测试工具 import logging from typing import Dict, Any, List, Optional import asyncio import subprocess import os from pathlib import Path import json import xml.etree.ElementTree as ET from xml.dom import minidom import shutil logger logging.getLogger(__name__) class JMeterRunner: JMeter 测试执行器 def __init__(self): self.results_dir Path(./performance_results/jmeter) self.results_dir.mkdir(parentsTrue, exist_okTrue) self.jmeter_home os.getenv(JMETER_HOME, /opt/apache-jmeter) self.jmeter_bin Path(self.jmeter_home) / bin / jmeter self.running_processes: Dict[str, subprocess.Popen] {} logger.info(JMeterRunner initialized) def is_available(self) - bool: 检查 JMeter 是否可用 try: if not self.jmeter_bin.exists(): return False result subprocess.run( [str(self.jmeter_bin), --version], capture_outputTrue, textTrue, timeout5 ) return result.returncode 0 except: return False async def run_test(self, config: Dict[str, Any]) - Dict[str, Any]: 运行 JMeter 测试 Args: config: 测试配置 Returns: 测试结果 test_id config[test_id] logger.info(fStarting JMeter test {test_id}) # 生成 JMX 文件 jmx_path await self._generate_jmx(config) # 准备输出文件 results_file self.results_dir / f{test_id}_results.jtl html_report_dir self.results_dir / f{test_id}_report log_file self.results_dir / f{test_id}.log # 构建 JMeter 命令 cmd [ str(self.jmeter_bin), -n, # 非 GUI 模式 -t, str(jmx_path), # 测试计划文件 -l, str(results_file), # 结果文件 -j, str(log_file), # 日志文件 -e, # 生成报告 -o, str(html_report_dir) # 报告输出目录 ] # 添加系统属性 if config.get(options, {}).get(java_opts): java_opts config[options][java_opts] for key, value in java_opts.items(): cmd.extend([-J, f{key}{value}]) logger.info(fJMeter command: { .join(cmd)}) # 启动 JMeter 进程 process subprocess.Popen( cmd, stdoutsubprocess.PIPE, stderrsubprocess.PIPE, textTrue ) self.running_processes[test_id] process # 等待测试完成 try: stdout, stderr process.communicate(timeoutconfig[duration] 120) if process.returncode ! 0: logger.error(fJMeter test failed: {stderr}) raise Exception(fJMeter test failed: {stderr}) # 解析结果 metrics await self._parse_results(results_file) return { test_id: test_id, status: completed, metrics: metrics, report_url: f/performance_results/jmeter/{test_id}_report/index.html } except subprocess.TimeoutExpired: logger.error(fJMeter test {test_id} timed out) await self.stop_test(test_id) raise except Exception as e: logger.error(fJMeter test {test_id} failed: {str(e)}, exc_infoTrue) raise finally: if test_id in self.running_processes: del self.running_processes[test_id] async def _generate_jmx(self, config: Dict[str, Any]) - Path: 生成 JMX 测试计划文件 test_id config[test_id] # 创建根元素 root ET.Element(jmeterTestPlan, { version: 1.2, properties: 5.0, jmeter: 5.5 }) # 添加哈希树 hash_tree ET.SubElement(root, hashTree) # 测试计划 test_plan ET.SubElement(hash_tree, TestPlan, { guiclass: TestPlanGui, testclass: TestPlan, testname: config[name], enabled: true }) # 测试计划属性 ET.SubElement(test_plan, stringProp, {name: TestPlan.comments}).text ET.SubElement(test_plan, boolProp, {name: TestPlan.functional_mode}).text false ET.SubElement(test_plan, boolProp, {name: TestPlan.serialize_threadgroups}).text false # 用户定义变量 arguments ET.SubElement(test_plan, elementProp, { name: TestPlan.user_defined_variables, elementType: Arguments, guiclass: ArgumentsPanel, testclass: Arguments, enabled: true }) ET.SubElement(arguments, collectionProp, {name: Arguments.arguments}) test_plan_tree ET.SubElement(hash_tree, hashTree) # 线程组 thread_group ET.SubElement(test_plan_tree, ThreadGroup, { guiclass: ThreadGroupGui, testclass: ThreadGroup, testname: Thread Group, enabled: true }) ET.SubElement(thread_group, stringProp, {name: ThreadGroup.on_sample_error}).text continue # 线程属性 thread_props ET.SubElement(thread_group, elementProp, { name: ThreadGroup.main_controller, elementType: LoopController, guiclass: LoopControlPanel, testclass: LoopController, enabled: true }) ET.SubElement(thread_props, boolProp, {name: LoopController.continue_forever}).text false ET.SubElement(thread_props, intProp, {name: LoopController.loops}).text -1 ET.SubElement(thread_group, stringProp, {name: ThreadGroup.num_threads}).text str(config[users]) ET.SubElement(thread_group, stringProp, {name: ThreadGroup.ramp_time}).text str(config[users] // config[spawn_rate]) ET.SubElement(thread_group, boolProp, {name: ThreadGroup.scheduler}).text true ET.SubElement(thread_group, stringProp, {name: ThreadGroup.duration}).text str(config[duration]) ET.SubElement(thread_group, stringProp, {name: ThreadGroup.delay}).text 0 thread_group_tree ET.SubElement(test_plan_tree, hashTree) # 添加场景HTTP 采样器 for scenario in config[scenarios]: sampler self._create_http_sampler(scenario, config[target_url]) thread_group_tree.append(sampler) thread_group_tree.append(ET.Element(hashTree)) # 添加监听器 # 结果树 result_tree ET.SubElement(thread_group_tree, ResultCollector, { guiclass: ViewResultsFullVisualizer, testclass: ResultCollector, testname: View Results Tree, enabled: true }) ET.SubElement(result_tree, boolProp, {name: ResultCollector.error_logging}).text false thread_group_tree.append(ET.Element(hashTree)) # 聚合报告 aggregate ET.SubElement(thread_group_tree, ResultCollector, { guiclass: StatVisualizer, testclass: ResultCollector, testname: Aggregate Report, enabled: true }) ET.SubElement(aggregate, boolProp, {name: ResultCollector.error_logging}).text false thread_group_tree.append(ET.Element(hashTree)) # 格式化 XML xml_str minidom.parseString(ET.tostring(root)).toprettyxml(indent ) # 保存 JMX 文件 jmx_path self.results_dir / f{test_id}.jmx with open(jmx_path, w) as f: f.write(xml_str) logger.info(fGenerated JMX file: {jmx_path}) return jmx_path def _create_http_sampler(self, scenario: Dict[str, Any], base_url: str) - ET.Element: 创建 HTTP 采样器 from urllib.parse import urlparse parsed_url urlparse(base_url) sampler ET.Element(HTTPSamplerProxy, { guiclass: HttpTestSampleGui, testclass: HTTPSamplerProxy, testname: scenario.get(name, HTTP Request), enabled: true }) # 基本属性 ET.SubElement(sampler, stringProp, {name: HTTPSampler.domain}).text parsed_url.netloc ET.SubElement(sampler, stringProp, {name: HTTPSampler.port}).text str(parsed_url.port or (443 if parsed_url.scheme https else 80)) ET.SubElement(sampler, stringProp, {name: HTTPSampler.protocol}).text parsed_url.scheme ET.SubElement(sampler, stringProp, {name: HTTPSampler.path}).text scenario.get(path, /) ET.SubElement(sampler, stringProp, {name: HTTPSampler.method}).text scenario.get(method, GET) # 请求体 if scenario.get(body): body_data json.dumps(scenario[body]) if isinstance(scenario[body], dict) else scenario[body] ET.SubElement(sampler, boolProp, {name: HTTPSampler.postBodyRaw}).text true arguments ET.SubElement(sampler, elementProp, { name: HTTPsampler.Arguments, elementType: Arguments }) collection ET.SubElement(arguments, collectionProp, {name: Arguments.arguments}) argument ET.SubElement(collection, elementProp, { name: , elementType: HTTPArgument }) ET.SubElement(argument, boolProp, {name: HTTPArgument.always_encode}).text false ET.SubElement(argument, stringProp, {name: Argument.value}).text body_data ET.SubElement(argument, stringProp, {name: Argument.metadata}).text # 请求头 if scenario.get(headers): header_manager ET.SubElement(sampler, HeaderManager, { guiclass: HeaderPanel, testclass: HeaderManager, testname: HTTP Header Manager, enabled: true }) collection ET.SubElement(header_manager, collectionProp, {name: HeaderManager.headers}) for key, value in scenario[headers].items(): header ET.SubElement(collection, elementProp, { name: , elementType: Header }) ET.SubElement(header, stringProp, {name: Header.name}).text key ET.SubElement(header, stringProp, {name: Header.value}).text value return sampler async def _parse_results(self, results_file: Path) - Dict[str, Any]: 解析 JMeter 结果文件 (JTL) metrics { requests: {}, summary: { total_requests: 0, total_failures: 0, total_bytes: 0, total_time: 0 } } if not results_file.exists(): return metrics try: # 解析 XML 格式的 JTL 文件 tree ET.parse(results_file) root tree.getroot() response_times [] for sample in root.findall(.//httpSample) or root.findall(.//sample): label sample.get(lb, Unknown) success sample.get(s, true) true time_ms int(sample.get(t, 0)) bytes_received int(sample.get(by, 0)) response_times.append(time_ms) if label not in metrics[requests]: metrics[requests][label] { count: 0, failures: 0, total_time: 0, total_bytes: 0, response_times: [] } metrics[requests][label][count] 1 metrics[requests][label][total_time] time_ms metrics[requests][label][total_bytes] bytes_received metrics[requests][label][response_times].append(time_ms) if not success: metrics[requests][label][failures] 1 metrics[summary][total_requests] 1 metrics[summary][total_time] time_ms metrics[summary][total_bytes] bytes_received if not success: metrics[summary][total_failures] 1 # 计算统计指标 for label, data in metrics[requests].items(): times sorted(data[response_times]) count len(times) if count 0: data[min_response_time] times[0] data[max_response_time] times[-1] data[average_response_time] sum(times) / count data[median_response_time] times[count // 2] data[percentile_90] times[int(count * 0.9)] data[percentile_95] times[int(count * 0.95)] data[percentile_99] times[int(count * 0.99)] # 删除原始响应时间列表太大 del data[response_times] # 计算总体统计 if response_times: times sorted(response_times) count len(times) metrics[summary][min_response_time] times[0] metrics[summary][max_response_time] times[-1] metrics[summary][average_response_time] sum(times) / count metrics[summary][median_response_time] times[count // 2] metrics[summary][percentile_90] times[int(count * 0.9)] metrics[summary][percentile_95] times[int(count * 0.95)] metrics[summary][percentile_99] times[int(count * 0.99)] except Exception as e: logger.error(fError parsing JMeter results: {str(e)}, exc_infoTrue) return metrics async def stop_test(self, test_id: str): 停止测试 if test_id in self.running_processes: process self.running_processes[test_id] try: # JMeter 使用 shutdown.sh 脚本优雅停止 shutdown_script Path(self.jmeter_home) / bin / shutdown.sh if shutdown_script.exists(): subprocess.run([str(shutdown_script)], timeout10) else: process.terminate() # 等待进程结束 try: process.wait(timeout30) except subprocess.TimeoutExpired: process.kill() process.wait() logger.info(fStopped JMeter test {test_id}) except Exception as e: logger.error(fError stopping test {test_id}: {str(e)}) finally: del self.running_processes[test_id] async def get_metrics(self, test_id: str) - Dict[str, Any]: 获取实时指标 # JMeter 在运行时不提供实时指标 API # 可以通过解析日志文件或使用 Backend Listener 插件 return {}继续下一部分...2.5.4 性能分析器backend/services/performance/src/performance_analyzer.py 性能测试结果分析器 提供智能分析和建议 import logging from typing import Dict, Any, List import statistics from datetime import datetime logger logging.getLogger(__name__) class PerformanceAnalyzer: 性能分析器 def __init__(self): # 性能阈值配置 self.thresholds { response_time: { excellent: 100, # ms good: 500, acceptable: 1000, poor: 3000 }, error_rate: { excellent: 0.1, # % good: 1.0, acceptable: 5.0, poor: 10.0 }, throughput: { excellent: 1000, # requests/s good: 500, acceptable: 100, poor: 50 } } async def analyze_results(self, result: Dict[str, Any]) - Dict[str, Any]: 分析性能测试结果 Args: result: 测试结果 Returns: 分析报告 metrics result.get(metrics, {}) summary metrics.get(summary, {}) analysis { overall_score: 0, performance_grade: N/A, bottlenecks: [], recommendations: [], strengths: [], issues: [] } # 分析响应时间 response_time_analysis self._analyze_response_time(summary) analysis[response_time] response_time_analysis # 分析错误率 error_rate_analysis self._analyze_error_rate(summary) analysis[error_rate] error_rate_analysis # 分析吞吐量 throughput_analysis self._analyze_throughput(summary) analysis[throughput] throughput_analysis # 识别瓶颈 bottlenecks self._identify_bottlenecks(metrics) analysis[bottlenecks] bottlenecks # 生成建议 recommendations self._generate_recommendations(analysis) analysis[recommendations] recommendations # 计算总体评分 overall_score self._calculate_overall_score(analysis) analysis[overall_score] overall_score analysis[performance_grade] self._get_performance_grade(overall_score) return analysis def _analyze_response_time(self, summary: Dict[str, Any]) - Dict[str, Any]: 分析响应时间 avg_response_time summary.get(average_response_time, 0) median_response_time summary.get(median_response_time, 0) percentile_95 summary.get(percentile_95, 0) percentile_99 summary.get(percentile_99, 0) analysis { average: avg_response_time, median: median_response_time, p95: percentile_95, p99: percentile_99, rating: N/A, score: 0 } # 评级基于 P95 if percentile_95 self.thresholds[response_time][excellent]: analysis[rating] Excellent analysis[score] 100 elif percentile_95 self.thresholds[response_time][good]: analysis[rating] Good analysis[score] 80 elif percentile_95 self.thresholds[response_time][acceptable]: analysis[rating] Acceptable analysis[score] 60 elif percentile_95 self.thresholds[response_time][poor]: analysis[rating] Poor analysis[score] 40 else: analysis[rating] Very Poor analysis[score] 20 return analysis def _analyze_error_rate(self, summary: Dict[str, Any]) - Dict[str, Any]: 分析错误率 total_requests summary.get(total_requests, 0) total_failures summary.get(total_failures, 0) error_rate (total_failures / total_requests * 100) if total_requests 0 else 0 analysis { total_requests: total_requests, total_failures: total_failures, error_rate_percent: round(error_rate, 2), rating: N/A, score: 0 } # 评级 if error_rate self.thresholds[error_rate][excellent]: analysis[rating] Excellent analysis[score] 100 elif error_rate self.thresholds[error_rate][good]: analysis[rating] Good analysis[score] 80 elif error_rate self.thresholds[error_rate][acceptable]: analysis[rating] Acceptable analysis[score] 60 elif error_rate self.thresholds[error_rate][poor]: analysis[rating] Poor analysis[score] 40 else: analysis[rating] Very Poor analysis[score] 20 return analysis def _analyze_throughput(self, summary: Dict[str, Any]) - Dict[str, Any]: 分析吞吐量 requests_per_second summary.get(requests_per_second, 0) analysis { requests_per_second: round(requests_per_second, 2), rating: N/A, score: 0 } # 评级 if requests_per_second self.thresholds[throughput][excellent]: analysis[rating] Excellent analysis[score] 100 elif requests_per_second self.thresholds[throughput][good]: analysis[rating] Good analysis[score] 80 elif requests_per_second self.thresholds[throughput][acceptable]: analysis[rating] Acceptable analysis[score] 60 elif requests_per_second self.thresholds[throughput][poor]: analysis[rating] Poor analysis[score] 40 else: analysis[rating] Very Poor analysis[score] 20 return analysis def _identify_bottlenecks(self, metrics: Dict[str, Any]) - List[Dict[str, Any]]: 识别性能瓶颈 bottlenecks [] requests metrics.get(requests, {}) # 找出最慢的请求 slowest_requests sorted( requests.items(), keylambda x: x[1].get(average_response_time, 0), reverseTrue )[:5] for name, data in slowest_requests: avg_time data.get(average_response_time, 0) if avg_time self.thresholds[response_time][acceptable]: bottlenecks.append({ type: slow_request, name: name, average_response_time: avg_time, severity: high if avg_time self.thresholds[response_time][poor] else medium, description: fRequest {name} has high average response time: {avg_time}ms }) # 找出错误率最高的请求 error_requests [ (name, data) for name, data in requests.items() if data.get(failures, 0) 0 ] error_requests sorted( error_requests, keylambda x: x[1].get(failures, 0) / x[1].get(count, 1), reverseTrue )[:5] for name, data in error_requests: error_rate (data.get(failures, 0) / data.get(count, 1)) * 100 if error_rate self.thresholds[error_rate][acceptable]: bottlenecks.append({ type: high_error_rate, name: name, error_rate: round(error_rate, 2), severity: high if error_rate self.thresholds[error_rate][poor] else medium, description: fRequest {name} has high error rate: {error_rate}% }) return bottlenecks def _generate_recommendations(self, analysis: Dict[str, Any]) - List[str]: 生成优化建议 recommendations [] # 响应时间建议 response_time analysis.get(response_time, {}) if response_time.get(score, 0) 60: recommendations.append( ⚠️ 响应时间较慢建议\n • 优化数据库查询添加索引\n • 启用缓存机制Redis/Memcached\n • 使用 CDN 加速静态资源\n • 优化代码逻辑减少不必要的计算 ) # 错误率建议 error_rate analysis.get(error_rate, {}) if error_rate.get(score, 0) 60: recommendations.append( ⚠️ 错误率较高建议\n • 检查应用日志定位错误原因\n • 增加错误处理和重试机制\n • 优化资源配置CPU/内存\n • 检查第三方服务依赖 ) # 吞吐量建议 throughput analysis.get(throughput, {}) if throughput.get(score, 0) 60: recommendations.append( ⚠️ 吞吐量较低建议\n • 增加服务器实例水平扩展\n • 优化应用性能垂直扩展\n • 使用负载均衡\n • 启用异步处理 ) # 瓶颈建议 bottlenecks analysis.get(bottlenecks, []) if bottlenecks: slow_requests [b for b in bottlenecks if b[type] slow_request] if slow_requests: recommendations.append( f⚠️ 发现 {len(slow_requests)} 个慢请求建议优先优化\n \n.join([f • {b[name]} ({b[average_response_time]}ms) for b in slow_requests[:3]]) ) error_requests [b for b in bottlenecks if b[type] high_error_rate] if error_requests: recommendations.append( f⚠️ 发现 {len(error_requests)} 个高错误率请求建议优先修复\n \n.join([f • {b[name]} ({b[error_rate]}%) for b in error_requests[:3]]) ) # 如果性能良好给出肯定 if not recommendations: recommendations.append( ✅ 性能表现良好继续保持\n • 定期进行性能测试\n • 监控生产环境指标\n • 持续优化代码质量 ) return recommendations def _calculate_overall_score(self, analysis: Dict[str, Any]) - int: 计算总体评分 scores [] if response_time in analysis: scores.append(analysis[response_time].get(score, 0)) if error_rate in analysis: scores.append(analysis[error_rate].get(score, 0)) if throughput in analysis: scores.append(analysis[throughput].get(score, 0)) if scores: return int(sum(scores) / len(scores)) return 0 def _get_performance_grade(self, score: int) - str: 获取性能等级 if score 90: return A elif score 80: return A elif score 70: return B elif score 60: return B elif score 50: return C else: return D async def generate_report( self, test_id: str, test_info: Dict[str, Any] ) - Dict[str, Any]: 生成完整的性能测试报告 config test_info.get(config, {}) metrics test_info.get(metrics, {}) analysis test_info.get(analysis, {}) report { test_id: test_id, test_name: config.get(name, Unknown), test_type: config.get(test_type, load), target_url: config.get(target_url, ), configuration: { users: config.get(users, 0), spawn_rate: config.get(spawn_rate, 0), duration: config.get(duration, 0) }, execution: { started_at: config.get(started_at).isoformat() if config.get(started_at) else None, completed_at: test_info.get(completed_at).isoformat() if test_info.get(completed_at) else None, status: test_info.get(status, unknown) }, metrics: metrics, analysis: analysis, generated_at: datetime.now().isoformat() } return report2.5.5 WebSocket 管理器backend/services/performance/src/websocket_manager.py WebSocket 连接管理器 用于实时推送性能测试指标 import logging from typing import Dict, List from fastapi import WebSocket import json import asyncio logger logging.getLogger(__name__) class WebSocketManager: WebSocket 连接管理器 def __init__(self): # 存储活动连接 {test_id: [websocket1, websocket2, ...]} self.active_connections: Dict[str, List[WebSocket]] {} async def connect(self, websocket: WebSocket, test_id: str): 接受 WebSocket 连接 await websocket.accept() if test_id not in self.active_connections: self.active_connections[test_id] [] self.active_connections[test_id].append(websocket) logger.info(fWebSocket connected for test {test_id}. Total connections: {len(self.active_connections[test_id])}) # 发送欢迎消息 await websocket.send_json({ type: connected, test_id: test_id, message: Connected to performance test stream }) def disconnect(self, websocket: WebSocket, test_id: str): 断开 WebSocket 连接 if test_id in self.active_connections: if websocket in self.active_connections[test_id]: self.active_connections[test_id].remove(websocket) logger.info(fWebSocket disconnected for test {test_id}. Remaining connections: {len(self.active_connections[test_id])}) # 如果没有连接了删除键 if not self.active_connections[test_id]: del self.active_connections[test_id] async def broadcast(self, test_id: str, message: Dict): 广播消息到所有连接 if test_id not in self.active_connections: return # 移除断开的连接 disconnected [] for websocket in self.active_connections[test_id]: try: await websocket.send_json(message) except Exception as e: logger.error(fError sending message: {str(e)}) disconnected.append(websocket) # 清理断开的连接 for websocket in disconnected: self.disconnect(websocket, test_id) async def send_metrics_update( self, test_id: str, metrics: Dict ): 发送指标更新 await self.broadcast(test_id, { type: metrics_update, test_id: test_id, metrics: metrics, timestamp: asyncio.get_event_loop().time() })后续请阅读DREAMVFIA Test Master 自动化测试平台 - 完整开源项目完整数据代码包 ——部分二