openai 脚本
- stream脚本
import os from openai import OpenAIclient = OpenAI(base_url="http://127.0.0.1:9117/api/v1",api_key=os.environ["ACCESS_TOKEN"], )stream = client.chat.completions.create(model = "Qwen/Qwen2-7B-Instruct",messages = [{"role": "user","content": "介绍一下自己"}],stream=True, ) for chunk in stream:print(chunk.choices[0].delta.content, end = "")
- api脚本
import os from openai import OpenAIclient = OpenAI(base_url="http://127.0.0.1:9117/api/v1",api_key=os.environ["ACCESS_TOKEN"], )response = client.chat.completions.create(model = "Qwen/Qwen2-7B-Instruct",messages = [{"role": "user","content": "介绍一下自己"}],stream=False, ) print(response.choices[0].delta["content"])
api/v1/chat/completions实现
- ChatController
@RequestMapping("/api/v1/chat") public class ChatController {...@PostMapping("/completions")public Object send(@Valid @RequestBody ChatSendStreamReq req) {if (req.isStream()) {return chatService.sendStreamEmitter(req, AuthContextHolderHelper.getUid(), AuthContextHolderHelper.getUsername());}Object result = chatService.call(req, AuthContextHolderHelper.getUid(), AuthContextHolderHelper.getUsername());return ResponseEntity.ok().contentType(MediaType.APPLICATION_JSON).body(result);}... }
- WebClientConfig
Configuration public class WebClientConfig {@Beanpublic WebClient webClient() throws SSLException {// 配置 SSLContext,信任所有证书SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();HttpClient httpClient = HttpClient.create().secure(ssl -> ssl.sslContext(sslContext));return WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)).build();// return WebClient.builder().build();} }
- ChatService
@Service @Slf4j public class ChatServiceImpl implements ChatService {...@Overridepublic SseEmitter sendStreamEmitter(ChatSendStreamReq req, String uid, String username) {Map<String, String> headers = buildHeaders(uid, username);String traceId = MDC.get("trace-id");// 0 表示无限超时(直到前端断开)// 0 = 永不超时(不推荐),可以设一个合理值比如 30sSseEmitter emitter = new SseEmitter(1_000L);// 用来保存 WebClient 的订阅对象AtomicReference<Disposable> disposableRef = new AtomicReference<>();// 开新线程处理,避免阻塞Servlet请求线程Executors.newSingleThreadExecutor().submit(() -> {try {setTraceIdToMDC(traceId);log.debug("[sse] SseEmitter创建开始");String url = "https://localhost:8888/api/v1/chat/send_stream";WebClient.RequestHeadersSpec<?> requestSpec = webClient.post().uri(url).accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_NDJSON, MediaType.APPLICATION_JSON).body(BodyInserters.fromValue(req)).headers(httpHeaders -> {headers.forEach(httpHeaders::set);httpHeaders.set(HttpHeaders.ACCEPT_CHARSET, StandardCharsets.UTF_8.name());});// 订阅远端 SSE 流// 订阅远端 SSE 流Disposable disposable = requestSpec.retrieve().bodyToFlux(String.class).doOnNext(line -> {setTraceIdToMDC(traceId);try {log.debug("[sse] 收到数据: {}", line);emitter.send(SseEmitter.event().data(line));} catch (IOException e) {log.error("[sse] SSE发送失败: {}", e.getMessage());Disposable d = disposableRef.get();if (d != null && !d.isDisposed()) {d.dispose(); // <- 立即停止 WebClient 流log.info("[sse] WebClient 流已取消");}// emitter.complete();emitter.completeWithError(e);}}).doOnError(error -> {setTraceIdToMDC(traceId);log.error("[sse] 流式请求失败: {}", error.toString());Disposable d = disposableRef.get();if (d != null && !d.isDisposed()) {d.dispose(); // <- 立即停止 WebClient 流log.info("[sse] WebClient 流已取消 error");}// emitter.complete();emitter.completeWithError(error);}).doOnComplete(() -> {setTraceIdToMDC(traceId);log.info("[sse] 流式请求完成");emitter.complete();}).subscribe();disposableRef.set(disposable);} catch (Exception e) {log.error("[sse] 异常: {}", e.getMessage(), e);emitter.completeWithError(e);}});// 前端主动断开时的回调(相当于 onCancel)emitter.onCompletion(() -> {log.info("[sse] SSE完成/客户端断开");Disposable disposable = disposableRef.get();if (disposable != null && !disposable.isDisposed()) {disposable.dispose();log.info("[sse] WebClient 订阅已关闭");}});emitter.onTimeout(() -> {log.info("[sse] SSE超时");emitter.complete();});emitter.onError((ex) -> {log.error("[sse] SSE异常: {}", ex.getMessage());Disposable disposable = disposableRef.get();if (disposable != null && !disposable.isDisposed()) {disposable.dispose();log.info("[sse] WebClient 订阅已关闭(异常触发)");}});return emitter;}private void setTraceIdToMDC(String traceId) {if (StrUtil.isNotBlank(traceId)) {MDC.put("trace-id", traceId);}}.... }
NGINX 配置
- proxy_pass
location = /api/v1/chat/completions {proxy_pass http://...;include proxy_ai.conf; }
- proxy_ai.conf
proxy_redirect off; proxy_cache off; proxy_buffering off; #AI不要加这个参数 #proxy_set_header Connection 'keep-alive'; proxy_max_temp_file_size 0; proxy_http_version 1.1; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; #proxy_set_header X-Real-IP $http_x_forwarded_for; #proxy_set_header X-Forwarded-For $http_x_forwarded_for; client_max_body_size 2048M; client_body_buffer_size 128k; #proxy_connect_timeout 100; proxy_read_timeout 300; proxy_send_timeout 300; proxy_buffer_size 16k; proxy_buffers 16 128k; proxy_busy_buffers_size 256k; proxy_temp_file_write_size 256k; proxy_headers_hash_max_size 512; proxy_headers_hash_bucket_size 512;