""" 抖音创作指导页面爬虫 - 使用Playwright 支持Cookie登录、手动登录和分类抓取 """ import argparse import asyncio import json import os import sys from datetime import datetime from pathlib import Path from playwright.async_api import async_playwright if sys.platform == "win32": os.environ["PYTHONIOENCODING"] = "utf-8" import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') async def get_available_categories(page): """从页面获取所有可用的分类""" try: # 先尝试展开所有分类(点击展开按钮) # 注意:按钮显示"收起"时说明已经展开了,我们要找的是未展开的按钮 expanded = await page.evaluate(""" () => { const showButtons = document.querySelectorAll('.show-button-sDo51G'); let clicked = false; showButtons.forEach(btn => { const text = btn.textContent.trim(); // 如果按钮不是"收起",说明需要展开 if (!text.includes('收起')) { btn.click(); clicked = true; } }); return clicked; } """) if expanded: print("已展开分类列表") await asyncio.sleep(1) # 获取所有分类 categories = await page.evaluate(""" () => { const categoryDivs = Array.from(document.querySelectorAll('.each-kind-MR__DN')); return categoryDivs.map(div => div.textContent.trim()).filter(text => text); } """) return categories except Exception as e: print(f"获取分类列表失败: {e}") return [] async def list_categories(cookie_file: str = None): """列出所有可用的分类""" url = "https://creator.douyin.com/creator-micro/creative-guidance" async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context_options = {} if cookie_file and Path(cookie_file).exists(): cookies = await load_cookies(cookie_file) if cookies: context_options['storage_state'] = {'cookies': cookies} context = await browser.new_context(**context_options) page = await context.new_page() try: await page.goto(url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(5) categories = await get_available_categories(page) print("\n可用的分类列表:") print("=" * 60) for i, cat in enumerate(categories, 1): print(f"{i:2d}. {cat}") print("=" * 60) print(f"总计: {len(categories)} 个分类\n") return categories finally: await browser.close() async def load_cookies(cookie_file: str): """加载Cookie文件""" try: with open(cookie_file, 'r', encoding='utf-8') as f: cookies = json.load(f) print(f"✓ 已加载Cookie文件: {cookie_file}") return cookies except Exception as e: print(f"✗ 加载Cookie失败: {e}") return None async def extract_video_data(page, debug_mode=False): """从页面中提取视频数据""" print("正在提取视频数据...") # 先检查页面上有什么 debug_info = await page.evaluate(""" () => { const authorLinks = document.querySelectorAll('a[href*="iesdouyin.com/share/user/"]'); const allLinks = document.querySelectorAll('a'); // 获取第一个视频容器的HTML结构用于调试 let sampleHTML = ''; if (authorLinks.length > 0) { let container = authorLinks[0].closest('div[class*="video"]') || authorLinks[0].closest('div[class*="item"]') || authorLinks[0].closest('div[class*="card"]'); if (!container) { container = authorLinks[0].parentElement; for (let i = 0; i < 5; i++) { if (container.querySelector('p')) break; container = container.parentElement; if (!container) break; } } if (container) { sampleHTML = container.outerHTML.substring(0, 2000); } } return { authorLinksCount: authorLinks.length, allLinksCount: allLinks.length, pageText: document.body.innerText.substring(0, 500), sampleHTML: sampleHTML }; } """) print(f"调试信息:") print(f" 作者链接数量: {debug_info['authorLinksCount']}") print(f" 所有链接数量: {debug_info['allLinksCount']}") print(f" 页面文本预览: {debug_info['pageText'][:200]}...") if debug_mode and debug_info['sampleHTML']: print(f"\n第一个视频容器HTML结构(前2000字符):") print(debug_info['sampleHTML']) print() print() # 执行JavaScript提取数据 data = await page.evaluate(""" () => { const videos = []; // 查找所有作者链接 const authorLinks = Array.from(document.querySelectorAll('a[href*="iesdouyin.com/share/user/"]')); authorLinks.forEach((authorLink, index) => { try { // 获取作者名 const author = authorLink.textContent.trim(); // 向上查找包含 contain-info-LpWGHS 的容器 let container = authorLink.parentElement; let maxLevels = 10; while (container && maxLevels > 0) { if (container.querySelector('.contain-info-LpWGHS')) { break; } container = container.parentElement; maxLevels--; } if (!container) return; // 获取描述 const paragraphs = Array.from(container.querySelectorAll('p')); let description = ''; for (let p of paragraphs) { const text = p.textContent.trim(); if (text && text !== '|' && text.length > 5 && !text.includes('万') && !text.includes(':')) { description = text; break; } } // 提取互动数据 let hot = '', plays = '', likes = '', comments = ''; const infoContainer = container.querySelector('.contain-info-LpWGHS'); if (infoContainer) { const infoItems = infoContainer.querySelectorAll('.each-info-TpmTI0'); infoItems.forEach(item => { const img = item.querySelector('img'); const text = item.textContent.trim(); if (img && img.src) { if (img.src.includes('hot_first') || img.src.includes('hot_second') || img.src.includes('hot_third') || img.src.includes('hot_')) { hot = text; } else if (img.src.includes('play')) { plays = text; } else if (img.src.includes('digg')) { likes = text; } else if (img.src.includes('comment')) { comments = text; } } }); } // 获取热词 const hotWords = []; const hotWordElements = container.querySelectorAll('.other-text-XeleRf'); hotWordElements.forEach((el, i) => { const text = el.textContent.trim(); if (i === 0 && text.includes('热词')) { // 跳过"热词 :"标签 } else if (text && !text.includes('热词')) { hotWords.push(text); } }); // 查找标签 const hashTags = description.match(/#[^\\s#]+/g) || []; // 查找视频时长 let duration = ''; const timeElements = container.querySelectorAll('.time-text-mask-WmpK85 p'); if (timeElements.length > 0) { duration = timeElements[0].textContent.trim(); } if (author && description) { videos.push({ index: index + 1, author: author, description: description, authorLink: authorLink.href, duration: duration, hot: hot, plays: plays, likes: likes, comments: comments, hotWords: hotWords, hashTags: hashTags }); } } catch (e) { // 静默处理错误 } }); return { total: videos.length, videos: videos, crawlTime: new Date().toISOString(), pageTitle: document.title, pageUrl: window.location.href }; } """) return data async def crawl_creative_guidance(output_dir: str, headless: bool = True, manual_mode: bool = False, cookie_file: str = None, category: str = None, debug_mode: bool = False): """抓取抖音创作指导页面""" output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) url = "https://creator.douyin.com/creator-micro/creative-guidance" print(f"正在抓取抖音创作指导页面: {url}") if category: print(f"目标分类: {category}") if debug_mode: print("调试模式已启用") print() async with async_playwright() as p: # 启动浏览器 browser = await p.chromium.launch(headless=headless) # 创建上下文,如果有Cookie则加载 context_options = {} if cookie_file and Path(cookie_file).exists(): cookies = await load_cookies(cookie_file) if cookies: context_options['storage_state'] = {'cookies': cookies} context = await browser.new_context(**context_options) page = await context.new_page() try: # 访问页面 print("正在打开页面...") await page.goto(url, wait_until="domcontentloaded", timeout=60000) if manual_mode: # 手动模式:等待用户登录 print("\n" + "=" * 80) print("请在浏览器中完成以下操作:") print("1. 登录抖音账号(如果需要)") print("2. 等待页面完全加载,确保能看到热门视频列表") if category: print(f"3. (可选)手动点击【{category}】分类,或者等待脚本自动点击") print("4. 完成后,回到这里按回车键继续") else: print("3. 完成后,回到这里按回车键继续") print("=" * 80 + "\n") input("按回车键开始抓取数据...") print() # 等待页面稳定 print("等待页面稳定...") await asyncio.sleep(3) # 检查当前URL,如果发生了跳转,重新导航 current_url = page.url if current_url != url: print(f"检测到页面跳转到: {current_url}") print("重新导航到目标页面...") await page.goto(url, wait_until="domcontentloaded", timeout=60000) await asyncio.sleep(5) else: # 自动模式:等待页面加载 print("等待页面加载...") await asyncio.sleep(10) # 如果指定了分类,点击分类标签 if category and category != "全部": print(f"正在切换到【{category}】分类...") # 先展开所有分类 await page.evaluate(""" () => { const showButtons = document.querySelectorAll('.show-button-sDo51G'); showButtons.forEach(btn => { const text = btn.textContent.trim(); // 如果按钮不是"收起",说明需要展开 if (!text.includes('收起')) { btn.click(); } }); } """) await asyncio.sleep(1) try: # 查找并点击分类标签 category_clicked = await page.evaluate(f""" () => {{ const categoryDivs = Array.from(document.querySelectorAll('.each-kind-MR__DN')); const targetDiv = categoryDivs.find(div => div.textContent.trim() === '{category}' ); if (targetDiv) {{ targetDiv.click(); return true; }} return false; }} """) if category_clicked: print(f"✓ 已点击【{category}】分类") print("等待分类内容加载...") await asyncio.sleep(8) # 增加等待时间 else: print(f"⚠ 未找到【{category}】分类标签") print("提示:使用 --list-categories 查看所有可用分类") except Exception as e: print(f"⚠ 点击分类失败: {e}") print("将抓取当前显示的内容") # 提取数据 data = await extract_video_data(page, debug_mode=debug_mode) if data['total'] == 0: print("⚠ 未提取到视频数据") print("提示:") print(" 1. 确保已登录抖音账号") print(" 2. 确保页面已完全加载") print(" 3. 尝试使用 --manual 参数手动控制") if not cookie_file: print(" 4. 或者使用 --cookie 参数提供Cookie文件") return print(f"✓ 提取到 {data['total']} 个视频\n") # 保存结果 save_results(data['videos'], url, output_path, category) except Exception as e: print(f"✗ 错误: {e}") import traceback traceback.print_exc() finally: await browser.close() def save_results(videos, url, output_path, category=None): """保存结果""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 文件名包含分类信息 category_suffix = f"_{category}" if category else "" # JSON格式 json_file = output_path / f"douyin_creative_guidance{category_suffix}_{timestamp}.json" result_data = { 'page_url': url, 'page_title': '抖音创作指导 - 热门视频', 'category': category or '全部', 'platform': 'douyin', 'crawl_time': datetime.now().isoformat(), 'total_videos': len(videos), 'videos': videos } with open(json_file, "w", encoding="utf-8") as f: json.dump(result_data, f, ensure_ascii=False, indent=2) print(f"✓ JSON已保存: {json_file}") # 文本格式 txt_file = json_file.with_suffix('.txt') with open(txt_file, "w", encoding="utf-8") as f: f.write(f"抖音创作指导 - 热门视频列表\n") f.write(f"页面: {url}\n") if category: f.write(f"分类: {category}\n") f.write(f"抓取时间: {datetime.now():%Y-%m-%d %H:%M:%S}\n") f.write(f"视频总数: {len(videos)}\n") f.write("=" * 80 + "\n\n") for video in videos: f.write(f"视频 {video['index']}:\n") f.write(f" 作者: {video.get('author', 'N/A')}\n") f.write(f" 描述: {video.get('description', 'N/A')}\n") if video.get('duration'): f.write(f" 时长: {video['duration']}\n") if video.get('authorLink'): f.write(f" 作者链接: {video['authorLink']}\n") if video.get('hot'): f.write(f" 热度: {video['hot']}\n") if video.get('plays'): f.write(f" 播放量: {video['plays']}\n") if video.get('likes'): f.write(f" 点赞: {video['likes']}\n") if video.get('comments'): f.write(f" 评论: {video['comments']}\n") if video.get('hashTags'): f.write(f" 标签: {', '.join(video['hashTags'])}\n") if video.get('hotWords'): f.write(f" 热词: {', '.join(video['hotWords'])}\n") f.write("-" * 80 + "\n") print(f"✓ 文本已保存: {txt_file}") # CSV格式 csv_file = json_file.with_suffix('.csv') with open(csv_file, "w", encoding="utf-8-sig") as f: f.write("序号,作者,描述,时长,热度,播放量,点赞,评论,标签,作者链接\n") for video in videos: tags = '|'.join(video.get('hashTags', [])) f.write(f"{video['index']},") f.write(f'"{video.get("author", "")}",') f.write(f'"{video.get("description", "")}",') f.write(f'"{video.get("duration", "")}",') f.write(f'"{video.get("hot", "")}",') f.write(f'"{video.get("plays", "")}",') f.write(f'"{video.get("likes", "")}",') f.write(f'"{video.get("comments", "")}",') f.write(f'"{tags}",') f.write(f'"{video.get("authorLink", "")}"\n') print(f"✓ CSV已保存: {csv_file}\n") print(f"统计:") print(f" 总视频数: {len(videos)}") print(f" 有标签: {sum(1 for v in videos if v.get('hashTags'))}") print(f" 有互动数据: {sum(1 for v in videos if v.get('plays') or v.get('likes'))}") def main(): parser = argparse.ArgumentParser(description="抖音创作指导页面爬虫 (Playwright版)") parser.add_argument("--output-dir", "-o", default="douyin_data", help="输出目录") parser.add_argument("--headless", action="store_true", help="无头模式(不显示浏览器)") parser.add_argument("--manual", "-m", action="store_true", help="手动模式:等待用户登录后按回车继续") parser.add_argument("--cookie", "-c", help="Cookie文件路径(JSON格式)") parser.add_argument("--category", "-t", help="分类标签") parser.add_argument("--debug", "-d", action="store_true", help="调试模式:显示页面HTML结构") parser.add_argument("--list-categories", "-l", action="store_true", help="列出所有可用的分类") args = parser.parse_args() print("=" * 80) print("抖音创作指导页面爬虫 (Playwright版)") print("=" * 80) print() # 如果是列出分类模式 if args.list_categories: try: asyncio.run(list_categories(cookie_file=args.cookie)) except KeyboardInterrupt: print("\n已中断") except Exception as e: print(f"\n错误: {e}") import traceback traceback.print_exc() return if args.cookie: if Path(args.cookie).exists(): print(f"✓ 将使用Cookie文件: {args.cookie}") else: print(f"⚠ Cookie文件不存在: {args.cookie}") print() if args.category: print(f"✓ 目标分类: {args.category}") print() if not args.headless: print("⚠ 浏览器模式已启用") if args.manual: print(" 手动模式:登录后按回车键继续抓取") else: print(" 自动模式:将自动等待10秒后抓取") print() try: asyncio.run(crawl_creative_guidance( output_dir=args.output_dir, headless=args.headless, manual_mode=args.manual, cookie_file=args.cookie, category=args.category, debug_mode=args.debug, )) except KeyboardInterrupt: print("\n已中断") except Exception as e: print(f"\n错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()