Zurück zum Blog

Daten von YouTube für RAG-Systeme sammeln: Untertitel und Metadaten mit Python extrahieren

Schritt-für-Schritt-Anleitung zur Datensammlung von YouTube für das Training von RAG-Systemen: Arbeiten mit der API, Parsing von Untertiteln und Metadaten, Proxy-Einrichtung zur Skalierung.

📅6. März 2026

RAG (Retrieval-Augmented Generation) Systeme benötigen qualitativ hochwertige Daten für das Training. YouTube ist eine riesige Quelle für strukturierten Inhalt: Videos mit Untertiteln, Metadaten und Kommentaren. In diesem Artikel werden wir untersuchen, wie man YouTube-Daten effektiv für RAG sammelt, Blockierungen vermeidet und die API-Limits einhält.

Was ist RAG und warum sind YouTube-Daten wichtig

RAG (Retrieval-Augmented Generation) ist ein Ansatz zur Erstellung von KI-Systemen, bei dem ein Sprachmodell mit einer Wissensdatenbank ergänzt wird. Anstatt sich nur auf die Daten zu verlassen, auf denen das Modell trainiert wurde, extrahiert RAG relevante Informationen aus einer externen Quelle und verwendet diese zur Generierung von Antworten.

YouTube enthält Millionen von Stunden an Inhalten mit Untertiteln in verschiedenen Sprachen. Dies macht die Plattform zu einer wertvollen Datenquelle für RAG-Systeme in verschiedenen Bereichen:

  • Bildungssysteme — Vorlesungen, Tutorials, Kurse mit Zeitstempeln
  • Technische Dokumentation — Videoanleitungen zu Programmierung, DevOps, Softwarekonfiguration
  • Medizinische Wissensdatenbanken — Vorlesungen von Ärzten, Fallstudien
  • Business-Analytik — Interviews mit Experten, Fallstudien, Marktanalysen
  • Produktunterstützung — Produktbewertungen, FAQs im Videoformat

Der Vorteil von YouTube-Daten liegt in der vorhandenen Struktur: Untertitel mit Zeitstempeln, Metadaten (Kategorien, Tags), sozialer Kontext (Kommentare, Likes). All dies hilft dem RAG-System, nicht nur den Inhalt, sondern auch den Kontext der Informationen zu verstehen.

Welche YouTube-Daten sind nützlich für RAG-Systeme

Für die effektive Arbeit eines RAG-Systems müssen mehrere Datentypen gesammelt werden. Jeder Typ erfüllt seine eigenen Aufgaben im Prozess der Informationsbeschaffung und -generierung.

Untertitel (Transcripts)

Die Hauptquelle für Textdaten. YouTube bietet zwei Arten von Untertiteln an:

  • Automatische — werden von den Spracherkennungsalgorithmen von Google generiert. Verfügbar für die meisten Videos in Englisch und anderen beliebten Sprachen. Genauigkeit von 85-95% je nach Audioqualität.
  • Manuelle — werden von den Autoren oder der Community hochgeladen. Genauer, enthalten oft Formatierungen und zusätzlichen Kontext.

Untertitel enthalten Zeitstempel, was es ermöglicht, den Text mit bestimmten Momenten im Video zu verknüpfen. Dies ist entscheidend für die Erstellung genauer Verweise auf Quellen in RAG-Antworten.

Metadaten von Videos

Metadaten helfen dem RAG-System, den Kontext und die Relevanz von Informationen zu verstehen:

Datentyp Anwendung in RAG
Titel und Beschreibung Semantische Suche, Themenbestimmung
Tags und Kategorien Inhaltsklassifizierung, Filterung
Veröffentlichungsdatum Aktualität der Informationen (wichtig für technische Themen)
Dauer Bewertung der Tiefe der Themenbehandlung
Statistiken (Aufrufe, Likes) Bewertung der Qualität und Popularität der Quelle
Kanalinformationen Bestimmung der Autorität der Quelle

Kommentare

Kommentare enthalten zusätzlichen Kontext: Fragen der Zuschauer, Klarstellungen der Autoren, Diskussionen. Für RAG-Systeme ist dies wertvoll, da:

  • Kommentare enthalten oft FAQs zum Thema des Videos
  • Autoren können Korrekturen und Ergänzungen veröffentlichen
  • Diskussionen offenbaren verschiedene Perspektiven auf das Problem

Arbeiten mit der YouTube Data API v3: Einrichtung und Limits

Die YouTube Data API v3 ist der offizielle Weg, um Daten zu erhalten. Sie bietet Zugang zu Metadaten, Statistiken und Kommentaren. Untertitel werden über separate Methoden abgerufen.

Erhalt eines API-Schlüssels

Um mit der API zu arbeiten, benötigen Sie einen Schlüssel aus der Google Cloud Console:

  1. Gehen Sie zu console.cloud.google.com
  2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus
  3. Aktivieren Sie die YouTube Data API v3 im Bereich "APIs & Services"
  4. Erstellen Sie Anmeldeinformationen (Credentials) → API-Schlüssel
  5. Kopieren Sie den Schlüssel — er wird für alle Anfragen benötigt

Limits und Quoten

Die YouTube API verwendet ein Quotensystem. Jede Anfrage "kostet" eine bestimmte Anzahl von Einheiten:

Operation Kosten in Quoten
Videosuche (search.list) 100 Einheiten
Abrufen von Videodaten (videos.list) 1 Einheit
Abrufen von Kommentaren (commentThreads.list) 1 Einheit

Das tägliche Limit beträgt standardmäßig 10.000 Einheiten. Das sind etwa 100 Suchanfragen oder 10.000 Anfragen für Metadaten. Um die Quote zu erhöhen, muss eine Anfrage an Google gestellt werden.

Ein einfaches Beispiel für die Arbeit mit der API

import requests

API_KEY = 'Ihr_api_schluessel'
BASE_URL = 'https://www.googleapis.com/youtube/v3'

# Videosuche nach Anfrage
def search_videos(query, max_results=10):
    url = f'{BASE_URL}/search'
    params = {
        'part': 'snippet',
        'q': query,
        'type': 'video',
        'maxResults': max_results,
        'key': API_KEY
    }
    
    response = requests.get(url, params=params)
    return response.json()

# Abrufen von Metadaten eines Videos
def get_video_details(video_id):
    url = f'{BASE_URL}/videos'
    params = {
        'part': 'snippet,contentDetails,statistics',
        'id': video_id,
        'key': API_KEY
    }
    
    response = requests.get(url, params=params)
    return response.json()

# Beispiel für die Verwendung
results = search_videos('machine learning tutorial', max_results=5)
for item in results.get('items', []):
    video_id = item['id']['videoId']
    title = item['snippet']['title']
    print(f'ID: {video_id}, Titel: {title}')
    
    # Abrufen von detaillierten Informationen
    details = get_video_details(video_id)
    stats = details['items'][0]['statistics']
    print(f"Aufrufe: {stats.get('viewCount')}, Likes: {stats.get('likeCount')}")

Parsing von Video-Untertiteln: automatisiert und manuell

Die YouTube Data API v3 bietet keinen direkten Zugang zu Untertiteln. Für deren Abruf werden alternative Methoden verwendet.

Verwendung der Bibliothek youtube-transcript-api

Der einfachste Weg ist die Bibliothek youtube-transcript-api für Python. Sie extrahiert Untertitel direkt, ohne API-Schlüssel:

from youtube_transcript_api import YouTubeTranscriptApi

# Abrufen von Untertiteln
video_id = 'dQw4w9WgXcQ'

try:
    # Versuch, russische Untertitel abzurufen, falls nicht, englische
    transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ru', 'en'])
    
    # Ausgabe der Untertitel mit Zeitstempeln
    for entry in transcript:
        start_time = entry['start']
        duration = entry['duration']
        text = entry['text']
        print(f"[{start_time:.2f}s] {text}")
        
except Exception as e:
    print(f"Fehler beim Abrufen der Untertitel: {e}")

# Abrufen der verfügbaren Sprachen
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
for transcript in transcript_list:
    print(f"Sprache: {transcript.language}, Automatisch: {transcript.is_generated}")

Die Bibliothek erkennt automatisch verfügbare Untertitel und kann diese in andere Sprachen übersetzen (sofern YouTube diese Möglichkeit bietet).

Verarbeitung von Zeitstempeln für RAG

Für RAG-Systeme ist es wichtig, die Verbindung zwischen Text und Zeitstempeln zu bewahren. Dies ermöglicht die Erstellung genauer Verweise auf Quellen:

def format_timestamp(seconds):
    """Konvertierung von Sekunden in das Format MM:SS"""
    minutes = int(seconds // 60)
    secs = int(seconds % 60)
    return f"{minutes:02d}:{secs:02d}"

def create_chunks_with_timestamps(transcript, chunk_size=500):
    """Aufteilung der Untertitel in Chunks unter Beibehaltung der Zeitstempel"""
    chunks = []
    current_chunk = ""
    chunk_start_time = 0
    
    for i, entry in enumerate(transcript):
        if len(current_chunk) == 0:
            chunk_start_time = entry['start']
        
        current_chunk += entry['text'] + " "
        
        # Wenn die gewünschte Größe erreicht ist oder das Ende der Untertitel
        if len(current_chunk) >= chunk_size or i == len(transcript) - 1:
            chunks.append({
                'text': current_chunk.strip(),
                'start_time': chunk_start_time,
                'timestamp': format_timestamp(chunk_start_time),
                'video_id': video_id
            })
            current_chunk = ""
    
    return chunks

# Verwendung
transcript = YouTubeTranscriptApi.get_transcript(video_id)
chunks = create_chunks_with_timestamps(transcript)

for chunk in chunks[:3]:  # Die ersten 3 Chunks
    print(f"[{chunk['timestamp']}] {chunk['text'][:100]}...")

Sammeln von Metadaten: Titel, Beschreibungen, Tags

Metadaten bereichern den Kontext für das RAG-System. Hier ist ein vollständiges Beispiel zum Sammeln aller erforderlichen Daten:

import requests
from datetime import datetime

def collect_video_metadata(video_id, api_key):
    """Sammeln vollständiger Metadaten eines Videos"""
    url = f'https://www.googleapis.com/youtube/v3/videos'
    params = {
        'part': 'snippet,contentDetails,statistics,topicDetails',
        'id': video_id,
        'key': api_key
    }
    
    response = requests.get(url, params=params)
    data = response.json()
    
    if 'items' not in data or len(data['items']) == 0:
        return None
    
    item = data['items'][0]
    snippet = item['snippet']
    stats = item.get('statistics', {})
    content = item.get('contentDetails', {})
    
    metadata = {
        'video_id': video_id,
        'title': snippet['title'],
        'description': snippet['description'],
        'channel_title': snippet['channelTitle'],
        'channel_id': snippet['channelId'],
        'published_at': snippet['publishedAt'],
        'tags': snippet.get('tags', []),
        'category_id': snippet.get('categoryId'),
        'duration': content.get('duration'),
        'view_count': int(stats.get('viewCount', 0)),
        'like_count': int(stats.get('likeCount', 0)),
        'comment_count': int(stats.get('commentCount', 0)),
        'topics': item.get('topicDetails', {}).get('topicCategories', [])
    }
    
    return metadata

# Beispiel für die Verwendung
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
print(f"Titel: {metadata['title']}")
print(f"Kanal: {metadata['channel_title']}")
print(f"Aufrufe: {metadata['view_count']:,}")
print(f"Tags: {', '.join(metadata['tags'][:5])}")

Bestimmung der Aktualität des Inhalts

Für technische Themen ist die Aktualität der Informationen wichtig. Fügen wir eine Funktion zur Bewertung der Aktualität hinzu:

from datetime import datetime, timedelta

def calculate_content_freshness(published_date_str):
    """Bewertung der Aktualität des Inhalts"""
    published_date = datetime.fromisoformat(published_date_str.replace('Z', '+00:00'))
    age_days = (datetime.now(published_date.tzinfo) - published_date).days
    
    if age_days < 30:
        return 'very_fresh'
    elif age_days < 180:
        return 'fresh'
    elif age_days < 365:
        return 'moderate'
    else:
        return 'old'

def calculate_quality_score(metadata):
    """Berechnung der Qualitätsbewertung der Quelle"""
    score = 0
    
    # Popularität
    views = metadata['view_count']
    if views > 100000:
        score += 3
    elif views > 10000:
        score += 2
    elif views > 1000:
        score += 1
    
    # Engagement (Likes im Verhältnis zu Aufrufen)
    if views > 0:
        like_ratio = metadata['like_count'] / views
        if like_ratio > 0.05:
            score += 2
        elif like_ratio > 0.02:
            score += 1
    
    # Aktualität
    freshness = calculate_content_freshness(metadata['published_at'])
    if freshness == 'very_fresh':
        score += 2
    elif freshness == 'fresh':
        score += 1
    
    return score

# Verwendung
metadata = collect_video_metadata('dQw4w9WgXcQ', API_KEY)
quality = calculate_quality_score(metadata)
freshness = calculate_content_freshness(metadata['published_at'])
print(f"Qualitätsbewertung: {quality}/7")
print(f"Aktualität: {freshness}")

Parsing von Kommentaren für kontextuelle Analysen

Kommentare können wertvolle Informationen enthalten: Korrekturen von Fehlern im Video, zusätzliche Ressourcen, häufige Fragen. Für RAG-Systeme ist dies zusätzlicher Kontext.

def get_video_comments(video_id, api_key, max_results=100):
    """Abrufen von Kommentaren zu einem Video"""
    url = 'https://www.googleapis.com/youtube/v3/commentThreads'
    comments = []
    next_page_token = None
    
    while len(comments) < max_results:
        params = {
            'part': 'snippet',
            'videoId': video_id,
            'maxResults': min(100, max_results - len(comments)),
            'order': 'relevance',  # Sortierung nach Relevanz
            'key': api_key
        }
        
        if next_page_token:
            params['pageToken'] = next_page_token
        
        response = requests.get(url, params=params)
        data = response.json()
        
        if 'items' not in data:
            break
        
        for item in data['items']:
            top_comment = item['snippet']['topLevelComment']['snippet']
            comments.append({
                'author': top_comment['authorDisplayName'],
                'text': top_comment['textDisplay'],
                'like_count': top_comment['likeCount'],
                'published_at': top_comment['publishedAt'],
                'reply_count': item['snippet']['totalReplyCount']
            })
        
        next_page_token = data.get('nextPageToken')
        if not next_page_token:
            break
    
    return comments

def filter_valuable_comments(comments, min_likes=5):
    """Filtern wertvoller Kommentare"""
    valuable = []
    
    for comment in comments:
        # Kriterien für Wertvoll:
        # 1. Viele Likes (Popularität)
        # 2. Es gibt Antworten (hat eine Diskussion ausgelöst)
        # 3. Langer Text (detaillierter Kommentar)
        
        if (comment['like_count'] >= min_likes or 
            comment['reply_count'] > 0 or 
            len(comment['text']) > 200):
            valuable.append(comment)
    
    return valuable

# Verwendung
comments = get_video_comments('dQw4w9WgXcQ', API_KEY, max_results=50)
valuable_comments = filter_valuable_comments(comments)

print(f"Insgesamt Kommentare: {len(comments)}")
print(f"Wertvolle Kommentare: {len(valuable_comments)}")

for comment in valuable_comments[:3]:
    print(f"\n[{comment['like_count']} Likes] {comment['author']}:")
    print(comment['text'][:200])

Verwendung von Proxys zur Skalierung der Datensammlung

Bei der massenhaften Datensammlung treten zwei Probleme auf: die Limits der YouTube API (10.000 Quoten pro Tag) und Blockierungen beim Parsing von Untertiteln. Proxys helfen, beide Probleme zu lösen.

Wann Proxys für YouTube-Parsing benötigt werden

  • Überschreitung der API-Quoten — durch die Verwendung mehrerer API-Schlüssel über verschiedene IPs kann das tägliche Limit erhöht werden
  • Parsing von Untertiteln ohne API — die Bibliothek youtube-transcript-api führt direkte Anfragen durch, die bei hoher Frequenz blockiert werden können
  • Datensammlung aus verschiedenen Regionen — einige Videos sind nur in bestimmten Ländern verfügbar
  • Parallele Datensammlung — Verteilung der Last auf mehrere IPs zur Beschleunigung des Prozesses

Auswahl des Proxytyps

Proxytyp Vorteile Wann verwenden
Rechenzentrum Hohe Geschwindigkeit, niedriger Preis Arbeiten mit API, kleine Volumen
Residential Geringes Risiko von Blockierungen, echte IPs Massenhaftes Parsing von Untertiteln, Umgehung von Einschränkungen
Mobil Maximales Vertrauen, seltene Blockierungen Datensammlung aus mobilen Anwendungen, kritische Aufgaben

Für die meisten Aufgaben von RAG-Systemen sind Residential Proxys geeignet — sie bieten ein Gleichgewicht zwischen Kosten und Zuverlässigkeit bei massenhaftem Parsing.

Proxy-Konfiguration im Code

import requests
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._api import TranscriptListFetcher

# Proxy-Konfiguration
PROXY = {
    'http': 'http://benutzer:passwort@proxy-server:port',
    'https': 'http://benutzer:passwort@proxy-server:port'
}

# Für die Arbeit mit der API über Proxy
def get_video_details_with_proxy(video_id, api_key, proxy):
    url = f'https://www.googleapis.com/youtube/v3/videos'
    params = {
        'part': 'snippet,statistics',
        'id': video_id,
        'key': api_key
    }
    
    response = requests.get(url, params=params, proxies=proxy, timeout=10)
    return response.json()

# Für das Parsing von Untertiteln über Proxy
class ProxiedTranscriptApi:
    def __init__(self, proxy):
        self.proxy = proxy
    
    def get_transcript(self, video_id, languages=['en']):
        # Erstellen einer benutzerdefinierten Sitzung mit Proxy
        session = requests.Session()
        session.proxies = self.proxy
        
        # Verwendung der Sitzung für Anfragen
        fetcher = TranscriptListFetcher(session)
        transcript_list = fetcher.fetch(video_id)
        
        # Abrufen der gewünschten Sprache
        for lang in languages:
            try:
                transcript = transcript_list.find_transcript([lang])
                return transcript.fetch()
            except:
                continue
        
        raise Exception(f"Untertitel nicht gefunden für Sprachen: {languages}")

# Verwendung
api = ProxiedTranscriptApi(PROXY)
transcript = api.get_transcript('dQw4w9WgXcQ', languages=['ru', 'en'])
print(f"Erhalten {len(transcript)} Segmente von Untertiteln")

Proxy-Rotation zur Skalierung

Bei der Datensammlung von Tausenden von Videos ist es wichtig, die Last zwischen mehreren Proxys zu verteilen:

import random
import time

class ProxyRotator:
    def __init__(self, proxy_list):
        self.proxies = proxy_list
        self.current_index = 0
    
    def get_next_proxy(self):
        """Sequenzielle Rotation"""
        proxy = self.proxies[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.proxies)
        return proxy
    
    def get_random_proxy(self):
        """Zufällige Rotation"""
        return random.choice(self.proxies)

# Liste der Proxys
PROXY_LIST = [
    {'http': 'http://benutzer:passwort@proxy1:port', 'https': 'http://benutzer:passwort@proxy1:port'},
    {'http': 'http://benutzer:passwort@proxy2:port', 'https': 'http://benutzer:passwort@proxy2:port'},
    {'http': 'http://benutzer:passwort@proxy3:port', 'https': 'http://benutzer:passwort@proxy3:port'},
]

rotator = ProxyRotator(PROXY_LIST)

def collect_data_with_rotation(video_ids):
    results = []
    
    for video_id in video_ids:
        proxy = rotator.get_next_proxy()
        
        try:
            # Abrufen von Metadaten
            metadata = get_video_details_with_proxy(video_id, API_KEY, proxy)
            
            # Abrufen von Untertiteln
            api = ProxiedTranscriptApi(proxy)
            transcript = api.get_transcript(video_id)
            
            results.append({
                'video_id': video_id,
                'metadata': metadata,
                'transcript': transcript
            })
            
            # Verzögerung zwischen Anfragen
            time.sleep(1)
            
        except Exception as e:
            print(f"Fehler für {video_id}: {e}")
            continue
    
    return results

# Verwendung
video_ids = ['video1', 'video2', 'video3', 'video4', 'video5']
data = collect_data_with_rotation(video_ids)
print(f"Gesammelt Daten für {len(data)} Videos")

Verarbeitung und Vorbereitung von Daten für RAG

Nach dem Sammeln von Daten müssen diese verarbeitet und strukturiert werden, um eine effektive Arbeit des RAG-Systems zu gewährleisten.

Erstellung von Vektor-Embeddings

RAG-Systeme verwenden Vektorsuche, um relevante Fragmente zu finden. Der Text muss in Embeddings umgewandelt werden:

from sentence_transformers import SentenceTransformer
import numpy as np

# Laden des Modells zur Erstellung von Embeddings
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

def create_embeddings_from_transcript(transcript_chunks):
    """Erstellung von Embeddings für Chunks von Untertiteln"""
    embeddings = []
    
    for chunk in transcript_chunks:
        # Kombinieren von Text mit Metadaten für besseren Kontext
        text_with_context = f"{chunk['title']} | {chunk['text']}"
        
        # Erstellen des Embeddings
        embedding = model.encode(text_with_context)
        
        embeddings.append({
            'video_id': chunk['video_id'],
            'timestamp': chunk['timestamp'],
            'text': chunk['text'],
            'embedding': embedding.tolist(),
            'metadata': {
                'title': chunk['title'],
                'channel': chunk['channel'],
                'views': chunk['views']
            }
        })
    
    return embeddings

# Datenvorbereitung
def prepare_rag_data(video_data):
    """Vorbereitung aller Daten für RAG"""
    all_chunks = []
    
    for video in video_data:
        metadata = video['metadata']
        transcript = video['transcript']
        
        # Aufteilen der Untertitel in Chunks
        chunks = create_chunks_with_timestamps(transcript)
        
        # Hinzufügen von Metadaten zu jedem Chunk
        for chunk in chunks:
            chunk['title'] = metadata['title']
            chunk['channel'] = metadata['channel_title']
            chunk['views'] = metadata['view_count']
            all_chunks.append(chunk)
    
    # Erstellen von Embeddings
    embeddings = create_embeddings_from_transcript(all_chunks)
    
    return embeddings

# Verwendung
rag_data = prepare_rag_data(collected_videos)
print(f"Vorbereitet {len(rag_data)} Fragmente für RAG")

Speichern in einer Vektor-Datenbank

Für eine effektive Suche werden die Embeddings in spezialisierten Datenbanken gespeichert. Beliebte Optionen: Pinecone, Weaviate, Qdrant, ChromaDB.

import chromadb
from chromadb.config import Settings

# Initialisierung von ChromaDB (lokale Vektor-Datenbank)
client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./youtube_rag_db"
))

# Erstellen einer Sammlung
collection = client.create_collection(
    name="youtube_transcripts",
    metadata={"description": "YouTube-Video-Transkripte für RAG"}
)

def store_in_vector_db(embeddings_data, collection):
    """Speichern von Embeddings in einer Vektor-Datenbank"""
    
    ids = []
    embeddings = []
    documents = []
    metadatas = []
    
    for i, item in enumerate(embeddings_data):
        ids.append(f"{item['video_id']}_{i}")
        embeddings.append(item['embedding'])
        documents.append(item['text'])
        metadatas.append({
            'video_id': item['video_id'],
            'timestamp': item['timestamp'],
            'title': item['metadata']['title'],
            'channel': item['metadata']['channel'],
            'views': str(item['metadata']['views']),
            'youtube_url': f"https://youtube.com/watch?v={item['video_id']}&t={int(float(item['timestamp'].split(':')[0])*60 + float(item['timestamp'].split(':')[1]))}s"
        })
    
    # Hinzufügen zur Sammlung
    collection.add(
        ids=ids,
        embeddings=embeddings,
        documents=documents,
        metadatas=metadatas
    )
    
    print(f"{len(ids)} Embeddings in der Vektor-Datenbank gespeichert")

# Verwendung
store_in_vector_db(rag_data, collection)

Suche und Generierung von Antworten

Der letzte Schritt ist die Implementierung der RAG-Suche und der Generierung von Antworten:

def search_youtube_knowledge(query, collection, model, top_k=3):
    """Suche nach relevanten Fragmenten aus YouTube"""
    
    # Erstellen des Embeddings der Anfrage
    query_embedding = model.encode(query).tolist()
    
    # Suche in der Vektor-Datenbank
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k
    )
    
    # Formatierung der Ergebnisse
    sources = []
    for i in range(len(results['ids'][0])):
        sources.append({
            'text': results['documents'][0][i],
            'metadata': results['metadatas'][0][i],
            'distance': results['distances'][0][i] if 'distances' in results else None
        })
    
    return sources

def generate_rag_answer(query, sources, llm_api_key):
    """Generierung einer Antwort basierend auf gefundenen Quellen"""
    
    # Erstellen des Kontexts aus gefundenen Quellen
    context = "\n\n".join([
        f"Quelle: {s['metadata']['title']} ({s['metadata']['timestamp']})\n{s['text']}"
        for s in sources
    ])
    
    # Prompt für LLM
    prompt = f"""Basierend auf den folgenden Fragmenten aus YouTube-Videos beantworte die Frage des Benutzers.
Vergiss nicht, die Quellen mit Zeitstempeln anzugeben.

Kontext:
{context}

Frage: {query}

Antwort:"""
    
    # Hier wird Ihre LLM aufgerufen (OpenAI, Claude, lokale Modell)
    # Beispiel mit OpenAI:
    # response = openai.ChatCompletion.create(
    #     model="gpt-4",
    #     messages=[{"role": "user", "content": prompt}]
    # )
    # answer = response.choices[0].message.content
    
    # Zum Beispiel geben wir den Prompt zurück
    return {
        'answer': 'Hier wird die Antwort der LLM sein',
        'sources': sources
    }

# Verwendung
query = "Wie konfiguriere ich einen Proxy in Python?"
sources = search_youtube_knowledge(query, collection, model, top_k=3)

print("Gefundene Quellen:")
for source in sources:
    print(f"\n{source['metadata']['title']}")
    print(f"Zeit: {source['metadata']['timestamp']}")
    print(f"Link: {source['metadata']['youtube_url']}")
    print(f"Text: {source['text'][:200]}...")

Optimierung der Qualität von RAG

Einige Tipps zur Verbesserung der Qualität des RAG-Systems mit YouTube-Daten:

  • Filtern Sie minderwertige Inhalte — verwenden Sie Metriken wie Aufrufe, Likes und Veröffentlichungsdatum
  • Bewahren Sie den Kontext — fügen Sie den Titel des Videos und des Kanals zu jedem Textchunk hinzu
  • Optimieren Sie die Chunk-Größe — für technische Inhalte sind 300-500 Wörter optimal
  • Verwenden Sie Metadaten zur Rangordnung — neuere und beliebtere Inhalte können Priorität erhalten
  • Fügen Sie Kommentare hinzu — sie enthalten oft wichtige Klarstellungen und FAQs
  • Überprüfen Sie die Verfügbarkeit von Videos — einige Videos können gelöscht oder privat werden

Tipp: Für die massenhafte Datensammlung von YouTube wird empfohlen, eine Kombination aus der offiziellen API (für Metadaten) und dem Parsing über Proxys (für Untertitel) zu verwenden. Dies ermöglicht es, die Limits zu umgehen und die maximale Menge an Informationen zu erhalten.

Fazit

Das Sammeln von YouTube-Daten für RAG-Systeme ist ein mehrstufiger Prozess, der die Arbeit mit der API, das Parsing von Untertiteln, die Verarbeitung von Metadaten und die Erstellung von Vektor-Embeddings umfasst. Die wichtigsten Punkte sind:

  • Die YouTube Data API v3 bietet Metadaten, Statistiken und Kommentare mit einem Limit von 10.000 Quoten pro Tag
  • Untertitel werden über die Bibliothek youtube-transcript-api oder direkte Anfragen geparsed
  • Zeitstempel sind entscheidend für die Erstellung genauer Verweise auf Quellen
  • Metadaten (Aufrufe, Likes, Datum) helfen, die Qualität und Aktualität des Inhalts zu bewerten
  • Kommentare fügen Kontext hinzu und enthalten oft FAQs
  • Proxys sind notwendig für die Skalierung und Umgehung von Limits
  • Vektor-Embeddings und spezialisierte Datenbanken ermöglichen eine schnelle semantische Suche

Bei richtiger Prozesskonfiguration können täglich Zehntausende qualitativ hochwertiger Inhaltsfragmente gesammelt werden, wodurch eine leistungsstarke Wissensbasis für RAG-Systeme in jedem Fachgebiet geschaffen wird.

Wenn Sie eine umfangreiche Datensammlung von YouTube mit Umgehung von Limits und Blockierungen planen, empfehlen wir die Verwendung von Residential Proxys — sie gewährleisten Stabilität beim Parsing von Tausenden von Videos und minimieren das Risiko von Blockierungen seitens YouTube.