郑州网站开发建设,wordpress企业网站实例,网站改版 总结,域名代备案7.3 综合实战#xff1a;基于MCP实现的金融投资Agent在本节的实例中#xff0c; 实现了一个基于模型上下文协议#xff08;MCP#xff09;的服务器#xff0c;旨在为大型语言模型提供全面的金融洞察与分析支持。本实例整合了实时市场数据、基本面及技术面分析#xff0c…7.3 综合实战基于MCP实现的金融投资Agent在本节的实例中 实现了一个基于模型上下文协议MCP的服务器旨在为大型语言模型提供全面的金融洞察与分析支持。本实例整合了实时市场数据、基本面及技术面分析能为用户提供多维度的金融信息涵盖市场动态、个股分析、期权数据、历史数据等多个关键领域。实例7-1基于MCP实现的金融投资Agent源码路径codes\7\investor-agent7.3.1 项目介绍本实例旨在构建一个基于模型上下文协议MCP的智能投资代理系统通过集成多种先进的技术为投资分析和决策提供强大的支持。1. 核心功能技术分析本实例集成了著名的TA-Lib技术分析库提供了150多种常见的技术指标计算函数如移动平均线MA、相对强弱指数RSI、布林带Bollinger Bands等。这使得用户可以直接在模型中使用这些经过验证的技术指标无需自行实现复杂的金融公式。多模态数据解析采用改进的Transformer架构结合跨模态注意力机制实现财报、研报、电话会议录音的语义对齐。此外通过对抗生成网络GAN创建带噪声的财报样本提升模型在模糊表格、缺失数据场景下的鲁棒性。联邦学习与合规架构整合了多个数据源构建了54维动态风险矩阵覆盖财务异常、供应链风险等。同时采用同态加密技术确保在极端市场环境下的高预警准确率满足监管审计要求。动态计算引擎基于PyTorch动态计算图支持Heston模型波动率曲面参数的毫秒级调整并在NVIDIA H100 GPU集群上实现每秒3万次蒙特卡洛模拟大幅提高了计算效率。2. 技术架构动态计算引擎通过PyTorch和YahooQL实现深度学习与金融时序数据的深度融合支持实时参数优化和时序数据加速。多模态数据解析结合Transformer架构和GAN技术处理财报、研报等多模态数据确保数据的一致性和鲁棒性。联邦学习与合规架构整合多个数据源构建动态风险矩阵并采用同态加密技术确保数据安全和合规性。7.3.2 获取恐惧与贪婪指数数据文件sentiment.py定义了一个异步函数fetch_fng_data用于从CNN网站获取原始的Fear Greed恐惧与贪婪指数数据。该函数首先设置了请求头包括用户代理、接受的内容类型和引用页以模拟浏览器请求。然后使用hishel库进行缓存安装以避免频繁获取相同的数据。通过httpx.AsyncClient发起异步HTTP GET请求从指定的CNN API端点获取数据。如果请求成功将返回解析后的JSON数据如果请求失败则会抛出异常。import logging import httpx import hishel logger logging.getLogger(__name__) async def fetch_fng_data() - dict | None: 从CNN获取原始的恐惧与贪婪指数数据。 headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: application/json, text/plain, */*, Referer: https://www.cnn.com/markets/fear-and-greed, } # 使用hishel缓存以避免过于频繁地重复获取相同数据 hishel.install_cache() async with httpx.AsyncClient() as client: response await client.get( https://production.dataviz.cnn.io/index/fearandgreed/graphdata, headersheaders ) response.raise_for_status() return response.json()7.3.3 金融数据分析文件yfinance_utils.py为使用yfinance库进行金融数据分析提供了一组函数包括获取股票信息、日历事件、分析师数据、新闻、价格历史、财务报表、机构持股、收益历史、内部交易、期权链和过滤后的期权数据等。另外还定义了一个装饰器retry_on_rate_limit用于在遇到速率限制错误时重试函数调用并采用指数退避策略。此外还使用了ThreadPoolExecutor来并行获取期权数据以提高效率。def retry_on_rate_limit(max_retries: int 3, base_delay: float 5.0, success_delay: float 1.5): 装饰器用于在遇到速率限制错误时通过指数退避策略重试函数调用。 基于2025年yfinance的实操经验 - 雅虎财经已大幅收紧速率限制 - 推荐的延迟5秒、15秒、45秒以获得更好的成功率 - 用户反馈较短的延迟1-2秒通常不够 - 在成功调用后添加延迟有助于防止速率限制 def decorator(func): wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: result func(*args, **kwargs) # 在成功请求后添加一个小延迟以防止速率限制 if success_delay 0: time.sleep(success_delay) return result except (YFRateLimitError, Exception) as e: if isinstance(e, YFRateLimitError) or rate limit in str(e).lower() or too many requests in str(e).lower(): if attempt max_retries - 1: # 使用2025年yfinance社区推荐的更长延迟 wait_time base_delay * (3 ** attempt) # 5秒、15秒、45秒的进程 logger.warning(f在尝试 {attempt 1} 时遇到速率限制等待 {wait_time} 秒后重试) time.sleep(wait_time) continue else: logger.error(f达到速率限制的最大重试次数{max_retries}) raise else: # 对于非速率限制错误不重试 raise return None return wrapper return decorator retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_ticker_info(ticker: str) - dict | None: return yf.Ticker(ticker).get_info() retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_calendar(ticker: str) - dict | None: 获取包括收益和股息日期在内的日历事件。 return yf.Ticker(ticker).get_calendar() retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_analyst_data(ticker: str, data_type: Literal[recommendations, upgrades], limit: int 5) - pd.DataFrame | None: 获取分析师推荐或升级/降级数据。 t yf.Ticker(ticker) if data_type recommendations: df t.get_recommendations() else: # upgrades df t.get_upgrades_downgrades() if df is not None: df df.sort_index(ascendingFalse) return df.head(limit) if df is not None else None def get_news(ticker: str, limit: int 10) - list[dict] | None: 以 [日期,标题,来源,网址] 字典的形式返回最新新闻。 try: items yf.Ticker(ticker).get_news()[:limit] if not items: return None out: list[dict] [] for it in items: c it.get(content, {}) raw_date c.get(pubDate) or c.get(displayTime) or try: date datetime.fromisoformat(raw_date.replace(Z, )).strftime(%Y-%m-%d) except Exception: date raw_date[:10] if raw_date else N/A out.append({ date: date, title: c.get(title) or 无标题, source: c.get(provider, {}).get(displayName, 未知), url: c.get(canonicalUrl, {}).get(url) or c.get(clickThroughUrl, {}).get(url) }) return out except Exception: return None retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_price_history( ticker: str, period: Literal[1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max] 1mo, interval: Literal[1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo] 1d ) - pd.DataFrame | None: return yf.Ticker(ticker).history(periodperiod, intervalinterval) retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_financial_statements( ticker: str, statement_type: Literal[income, balance, cash] income, frequency: Literal[quarterly, annual] quarterly ) - pd.DataFrame | None: t yf.Ticker(ticker) statements { income: {annual: t.income_stmt, quarterly: t.quarterly_income_stmt}, balance: {annual: t.balance_sheet, quarterly: t.quarterly_balance_sheet}, cash: {annual: t.cashflow, quarterly: t.quarterly_cashflow} } return statements[statement_type][frequency] retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_institutional_holders(ticker: str, top_n: int 20) - tuple[pd.DataFrame | None, pd.DataFrame | None]: t yf.Ticker(ticker) inst t.get_institutional_holders() fund t.get_mutualfund_holders() return (inst.head(top_n) if inst is not None else None, fund.head(top_n) if fund is not None else None) retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_earnings_history(ticker: str, limit: int 12) - pd.DataFrame | None: 获取原始收益历史数据。 默认限制为12显示3年的季度收益。 df yf.Ticker(ticker).get_earnings_history() return df.head(limit) if df is not None else None retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_insider_trades(ticker: str, limit: int 30) - pd.DataFrame | None: df yf.Ticker(ticker).get_insider_transactions() return df.head(limit) if df is not None else None retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_options_chain( ticker: str, expiry: str | None None, option_type: Literal[C, P] | None None ) - tuple[pd.DataFrame | None, str | None]: 获取特定到期日的原始期权链数据的辅助函数。 参数: ticker: 股票代码 expiry: 到期日期 option_type: C 表示认购期权P 表示认沽期权None 表示两者都要 try: if not expiry: return None, 未提供到期日期 chain yf.Ticker(ticker).option_chain(expiry) if option_type C: return chain.calls, None elif option_type P: return chain.puts, None return pd.concat([chain.calls, chain.puts]), None except Exception as e: return None, str(e) retry_on_rate_limit(max_retries3, base_delay5.0, success_delay1.5) def get_filtered_options( ticker: str, start_date: str | None None, end_date: str | None None, strike_lower: float | None None, strike_upper: float | None None, option_type: Literal[C, P] | None None, ) - tuple[pd.DataFrame | None, str | None]: 高效获取过滤后的期权数据。 try: # 在处理前验证日期格式 if start_date: try: datetime.strptime(start_date, %Y-%m-%d) except ValueError: return None, f无效的 start_date 格式。请使用 YYYY-MM-DD if end_date: try: datetime.strptime(end_date, %Y-%m-%d) except ValueError: return None, f无效的 end_date 格式。请使用 YYYY-MM-DD t yf.Ticker(ticker) expirations t.options if not expirations: return None, f{ticker} 没有可用的期权 # 一次性将日期字符串转换为 datetime 对象 start_date_obj datetime.strptime(start_date, %Y-%m-%d).date() if start_date else None end_date_obj datetime.strptime(end_date, %Y-%m-%d).date() if end_date else None # 在进行 API 调用前过滤到期日期 valid_expirations [] for exp in expirations: exp_date datetime.strptime(exp, %Y-%m-%d).date() if ((not start_date_obj or exp_date start_date_obj) and (not end_date_obj or exp_date end_date_obj)): valid_expirations.append(exp) if not valid_expirations: return None, f在指定日期范围内未找到 {ticker} 的期权 # 并行获取期权使用 ThreadPoolExecutor filtered_option_chains [] with ThreadPoolExecutor() as executor: options_results list(executor.map( lambda exp: get_options_chain(ticker, exp, option_type), valid_expirations )) for (chain, error), expiry in zip(options_results, valid_expirations): if error: continue if chain is not None: filtered_option_chains.append(chain.assign(expiryDateexpiry)) if not filtered_option_chains: return None, f未找到符合 {ticker} 条件的期权 df pd.concat(filtered_option_chains, ignore_indexTrue) # 应用行权价过滤 if strike_lower is not None or strike_upper is not None: mask pd.Series(True, indexdf.index) if strike_lower is not None: mask df[strike] strike_lower if strike_upper is not None: mask df[strike] strike_upper df df[mask] return df.sort_values([openInterest, volume], ascending[False, False]), None except Exception as e: return None, f获取期权数据失败{str(e)}