RAG(检索增强生成)系统需要高质量的数据进行训练。YouTube 是一个巨大的结构化内容来源:带有字幕、元数据和评论的视频。在本文中,我们将讨论如何有效地收集 YouTube 数据以用于 RAG,避免封锁并遵守 API 限制。
什么是 RAG,YouTube 数据有什么用
RAG(检索增强生成)是一种构建 AI 系统的方法,其中语言模型通过知识库进行补充。RAG 不仅依赖于模型训练的数据,而是从外部来源提取相关信息并用于生成答案。
YouTube 包含数百万小时的不同语言字幕内容。这使得该平台成为 RAG 系统在各个领域的宝贵数据来源:
- 教育系统 — 讲座、教程、带时间戳的课程
- 技术文档 — 编程、DevOps、软件设置的视频指南
- 医学知识库 — 医生讲座、临床案例分析
- 商业分析 — 专家访谈、案例研究、市场概述
- 产品支持 — 产品评测、视频格式的常见问题解答
YouTube 数据的优势在于其结构:带有时间戳的字幕、元数据(类别、标签)、社会背景(评论、点赞)。所有这些都有助于 RAG 系统理解信息的内容和上下文。
哪些 YouTube 数据对 RAG 系统有用
为了有效地工作,RAG 系统需要收集几种类型的数据。每种类型在信息提取和生成过程中都有其特定的任务。
字幕(Transcripts)
主要的文本数据来源。YouTube 提供两种类型的字幕:
- 自动生成 — 由 Google 的语音识别算法生成。大多数英语和其他流行语言的视频都可以使用。准确率为 85-95%,具体取决于音质。
- 手动生成 — 由作者或社区上传。更准确,通常包含格式化和额外的上下文。
字幕包括时间戳,这使得文本与视频的特定时刻相联系。这对于在 RAG 答复中创建准确的来源引用至关重要。
视频元数据
元数据帮助 RAG 系统理解信息的上下文和相关性:
| 数据类型 | 在 RAG 中的应用 |
|---|---|
| 标题和描述 | 语义搜索,主题识别 |
| 标签和类别 | 内容分类,过滤 |
| 发布日期 | 信息的相关性(对技术主题重要) |
| 时长 | 评估主题的深入程度 |
| 统计数据(观看次数、点赞) | 评估来源的质量和受欢迎程度 |
| 频道信息 | 确定来源的权威性 |
评论
评论包含额外的上下文:观众的问题、作者的澄清、讨论。对于 RAG 系统来说,这非常有价值,因为:
- 评论通常包含视频主题的常见问题解答
- 作者可以发布修正和补充
- 讨论揭示了对问题的不同观点
使用 YouTube Data API v3:设置和限制
YouTube Data API v3 是获取数据的官方方式。它提供对元数据、统计数据和评论的访问。字幕通过单独的方法获取。
获取 API 密钥
使用 API 需要从 Google Cloud Console 获取密钥:
- 访问 console.cloud.google.com
- 创建新项目或选择现有项目
- 在“API 和服务”部分启用 YouTube Data API v3
- 创建凭据(Credentials)→ API 密钥
- 复制密钥 — 该密钥在所有请求中都需要
限制和配额
YouTube API 使用配额系统。每个请求“消耗”一定数量的单位:
| 操作 | 配额成本 |
|---|---|
| 搜索视频(search.list) | 100 单位 |
| 获取视频数据(videos.list) | 1 单位 |
| 获取评论(commentThreads.list) | 1 单位 |
默认的每日限制为 10,000 单位。这大约相当于 100 个搜索请求或 10,000 个元数据请求。要增加配额,需要向 Google 提交申请。
API 的基本示例
import requests
API_KEY = '你的_api_密钥'
BASE_URL = 'https://www.googleapis.com/youtube/v3'
# 根据查询搜索视频
def search_videos(query, max_results=10):
url = f'{BASE_URL}/search'
params = {
'part': 'snippet',
'q': query,
'type': 'video',
'maxResults': max_results,
'key': API_KEY
}
response = requests.get(url, params=params)
return response.json()
# 获取视频的元数据
def get_video_details(video_id):
url = f'{BASE_URL}/videos'
params = {
'part': 'snippet,contentDetails,statistics',
'id': video_id,
'key': API_KEY
}
response = requests.get(url, params=params)
return response.json()
# 示例用法
results = search_videos('机器学习教程', max_results=5)
for item in results.get('items', []):
video_id = item['id']['videoId']
title = item['snippet']['title']
print(f'ID: {video_id}, 标题: {title}')
# 获取详细信息
details = get_video_details(video_id)
stats = details['items'][0]['statistics']
print(f"观看次数: {stats.get('viewCount')}, 点赞: {stats.get('likeCount')}")
视频字幕解析:自动和手动
YouTube Data API v3 不提供对字幕的直接访问。获取字幕使用替代方法。
使用 youtube-transcript-api 库
最简单的方法是使用 Python 的 youtube-transcript-api 库。它直接提取字幕,无需 API 密钥:
from youtube_transcript_api import YouTubeTranscriptApi
# 获取字幕
video_id = 'dQw4w9WgXcQ'
try:
# 尝试获取俄语字幕,如果没有,则获取英语字幕
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ru', 'en'])
# 输出带时间戳的字幕
for entry in transcript:
start_time = entry['start']
duration = entry['duration']
text = entry['text']
print(f"[{start_time:.2f}s] {text}")
except Exception as e:
print(f"获取字幕时出错: {e}")
# 获取可用语言列表
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
for transcript in transcript_list:
print(f"语言: {transcript.language}, 自动生成: {transcript.is_generated}")
该库自动识别可用的字幕,并可以将其翻译成其他语言(如果 YouTube 提供此功能)。
处理时间戳以用于 RAG
对于 RAG 系统,保持文本与时间戳的关联非常重要。这使得创建准确的来源引用成为可能:
def format_timestamp(seconds):
"""将秒转换为 MM:SS 格式"""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
def create_chunks_with_timestamps(transcript, chunk_size=500):
"""将字幕拆分为带时间戳的块"""
chunks = []
current_chunk = ""
chunk_start_time = 0
for i, entry in enumerate(transcript):
if len(current_chunk) == 0:
chunk_start_time = entry['start']
current_chunk += entry['text'] + " "
# 如果达到所需大小或字幕结束
if len(current_chunk) >= chunk_size or i == len(transcript) - 1:
chunks.append({
'text': current_chunk.strip(),
'start_time': chunk_start_time,
'timestamp': format_timestamp(chunk_start_time),
'video_id': video_id
})
current_chunk = ""
return chunks
# 使用
transcript = YouTubeTranscriptApi.get_transcript(video_id)
chunks = create_chunks_with_timestamps(transcript)
for chunk in chunks[:3]: # 前 3 个块
print(f"[{chunk['timestamp']}] {chunk['text'][:100]}...")
收集元数据:标题、描述、标签
元数据为 RAG 系统提供了丰富的上下文。以下是收集所有必要数据的完整示例:
import requests
from datetime import datetime
def collect_video_metadata(video_id, api_key):
"""收集视频的完整元数据"""
url = f'https://www.googleapis.com/youtube/v3/videos'
params = {
'part': 'snippet,contentDetails,statistics,topicDetails',
'id': video_id,
'key': api_key
}
response = requests.get(url, params=params)
data = response.json()
if 'items' not in data or len(data['items']) == 0:
return None
item = data['items'][0]
snippet = item['snippet']
stats = item.get('statistics', {})
content = item.get('contentDetails', {})
metadata = {
'video_id': video_id,
'title': snippet['title'],
'description': snippet['description'],
'channel_title': snippet['channelTitle'],
'channel_id': snippet['channelId'],
'published_at': snippet['publishedAt'],
'tags': snippet.get('tags', []),
'category_id': snippet.get('categoryId'),
'duration': content.get('duration'),
'view_count': int(stats.get('viewCount', 0)),
'like_count': int(stats.get('likeCount', 0)),
'comment_count': int(stats.get('commentCount', 0)),
'topics': item.get('topicDetails', {}).get('topicCategories', [])
}
return metadata
# 示例用法
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
print(f"标题: {metadata['title']}")
print(f"频道: {metadata['channel_title']}")
print(f"观看次数: {metadata['view_count']:,}")
print(f"标签: {', '.join(metadata['tags'][:5])}")
确定内容的相关性
对于技术主题,信息的新鲜度非常重要。我们将添加一个评估相关性的函数:
from datetime import datetime, timedelta
def calculate_content_freshness(published_date_str):
"""评估内容的相关性"""
published_date = datetime.fromisoformat(published_date_str.replace('Z', '+00:00'))
age_days = (datetime.now(published_date.tzinfo) - published_date).days
if age_days < 30:
return 'very_fresh'
elif age_days < 180:
return 'fresh'
elif age_days < 365:
return 'moderate'
else:
return 'old'
def calculate_quality_score(metadata):
"""计算来源的质量评分"""
score = 0
# 受欢迎程度
views = metadata['view_count']
if views > 100000:
score += 3
elif views > 10000:
score += 2
elif views > 1000:
score += 1
# 参与度(点赞相对于观看次数)
if views > 0:
like_ratio = metadata['like_count'] / views
if like_ratio > 0.05:
score += 2
elif like_ratio > 0.02:
score += 1
# 相关性
freshness = calculate_content_freshness(metadata['published_at'])
if freshness == 'very_fresh':
score += 2
elif freshness == 'fresh':
score += 1
return score
# 使用
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
quality = calculate_quality_score(metadata)
freshness = calculate_content_freshness(metadata['published_at'])
print(f"质量评分: {quality}/7")
print(f"相关性: {freshness}")
评论解析以进行上下文分析
评论可能包含有价值的信息:视频中的错误修正、额外资源、常见问题。对于 RAG 系统来说,这是额外的上下文。
def get_video_comments(video_id, api_key, max_results=100):
"""获取视频的评论"""
url = 'https://www.googleapis.com/youtube/v3/commentThreads'
comments = []
next_page_token = None
while len(comments) < max_results:
params = {
'part': 'snippet',
'videoId': video_id,
'maxResults': min(100, max_results - len(comments)),
'order': 'relevance', # 按相关性排序
'key': api_key
}
if next_page_token:
params['pageToken'] = next_page_token
response = requests.get(url, params=params)
data = response.json()
if 'items' not in data:
break
for item in data['items']:
top_comment = item['snippet']['topLevelComment']['snippet']
comments.append({
'author': top_comment['authorDisplayName'],
'text': top_comment['textDisplay'],
'like_count': top_comment['likeCount'],
'published_at': top_comment['publishedAt'],
'reply_count': item['snippet']['totalReplyCount']
})
next_page_token = data.get('nextPageToken')
if not next_page_token:
break
return comments
def filter_valuable_comments(comments, min_likes=5):
"""过滤有价值的评论"""
valuable = []
for comment in comments:
# 价值标准:
# 1. 很多点赞(受欢迎程度)
# 2. 有回复(引发讨论)
# 3. 长文本(详细评论)
if (comment['like_count'] >= min_likes or
comment['reply_count'] > 0 or
len(comment['text']) > 200):
valuable.append(comment)
return valuable
# 使用
comments = get_video_comments('dQw4w9WgXcQ', API_KEY, max_results=50)
valuable_comments = filter_valuable_comments(comments)
print(f"总评论数: {len(comments)}")
print(f"有价值的评论数: {len(valuable_comments)}")
for comment in valuable_comments[:3]:
print(f"\n[{comment['like_count']} 点赞] {comment['author']}:")
print(comment['text'][:200])
使用代理扩展数据收集
在大规模数据收集时会出现两个问题:YouTube API 的限制(每天 10,000 配额)和在解析字幕时的封锁。代理可以帮助解决这两个问题。
何时需要 YouTube 解析的代理
- 超出 API 配额 — 通过不同 IP 使用多个 API 密钥,可以增加每日限制
- 绕过 API 解析字幕 — youtube-transcript-api 库进行直接请求,频率过高可能会被封锁
- 从不同地区收集数据 — 某些视频仅在特定国家可用
- 并行收集 — 将负载分配到多个 IP 以加快过程
选择代理类型
| 代理类型 | 优点 | 何时使用 |
|---|---|---|
| 数据中心 | 高速,低成本 | 与 API 交互,小数据量 |
| 住宅 | 低封锁风险,真实 IP | 大规模解析字幕,绕过限制 |
| 移动 | 最大信任,少量封锁 | 从移动应用收集数据,关键任务 |
对于大多数 RAG 系统任务,住宅代理 是合适的选择 — 它们在大规模解析时提供了成本和可靠性之间的平衡。
在代码中设置代理
import requests
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._api import TranscriptListFetcher
# 设置代理
PROXY = {
'http': 'http://username:password@proxy-server:port',
'https': 'http://username:password@proxy-server:port'
}
# 通过代理获取 API 数据
def get_video_details_with_proxy(video_id, api_key, proxy):
url = f'https://www.googleapis.com/youtube/v3/videos'
params = {
'part': 'snippet,statistics',
'id': video_id,
'key': api_key
}
response = requests.get(url, params=params, proxies=proxy, timeout=10)
return response.json()
# 通过代理解析字幕
class ProxiedTranscriptApi:
def __init__(self, proxy):
self.proxy = proxy
def get_transcript(self, video_id, languages=['en']):
# 创建带代理的自定义会话
session = requests.Session()
session.proxies = self.proxy
# 使用会话进行请求
fetcher = TranscriptListFetcher(session)
transcript_list = fetcher.fetch(video_id)
# 获取所需语言
for lang in languages:
try:
transcript = transcript_list.find_transcript([lang])
return transcript.fetch()
except:
continue
raise Exception(f"未找到语言的字幕: {languages}")
# 使用
api = ProxiedTranscriptApi(PROXY)
transcript = api.get_transcript('dQw4w9WgXcQ', languages=['ru', 'en'])
print(f"获取了 {len(transcript)} 段字幕")
代理轮换以进行扩展
在从数千个视频收集数据时,重要的是将负载分配到多个代理:
import random
import time
class ProxyRotator:
def __init__(self, proxy_list):
self.proxies = proxy_list
self.current_index = 0
def get_next_proxy(self):
"""顺序轮换"""
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def get_random_proxy(self):
"""随机轮换"""
return random.choice(self.proxies)
# 代理列表
PROXY_LIST = [
{'http': 'http://user:pass@proxy1:port', 'https': 'http://user:pass@proxy1:port'},
{'http': 'http://user:pass@proxy2:port', 'https': 'http://user:pass@proxy2:port'},
{'http': 'http://user:pass@proxy3:port', 'https': 'http://user:pass@proxy3:port'},
]
rotator = ProxyRotator(PROXY_LIST)
def collect_data_with_rotation(video_ids):
results = []
for video_id in video_ids:
proxy = rotator.get_next_proxy()
try:
# 获取元数据
metadata = get_video_details_with_proxy(video_id, API_KEY, proxy)
# 获取字幕
api = ProxiedTranscriptApi(proxy)
transcript = api.get_transcript(video_id)
results.append({
'video_id': video_id,
'metadata': metadata,
'transcript': transcript
})
# 请求之间的延迟
time.sleep(1)
except Exception as e:
print(f"{video_id} 的错误: {e}")
continue
return results
# 使用
video_ids = ['video1', 'video2', 'video3', 'video4', 'video5']
data = collect_data_with_rotation(video_ids)
print(f"收集到 {len(data)} 个视频的数据")
处理和准备 RAG 数据
收集数据后,需要对其进行处理和结构化,以便 RAG 系统有效工作。
创建向量嵌入
RAG 系统使用向量搜索来查找相关片段。需要将文本转换为嵌入:
from sentence_transformers import SentenceTransformer
import numpy as np
# 加载用于创建嵌入的模型
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def create_embeddings_from_transcript(transcript_chunks):
"""为字幕块创建嵌入"""
embeddings = []
for chunk in transcript_chunks:
# 将文本与元数据结合以提供更好的上下文
text_with_context = f"{chunk['title']} | {chunk['text']}"
# 创建嵌入
embedding = model.encode(text_with_context)
embeddings.append({
'video_id': chunk['video_id'],
'timestamp': chunk['timestamp'],
'text': chunk['text'],
'embedding': embedding.tolist(),
'metadata': {
'title': chunk['title'],
'channel': chunk['channel'],
'views': chunk['views']
}
})
return embeddings
# 准备数据
def prepare_rag_data(video_data):
"""准备所有数据以用于 RAG"""
all_chunks = []
for video in video_data:
metadata = video['metadata']
transcript = video['transcript']
# 将字幕拆分为块
chunks = create_chunks_with_timestamps(transcript)
# 将元数据添加到每个块
for chunk in chunks:
chunk['title'] = metadata['title']
chunk['channel'] = metadata['channel_title']
chunk['views'] = metadata['view_count']
all_chunks.append(chunk)
# 创建嵌入
embeddings = create_embeddings_from_transcript(all_chunks)
return embeddings
# 使用
rag_data = prepare_rag_data(collected_videos)
print(f"为 RAG 准备了 {len(rag_data)} 个片段")
保存到向量数据库
为了有效搜索,嵌入被保存到专用数据库中。流行的选项包括:Pinecone、Weaviate、Qdrant、ChromaDB。
import chromadb
from chromadb.config import Settings
# 初始化 ChromaDB(本地向量数据库)
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./youtube_rag_db"
))
# 创建集合
collection = client.create_collection(
name="youtube_transcripts",
metadata={"description": "YouTube 视频字幕用于 RAG"}
)
def store_in_vector_db(embeddings_data, collection):
"""将嵌入保存到向量数据库中"""
ids = []
embeddings = []
documents = []
metadatas = []
for i, item in enumerate(embeddings_data):
ids.append(f"{item['video_id']}_{i}")
embeddings.append(item['embedding'])
documents.append(item['text'])
metadatas.append({
'video_id': item['video_id'],
'timestamp': item['timestamp'],
'title': item['metadata']['title'],
'channel': item['metadata']['channel'],
'views': str(item['metadata']['views']),
'youtube_url': f"https://youtube.com/watch?v={item['video_id']}&t={int(float(item['timestamp'].split(':')[0])*60 + float(item['timestamp'].split(':')[1]))}s"
})
# 添加到集合中
collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas
)
print(f"已保存 {len(ids)} 个嵌入到向量数据库中")
# 使用
store_in_vector_db(rag_data, collection)
搜索和生成答案
最后一步是实现 RAG 搜索和生成答案:
def search_youtube_knowledge(query, collection, model, top_k=3):
"""从 YouTube 中搜索相关片段"""
# 创建查询的嵌入
query_embedding = model.encode(query).tolist()
# 在向量数据库中搜索
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
# 格式化结果
sources = []
for i in range(len(results['ids'][0])):
sources.append({
'text': results['documents'][0][i],
'metadata': results['metadatas'][0][i],
'distance': results['distances'][0][i] if 'distances' in results else None
})
return sources
def generate_rag_answer(query, sources, llm_api_key):
"""基于找到的来源生成答案"""
# 从找到的来源中形成上下文
context = "\n\n".join([
f"来源: {s['metadata']['title']} ({s['metadata']['timestamp']})\n{s['text']}"
for s in sources
])
# LLM 的提示
prompt = f"""根据以下来自 YouTube 视频的片段回答用户的问题。
务必提供带时间戳的来源。
上下文:
{context}
问题: {query}
答案:"""
# 这里调用你的 LLM(OpenAI、Claude、本地模型)
# 示例使用 OpenAI:
# response = openai.ChatCompletion.create(
# model="gpt-4",
# messages=[{"role": "user", "content": prompt}]
# )
# answer = response.choices[0].message.content
# 示例返回提示
return {
'answer': '这里将是 LLM 的答案',
'sources': sources
}
# 使用
query = "如何在 Python 中设置代理?"
sources = search_youtube_knowledge(query, collection, model, top_k=3)
print("找到的来源:")
for source in sources:
print(f"\n{source['metadata']['title']}")
print(f"时间: {source['metadata']['timestamp']}")
print(f"链接: {source['metadata']['youtube_url']}")
print(f"文本: {source['text'][:200]}...")
优化 RAG 的质量
提高 YouTube 数据 RAG 系统质量的几个建议:
- 过滤低质量内容 — 使用观看次数、点赞、发布时间等指标
- 保持上下文 — 将视频和频道的名称添加到每个文本块中
- 优化块的大小 — 对于技术内容,300-500 字是最佳选择
- 使用元数据进行排名 — 更新和受欢迎的内容可以优先考虑
- 添加评论 — 它们通常包含重要的澄清和常见问题解答
- 检查视频的可用性 — 某些视频可能已被删除或变为私有
建议: 对于大规模数据收集,建议结合使用官方 API(用于元数据)和通过代理解析(用于字幕)。这可以绕过限制并获取最多的信息。
结论
为 RAG 系统收集 YouTube 数据是一个多步骤的过程,包括与 API 的交互、解析字幕、处理元数据和创建向量嵌入。关键点:
- YouTube Data API v3 提供元数据、统计数据和评论,每天限制为 10,000 配额
- 字幕通过 youtube-transcript-api 库或直接请求进行解析
- 时间戳对创建准确的来源引用至关重要
- 元数据(观看次数、点赞、日期)有助于评估内容的质量和相关性
- 评论提供上下文,通常包含常见问题解答
- 代理对于扩展和绕过限制是必要的
- 向量嵌入和专用数据库提供快速的语义搜索
通过正确设置流程,可以每天收集数万条高质量内容片段,为任何主题领域的 RAG 系统创建强大的知识库。
如果您计划进行大规模的 YouTube 数据收集以绕过限制和封锁,建议使用 住宅代理 — 它们在解析数千个视频时提供稳定性,并最小化 YouTube 封锁的风险。