php 网站开发框架,网站建设开发步骤,郑州网站制作免费,网站建设这个工作怎么样引言#xff1a;设计素材采集的挑战与机遇在数字化设计时代#xff0c;高质量的设计素材是设计师创作的基石。然而#xff0c;面对众多设计素材网站#xff0c;手动下载效率低下且难以批量获取。Python爬虫技术为我们提供了自动化采集的解决方案。本文将深入探讨如何使用最…引言设计素材采集的挑战与机遇在数字化设计时代高质量的设计素材是设计师创作的基石。然而面对众多设计素材网站手动下载效率低下且难以批量获取。Python爬虫技术为我们提供了自动化采集的解决方案。本文将深入探讨如何使用最新的Python异步爬虫技术高效、合规地采集设计素材网站。技术栈概览Python 3.8核心编程语言aiohttp异步HTTP客户端/服务器框架asyncioPython原生异步I/O框架BeautifulSoup4HTML解析库aiofiles异步文件操作Redis分布式缓存和任务队列可选代理IP池应对反爬机制项目结构设计textdesign-material-crawler/ ├── src/ │ ├── crawler.py # 主爬虫逻辑 │ ├── parser.py # 页面解析器 │ ├── storage.py # 数据存储模块 │ ├── proxy_manager.py # 代理管理器 │ └── utils.py # 工具函数 ├── config/ │ └── settings.py # 配置文件 ├── data/ # 采集的数据 ├── logs/ # 日志文件 └── requirements.txt # 依赖列表完整代码实现1. 环境配置与依赖安装requirements.txt:txtaiohttp3.9.1 beautifulsoup44.12.2 aiofiles23.2.1 redis5.0.1 asyncio-throttle1.0.2 fake-useragent1.4.0 python-dotenv1.0.0 pillow10.1.0 opencv-python4.9.0.802. 配置文件config/settings.py:pythonimport os from dotenv import load_dotenv load_dotenv() class Config: # 爬虫配置 MAX_CONCURRENT_REQUESTS 10 REQUEST_TIMEOUT 30 RETRY_ATTEMPTS 3 DELAY_BETWEEN_REQUESTS 1.0 # 目标网站配置 TARGET_SITES { unsplash: { base_url: https://unsplash.com, search_url: https://unsplash.com/napi/search/photos, per_page: 20 }, pexels: { base_url: https://www.pexels.com, search_url: https://www.pexels.com/api/v3/search, api_key: os.getenv(PEXELS_API_KEY, ) } } # 存储配置 STORAGE_PATH ./data IMAGE_FORMATS [jpg, png, webp, svg] MAX_FILE_SIZE 50 * 1024 * 1024 # 50MB # 代理配置 USE_PROXY True PROXY_POOL_URL os.getenv(PROXY_POOL_URL, ) # Redis配置 REDIS_URL os.getenv(REDIS_URL, redis://localhost:6379/0) # 请求头配置 HEADERS { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: text/html,application/xhtmlxml,application/xml;q0.9,*/*;q0.8, Accept-Language: en-US,en;q0.5, Accept-Encoding: gzip, deflate, DNT: 1, Connection: keep-alive, Upgrade-Insecure-Requests: 1 }3. 主爬虫类实现src/crawler.py:pythonimport asyncio import aiohttp import aiofiles import logging from typing import List, Dict, Any, Optional from pathlib import Path from urllib.parse import urljoin, urlparse import hashlib import json from datetime import datetime from config.settings import Config from src.proxy_manager import ProxyManager from src.parser import Parser from src.storage import StorageManager from src.utils import rate_limiter, retry_handler, generate_file_hash logger logging.getLogger(__name__) class DesignMaterialCrawler: 设计素材网站异步爬虫 def __init__(self, config: Config): self.config config self.session: Optional[aiohttp.ClientSession] None self.proxy_manager ProxyManager(config) if config.USE_PROXY else None self.parser Parser() self.storage StorageManager(config) self.semaphore asyncio.Semaphore(config.MAX_CONCURRENT_REQUESTS) self.visited_urls set() async def __aenter__(self): 异步上下文管理器入口 timeout aiohttp.ClientTimeout(totalself.config.REQUEST_TIMEOUT) connector aiohttp.TCPConnector(limit100, force_closeTrue) self.session aiohttp.ClientSession( timeouttimeout, connectorconnector, headersself.config.HEADERS ) await self.storage.initialize() return self async def __aexit__(self, exc_type, exc_val, exc_tb): 异步上下文管理器出口 if self.session: await self.session.close() await self.storage.close() retry_handler(max_retries3, delay1.0) rate_limiter(max_calls10, period1.0) async def fetch_page(self, url: str, params: Dict None) - str: 获取页面内容 if url in self.visited_urls: return proxy None if self.proxy_manager: proxy await self.proxy_manager.get_proxy() try: async with self.semaphore: async with self.session.get( url, paramsparams, proxyproxy, sslFalse ) as response: response.raise_for_status() content await response.text() self.visited_urls.add(url) # 记录成功请求 logger.info(f成功获取页面: {url}, 状态码: {response.status}) return content except aiohttp.ClientError as e: logger.error(f请求失败 {url}: {str(e)}) # 标记代理失效 if proxy and self.proxy_manager: await self.proxy_manager.mark_proxy_failed(proxy) raise except asyncio.TimeoutError: logger.error(f请求超时: {url}) raise async def download_file(self, url: str, filepath: Path) - bool: 下载文件到本地 if filepath.exists(): logger.info(f文件已存在: {filepath}) return True try: async with self.session.get(url) as response: if response.status 200: # 检查文件类型和大小 content_type response.headers.get(Content-Type, ) content_length int(response.headers.get(Content-Length, 0)) if not self._is_valid_file(content_type, content_length): logger.warning(f无效文件类型或大小: {url}) return False # 异步写入文件 async with aiofiles.open(filepath, wb) as f: async for chunk in response.content.iter_chunked(8192): await f.write(chunk) logger.info(f文件下载成功: {filepath}) return True else: logger.error(f下载失败 {url}: 状态码 {response.status}) return False except Exception as e: logger.error(f下载文件异常 {url}: {str(e)}) return False def _is_valid_file(self, content_type: str, content_length: int) - bool: 验证文件类型和大小 # 检查文件大小 if content_length self.config.MAX_FILE_SIZE: return False # 检查文件类型 valid_types [fimage/{fmt} for fmt in self.config.IMAGE_FORMATS] valid_types.extend([application/zip, application/x-rar-compressed]) return any(valid_type in content_type for valid_type in valid_types) async def crawl_site(self, site_name: str, keywords: List[str], max_items: int 100): 爬取指定网站的设计素材 site_config self.config.TARGET_SITES.get(site_name) if not site_config: logger.error(f未找到网站配置: {site_name}) return logger.info(f开始爬取 {site_name}关键词: {keywords}) all_items [] for keyword in keywords: logger.info(f搜索关键词: {keyword}) page 1 collected_items 0 while collected_items max_items: try: # 构建请求参数 params self._build_search_params(site_name, keyword, page) # 获取搜索结果 if site_name unsplash: search_url site_config[search_url] content await self.fetch_page(search_url, params) data json.loads(content) items self.parser.parse_unsplash_results(data) elif site_name pexels: if not site_config.get(api_key): logger.error(Pexels需要API Key) break search_url site_config[search_url] headers {Authorization: site_config[api_key]} async with self.session.get(search_url, paramsparams, headersheaders) as response: data await response.json() items self.parser.parse_pexels_results(data) else: logger.error(f不支持的网站: {site_name}) break if not items: break # 处理每个素材项 tasks [] for item in items: if collected_items max_items: break tasks.append(self._process_material_item(item, site_name, keyword)) collected_items 1 # 并发处理 results await asyncio.gather(*tasks, return_exceptionsTrue) # 收集成功的结果 for result in results: if isinstance(result, dict): all_items.append(result) page 1 await asyncio.sleep(1) # 礼貌延迟 except Exception as e: logger.error(f爬取过程异常: {str(e)}) break # 保存元数据 if all_items: metadata_file self.storage.save_metadata(all_items, site_name) logger.info(f爬取完成共收集 {len(all_items)} 个素材元数据保存至: {metadata_file}) def _build_search_params(self, site_name: str, keyword: str, page: int) - Dict: 构建搜索参数 if site_name unsplash: return { query: keyword, page: page, per_page: min(20, self.config.TARGET_SITES[unsplash][per_page]) } elif site_name pexels: return { query: keyword, page: page, per_page: 15 } return {} async def _process_material_item(self, item: Dict, site_name: str, keyword: str) - Dict: 处理单个素材项 try: # 下载主图片 image_url item.get(image_url) if not image_url: return {} # 生成文件名 filename self._generate_filename(item, site_name) filepath self.storage.get_filepath(site_name, images, filename) # 下载文件 success await self.download_file(image_url, filepath) if success: # 计算文件哈希 file_hash await generate_file_hash(filepath) # 构建元数据 metadata { id: item.get(id, ), title: item.get(title, ), description: item.get(description, ), image_url: image_url, download_url: item.get(download_url, ), author: item.get(author, {}), tags: item.get(tags, []), keywords: [keyword], site: site_name, file_path: str(filepath), file_hash: file_hash, file_size: filepath.stat().st_size, crawled_at: datetime.now().isoformat(), metadata: item } # 保存缩略图 await self.storage.create_thumbnail(filepath) return metadata except Exception as e: logger.error(f处理素材项失败: {str(e)}) return {} def _generate_filename(self, item: Dict, site_name: str) - str: 生成文件名 item_id item.get(id, ) title item.get(title, untitled).lower() # 清理文件名 import re title_clean re.sub(r[^\w\-_\. ], _, title) title_clean re.sub(r\s, _, title_clean) # 提取扩展名 image_url item.get(image_url, ) ext Path(urlparse(image_url).path).suffix if image_url else .jpg return f{site_name}_{item_id}_{title_clean}{ext} async def crawl_batch(self, sites_keywords: Dict[str, List[str]], max_items_per_site: int 50): 批量爬取多个网站 tasks [] for site_name, keywords in sites_keywords.items(): task self.crawl_site(site_name, keywords, max_items_per_site) tasks.append(task) # 并发执行所有任务 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 for site_name, result in zip(sites_keywords.keys(), results): if isinstance(result, Exception): logger.error(f爬取 {site_name} 失败: {str(result)}) else: logger.info(f爬取 {site_name} 完成)4. 页面解析器src/parser.py:pythonfrom bs4 import BeautifulSoup import json from typing import List, Dict, Any import re class Parser: 页面解析器 def parse_unsplash_results(self, data: Dict) - List[Dict]: 解析Unsplash API返回数据 results [] if results in data: for item in data[results]: result { id: item.get(id, ), title: item.get(description, Untitled), description: item.get(alt_description, ), image_url: item.get(urls, {}).get(regular, ), download_url: item.get(links, {}).get(download, ), author: { name: item.get(user, {}).get(name, ), username: item.get(user, {}).get(username, ), profile_url: item.get(user, {}).get(links, {}).get(html, ) }, tags: [tag.get(title, ) for tag in item.get(tags, [])], width: item.get(width, 0), height: item.get(height, 0), color: item.get(color, ), likes: item.get(likes, 0) } results.append(result) return results def parse_pexels_results(self, data: Dict) - List[Dict]: 解析Pexels API返回数据 results [] if photos in data: for item in data[photos]: result { id: str(item.get(id, )), title: item.get(alt, Untitled), description: , image_url: item.get(src, {}).get(large, ), download_url: item.get(url, ), author: { name: item.get(photographer, ), profile_url: item.get(photographer_url, ) }, tags: [], width: item.get(width, 0), height: item.get(height, 0), avg_color: item.get(avg_color, ), liked: item.get(liked, False) } results.append(result) return results def parse_html_page(self, html: str, site_type: str) - List[Dict]: 解析HTML页面备用方法 soup BeautifulSoup(html, html.parser) results [] if site_type unsplash: # 解析Unsplash页面结构 image_elements soup.select(figure img) for img in image_elements: src img.get(src) or img.get(data-src) if src and images.unsplash.com in src: result { image_url: src, title: img.get(alt, ), author: self._extract_author_unsplash(img) } results.append(result) return results def _extract_author_unsplash(self, element) - Dict: 从Unsplash元素提取作者信息 # 根据实际页面结构调整 author_info { name: , username: , profile_url: } # 尝试找到作者信息 parent element.find_parent(a, hrefre.compile(r/)) if parent and parent.get(href): author_info[profile_url] https://unsplash.com parent[href] author_info[username] parent[href].split(/)[-1] return author_info5. 存储管理器src/storage.py:pythonimport json import csv import sqlite3 from pathlib import Path from typing import List, Dict, Any, Optional import aiofiles import asyncio from datetime import datetime import hashlib from PIL import Image import io class StorageManager: 存储管理器 def __init__(self, config): self.config config self.base_path Path(config.STORAGE_PATH) self.db_conn: Optional[sqlite3.Connection] None async def initialize(self): 初始化存储目录和数据库 # 创建目录结构 directories [images, thumbnails, metadata, logs] for dir_name in directories: (self.base_path / dir_name).mkdir(parentsTrue, exist_okTrue) # 初始化数据库 self._init_database() def _init_database(self): 初始化SQLite数据库 db_path self.base_path / materials.db self.db_conn sqlite3.connect(str(db_path)) # 创建表 cursor self.db_conn.cursor() # 素材表 cursor.execute( CREATE TABLE IF NOT EXISTS materials ( id TEXT PRIMARY KEY, title TEXT, description TEXT, image_url TEXT, download_url TEXT, author_name TEXT, author_url TEXT, tags TEXT, keywords TEXT, site TEXT, file_path TEXT, file_hash TEXT UNIQUE, file_size INTEGER, width INTEGER, height INTEGER, color TEXT, likes INTEGER, crawled_at TEXT, metadata TEXT ) ) # 下载记录表 cursor.execute( CREATE TABLE IF NOT EXISTS download_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, material_id TEXT, file_path TEXT, download_time TEXT, status TEXT, error_message TEXT, FOREIGN KEY (material_id) REFERENCES materials (id) ) ) self.db_conn.commit() def get_filepath(self, site_name: str, file_type: str, filename: str) - Path: 获取文件路径 date_str datetime.now().strftime(%Y/%m/%d) filepath self.base_path / site_name / file_type / date_str / filename # 确保目录存在 filepath.parent.mkdir(parentsTrue, exist_okTrue) return filepath def save_metadata(self, items: List[Dict], site_name: str) - Path: 保存元数据 timestamp datetime.now().strftime(%Y%m%d_%H%M%S) metadata_file self.base_path / metadata / f{site_name}_{timestamp}.json # 保存为JSON with open(metadata_file, w, encodingutf-8) as f: json.dump(items, f, ensure_asciiFalse, indent2) # 同时保存到数据库 self._save_to_database(items) # 保存为CSV可选 csv_file metadata_file.with_suffix(.csv) self._save_to_csv(items, csv_file) return metadata_file def _save_to_database(self, items: List[Dict]): 保存数据到数据库 cursor self.db_conn.cursor() for item in items: cursor.execute( INSERT OR REPLACE INTO materials ( id, title, description, image_url, download_url, author_name, author_url, tags, keywords, site, file_path, file_hash, file_size, width, height, color, likes, crawled_at, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( item.get(id), item.get(title), item.get(description), item.get(image_url), item.get(download_url), item.get(author, {}).get(name), item.get(author, {}).get(profile_url), json.dumps(item.get(tags, [])), json.dumps(item.get(keywords, [])), item.get(site), item.get(file_path), item.get(file_hash), item.get(file_size), item.get(metadata, {}).get(width), item.get(metadata, {}).get(height), item.get(metadata, {}).get(color), item.get(metadata, {}).get(likes), item.get(crawled_at), json.dumps(item.get(metadata, {})) )) self.db_conn.commit() def _save_to_csv(self, items: List[Dict], csv_file: Path): 保存数据到CSV if not items: return # 提取CSV字段 fieldnames [ id, title, description, site, author_name, keywords, tags, file_path, file_size, crawled_at ] with open(csv_file, w, newline, encodingutf-8) as f: writer csv.DictWriter(f, fieldnamesfieldnames) writer.writeheader() for item in items: row { id: item.get(id, ), title: item.get(title, ), description: item.get(description, ), site: item.get(site, ), author_name: item.get(author, {}).get(name, ), keywords: ;.join(item.get(keywords, [])), tags: ;.join(item.get(tags, [])), file_path: item.get(file_path, ), file_size: item.get(file_size, 0), crawled_at: item.get(crawled_at, ) } writer.writerow(row) async def create_thumbnail(self, image_path: Path, size: tuple (200, 200)): 创建缩略图 try: thumb_path self.base_path / thumbnails / image_path.name # 异步执行图像处理 await asyncio.to_thread(self._generate_thumbnail, image_path, thumb_path, size) except Exception as e: print(f创建缩略图失败 {image_path}: {str(e)}) def _generate_thumbnail(self, src_path: Path, dst_path: Path, size: tuple): 生成缩略图在单独线程中执行 with Image.open(src_path) as img: img.thumbnail(size, Image.Resampling.LANCZOS) # 确保缩略图目录存在 dst_path.parent.mkdir(parentsTrue, exist_okTrue) # 保存缩略图 img.save(dst_path, JPEG, quality85) async def close(self): 关闭存储资源 if self.db_conn: self.db_conn.close()6. 工具函数src/utils.py:pythonimport asyncio import hashlib import time from functools import wraps from pathlib import Path from typing import Callable, Any import aiofiles def rate_limiter(max_calls: int, period: float): 限流装饰器 def decorator(func): last_reset time.time() call_count 0 wraps(func) async def wrapper(*args, **kwargs): nonlocal last_reset, call_count current_time time.time() if current_time - last_reset period: last_reset current_time call_count 0 if call_count max_calls: wait_time period - (current_time - last_reset) if wait_time 0: await asyncio.sleep(wait_time) last_reset time.time() call_count 0 call_count 1 return await func(*args, **kwargs) return wrapper return decorator def retry_handler(max_retries: int 3, delay: float 1.0): 重试装饰器 def decorator(func): wraps(func) async def wrapper(*args, **kwargs): last_exception None for attempt in range(max_retries 1): try: return await func(*args, **kwargs) except Exception as e: last_exception e if attempt max_retries: wait_time delay * (2 ** attempt) # 指数退避 await asyncio.sleep(wait_time) raise last_exception return wrapper return decorator async def generate_file_hash(filepath: Path, algorithm: str sha256) - str: 生成文件哈希 hash_func hashlib.new(algorithm) async with aiofiles.open(filepath, rb) as f: while chunk : await f.read(8192): hash_func.update(chunk) return hash_func.hexdigest() def validate_url(url: str) - bool: 验证URL格式 from urllib.parse import urlparse try: result urlparse(url) return all([result.scheme, result.netloc]) except: return False class ProgressTracker: 进度跟踪器 def __init__(self, total: int): self.total total self.completed 0 self.start_time time.time() def update(self, increment: int 1): 更新进度 self.completed increment # 计算进度百分比 percentage (self.completed / self.total) * 100 # 计算预计剩余时间 elapsed time.time() - self.start_time if self.completed 0: estimated_total elapsed * (self.total / self.completed) remaining estimated_total - elapsed else: remaining 0 print(f进度: {percentage:.1f}% | 已完成: {self.completed}/{self.total} | f剩余时间: {remaining:.0f}秒)7. 主程序入口main.py:pythonimport asyncio import logging from pathlib import Path import sys from config.settings import Config from src.crawler import DesignMaterialCrawler def setup_logging(): 配置日志 log_dir Path(logs) log_dir.mkdir(exist_okTrue) log_file log_dir / fcrawler_{asyncio.get_event_loop().time()}.log logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(log_file, encodingutf-8), logging.StreamHandler(sys.stdout) ] ) async def main(): 主函数 # 配置日志 setup_logging() logger logging.getLogger(__name__) # 加载配置 config Config() # 定义爬取任务 sites_keywords { unsplash: [design, background, texture, pattern, minimal], # pexels: [design, creative, art, graphic] # 需要API Key } try: async with DesignMaterialCrawler(config) as crawler: # 批量爬取 await crawler.crawl_batch(sites_keywords, max_items_per_site20) # 或者单独爬取 # await crawler.crawl_site(unsplash, [nature, technology], max_items50) except KeyboardInterrupt: logger.info(用户中断爬取过程) except Exception as e: logger.error(f爬取过程发生错误: {str(e)}, exc_infoTrue) finally: logger.info(爬虫程序结束) if __name__ __main__: # 设置事件循环策略Windows兼容 if sys.platform win32: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # 运行主程序 asyncio.run(main())高级特性与优化1. 分布式爬虫扩展python# src/distributed.py import asyncio import redis.asyncio as redis from typing import List, Dict import json class DistributedCrawler: 分布式爬虫管理器 def __init__(self, config): self.config config self.redis_client None self.queue_name crawler:tasks self.result_name crawler:results async def initialize(self): 初始化Redis连接 self.redis_client await redis.from_url( self.config.REDIS_URL, decode_responsesTrue ) async def push_tasks(self, tasks: List[Dict]): 推送任务到队列 for task in tasks: await self.redis_client.lpush( self.queue_name, json.dumps(task) ) async def get_results(self, count: int 100) - List[Dict]: 获取处理结果 results [] for _ in range(count): result await self.redis_client.rpop(self.result_name) if result: results.append(json.loads(result)) return results2. 图像内容分析python# src/image_analyzer.py import cv2 import numpy as np from PIL import Image import asyncio from typing import Dict class ImageAnalyzer: 图像内容分析器 staticmethod async def analyze_image(image_path: Path) - Dict: 分析图像特征 # 在单独线程中执行CPU密集型操作 return await asyncio.to_thread( ImageAnalyzer._analyze_image_sync, image_path ) staticmethod def _analyze_image_sync(image_path: Path) - Dict: 同步图像分析 try: # 使用OpenCV分析 image cv2.imread(str(image_path)) if image is None: return {} # 提取颜色直方图 hist cv2.calcHist([image], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256]) hist cv2.normalize(hist, hist).flatten() # 计算平均颜色 avg_color cv2.mean(image)[:3] # 检测边缘 gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) edges cv2.Canny(gray, 100, 200) edge_density np.sum(edges 0) / edges.size return { dominant_colors: hist.tolist(), average_color: avg_color, edge_density: float(edge_density), resolution: f{image.shape[1]}x{image.shape[0]} } except Exception as e: print(f图像分析失败: {str(e)}) return {}爬虫伦理与合规性1. Robots.txt 遵守python# src/robots_checker.py import urllib.robotparser import aiohttp class RobotsChecker: Robots.txt检查器 def __init__(self): self.parser urllib.robotparser.RobotFileParser() async def can_fetch(self, url: str, user_agent: str *) - bool: 检查是否允许爬取 base_url self._extract_base_url(url) robots_url f{base_url}/robots.txt try: async with aiohttp.ClientSession() as session: async with session.get(robots_url) as response: if response.status 200: content await response.text() self.parser.parse(content.splitlines()) return self.parser.can_fetch(user_agent, url) except: pass return True # 如果无法获取robots.txt默认允许 def _extract_base_url(self, url: str) - str: 提取基础URL from urllib.parse import urlparse parsed urlparse(url) return f{parsed.scheme}://{parsed.netloc}2. 使用注意事项尊重版权仅下载允许商业使用的素材遵守条款仔细阅读目标网站的Terms of Service控制频率添加适当延迟避免对服务器造成压力设置User-Agent明确标识你的爬虫处理错误妥善处理404、429等HTTP状态码数据去重避免重复下载相同内容性能优化建议连接池复用重用HTTP连接减少TCP握手开销异步文件IO使用aiofiles避免文件操作阻塞事件循环内存管理及时释放大对象使用生成器处理大量数据错误恢复实现检查点机制支持断点续爬缓存策略缓存已解析的页面和数据