使用python接入腾讯云DeepSeek

news/2025/2/26 12:54:35

本文主要从提供SSE方式接入DeepSeek,并通过fastapi websocket对外提供接入方法。
参考文档:
腾讯云大模型:https://cloud.tencent.com/document/product/1759/109380
fastAPI官网:https://fastapi.tiangolo.com/

WebSocketManager

提供WebsocketManager对websocket连接实例进行管理,代码片段如下:

python">from fastapi import WebSocket


class WsManager:
    """
    websocket manager
    """
    connectors: dict[str, WebSocket] = {}

    @staticmethod
    def get_connector(connector_id: str) -> WebSocket | None:
        """
        获取连接实例
        :param connector_id:
        :return:
        """
        connector = WsManager.connectors.get(connector_id)
        return connector

    @staticmethod
    def add_connector(connector_id: str, connector: WebSocket):
        """
        添加websocket客户端
        :param connector_id: 客户端ID
        :param connector: 客户端连接实例
        :return:
        """
        WsManager.connectors[connector_id] = connector

    @staticmethod
    def remove_connector(connector_id: str):
        """
        移除websocket连接
        :param connector_id: 连接ID
        :return:
        """
        try:
            del WsManager.connectors[connector_id]
        except Exception:
            pass

    @staticmethod
    async def send_message(connector_id: str, message: str):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_text(message)
        except Exception as e:
            print('消息发送失败')

    @staticmethod
    async def send_message_json(connector_id: str, message: dict):
        connector = WsManager.connectors.get(connector_id)
        if connector is None:
            return
        try:
            await connector.send_json(message)
        except Exception as e:
            print('消息发送失败')

接入DeepSeek

python">import uuid
import os
import requests
import json

from src.core.WsManager import WsManager
from .session import get_session

TCLOUD_URL = 'lke.tencentcloudapi.com'
TCLOUD_DeepSeek_SSE_URL = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"

SECRET_ID = "XXXXXXXXXXXXXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXX"
REGION = "ap-guangzhou"
TYPE = 5


def get_session():
    """
    生成一个 UUID
    :return session id 字符串
    """
    new_uuid = uuid.uuid4()
    # 将 UUID 转换为字符串
    session_id = str(new_uuid)
    return session_id


def _resolve_message(message: str, prev_message: str):
    """
    处理消息
    :param message:
    :param prev_message:
    :return:
    """
    if prev_message == '':
        return None
    if prev_message.startswith("event:"):
        # 生成消息
        message = message.replace("data:", '').strip()
        try:
            message_json = json.loads(message)
            message_type = message_json.get('type')
            payload = message_json.get('payload')
            if message_type == 'token_stat':
                # token统计事件
                return None
            elif message_type == 'thought':
                # 思考事件
                return None
            elif message_type == 'error':
                # 错误事件
                return None
            elif message_type == 'reference':
                # 参考来源事件
                return None
            content = payload.get('content')
            record_id = payload.get('record_id')
            request_id = payload.get('request_id')
            session_id = payload.get('session_id')
            trace_id = payload.get('trace_id')
            message_id = payload.get('message_id')
            return {
                'type': message_type,
                'data': {
                    'content': content,
                    'record_id': record_id,
                    'request_id': request_id,
                    'session_id': session_id,
                    'trace_id': trace_id,
                    'message_id': message_id,
                }
            }
        except Exception as e:
            print(e)
    else:
        return None


async def get_q_a_result(visitor_id: str, question: str):
    """
    获取问答结果
    :param visitor_id: 访客ID
    :param question: 问题内容
    :return: 回答内容
    """
    session_id = get_session()
    req_data = {
        "content": f"{question}",
        "bot_app_key": "XXXXXXXX",  # 可以获取方式见下图
        "visitor_biz_id": visitor_id,
        "session_id": session_id,
        "streaming_throttle": 100
    }
    res = requests.post(TCLOUD_DeepSeek_SSE_URL, data=json.dumps(req_data),
                        stream=True, headers={"Accept": "text/event-stream"})
    prev_message: str = ''  # 上一条消息
    if res.status_code == 200:
        for line in res.iter_lines():
            if line:
                data = line.decode('utf-8')
                message = _resolve_message(data, prev_message)
                if message:
                    await WsManager.send_message_json(visitor_id, message)
                prev_message = data
    else:
        print('Failed to get data. Status code: {response.status_code}')
    # 关闭websocket连接
    connector = WsManager.get_connector(visitor_id)
    if connector:
        await connector.close()
        WsManager.remove_connector(visitor_id)

  1. 进入控制台
    在这里插入图片描述2. 创建应用,并点击调用
    在这里插入图片描述

fastapi_websocket_189">通过fastapi 以websocket方式对外提供接口

这个接口是,前台连接websocket后,后台直接根据连接参数,开始调用问答,问答结束后自动关闭websocket连接

python">import asyncio

from fastapi import APIRouter, WebSocket, WebSocketDisconnect, BackgroundTasks
from fastapi.responses import JSONResponse

from src.core.WsManager import WsManager
from src.utils.tcloudLLM import get_example_recommend
from src.utils.session import get_visitor_id

router = APIRouter(prefix="/api/tcloud", tags=['腾信云大模型接口'])


@router.websocket("/ws/{compound_name}")
async def get_exercises_ws(compound_name: str, websocket: WebSocket, background_tasks: BackgroundTasks):
    await websocket.accept()
    visitor_id = get_visitor_id()
    WsManager.add_connector(visitor_id, websocket)
    # 本来使用BackgroundTask去做后台任务,结果不知道什么原因,调用不起来,然后换成asyncio处理
    asyncio.create_task(get_example_recommend(visitor_id, compound_name))  # 创建对应问答任务
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        WsManager.remove_connector(visitor_id)
    except Exception:
        WsManager.remove_connector(visitor_id)

如果需要进行多轮问答,提供为多次问答设置相关的 request_id 参数进行消息串联。


http://www.niftyadmin.cn/n/5868724.html

相关文章

Python从0到100(八十九):Resnet、LSTM、Shufflenet、CNN四种网络分析及对比

前言: 零基础学Python:Python从0到100最新最全教程。 想做这件事情很久了,这次我更新了自己所写过的所有博客,汇集成了Python从0到100,共一百节课,帮助大家一个月时间里从零基础到学习Python基础语法、Pyth…

STM32MP157A-FSMP1A单片机移植Linux系统SPI总线驱动

SPI总线驱动整体上与I2C总线驱动类型,差别主要在设备树和数据传输上,由于SPI是由4根线实现主从机的通信,在设备树上配置时需要对SPI进行设置。 原理图可知,数码管使用的SPI4对应了单片机上的PE11-->SPI4-NSS,PE12-->SPI4-S…

Java进阶:SpringMVC中放行静态资源

方法一 <mvc:resources mapping"/js/**" location"/js/"/>mapping&#xff1a;映射地址。访问服务器找资源时候的地址。 location&#xff1a;目录。具体资源所在目录。 方式二 <mvc:default-servlet-handler/>mvc如果找不到静态资源&…

娛閑放鬆篇3

2月的立春打算多添加兩隻貓&#xff0c;本來1月份黑漸層死了…但立春那天三花也撐不下去.....然後去了領貓的地方&#xff0c;只捉到了熟睡的三花&#xff0c;捉不到那隻橘白...幸好三天後捉到了...也是到家後一天才轉移到了貓籠&#xff0c;到家後兩天用吸管擼貓才不哈我&…

Spring Boot + Vue 接入腾讯云人脸识别API(SDK版本3.1.830)

一、需求分析 这次是基于一个Spring Boot Vue的在线考试系统进行二次开发&#xff0c;添加人脸识别功能以防止学生替考。其他有对应场景的也可按需接入API&#xff0c;方法大同小异。 主要有以下两个步骤&#xff1a; 人脸录入&#xff1a;将某个角色&#xff08;如学生&…

[算法--前缀和] 矩阵区域和

目录 1. 二维前缀和的知识铺垫2. 以nums[i][j]为中心计算区域大小.3. dp数组与ret数组之间的逻辑关系.4. 细节: 如果[i,j]为中心的数组越界了呢?下面继续分享一道用前缀和思想解决的算法问题 -> 矩阵区域和 1. 二维前缀和的知识铺垫 实际上, 有一道十分类似的基础题 ->…

免费使用 DeepSeek API 教程及资源汇总

免费使用 DeepSeek API 教程及资源汇总 一、DeepSeek API 资源汇总1.1 火山引擎1.2 百度千帆1.3 阿里百炼1.4 腾讯云 二、其他平台2.1 华为云2.2 硅基流动 三、总结 DeepSeek-R1 作为 2025 年初发布的推理大模型&#xff0c;凭借其卓越的逻辑推理能力和成本优势&#xff0c;迅速…

硬件基础(3):三极管(1):理论基础

目录 一、背景 二、定义 三、分类 四、工作原理 NPN三极管工作原理 基本工作原理 电流放大倍数&#xff08;增益&#xff09; 输入特性 1. 输入特性的基本概念 2. 输入特性曲线的形态 3. 输入特性曲线的具体分析 输出特性 1. 输出特性图的基本概念 2. 输出特性曲…