返回博客

如何使用Python收集YouTube数据以构建RAG系统:解析字幕和元数据

逐步指南:收集YouTube数据以训练RAG系统,包括使用API、解析字幕和元数据,以及设置代理以实现扩展。

📅2026年3月6日
```html

RAG(检索增强生成)系统需要高质量的数据进行训练。YouTube 是一个巨大的结构化内容来源:带有字幕、元数据和评论的视频。在本文中,我们将讨论如何有效地收集 YouTube 数据以用于 RAG,避免封锁并遵守 API 限制。

什么是 RAG,YouTube 数据有什么用

RAG(检索增强生成)是一种构建 AI 系统的方法,其中语言模型通过知识库进行补充。RAG 不仅依赖于模型训练的数据,而是从外部来源提取相关信息并用于生成答案。

YouTube 包含数百万小时的不同语言字幕内容。这使得该平台成为 RAG 系统在各个领域的宝贵数据来源:

  • 教育系统 — 讲座、教程、带时间戳的课程
  • 技术文档 — 编程、DevOps、软件设置的视频指南
  • 医学知识库 — 医生讲座、临床案例分析
  • 商业分析 — 专家访谈、案例研究、市场概述
  • 产品支持 — 产品评测、视频格式的常见问题解答

YouTube 数据的优势在于其结构:带有时间戳的字幕、元数据(类别、标签)、社会背景(评论、点赞)。所有这些都有助于 RAG 系统理解信息的内容和上下文。

哪些 YouTube 数据对 RAG 系统有用

为了有效地工作,RAG 系统需要收集几种类型的数据。每种类型在信息提取和生成过程中都有其特定的任务。

字幕(Transcripts)

主要的文本数据来源。YouTube 提供两种类型的字幕:

  • 自动生成 — 由 Google 的语音识别算法生成。大多数英语和其他流行语言的视频都可以使用。准确率为 85-95%,具体取决于音质。
  • 手动生成 — 由作者或社区上传。更准确,通常包含格式化和额外的上下文。

字幕包括时间戳,这使得文本与视频的特定时刻相联系。这对于在 RAG 答复中创建准确的来源引用至关重要。

视频元数据

元数据帮助 RAG 系统理解信息的上下文和相关性:

数据类型 在 RAG 中的应用
标题和描述 语义搜索,主题识别
标签和类别 内容分类,过滤
发布日期 信息的相关性(对技术主题重要)
时长 评估主题的深入程度
统计数据(观看次数、点赞) 评估来源的质量和受欢迎程度
频道信息 确定来源的权威性

评论

评论包含额外的上下文:观众的问题、作者的澄清、讨论。对于 RAG 系统来说,这非常有价值,因为:

  • 评论通常包含视频主题的常见问题解答
  • 作者可以发布修正和补充
  • 讨论揭示了对问题的不同观点

使用 YouTube Data API v3:设置和限制

YouTube Data API v3 是获取数据的官方方式。它提供对元数据、统计数据和评论的访问。字幕通过单独的方法获取。

获取 API 密钥

使用 API 需要从 Google Cloud Console 获取密钥:

  1. 访问 console.cloud.google.com
  2. 创建新项目或选择现有项目
  3. 在“API 和服务”部分启用 YouTube Data API v3
  4. 创建凭据(Credentials)→ API 密钥
  5. 复制密钥 — 该密钥在所有请求中都需要

限制和配额

YouTube API 使用配额系统。每个请求“消耗”一定数量的单位:

操作 配额成本
搜索视频(search.list) 100 单位
获取视频数据(videos.list) 1 单位
获取评论(commentThreads.list) 1 单位

默认的每日限制为 10,000 单位。这大约相当于 100 个搜索请求或 10,000 个元数据请求。要增加配额,需要向 Google 提交申请。

API 的基本示例

import requests

API_KEY = '你的_api_密钥'
BASE_URL = 'https://www.googleapis.com/youtube/v3'

# 根据查询搜索视频
def search_videos(query, max_results=10):
    url = f'{BASE_URL}/search'
    params = {
        'part': 'snippet',
        'q': query,
        'type': 'video',
        'maxResults': max_results,
        'key': API_KEY
    }
    
    response = requests.get(url, params=params)
    return response.json()

# 获取视频的元数据
def get_video_details(video_id):
    url = f'{BASE_URL}/videos'
    params = {
        'part': 'snippet,contentDetails,statistics',
        'id': video_id,
        'key': API_KEY
    }
    
    response = requests.get(url, params=params)
    return response.json()

# 示例用法
results = search_videos('机器学习教程', max_results=5)
for item in results.get('items', []):
    video_id = item['id']['videoId']
    title = item['snippet']['title']
    print(f'ID: {video_id}, 标题: {title}')
    
    # 获取详细信息
    details = get_video_details(video_id)
    stats = details['items'][0]['statistics']
    print(f"观看次数: {stats.get('viewCount')}, 点赞: {stats.get('likeCount')}")

视频字幕解析:自动和手动

YouTube Data API v3 不提供对字幕的直接访问。获取字幕使用替代方法。

使用 youtube-transcript-api 库

最简单的方法是使用 Python 的 youtube-transcript-api 库。它直接提取字幕,无需 API 密钥:

from youtube_transcript_api import YouTubeTranscriptApi

# 获取字幕
video_id = 'dQw4w9WgXcQ'

try:
    # 尝试获取俄语字幕,如果没有,则获取英语字幕
    transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ru', 'en'])
    
    # 输出带时间戳的字幕
    for entry in transcript:
        start_time = entry['start']
        duration = entry['duration']
        text = entry['text']
        print(f"[{start_time:.2f}s] {text}")
        
except Exception as e:
    print(f"获取字幕时出错: {e}")

# 获取可用语言列表
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
for transcript in transcript_list:
    print(f"语言: {transcript.language}, 自动生成: {transcript.is_generated}")

该库自动识别可用的字幕,并可以将其翻译成其他语言(如果 YouTube 提供此功能)。

处理时间戳以用于 RAG

对于 RAG 系统,保持文本与时间戳的关联非常重要。这使得创建准确的来源引用成为可能:

def format_timestamp(seconds):
    """将秒转换为 MM:SS 格式"""
    minutes = int(seconds // 60)
    secs = int(seconds % 60)
    return f"{minutes:02d}:{secs:02d}"

def create_chunks_with_timestamps(transcript, chunk_size=500):
    """将字幕拆分为带时间戳的块"""
    chunks = []
    current_chunk = ""
    chunk_start_time = 0
    
    for i, entry in enumerate(transcript):
        if len(current_chunk) == 0:
            chunk_start_time = entry['start']
        
        current_chunk += entry['text'] + " "
        
        # 如果达到所需大小或字幕结束
        if len(current_chunk) >= chunk_size or i == len(transcript) - 1:
            chunks.append({
                'text': current_chunk.strip(),
                'start_time': chunk_start_time,
                'timestamp': format_timestamp(chunk_start_time),
                'video_id': video_id
            })
            current_chunk = ""
    
    return chunks

# 使用
transcript = YouTubeTranscriptApi.get_transcript(video_id)
chunks = create_chunks_with_timestamps(transcript)

for chunk in chunks[:3]:  # 前 3 个块
    print(f"[{chunk['timestamp']}] {chunk['text'][:100]}...")

收集元数据:标题、描述、标签

元数据为 RAG 系统提供了丰富的上下文。以下是收集所有必要数据的完整示例:

import requests
from datetime import datetime

def collect_video_metadata(video_id, api_key):
    """收集视频的完整元数据"""
    url = f'https://www.googleapis.com/youtube/v3/videos'
    params = {
        'part': 'snippet,contentDetails,statistics,topicDetails',
        'id': video_id,
        'key': api_key
    }
    
    response = requests.get(url, params=params)
    data = response.json()
    
    if 'items' not in data or len(data['items']) == 0:
        return None
    
    item = data['items'][0]
    snippet = item['snippet']
    stats = item.get('statistics', {})
    content = item.get('contentDetails', {})
    
    metadata = {
        'video_id': video_id,
        'title': snippet['title'],
        'description': snippet['description'],
        'channel_title': snippet['channelTitle'],
        'channel_id': snippet['channelId'],
        'published_at': snippet['publishedAt'],
        'tags': snippet.get('tags', []),
        'category_id': snippet.get('categoryId'),
        'duration': content.get('duration'),
        'view_count': int(stats.get('viewCount', 0)),
        'like_count': int(stats.get('likeCount', 0)),
        'comment_count': int(stats.get('commentCount', 0)),
        'topics': item.get('topicDetails', {}).get('topicCategories', [])
    }
    
    return metadata

# 示例用法
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
print(f"标题: {metadata['title']}")
print(f"频道: {metadata['channel_title']}")
print(f"观看次数: {metadata['view_count']:,}")
print(f"标签: {', '.join(metadata['tags'][:5])}")

确定内容的相关性

对于技术主题,信息的新鲜度非常重要。我们将添加一个评估相关性的函数:

from datetime import datetime, timedelta

def calculate_content_freshness(published_date_str):
    """评估内容的相关性"""
    published_date = datetime.fromisoformat(published_date_str.replace('Z', '+00:00'))
    age_days = (datetime.now(published_date.tzinfo) - published_date).days
    
    if age_days < 30:
        return 'very_fresh'
    elif age_days < 180:
        return 'fresh'
    elif age_days < 365:
        return 'moderate'
    else:
        return 'old'

def calculate_quality_score(metadata):
    """计算来源的质量评分"""
    score = 0
    
    # 受欢迎程度
    views = metadata['view_count']
    if views > 100000:
        score += 3
    elif views > 10000:
        score += 2
    elif views > 1000:
        score += 1
    
    # 参与度(点赞相对于观看次数)
    if views > 0:
        like_ratio = metadata['like_count'] / views
        if like_ratio > 0.05:
            score += 2
        elif like_ratio > 0.02:
            score += 1
    
    # 相关性
    freshness = calculate_content_freshness(metadata['published_at'])
    if freshness == 'very_fresh':
        score += 2
    elif freshness == 'fresh':
        score += 1
    
    return score

# 使用
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
quality = calculate_quality_score(metadata)
freshness = calculate_content_freshness(metadata['published_at'])
print(f"质量评分: {quality}/7")
print(f"相关性: {freshness}")

评论解析以进行上下文分析

评论可能包含有价值的信息:视频中的错误修正、额外资源、常见问题。对于 RAG 系统来说,这是额外的上下文。

def get_video_comments(video_id, api_key, max_results=100):
    """获取视频的评论"""
    url = 'https://www.googleapis.com/youtube/v3/commentThreads'
    comments = []
    next_page_token = None
    
    while len(comments) < max_results:
        params = {
            'part': 'snippet',
            'videoId': video_id,
            'maxResults': min(100, max_results - len(comments)),
            'order': 'relevance',  # 按相关性排序
            'key': api_key
        }
        
        if next_page_token:
            params['pageToken'] = next_page_token
        
        response = requests.get(url, params=params)
        data = response.json()
        
        if 'items' not in data:
            break
        
        for item in data['items']:
            top_comment = item['snippet']['topLevelComment']['snippet']
            comments.append({
                'author': top_comment['authorDisplayName'],
                'text': top_comment['textDisplay'],
                'like_count': top_comment['likeCount'],
                'published_at': top_comment['publishedAt'],
                'reply_count': item['snippet']['totalReplyCount']
            })
        
        next_page_token = data.get('nextPageToken')
        if not next_page_token:
            break
    
    return comments

def filter_valuable_comments(comments, min_likes=5):
    """过滤有价值的评论"""
    valuable = []
    
    for comment in comments:
        # 价值标准:
        # 1. 很多点赞(受欢迎程度)
        # 2. 有回复(引发讨论)
        # 3. 长文本(详细评论)
        
        if (comment['like_count'] >= min_likes or 
            comment['reply_count'] > 0 or 
            len(comment['text']) > 200):
            valuable.append(comment)
    
    return valuable

# 使用
comments = get_video_comments('dQw4w9WgXcQ', API_KEY, max_results=50)
valuable_comments = filter_valuable_comments(comments)

print(f"总评论数: {len(comments)}")
print(f"有价值的评论数: {len(valuable_comments)}")

for comment in valuable_comments[:3]:
    print(f"\n[{comment['like_count']} 点赞] {comment['author']}:")
    print(comment['text'][:200])

使用代理扩展数据收集

在大规模数据收集时会出现两个问题:YouTube API 的限制(每天 10,000 配额)和在解析字幕时的封锁。代理可以帮助解决这两个问题。

何时需要 YouTube 解析的代理

  • 超出 API 配额 — 通过不同 IP 使用多个 API 密钥,可以增加每日限制
  • 绕过 API 解析字幕 — youtube-transcript-api 库进行直接请求,频率过高可能会被封锁
  • 从不同地区收集数据 — 某些视频仅在特定国家可用
  • 并行收集 — 将负载分配到多个 IP 以加快过程

选择代理类型

代理类型 优点 何时使用
数据中心 高速,低成本 与 API 交互,小数据量
住宅 低封锁风险,真实 IP 大规模解析字幕,绕过限制
移动 最大信任,少量封锁 从移动应用收集数据,关键任务

对于大多数 RAG 系统任务,住宅代理 是合适的选择 — 它们在大规模解析时提供了成本和可靠性之间的平衡。

在代码中设置代理

import requests
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._api import TranscriptListFetcher

# 设置代理
PROXY = {
    'http': 'http://username:password@proxy-server:port',
    'https': 'http://username:password@proxy-server:port'
}

# 通过代理获取 API 数据
def get_video_details_with_proxy(video_id, api_key, proxy):
    url = f'https://www.googleapis.com/youtube/v3/videos'
    params = {
        'part': 'snippet,statistics',
        'id': video_id,
        'key': api_key
    }
    
    response = requests.get(url, params=params, proxies=proxy, timeout=10)
    return response.json()

# 通过代理解析字幕
class ProxiedTranscriptApi:
    def __init__(self, proxy):
        self.proxy = proxy
    
    def get_transcript(self, video_id, languages=['en']):
        # 创建带代理的自定义会话
        session = requests.Session()
        session.proxies = self.proxy
        
        # 使用会话进行请求
        fetcher = TranscriptListFetcher(session)
        transcript_list = fetcher.fetch(video_id)
        
        # 获取所需语言
        for lang in languages:
            try:
                transcript = transcript_list.find_transcript([lang])
                return transcript.fetch()
            except:
                continue
        
        raise Exception(f"未找到语言的字幕: {languages}")

# 使用
api = ProxiedTranscriptApi(PROXY)
transcript = api.get_transcript('dQw4w9WgXcQ', languages=['ru', 'en'])
print(f"获取了 {len(transcript)} 段字幕")

代理轮换以进行扩展

在从数千个视频收集数据时,重要的是将负载分配到多个代理:

import random
import time

class ProxyRotator:
    def __init__(self, proxy_list):
        self.proxies = proxy_list
        self.current_index = 0
    
    def get_next_proxy(self):
        """顺序轮换"""
        proxy = self.proxies[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.proxies)
        return proxy
    
    def get_random_proxy(self):
        """随机轮换"""
        return random.choice(self.proxies)

# 代理列表
PROXY_LIST = [
    {'http': 'http://user:pass@proxy1:port', 'https': 'http://user:pass@proxy1:port'},
    {'http': 'http://user:pass@proxy2:port', 'https': 'http://user:pass@proxy2:port'},
    {'http': 'http://user:pass@proxy3:port', 'https': 'http://user:pass@proxy3:port'},
]

rotator = ProxyRotator(PROXY_LIST)

def collect_data_with_rotation(video_ids):
    results = []
    
    for video_id in video_ids:
        proxy = rotator.get_next_proxy()
        
        try:
            # 获取元数据
            metadata = get_video_details_with_proxy(video_id, API_KEY, proxy)
            
            # 获取字幕
            api = ProxiedTranscriptApi(proxy)
            transcript = api.get_transcript(video_id)
            
            results.append({
                'video_id': video_id,
                'metadata': metadata,
                'transcript': transcript
            })
            
            # 请求之间的延迟
            time.sleep(1)
            
        except Exception as e:
            print(f"{video_id} 的错误: {e}")
            continue
    
    return results

# 使用
video_ids = ['video1', 'video2', 'video3', 'video4', 'video5']
data = collect_data_with_rotation(video_ids)
print(f"收集到 {len(data)} 个视频的数据")

处理和准备 RAG 数据

收集数据后,需要对其进行处理和结构化,以便 RAG 系统有效工作。

创建向量嵌入

RAG 系统使用向量搜索来查找相关片段。需要将文本转换为嵌入:

from sentence_transformers import SentenceTransformer
import numpy as np

# 加载用于创建嵌入的模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

def create_embeddings_from_transcript(transcript_chunks):
    """为字幕块创建嵌入"""
    embeddings = []
    
    for chunk in transcript_chunks:
        # 将文本与元数据结合以提供更好的上下文
        text_with_context = f"{chunk['title']} | {chunk['text']}"
        
        # 创建嵌入
        embedding = model.encode(text_with_context)
        
        embeddings.append({
            'video_id': chunk['video_id'],
            'timestamp': chunk['timestamp'],
            'text': chunk['text'],
            'embedding': embedding.tolist(),
            'metadata': {
                'title': chunk['title'],
                'channel': chunk['channel'],
                'views': chunk['views']
            }
        })
    
    return embeddings

# 准备数据
def prepare_rag_data(video_data):
    """准备所有数据以用于 RAG"""
    all_chunks = []
    
    for video in video_data:
        metadata = video['metadata']
        transcript = video['transcript']
        
        # 将字幕拆分为块
        chunks = create_chunks_with_timestamps(transcript)
        
        # 将元数据添加到每个块
        for chunk in chunks:
            chunk['title'] = metadata['title']
            chunk['channel'] = metadata['channel_title']
            chunk['views'] = metadata['view_count']
            all_chunks.append(chunk)
    
    # 创建嵌入
    embeddings = create_embeddings_from_transcript(all_chunks)
    
    return embeddings

# 使用
rag_data = prepare_rag_data(collected_videos)
print(f"为 RAG 准备了 {len(rag_data)} 个片段")

保存到向量数据库

为了有效搜索,嵌入被保存到专用数据库中。流行的选项包括:Pinecone、Weaviate、Qdrant、ChromaDB。

import chromadb
from chromadb.config import Settings

# 初始化 ChromaDB(本地向量数据库)
client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./youtube_rag_db"
))

# 创建集合
collection = client.create_collection(
    name="youtube_transcripts",
    metadata={"description": "YouTube 视频字幕用于 RAG"}
)

def store_in_vector_db(embeddings_data, collection):
    """将嵌入保存到向量数据库中"""
    
    ids = []
    embeddings = []
    documents = []
    metadatas = []
    
    for i, item in enumerate(embeddings_data):
        ids.append(f"{item['video_id']}_{i}")
        embeddings.append(item['embedding'])
        documents.append(item['text'])
        metadatas.append({
            'video_id': item['video_id'],
            'timestamp': item['timestamp'],
            'title': item['metadata']['title'],
            'channel': item['metadata']['channel'],
            'views': str(item['metadata']['views']),
            'youtube_url': f"https://youtube.com/watch?v={item['video_id']}&t={int(float(item['timestamp'].split(':')[0])*60 + float(item['timestamp'].split(':')[1]))}s"
        })
    
    # 添加到集合中
    collection.add(
        ids=ids,
        embeddings=embeddings,
        documents=documents,
        metadatas=metadatas
    )
    
    print(f"已保存 {len(ids)} 个嵌入到向量数据库中")

# 使用
store_in_vector_db(rag_data, collection)

搜索和生成答案

最后一步是实现 RAG 搜索和生成答案:

def search_youtube_knowledge(query, collection, model, top_k=3):
    """从 YouTube 中搜索相关片段"""
    
    # 创建查询的嵌入
    query_embedding = model.encode(query).tolist()
    
    # 在向量数据库中搜索
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k
    )
    
    # 格式化结果
    sources = []
    for i in range(len(results['ids'][0])):
        sources.append({
            'text': results['documents'][0][i],
            'metadata': results['metadatas'][0][i],
            'distance': results['distances'][0][i] if 'distances' in results else None
        })
    
    return sources

def generate_rag_answer(query, sources, llm_api_key):
    """基于找到的来源生成答案"""
    
    # 从找到的来源中形成上下文
    context = "\n\n".join([
        f"来源: {s['metadata']['title']} ({s['metadata']['timestamp']})\n{s['text']}"
        for s in sources
    ])
    
    # LLM 的提示
    prompt = f"""根据以下来自 YouTube 视频的片段回答用户的问题。
务必提供带时间戳的来源。

上下文:
{context}

问题: {query}

答案:"""
    
    # 这里调用你的 LLM(OpenAI、Claude、本地模型)
    # 示例使用 OpenAI:
    # response = openai.ChatCompletion.create(
    #     model="gpt-4",
    #     messages=[{"role": "user", "content": prompt}]
    # )
    # answer = response.choices[0].message.content
    
    # 示例返回提示
    return {
        'answer': '这里将是 LLM 的答案',
        'sources': sources
    }

# 使用
query = "如何在 Python 中设置代理?"
sources = search_youtube_knowledge(query, collection, model, top_k=3)

print("找到的来源:")
for source in sources:
    print(f"\n{source['metadata']['title']}")
    print(f"时间: {source['metadata']['timestamp']}")
    print(f"链接: {source['metadata']['youtube_url']}")
    print(f"文本: {source['text'][:200]}...")

优化 RAG 的质量

提高 YouTube 数据 RAG 系统质量的几个建议:

  • 过滤低质量内容 — 使用观看次数、点赞、发布时间等指标
  • 保持上下文 — 将视频和频道的名称添加到每个文本块中
  • 优化块的大小 — 对于技术内容,300-500 字是最佳选择
  • 使用元数据进行排名 — 更新和受欢迎的内容可以优先考虑
  • 添加评论 — 它们通常包含重要的澄清和常见问题解答
  • 检查视频的可用性 — 某些视频可能已被删除或变为私有

建议: 对于大规模数据收集,建议结合使用官方 API(用于元数据)和通过代理解析(用于字幕)。这可以绕过限制并获取最多的信息。

结论

为 RAG 系统收集 YouTube 数据是一个多步骤的过程,包括与 API 的交互、解析字幕、处理元数据和创建向量嵌入。关键点:

  • YouTube Data API v3 提供元数据、统计数据和评论,每天限制为 10,000 配额
  • 字幕通过 youtube-transcript-api 库或直接请求进行解析
  • 时间戳对创建准确的来源引用至关重要
  • 元数据(观看次数、点赞、日期)有助于评估内容的质量和相关性
  • 评论提供上下文,通常包含常见问题解答
  • 代理对于扩展和绕过限制是必要的
  • 向量嵌入和专用数据库提供快速的语义搜索

通过正确设置流程,可以每天收集数万条高质量内容片段,为任何主题领域的 RAG 系统创建强大的知识库。

如果您计划进行大规模的 YouTube 数据收集以绕过限制和封锁,建议使用 住宅代理 — 它们在解析数千个视频时提供稳定性,并最小化 YouTube 封锁的风险。

```