""" 阿里云百炼AI分析模块 用于分析抖音数据 """ import os import json from pathlib import Path from typing import List, Dict, Optional import dashscope from dashscope import Generation from dotenv import load_dotenv # 加载环境变量 load_dotenv() class AIAnalyzer: """AI数据分析器""" def __init__(self, api_key: str = None, model: str = "qwen-plus"): """ 初始化AI分析器 Args: api_key: 阿里云百炼API Key,如果不提供则从环境变量读取 model: 使用的模型名称,默认qwen-plus """ self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY") if not self.api_key: raise ValueError("未提供API Key,请设置DASHSCOPE_API_KEY环境变量或传入api_key参数") dashscope.api_key = self.api_key self.model = model def load_prompt(self, prompt_file: str) -> str: """ 加载提示词文件 Args: prompt_file: 提示词文件路径 Returns: 提示词内容 """ try: with open(prompt_file, 'r', encoding='utf-8') as f: return f.read() except Exception as e: raise Exception(f"加载提示词文件失败: {e}") def format_video_data(self, videos: List[Dict]) -> str: """ 格式化视频数据为文本 Args: videos: 视频数据列表 Returns: 格式化后的文本 """ data_text = f"## 数据总览\n总视频数: {len(videos)}\n\n## 视频详情\n\n" for i, video in enumerate(videos, 1): data_text += f"### 视频 {i}\n" data_text += f"- 作者: {video.get('author', 'N/A')}\n" data_text += f"- 描述: {video.get('description', 'N/A')}\n" if video.get('duration'): data_text += f"- 时长: {video.get('duration')}\n" if video.get('hot'): data_text += f"- 热度: {video.get('hot')}\n" if video.get('plays'): data_text += f"- 播放量: {video.get('plays')}\n" if video.get('likes'): data_text += f"- 点赞: {video.get('likes')}\n" if video.get('comments'): data_text += f"- 评论: {video.get('comments')}\n" if video.get('hashTags'): data_text += f"- 标签: {', '.join(video.get('hashTags'))}\n" if video.get('hotWords'): data_text += f"- 热词: {', '.join(video.get('hotWords'))}\n" data_text += "\n" return data_text def analyze(self, videos: List[Dict], prompt_file: str = "prompts/analyze_prompt.md", custom_instruction: str = None) -> Dict: """ 分析视频数据 Args: videos: 视频数据列表 prompt_file: 提示词文件路径 custom_instruction: 自定义分析指令(可选) Returns: 分析结果字典 """ try: # 加载提示词 system_prompt = self.load_prompt(prompt_file) # 格式化视频数据 video_data_text = self.format_video_data(videos) # 构建用户消息 user_message = f"{video_data_text}\n\n" if custom_instruction: user_message += f"特殊要求: {custom_instruction}\n\n" user_message += "请根据以上数据进行分析。" # 调用API messages = [ {'role': 'system', 'content': system_prompt}, {'role': 'user', 'content': user_message} ] response = Generation.call( model=self.model, messages=messages, result_format='message' ) if response.status_code == 200: analysis_result = response.output.choices[0].message.content return { "success": True, "analysis": analysis_result, "model": self.model, "video_count": len(videos), "usage": { "input_tokens": response.usage.input_tokens, "output_tokens": response.usage.output_tokens, "total_tokens": response.usage.total_tokens } } else: return { "success": False, "error": f"API调用失败: {response.code} - {response.message}", "video_count": len(videos) } except Exception as e: return { "success": False, "error": f"分析过程出错: {str(e)}", "video_count": len(videos) } def analyze_from_file(self, json_file: str, prompt_file: str = "prompts/analyze_prompt.md", custom_instruction: str = None) -> Dict: """ 从JSON文件读取数据并分析 Args: json_file: JSON数据文件路径 prompt_file: 提示词文件路径 custom_instruction: 自定义分析指令(可选) Returns: 分析结果字典 """ try: with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) videos = data.get('videos', []) if not videos: return { "success": False, "error": "JSON文件中没有视频数据", "video_count": 0 } return self.analyze(videos, prompt_file, custom_instruction) except Exception as e: return { "success": False, "error": f"读取文件失败: {str(e)}", "video_count": 0 } def main(): """测试函数""" import argparse parser = argparse.ArgumentParser(description="AI数据分析工具") parser.add_argument("--file", "-f", required=True, help="JSON数据文件路径") parser.add_argument("--prompt", "-p", default="prompts/analyze_prompt.md", help="提示词文件路径") parser.add_argument("--instruction", "-i", help="自定义分析指令") parser.add_argument("--model", "-m", default="qwen-plus", help="模型名称") parser.add_argument("--output", "-o", help="输出文件路径(可选)") args = parser.parse_args() # 创建分析器 analyzer = AIAnalyzer(model=args.model) # 分析数据 print(f"正在分析文件: {args.file}") print(f"使用模型: {args.model}") print(f"提示词文件: {args.prompt}") print() result = analyzer.analyze_from_file( json_file=args.file, prompt_file=args.prompt, custom_instruction=args.instruction ) if result["success"]: print("=" * 80) print("分析结果:") print("=" * 80) print(result["analysis"]) print() print("=" * 80) print(f"统计信息:") print(f" 视频数量: {result['video_count']}") print(f" 输入Token: {result['usage']['input_tokens']}") print(f" 输出Token: {result['usage']['output_tokens']}") print(f" 总Token: {result['usage']['total_tokens']}") # 保存结果 if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(result["analysis"]) print(f"\n✓ 分析结果已保存到: {args.output}") else: print(f"✗ 分析失败: {result['error']}") if __name__ == "__main__": main()