如何构建自己的谷歌NotebookLM

随着音频内容消费的日益普及,将文档或书面内容转换为逼真的音频格式的能力最近变得越来越流行。

虽然谷歌的NotebookLM在这一领域引起了关注,但我想探索使用现代云服务构建一个类似的系统。在本文中,我将带您了解如何创建一个可扩展的云原生系统,该系统使用FastAPI、Firebase、Google Cloud Pub/Sub和Azure的文本转语音服务将文档转换为高质量的播客。

挑战

将文档转换为播客并不像简单地将文本通过文本转语音引擎处理那么简单。这需要仔细的处理、自然语言理解,以及处理各种文档格式的能力,同时保持流畅的用户体验。系统需要:

  • 高效处理多种文档格式
  • 生成自然听起来的音频,并支持多种声音
  • 在不影响用户体验的情况下处理大规模文档
  • 向用户提供实时状态更新
  • 保持高可用性和可扩展性

架构深度剖析

让我们分解关键组件,了解它们是如何协同工作的:

图片描述

1. FastAPI 后端

FastAPI 作为我们的后端框架,选择它的原因有几个:

  • 异步支持:基于 Starlette 构建,FastAPI 的异步能力允许高效处理并发请求
  • 自动生成 OpenAPI 文档:开箱即用生成交互式 API 文档
  • 类型安全:利用 Python 的类型提示进行运行时验证
  • 高性能:在速度上可与 Node.js 和 Go 相媲美

以下是我们上传端点的详细介绍:


@app.post(
    '/upload'
)
async def upload_files(
    token: Annotated[ParsedToken, Depends(verify_firebase_token)],
    project_name: str,
    description: str,
    website_link: str,
    host_count: int,
    files: Optional[List[UploadFile]] = File(None)
):

    # 验证令牌
    user_id = token['uid']

    # 生成唯一标识符
    project_id = str(uuid.uuid4())
    podcast_id = str(uuid.uuid4())

    # 处理并存储文件
    file_urls = await process_uploads(files, user_id, project_id)

    # 创建 Firestore 文档
    await create_project_document(user_id, project_id, {
        'status': '待处理',
        'created_at': datetime.now(),
        'project_name': project_name,
        'description': description,
        'files_urls': file_urls
    })

    # 触发异步处理
    await publish_to_pubsub(user_id, project_id, podcast_id, file_urls)
    return {'project_id': project_id, 'status': 'processing'}
这段代码定义了一个异步的文件上传接口,使用了FastAPI框架。接口的路径为`/upload`,并且需要一个经过Firebase验证的token。函数接收项目名称、描述、网站链接、主机数量以及可选的文件列表作为参数。

2. Firebase 集成

Firebase 为我们的应用程序提供了两个关键服务:

Firebase 存储
  • 处理安全文件上传并自动扩展
  • 为生成的音频文件提供CDN支持的分发
  • 支持大文件的可恢复上传
Firestore
  • 用于项目状态跟踪的实时数据库
  • 基于文档的结构,非常适合项目元数据
  • 自动扩展,无需手动分片

以下是我们如何实现实时状态更新的:

async def update_status(user_id: str, project_id: str, status: str, metadata: dict = None):
    doc_ref = db.collection('projects').document(f'{user_id}/{project_id}')
    update_data = {
        'status': status,
        'updated_at': datetime.now()
    }
    if metadata:
        update_data.update(metadata)
    await doc_ref.update(update_data)

在这段代码中,`doc_ref` 变量用于引用数据库中 `projects` 集合下特定用户和项目的文档。接着,`update_data` 字典包含了要更新的状态和更新时间。如果存在 `metadata`,则会将其更新到 `update_data` 字典中。

3. Google Cloud Pub/Sub

Pub/Sub 作为我们的消息传递骨干,提供了以下功能:

  • 解耦架构以提高可扩展性
  • 至少一次的交付保证
  • 自动消息保留和重放
  • 失败消息的死信队列

消息结构示例:

  {
    'user_id': 'uid_123',
    'project_id': 'proj_456',
    'podcast_id': 'pod_789',
    'file_urls': ['gs://bucket/file1.pdf'],
    'description': '关于云架构的技术博客文章',
    'host_count': 2,
  }

4. 使用 Azure 语音服务进行语音生成

我们音频生成的核心使用了 Azure 的认知服务语音 SDK。让我们来看看如何实现自然听起来的语音合成:

import azure.cognitiveservices.speech as speechsdk
from pathlib import Path
class SpeechGenerator:
    def __init__(self):
        self.speech_config = speechsdk.SpeechConfig(
            subscription=os.getenv("AZURE_SPEECH_KEY"),
            region=os.getenv("AZURE_SPEECH_REGION")
        )
    async def create_speech_segment(self, text, voice, output_file):
        try:
            self.speech_config.speech_synthesis_voice_name = voice
            synthesizer = speechsdk.SpeechSynthesizer(
                speech_config=self.speech_config,
                audio_config=None
            )
            # 从文本生成语音
            result = synthesizer.speak_text_async(text).get()
            if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted:
                with open(output_file, "wb") as audio_file:
                    audio_file.write(result.audio_data)
                return True
            return False
    except Exception as e:
        logger.error(f"语音合成失败: {str(e)}")
        return False

这段代码定义了一个名为 `SpeechGenerator` 的类。在其初始化方法中,使用 `speechsdk` 库的 `SpeechConfig` 类来配置语音服务的相关参数,包括订阅密钥和区域信息。这些信息通过环境变量 `AZURE_SPEECH_KEY` 和 `AZURE_SPEECH_REGION` 获取。 我们系统的一个独特功能是能够使用人工智能生成多语音播客。以下是我们如何处理不同主持人的脚本生成:
async def generate_podcast_script(outline: str, analysis: str, host_count: int):
    # 针对不同播客格式的系统指令
    system_instructions = TWO_HOST_SYSTEM_PROMPT if host_count > 1 else ONE_HOST_SYSTEM_PROMPT
    # 我们如何构建AI对话的示例
    if host_count > 1:
        script_format = """
            **Alex**: “你好,欢迎来到 MyPodify!我是你的主持人 Alex,和我一起的是…”
            **Jane**: “大家好!我是 Jane,今天我们将深入探讨 {topic}…”
    else:
        script_format = """
            **Alex**: “欢迎来到 MyPodify!今天我们将探索 {topic}…”
        """

# 使用 AI 生成完整脚本
script = await generate_content_from_openai(
        content=f"{大纲}\n\n内容详情:{分析}",
        system_instructions=system_instructions,,
        purpose="播客脚本"
    )
    return script

对于语音合成,我们将不同的说话者映射到特定的Azure语音:

VOICE_MAPPING = {
    'Alex': 'en-US-AndrewMultilingualNeural',  # 男性主持人
    'Jane': 'en-US-AvaMultilingualNeural',     # 女性主持人
    'Narrator': 'en-US-BrandonMultilingualNeural'  # 中性声音
}

5. 背景处理工作者

工作者组件负责处理繁重的任务:
文档分析

  • 从各种文档格式中提取文本
  • 分析文档的结构和内容
  • 识别关键主题和章节

内容处理

  • 生成自然的对话流程
  • 将内容拆分为发言者段落
  • 在主题之间创建过渡

音频生成

  • 使用Azure的神经语音将文本转换为语音
  • 处理多个发言者的声音
  • 应用音频后期处理

以下是我们工作者逻辑的简化视图:

async def process_document(message_data: dict):

try:
        # 从文档中提取内容
        content = await extract_document_content(message_data['file_urls'])
        # 分析并结构化内容
        document_analysis = await analyze_content(content)
        # 生成播客脚本
        script = await generate_script(
            document_analysis,
            speaker_count=message_data['host_count']
        )
        # 转换为音频
        audio_segments = await generate_audio_segments(script)
        # 后处理音频
        final_audio = await post_process_audio(audio_segments)
        # 上传并更新状态
        audio_url = await upload_to_storage(final_audio)
        await update_status(
            message_data['user_id'],
            message_data['project_id'],
            'completed',
            {'audio_url': audio_url}
        )
except Exception as e:
    await handle_processing_error(message_data, e)

 

错误处理与可靠性

系统实现了全面的错误处理:

  1. 重试逻辑
    • 对失败的API调用进行指数退避
    • 最大重试次数配置
    • 失败消息的死信队列
  2. 状态跟踪
    • 详细的错误消息存储在Firestore中
    • 实时状态更新给用户
    • 错误聚合以便监控
  3. 资源清理
    • 自动临时文件删除
    • 失败上传的清理
    • 孤立资源检测

扩展和性能优化

为了处理生产负载,我们实施了几项优化:

  1. 工作者扩展
    • 基于队列长度的横向扩展
    • 基于资源的自动扩展
    • 区域部署以降低延迟
  2. 存储优化
    • 内容去重
    • 压缩音频存储
    • CDN集成以便交付
  3. 处理优化
    • 对相似文档进行批处理
    • 对重复内容进行缓存
    • 在可能的情况下进行并行处理

监控与可观察性

系统包括全面的监控:

async def track_metrics(stage: str, duration: float, metadata: dict = None):
    metrics = {
        'stage': stage,
        'duration_ms': duration * 1000,
        'timestamp': datetime.now()
    }
    if metadata:
        metrics.update(metadata)
    await publish_metrics(metrics)

未来增强功能

虽然当前系统运行良好,但未来有几个令人兴奋的改进可能性:

  1. 增强音频处理
    • 背景音乐集成
    • 高级音频效果
    • 自定义语音训练
  2. 内容增强
    • 自动章节标记
    • 互动式转录
    • 多语言支持
  3. 平台集成
    • 直接发布到播客平台
    • 生成RSS订阅源
    • 社交媒体分享

构建文档到播客的转换器是一次令人兴奋的现代云架构之旅。FastAPI、Firebase、Google Cloud Pub/Sub和Azure的文本转语音服务的结合,为处理复杂文档处理提供了强大的基础,能够满足大规模的需求。

事件驱动架构确保系统在负载下保持响应,而使用托管服务则减少了运营开销。无论您是构建类似系统还是仅仅在探索云原生架构,我希望这次深入探讨能为构建可扩展、生产就绪的应用程序提供有价值的见解。

 

更多