""" 抖音搜索页面数据爬取脚本 支持按关键词搜索并爬取视频信息 """ import asyncio import json import re from datetime import datetime from pathlib import Path from playwright.async_api import async_playwright import argparse class DouyinSearchCrawler: def __init__(self, headless=False): self.headless = headless self.browser = None self.context = None self.page = None async def init_browser(self): """初始化浏览器""" self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=self.headless, args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-setuid-sandbox' ] ) # 创建浏览器上下文 self.context = await self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ) self.page = await self.context.new_page() # 设置超时时间 self.page.set_default_timeout(30000) # 设置额外的headers await self.page.set_extra_http_headers({ 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', }) async def load_cookies(self, cookie_file='douyin_cookie.json'): """加载cookies""" cookie_path = Path(cookie_file) if cookie_path.exists(): try: with open(cookie_path, 'r', encoding='utf-8') as f: cookies = json.load(f) await self.context.add_cookies(cookies) print(f"✓ 成功加载 cookies: {cookie_file}") return True except Exception as e: print(f"✗ 加载 cookies 失败: {e}") return False else: print(f"✗ Cookie 文件不存在: {cookie_file}") return False async def search_videos(self, keyword, max_scroll=5): """搜索视频""" print(f"\n开始搜索关键词: {keyword}") # 访问搜索页面 search_url = f"https://www.douyin.com/search/{keyword}?type=video" print(f"访问URL: {search_url}") try: await self.page.goto(search_url, wait_until='networkidle', timeout=60000) print("✓ 页面加载完成") except Exception as e: print(f"⚠ 页面加载超时,继续尝试: {e}") # 增加初始等待时间,确保页面完全加载 print("等待页面内容加载...") await asyncio.sleep(5) # 等待视频列表元素出现 try: await self.page.wait_for_selector('a[href*="/video/"]', timeout=10000) print("✓ 检测到视频元素") except Exception as e: print(f"⚠ 未检测到视频元素: {e}") all_videos = [] no_new_data_count = 0 # 连续没有新数据的次数 # 滚动加载更多内容 for scroll_count in range(max_scroll): print(f"\n第 {scroll_count + 1}/{max_scroll} 次滚动加载...") previous_count = len(all_videos) # 提取当前页面的视频数据 videos = await self.extract_videos() # 去重并添加到结果中 for video in videos: if video['url'] not in [v['url'] for v in all_videos]: all_videos.append(video) new_count = len(all_videos) - previous_count print(f"本次新增 {new_count} 个视频,当前已获取 {len(all_videos)} 个视频") # 如果连续3次没有新数据,提前结束 if new_count == 0: no_new_data_count += 1 if no_new_data_count >= 3: print("连续3次未获取到新数据,停止滚动") break else: no_new_data_count = 0 # 滚动到页面底部 await self.page.evaluate('window.scrollTo(0, document.body.scrollHeight)') # 增加滚动后的等待时间 await asyncio.sleep(3) return all_videos async def extract_videos(self): """提取页面上的视频信息""" # 先等待一下确保DOM更新 await asyncio.sleep(1) videos_data = await self.page.evaluate('''() => { const links = Array.from(document.querySelectorAll('a[href*="/video/"]')); return links.map(link => { const text = link.textContent?.trim() || ''; const url = link.getAttribute('href') || ''; // 提取时长 (格式: HH:MM 或 MM:SS) const durationMatch = text.match(/(\\d{1,2}:\\d{2})/); const duration = durationMatch ? durationMatch[1] : ''; // 提取播放量 const playCountMatch = text.match(/([\\d.]+万|[\\d]+)(?!:)/); const playCount = playCountMatch ? playCountMatch[1] : ''; // 提取发布时间 const timeMatch = text.match(/(\\d+天前|\\d+小时前|\\d+分钟前|\\d{4}-\\d{2}-\\d{2})/); const publishTime = timeMatch ? timeMatch[1] : ''; // 提取作者 - 先提取,然后移除时间信息 const authorMatch = text.match(/@([^\\s@]+)/); let author = authorMatch ? authorMatch[1] : ''; // 从作者中移除时间信息(如果存在) if (author && publishTime) { author = author.replace(publishTime, '').trim(); } // 移除可能残留的数字和"前"字 author = author.replace(/\\d+(天|小时|分钟)前$/, '').trim(); // 提取标题和标签 let title = text; // 移除时长、播放量、作者、时间等信息 title = title.replace(/(\\d{1,2}:\\d{2})/g, ''); title = title.replace(/([\\d.]+万|[\\d]+)(?!:)/g, ''); title = title.replace(/@[^\\s@]+/g, ''); title = title.replace(/(\\d+天前|\\d+小时前|\\d+分钟前|\\d{4}-\\d{2}-\\d{2})/g, ''); title = title.replace(/合集/g, ''); title = title.trim(); // 提取标签 const tags = []; const tagMatches = title.matchAll(/#([^#\\s]+)/g); for (const match of tagMatches) { tags.push(match[1]); } return { url: url.startsWith('//') ? 'https:' + url : url, title: title, author: author, publishTime: publishTime, duration: duration, playCount: playCount, tags: tags }; }); }''') # 过滤掉空数据 videos_data = [v for v in videos_data if v['url'] and v['title']] return videos_data async def save_results(self, keyword, videos): """保存结果到JSON文件""" # 创建输出目录 output_dir = Path('douyin_data') output_dir.mkdir(exist_ok=True) # 生成文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = output_dir / f'douyin_search_{keyword}_{timestamp}.json' # 准备数据 data = { 'keyword': keyword, 'crawl_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'total_count': len(videos), 'videos': videos } # 保存到文件 with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"\n✓ 数据已保存到: {filename}") print(f"✓ 共爬取 {len(videos)} 个视频") return filename async def close(self): """关闭浏览器""" if self.page: await self.page.close() if self.context: await self.context.close() if self.browser: await self.browser.close() if hasattr(self, 'playwright'): await self.playwright.stop() async def main(): parser = argparse.ArgumentParser(description='抖音搜索页面爬虫') parser.add_argument('keyword', help='搜索关键词') parser.add_argument('--max-scroll', type=int, default=5, help='最大滚动次数 (默认: 5)') parser.add_argument('--headless', action='store_true', help='无头模式运行') parser.add_argument('--cookie', default='douyin_cookie.json', help='Cookie文件路径') args = parser.parse_args() crawler = DouyinSearchCrawler(headless=args.headless) try: # 初始化浏览器 await crawler.init_browser() # 加载cookies await crawler.load_cookies(args.cookie) # 搜索视频 videos = await crawler.search_videos(args.keyword, max_scroll=args.max_scroll) # 保存结果 if videos: await crawler.save_results(args.keyword, videos) else: print("\n✗ 未获取到任何视频数据") except Exception as e: print(f"\n✗ 爬取过程中出错: {e}") import traceback traceback.print_exc() finally: await crawler.close() if __name__ == '__main__': asyncio.run(main())