构建基于OIDC、Azure Service Bus与LlamaIndex的租户隔离型异步RAG架构


在企业级应用中构建检索增强生成(RAG)系统,挑战远不止于模型选型和提示工程。核心症结在于如何设计一个能够承载多租户、保障数据隔离、具备高可用性且能平滑应对负载波峰的架构。一个简单的、同步处理请求的API端点在生产环境中会迅速暴露其脆弱性:长时间运行的索引任务会阻塞API,突发的查询请求可能导致服务崩溃,而最关键的是,在共享基础设施上实现严格的租户数据安全隔离,需要远超基础CRUD框架的精心设计。

本文记录了一个架构决策过程,旨在解决上述挑战。我们最终放弃了传统的同步API模型,转而设计一个基于事件驱动的异步处理管道。该架构的核心组件权衡如下:

  • 身份与授权: 选用 OpenID Connect (OIDC) 作为身份验证协议,利用其标准化的JWT Claims机制,在请求入口处即可安全、无状态地识别用户和其所属租户,为后续所有操作提供可信的身份上下文。
  • 服务解耦与韧性: 引入 Azure Service Bus 作为消息中间件。API层仅负责验证请求和投递任务,核心的RAG处理逻辑则由独立的后台工作进程(Worker)异步消费消息并执行。这种解耦带来了极佳的弹性和可扩展性。
  • 检索与生成核心: 使用 LlamaIndex 作为RAG管道的核心实现。其灵活的索引和查询接口,尤其是元数据过滤能力,是实现租户数据在向量存储层进行逻辑隔离的关键。

我们将探讨两种架构方案的利弊,阐述为何最终选择异步模型,并提供该架构下关键组件的核心实现代码。

方案A: 同步API架构——一条脆弱的捷径

在项目初期,最直观的设计是构建一个单一的RESTful API服务。其工作流大致如下:

  1. 客户端携带认证凭据(如API Key)发起HTTP请求。
  2. API服务器验证凭据,执行LlamaIndex的查询逻辑。
  3. 查询向量数据库,获取相关文档。
  4. 将文档注入大语言模型(LLM)生成答案。
  5. 将答案通过HTTP响应返回给客户端。
sequenceDiagram
    participant Client
    participant API Server (FastAPI + LlamaIndex)
    participant VectorDB
    participant LLM

    Client->>API Server: POST /query (with API Key)
    API Server->>API Server: Validate API Key
    API Server->>VectorDB: Search relevant documents
    VectorDB-->>API Server: Documents
    API Server->>LLM: Generate response with context
    LLM-->>API Server: Generated Text
    API Server-->>Client: 200 OK (with response)

优势分析:

  • 实现简单: 整个逻辑内聚在一个服务中,代码结构直接,易于快速开发和本地调试。
  • 低延迟(理想情况): 对于简单、快速的查询,同步处理的端到端延迟最低。

劣势分析:

  • 用户体验差: RAG查询,特别是涉及复杂检索和LLM推理时,耗时可能长达数秒甚至数十秒。同步API意味着客户端必须长时间等待,极易导致HTTP请求超时。
  • 资源耦合与扩展性差: API服务器的Web进程同时承担了网络I/O和密集的CPU/内存计算(Embedding、LLM推理)。无法根据不同类型的负载(例如,大量并发的轻量请求 vs. 少量耗时的重量级请求)独立扩展。
  • 可靠性低: 如果在处理过程中(如访问向量数据库或LLM API时)发生临时性故障,整个请求将失败。没有内置的重试或恢复机制,除非在API代码中手动实现复杂的重试逻辑。
  • 租户隔离实现困难: 使用简单的API Key很难安全地传递租户上下文。将租户隔离逻辑硬编码在API服务中,使得安全模型与业务逻辑紧密耦合,难以维护和审计。

在真实项目中,这种架构很快会遇到瓶颈。一个耗时30秒的文档索引请求,就足以占满一个工作进程,影响所有其他用户的查询请求。这在生产环境中是不可接受的。

方案B: OIDC + 消息队列的异步架构——为生产环境而生

为了克服同步架构的弊病,我们设计了第二套方案。该方案的核心思想是“请求接收”与“任务处理”的分离。

graph TD
    subgraph "User Realm"
        User
    end

    subgraph "Security Realm"
        IdP[OIDC Identity Provider]
    end

    subgraph "Application Realm"
        A[API Gateway] --> B{FastAPI Endpoint}
        B -- 1. Publish Job --> C[Azure Service Bus Queue]
        D[RAG Worker Process] -- 2. Consume Job --> E{LlamaIndex Core}
        E -- Query/Index --> F[Vector Database]
        E -- Inference --> G[LLM Service]
    end

    User -- Authenticates --> IdP
    IdP -- Issues OIDC Token --> User
    User -- Calls API with Token --> A

工作流程:

  1. 用户通过OIDC提供商(如Azure AD, Auth0)进行身份验证,获取一个包含tid(Tenant ID)和sub(User ID)等Claims的JWT。
  2. 用户携带此JWT向FastAPI应用发起请求。
  3. FastAPI应用配置了OIDC验证中间件,它会验证JWT的签名和Claims,确保请求的合法性,并解析出租户和用户信息。
  4. API端点接收到请求后,不再直接执行RAG逻辑,而是将查询内容连同从JWT中提取的租户上下文(tenant_id)打包成一个消息,发送到Azure Service Bus的特定队列中。
  5. API立即向客户端返回202 Accepted响应,表示任务已接收,正在后台处理。
  6. 一个或多个独立的RAG Worker进程正在监听该队列。其中一个Worker会接收到该消息。
  7. Worker解析消息,根据tenant_id执行严格隔离的LlamaIndex查询。这通常意味着在查询向量数据库时,会附加一个metadata_filter,确保只检索属于该租户的文档。
  8. 处理完成后,Worker可以通过某种方式(如WebSocket、Webhook或另一个结果队列)通知用户结果。

优势分析:

  • 高可用与韧性: Service Bus提供了至少一次的消息传递保证和死信队列机制。如果Worker在处理过程中失败,消息可以被安全地重试,或在多次失败后移入死信队列进行人工干预。API服务和Worker服务的短暂宕机不会导致数据丢失。
  • 可扩展性: API层和Worker层可以独立扩展。如果查询请求增多,可以增加API服务的实例;如果处理任务繁重,可以增加Worker的数量,而两者互不影响。
  • 改善的用户体验: API快速响应,用户无需漫长等待。客户端可以通过轮询状态端点或订阅通知来获取最终结果。
  • 清晰的安全边界: OIDC将身份验证与应用逻辑解耦。API层作为安全网关,职责单一且明确:验证身份,注入上下文。Worker则完全信任上游传递过来的租户信息,专注于业务处理。

劣症分析:

  • 架构复杂性增加: 引入了消息队列和后台进程,增加了部署、监控和调试的复杂性。需要考虑分布式系统中的常见问题,如消息幂等性、分布式追踪等。
  • 最终一致性: 结果不是立即返回的,系统状态是最终一致的。这要求客户端的设计能够适应这种异步交互模式。
  • 潜在延迟: 消息在队列中的排队时间增加了端到端的总延迟。对于要求亚秒级响应的场景,此架构可能不适用。

决策: 考虑到企业级RAG系统对可靠性、可扩展性和安全性的要求远高于对即时响应的要求,方案B是显而易见的更优选择。接下来的部分将展示该架构中关键组件的代码实现。

核心实现概览

我们将使用Python生态系统进行演示,包括FastAPI作为Web框架,azure-servicebus SDK与消息队列交互,以及llama-index

1. 配置与环境变量

在真实项目中,所有配置都应通过环境变量管理。

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# OIDC Configuration (e.g., for Azure AD)
OIDC_TENANT_ID = os.environ.get("OIDC_TENANT_ID")
OIDC_CLIENT_ID = os.environ.get("OIDC_CLIENT_ID")
# This is the 'issuer' URL from your OIDC provider's metadata
OIDC_ISSUER = f"https://login.microsoftonline.com/{OIDC_TENANT_ID}/v2.0"
# This corresponds to the 'aud' claim in the JWT
OIDC_AUDIENCE = os.environ.get("OIDC_AUDIENCE", OIDC_CLIENT_ID) 

# Azure Service Bus Configuration
SERVICE_BUS_CONNECTION_STRING = os.environ.get("SERVICE_BUS_CONNECTION_STRING")
SERVICE_BUS_QUEUE_NAME = os.environ.get("SERVICE_BUS_QUEUE_NAME", "rag-query-jobs")

# OpenAI/LLM Configuration
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

# Vector DB Configuration (example for Pinecone)
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT")
PINECONE_INDEX_NAME = os.environ.get("PINECONE_INDEX_NAME", "multitenant-rag-index")

2. OIDC 安全依赖项 (FastAPI)

我们需要创建一个可重用的FastAPI依赖项来处理OIDC令牌验证。这部分代码是整个安全体系的基石。

# security.py
import httpx
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OpenIdConnect
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Dict, Any, Optional
import logging

from config import OIDC_ISSUER, OIDC_AUDIENCE

logger = logging.getLogger(__name__)

# This just defines the mechanism, not the logic.
# The actual token is expected in the Authorization header: Bearer <token>
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Cache for OIDC configuration to avoid fetching it on every request.
_oidc_config_cache: Optional[Dict[str, Any]] = None
_jwks_cache: Optional[Dict[str, Any]] = None

class UserContext(BaseModel):
    user_id: str
    tenant_id: str
    raw_claims: Dict[str, Any]

async def get_oidc_config() -> Dict[str, Any]:
    """Fetches and caches OIDC provider configuration."""
    global _oidc_config_cache
    if _oidc_config_cache is None:
        try:
            async with httpx.AsyncClient() as client:
                # The .well-known URL is a standard OIDC discovery endpoint
                response = await client.get(f"{OIDC_ISSUER}/.well-known/openid-configuration")
                response.raise_for_status()
                _oidc_config_cache = response.json()
        except httpx.RequestError as e:
            logger.error(f"Failed to fetch OIDC config: {e}")
            raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="OIDC provider unavailable")
    return _oidc_config_cache

async def get_jwks(oidc_config: Dict[str, Any]) -> Dict[str, Any]:
    """Fetches and caches JSON Web Key Set (JWKS) for token signature verification."""
    global _jwks_cache
    if _jwks_cache is None:
        jwks_uri = oidc_config.get("jwks_uri")
        if not jwks_uri:
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="jwks_uri not found in OIDC config")
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(jwks_uri)
                response.raise_for_status()
                _jwks_cache = response.json()
        except httpx.RequestError as e:
            logger.error(f"Failed to fetch JWKS: {e}")
            raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="OIDC provider's JWKS endpoint unavailable")
    return _jwks_cache

async def get_current_user(token: str = Depends(oauth2_scheme)) -> UserContext:
    """
    FastAPI dependency that validates the OIDC JWT and extracts user/tenant context.
    This is the security gate for our API.
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        oidc_config = await get_oidc_config()
        jwks = await get_jwks(oidc_config)
        
        unverified_header = jwt.get_unverified_header(token)
        rsa_key = {}
        for key in jwks["keys"]:
            if key["kid"] == unverified_header["kid"]:
                rsa_key = {
                    "kty": key["kty"],
                    "kid": key["kid"],
                    "use": key["use"],
                    "n": key["n"],
                    "e": key["e"],
                }
        if not rsa_key:
            raise credentials_exception

        payload = jwt.decode(
            token,
            rsa_key,
            algorithms=["RS256"],
            audience=OIDC_AUDIENCE,
            issuer=OIDC_ISSUER,
        )

        # In Azure AD, 'tid' is the tenant ID and 'sub' is a unique user identifier.
        # These might be different claims depending on your OIDC provider.
        tenant_id = payload.get("tid")
        user_id = payload.get("sub")

        if tenant_id is None or user_id is None:
            logger.warning("Token is missing 'tid' or 'sub' claim.")
            raise credentials_exception
        
        return UserContext(user_id=user_id, tenant_id=tenant_id, raw_claims=payload)

    except JWTError as e:
        logger.error(f"JWT validation failed: {e}")
        raise credentials_exception

这里的关键在于,get_current_user 不仅仅验证了令牌的合法性,还将其解析成了一个结构化的 UserContext 对象。这个对象之后会在应用的业务逻辑中被传递,确保了身份上下文的完整性。

3. API 端点与消息发布

API端点的职责变得非常轻量:验证用户,构建消息,然后发送。

# main.py
import logging
import uuid
import json
from fastapi import FastAPI, Depends, HTTPException, status, Body
from pydantic import BaseModel, Field
from azure.servicebus.aio import ServiceBusClient
from azure.core.exceptions import AzureError

from config import SERVICE_BUS_CONNECTION_STRING, SERVICE_BUS_QUEUE_NAME
from security import get_current_user, UserContext

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = FastAPI(title="Async RAG API")

class QueryRequest(BaseModel):
    query: str = Field(..., min_length=10, max_length=1000, description="User's query for the RAG system.")

class JobSubmissionResponse(BaseModel):
    job_id: str
    status: str
    message: str

@app.post("/v1/query", status_code=status.HTTP_202_ACCEPTED, response_model=JobSubmissionResponse)
async def submit_query(
    request: QueryRequest,
    user: UserContext = Depends(get_current_user)
):
    """
    Receives a user query, validates OIDC token, and dispatches a job to the Service Bus.
    """
    job_id = str(uuid.uuid4())
    
    # The message payload must contain all necessary context for the worker.
    message_payload = {
        "job_id": job_id,
        "tenant_id": user.tenant_id,
        "user_id": user.user_id,
        "query": request.query,
        "timestamp_utc": datetime.utcnow().isoformat()
    }

    try:
        # Using 'async with' ensures the client is properly closed.
        async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
            async with client.get_queue_sender(SERVICE_BUS_QUEUE_NAME) as sender:
                # Create a message object and serialize the payload.
                message = ServiceBusMessage(body=json.dumps(message_payload), content_type="application/json", correlation_id=job_id)
                await sender.send_messages(message)
                logging.info(f"Job {job_id} for tenant {user.tenant_id} dispatched successfully.")
    except AzureError as e:
        logging.error(f"Failed to send message to Service Bus for job {job_id}: {e}")
        # In a real system, you might have a fallback or retry mechanism here.
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Backend messaging system is currently unavailable.")
    
    return JobSubmissionResponse(
        job_id=job_id,
        status="submitted",
        message="Your query has been accepted and is being processed."
    )

注意,API返回的是一个job_id。客户端可以保存这个ID,用于后续查询处理状态或获取结果。

4. RAG Worker 实现

这是异步处理的核心。Worker是一个长期运行的进程,它从Service Bus拉取消息并执行LlamaIndex逻辑。

# worker.py
import asyncio
import json
import logging
import os
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.exceptions import ServiceBusError
from llama_index.core import VectorStoreIndex, Settings
from llama_index.vector_stores.pinecone import PineconeVectorStore
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from pinecone import Pinecone

from config import (
    SERVICE_BUS_CONNECTION_STRING, SERVICE_BUS_QUEUE_NAME, OPENAI_API_KEY,
    PINECONE_API_KEY, PINECONE_ENVIRONMENT, PINECONE_INDEX_NAME
)

# Configure logging for the worker
logging.basicConfig(level=logging.INFO, format='%(asctime)s - WORKER - %(levelname)s - %(message)s')

class RagProcessor:
    def __init__(self):
        # Initialize connections once.
        # A common mistake is to re-initialize these for every message.
        self.pc = Pinecone(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)
        pinecone_index = self.pc.Index(PINECONE_INDEX_NAME)
        
        # Setup LlamaIndex global settings
        Settings.llm = OpenAI(model="gpt-4-turbo-preview", api_key=OPENAI_API_KEY)
        Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=OPENAI_API_KEY)

        self.vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
        # We create the index object from the store, but we won't query it directly yet.
        self.index = VectorStoreIndex.from_vector_store(self.vector_store)

    async def process_message(self, message_body: str):
        """
        The core logic for processing a single RAG job.
        """
        try:
            data = json.loads(message_body)
            job_id = data['job_id']
            tenant_id = data['tenant_id']
            query_text = data['query']
            logging.info(f"Processing job {job_id} for tenant {tenant_id}.")

            # This is the most critical part for tenant isolation.
            # We construct a query engine with a metadata filter that scopes
            # the vector search to ONLY documents with a matching 'tenant_id'.
            # Assume documents were indexed with a 'tenant_id' in their metadata.
            query_engine = self.index.as_query_engine(
                filters=MetadataFilters(
                    filters=[ExactMatchFilter(key="tenant_id", value=tenant_id)]
                ),
                similarity_top_k=3 # Retrieve top 3 relevant chunks
            )

            response = await query_engine.aquery(query_text)

            logging.info(f"Successfully processed job {job_id}. Response: {str(response)[:100]}...")
            
            # In a real application, the result would be stored in a database
            # or sent to a result queue/webhook, associated with the job_id.
            # For this example, we just log it.
            # E.g., store_result(job_id, tenant_id, response.response)

        except json.JSONDecodeError:
            logging.error("Received a malformed JSON message.")
        except KeyError as e:
            logging.error(f"Message is missing required key: {e}")
        except Exception as e:
            # General catch-all to prevent the worker from crashing.
            logging.error(f"An unexpected error occurred while processing message: {e}", exc_info=True)
            # This message will eventually be dead-lettered by Service Bus if it fails repeatedly.
            raise

async def main():
    processor = RagProcessor()
    
    while True:
        try:
            async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
                async with client.get_queue_receiver(SERVICE_BUS_QUEUE_NAME, max_wait_time=30) as receiver:
                    logging.info("Worker started. Waiting for messages...")
                    async for msg in receiver:
                        try:
                            await processor.process_message(str(msg))
                            # Complete the message so it is not received again.
                            await receiver.complete_message(msg)
                        except Exception:
                            # If process_message fails, we abandon the message.
                            # Service Bus will re-deliver it based on the queue's policy.
                            # After max delivery count, it moves to the dead-letter queue.
                            logging.warning(f"Abandoning message {msg.correlation_id} due to processing failure.")
                            await receiver.abandon_message(msg)
        except ServiceBusError as e:
            logging.error(f"Service Bus connection error: {e}. Reconnecting in 60 seconds...")
            await asyncio.sleep(60)

if __name__ == "__main__":
    asyncio.run(main())

RagProcessor中,index.as_query_engine(filters=...)是实现租户数据安全隔离的命门。它向LlamaIndex声明,在执行向量检索时,必须满足元数据中tenant_id字段与当前消息上下文中的tenant_id完全匹配的条件。这就从根本上杜绝了数据跨租户泄露的风险,即便所有租户的数据都存储在同一个Pinecone索引中。

架构的扩展性与局限性

这个架构模式并非银弹,但它为解决特定问题提供了坚实的基础。

可扩展路径:

  1. 文档索引: 可以创建另一个名为rag-indexing-jobs的队列和对应的Indexing Worker。当用户上传文档时,API将文档元数据(包括tenant_id)和存储位置(如Blob Storage URL)发送到此队列,由Indexing Worker负责下载、分块、嵌入并存入向量数据库,同样附加tenant_id元数据。
  2. 结果通知: 可以使用SignalR、WebSocket或另一个Service Bus Topic来实现结果的实时推送,而不是让客户端轮询。Worker处理完任务后,将结果发布到与特定用户或会话关联的主题上。
  3. 优先级队列: Azure Service Bus支持消息会话(Sessions)和优先级,可以实现更复杂的调度逻辑,例如,优先处理来自付费租户的查询。

当前方案的局限性:

  • 调试复杂性: 链路变长导致端到端调试变得困难。在真实项目中,必须集成分布式追踪方案(如OpenTelemetry),将job_id作为Trace ID在API和Worker之间传递,以便在Jaeger或DataDog等工具中观察完整的请求生命周期。
  • 向量存储的挑战: 虽然逻辑隔离是可行的,但在极大规模的多租户场景下,共享索引可能会遇到“邻居干扰”问题,即一个租户的数据可能会影响另一个租户的检索质量。更彻底的隔离方案是为每个租户创建独立的索引或命名空间,但这会增加基础设施的管理复杂度和成本。
  • 冷启动问题: 如果Worker是基于Serverless函数(如Azure Functions)实现的,可能会有冷启动延迟。对于需要持续低延迟处理的场景,保持一个或多个长期运行的容器实例(如在AKS或Container Apps上)可能是更好的选择。

  目录