SpringBoot实现简易直播

当下直播技术已经成为各类应用不可或缺的一部分,从社交媒体到在线教育,再到电子商务和游戏领域,直播功能正在被广泛应用。

本文将介绍如何使用SpringBoot框架构建一个直播流推拉系统。

在这里插入图片描述

一、直播技术基础

1.1 推流与拉流概念

直播系统的核心环节包括推流和拉流:

  • 推流(Push) : 指主播将采集的音视频数据通过特定协议发送到流媒体服务器的过程
  • 拉流(Pull) : 指观众从流媒体服务器获取音视频数据并播放的过程

1.2 常用直播协议

市面上主要的直播协议包括:

协议优势劣势适用场景
RTMP低延迟(1-3秒)、成熟稳定基于Flash、需要额外端口(1935)主播推流、低延迟直播
HLS兼容性好、使用HTTP协议延迟高大规模直播分发、移动端
WebRTC超低延迟(<1秒)、P2P通信穿透复杂网络困难、部署复杂实时互动、小规模视频会议
HTTP-FLV低延迟、兼容性较好不支持可变码率大规模观看、延迟敏感场景
SRT低延迟、高可靠性生态相对较新专业直播、跨国直播

1.3 直播系统架构概述

一个完整的直播系统通常包含以下组件:

  1. 采集端:负责采集、编码音视频数据
  2. 流媒体服务器:处理音视频流的转发、转码和分发
  3. CDN:提供内容分发服务,解决大规模用户访问问题
  4. 播放器:解码并播放音视频内容
  5. 信令服务:管理直播间信息、用户状态等

二、系统技术设计

2.1 整体架构

核心组件包括:

  1. API服务层:基于SpringBoot构建的RESTful API,处理直播间管理、用户认证等
  2. 媒体服务器集成层:集成开源流媒体服务器(如SRS)
  3. 流转发层:处理媒体流的转发、转码和适配
  4. 存储层:用于直播回放和点播
  5. WebSocket服务:提供直播互动功能

2.2 技术选型

  • 后端框架:SpringBoot 3
  • 流媒体服务器:SRS (Simple RTMP Server)
  • 数据库:MySQL + Redis
  • WebSocket:Spring WebSocket
  • 存储系统:MinIO (对象存储)
  • 前端播放器:Video.js、flv.js、hls.js

三、SpringBoot实现直播服务

3.1 项目依赖配置

首先配置pom.xml文件,引入必要的依赖:

<dependencies><!-- SpringBoot 核心依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- WebSocket 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Redis 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- MySQL 支持 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- MyBatis Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.5</version></dependency><!-- MinIO 客户端 --><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.6</version></dependency><!-- HTTP 客户端 --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

3.2 应用配置

application.yml中配置应用参数:

server:port: 8080spring:application:name: live-streaming-service# 数据库配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/live_streaming?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root# Redis配置redis:host: localhostport: 6379database: 0# MyBatis Plus配置
mybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.livestream.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 流媒体服务器配置
live:srs:server-url: rtmp://192.168.195.100:1935/liveapi-url: http://192.168.195.100:1985/apihttp-flv-url: http://192.168.195.100:8080/livehls-url: http://192.168.195.100:8080/live/hlsrecord:save-path: /data/recordpush:key-check-enabled: trueauth-expire: 86400  # 24小时,单位秒auth-key: your_secret_key# MinIO配置
minio:endpoint: http://localhost:9000access-key: minioadminsecret-key: minioadminbucket: live-recordings

3.3 实体类设计

首先定义直播间实体:

@Data
@TableName("live_room")
public class LiveRoom {@TableId(value = "id", type = IdType.AUTO)private Long id;private String title;private String coverUrl;private Long userId;private String streamKey;  // 推流密钥private String streamUrl;  // 推流地址private String hlsUrl;     // HLS播放地址private String flvUrl;     // HTTP-FLV播放地址private Integer status;    // 0:未开播 1:直播中 2:直播结束private Long viewCount;    // 观看人数private Long likeCount;    // 点赞数private LocalDateTime startTime;  // 开播时间private LocalDateTime endTime;    // 结束时间private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

直播流信息实体:

@Data
@TableName("live_stream")
public class LiveStream {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String streamId;  // 流IDprivate String protocol;  // 协议类型:rtmp/hls/flvprivate Integer bitrate;  // 码率private String resolution; // 分辨率private Integer status;   // 0:未启动 1:活跃 2:已结束private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

直播回放实体:

@Data
@TableName("live_recording")
public class LiveRecording {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String fileName;private String fileUrl;private Long fileSize;private Integer duration;  // 时长,单位秒private LocalDateTime startTime;private LocalDateTime endTime;private Integer status;   // 0:录制中 1:录制完成 2:处理中 3:可用 4:删除private LocalDateTime createdAt;private LocalDateTime updatedAt;
}

3.4 数据库表设计

根据实体类,创建对应的数据库表:

-- 直播间表
CREATE TABLE `live_room` (`id` bigint NOT NULL AUTO_INCREMENT,`title` varchar(255) NOT NULL COMMENT '直播标题',`cover_url` varchar(255) DEFAULT NULL COMMENT '封面URL',`user_id` bigint NOT NULL COMMENT '主播用户ID',`stream_key` varchar(64) NOT NULL COMMENT '推流密钥',`stream_url` varchar(255) DEFAULT NULL COMMENT '推流地址',`hls_url` varchar(255) DEFAULT NULL COMMENT 'HLS播放地址',`flv_url` varchar(255) DEFAULT NULL COMMENT 'HTTP-FLV播放地址',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0未开播 1直播中 2直播结束',`view_count` bigint NOT NULL DEFAULT '0' COMMENT '观看人数',`like_count` bigint NOT NULL DEFAULT '0' COMMENT '点赞数',`start_time` datetime DEFAULT NULL COMMENT '开播时间',`end_time` datetime DEFAULT NULL COMMENT '结束时间',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_key` (`stream_key`),KEY `idx_user_id` (`user_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播间信息表';-- 直播流表
CREATE TABLE `live_stream` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播间ID',`stream_id` varchar(64) NOT NULL COMMENT '流ID',`protocol` varchar(20) NOT NULL COMMENT '协议类型',`bitrate` int DEFAULT NULL COMMENT '码率',`resolution` varchar(20) DEFAULT NULL COMMENT '分辨率',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0未启动 1活跃 2已结束',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_id` (`stream_id`),KEY `idx_room_id` (`room_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播流信息表';-- 直播回放表
CREATE TABLE `live_recording` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播间ID',`file_name` varchar(255) NOT NULL COMMENT '文件名',`file_url` varchar(255) COMMENT '文件URL',`file_size` bigint DEFAULT NULL COMMENT '文件大小(字节)',`duration` int DEFAULT NULL COMMENT '时长(秒)',`start_time` datetime NOT NULL COMMENT '开始时间',`end_time` datetime DEFAULT NULL COMMENT '结束时间',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0录制中 1录制完成 2处理中 3可用 4删除',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_room_id` (`room_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播回放表';

3.5 Mapper接口

使用MyBatis-Plus创建Mapper接口:

@Mapper
public interface LiveRoomMapper extends BaseMapper<LiveRoom> {// 自定义查询方法@Select("SELECT * FROM live_room WHERE status = 1 ORDER BY view_count DESC LIMIT #{limit}")List<LiveRoom> findHotLiveRooms(@Param("limit") int limit);
}@Mapper
public interface LiveStreamMapper extends BaseMapper<LiveStream> {// 基础CRUD方法由BaseMapper提供
}@Mapper
public interface LiveRecordingMapper extends BaseMapper<LiveRecording> {// 基础CRUD方法由BaseMapper提供
}

3.6 服务层实现

3.6.1 直播流服务
@Service
@Slf4j
public class LiveStreamService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveStreamMapper liveStreamMapper;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${live.srs.server-url}")private String srsServerUrl;@Value("${live.srs.api-url}")private String srsApiUrl;@Value("${live.srs.http-flv-url}")private String httpFlvUrl;@Value("${live.srs.hls-url}")private String hlsUrl;@Value("${live.push.key-check-enabled}")private boolean keyCheckEnabled;@Value("${live.push.auth-expire}")private long authExpire;@Value("${live.push.auth-key}")private String authKey;private RestTemplate restTemplate = new RestTemplate();/*** 创建直播间*/@Transactionalpublic LiveRoom createLiveRoom(LiveRoom liveRoom) {// 生成推流密钥String streamKey = generateStreamKey(liveRoom.getUserId());liveRoom.setStreamKey(streamKey);// 构建推流地址String pushUrl = buildPushUrl(streamKey);liveRoom.setStreamUrl(pushUrl);// 构建播放地址liveRoom.setHlsUrl(hlsUrl + "/" + streamKey + ".m3u8");liveRoom.setFlvUrl(httpFlvUrl + "/" + streamKey + ".flv");// 设置初始状态liveRoom.setStatus(0);liveRoom.setViewCount(0L);liveRoom.setLikeCount(0L);liveRoom.setCreatedAt(LocalDateTime.now());liveRoom.setUpdatedAt(LocalDateTime.now());// 保存到数据库liveRoomMapper.insert(liveRoom);return liveRoom;}/*** 生成推流密钥*/private String generateStreamKey(Long userId) {// 生成基于用户ID和时间戳的唯一密钥String baseKey = userId + "_" + System.currentTimeMillis();return DigestUtils.md5DigestAsHex(baseKey.getBytes());}/*** 构建推流地址*/private String buildPushUrl(String streamKey) {StringBuilder sb = new StringBuilder(srsServerUrl);sb.append("/").append(streamKey);// 如果启用了推流验证if (keyCheckEnabled) {long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());sb.append("?auth_key=").append(authToken).append("&expire=").append(expireTimestamp);}return sb.toString();}/*** 开始直播*/@Transactionalpublic LiveRoom startLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {throw new IllegalArgumentException("直播间不存在");}// 更新直播间状态为直播中liveRoom.setStatus(1);liveRoom.setStartTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 创建直播流记录LiveStream liveStream = new LiveStream();liveStream.setRoomId(roomId);liveStream.setStreamId(liveRoom.getStreamKey());liveStream.setProtocol("rtmp");liveStream.setStatus(1);liveStream.setCreatedAt(LocalDateTime.now());liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.insert(liveStream);// 更新Redis缓存中的活跃直播间redisTemplate.opsForSet().add("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 结束直播*/@Transactionalpublic LiveRoom endLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播间不存在或未开播");}// 更新直播间状态为已结束liveRoom.setStatus(2);liveRoom.setEndTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 更新直播流状态QueryWrapper<LiveStream> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 1);LiveStream liveStream = liveStreamMapper.selectOne(queryWrapper);if (liveStream != null) {liveStream.setStatus(2);liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.updateById(liveStream);}// 从Redis中移除活跃直播间redisTemplate.opsForSet().remove("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 获取当前活跃的直播间列表*/public List<LiveRoom> getActiveLiveRooms(int page, int size) {Page<LiveRoom> pageParam = new Page<>(page, size);QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("status", 1).orderByDesc("view_count");return liveRoomMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 获取热门直播间*/public List<LiveRoom> getHotLiveRooms(int limit) {return liveRoomMapper.findHotLiveRooms(limit);}/*** 增加直播间观看人数*/public void incrementViewCount(Long roomId) {// 使用Redis进行计数String key = "live:room:" + roomId + ":view_count";redisTemplate.opsForValue().increment(key);// 定期同步到数据库if (Math.random() < 0.1) {  // 10%概率同步,减少数据库压力String countStr = redisTemplate.opsForValue().get(key);if (countStr != null) {long count = Long.parseLong(countStr);LiveRoom room = new LiveRoom();room.setId(roomId);room.setViewCount(count);liveRoomMapper.updateById(room);}}}/*** 校验推流密钥*/public boolean validateStreamKey(String streamKey, String token, String expire) {if (!keyCheckEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expire);long currentTime = System.currentTimeMillis() / 1000;// 检查是否过期if (currentTime > expireTimestamp) {return false;}// 验证tokenString authString = streamKey + "-" + expire + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(token);} catch (Exception e) {log.error("验证推流密钥异常", e);return false;}}/*** 处理SRS回调 - 流发布*/public void handleStreamPublish(String app, String stream) {try {// 查找对应的直播间QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 0) {// 更新直播间状态startLiveStream(liveRoom.getId());log.info("直播流发布成功: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("处理流发布回调异常", e);}}/*** 处理SRS回调 - 流关闭*/public void handleStreamClose(String app, String stream) {try {// 查找对应的直播间QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 1) {// 更新直播间状态endLiveStream(liveRoom.getId());log.info("直播流关闭: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("处理流关闭回调异常", e);}}/*** 获取SRS服务器信息*/public Map<String, Object> getSrsServerInfo() {try {String url = srsApiUrl + "/v1/summaries";ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);return response.getBody();} catch (Exception e) {log.error("获取SRS服务器信息异常", e);return Collections.emptyMap();}}
}
3.6.2 直播回放服务
@Service
@Slf4j
public class LiveRecordingService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveRecordingMapper recordingMapper;@Autowiredprivate MinioClient minioClient;@Value("${minio.bucket}")private String minioBucket;@Value("${live.record.save-path}")private String recordSavePath;/*** 开始录制直播*/@Transactionalpublic LiveRecording startRecording(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播间不存在或未开播");}// 创建录制记录LiveRecording recording = new LiveRecording();recording.setRoomId(roomId);recording.setFileName(liveRoom.getStreamKey() + "_" + System.currentTimeMillis() + ".mp4");recording.setStatus(0);  // 录制中recording.setStartTime(LocalDateTime.now());recording.setCreatedAt(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.insert(recording);// 异步启动录制进程startRecordingProcess(liveRoom, recording);return recording;}/*** 停止录制直播*/@Transactionalpublic LiveRecording stopRecording(Long recordingId) {LiveRecording recording = recordingMapper.selectById(recordingId);if (recording == null || recording.getStatus() != 0) {throw new IllegalArgumentException("录制任务不存在或已结束");}// 更新录制状态recording.setStatus(1);  // 录制完成recording.setEndTime(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(recording);// 异步停止录制进程并上传文件stopRecordingProcess(recording);return recording;}/*** 获取直播回放列表*/public List<LiveRecording> getRecordings(Long roomId, int page, int size) {Page<LiveRecording> pageParam = new Page<>(page, size);QueryWrapper<LiveRecording> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 3)  // 可用状态.orderByDesc("start_time");return recordingMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 启动录制进程*/private void startRecordingProcess(LiveRoom liveRoom, LiveRecording recording) {// 使用线程池异步执行CompletableFuture.runAsync(() -> {try {File saveDir = new File(recordSavePath);if (!saveDir.exists()) {saveDir.mkdirs();}String outputPath = recordSavePath + "/" + recording.getFileName();String inputUrl = liveRoom.getFlvUrl();// 使用FFmpeg录制ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-i", inputUrl,"-c:v", "copy","-c:a", "aac","-strict", "-2",outputPath);Process process = pb.start();// 保存进程ID,以便后续停止String processKey = "live:recording:process:" + recording.getId();Runtime.getRuntime().addShutdownHook(new Thread(() -> {process.destroy();}));log.info("开始录制直播, roomId={}, recordingId={}", liveRoom.getId(), recording.getId());// 等待进程结束int exitCode = process.waitFor();log.info("录制进程结束, roomId={}, recordingId={}, exitCode={}", liveRoom.getId(), recording.getId(), exitCode);// 如果是正常结束,则更新状态并上传文件if (exitCode == 0) {uploadRecording(recording, new File(outputPath));}} catch (Exception e) {log.error("录制直播异常", e);// 更新录制状态为失败LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4);  // 失败状态failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}});}/*** 停止录制进程*/private void stopRecordingProcess(LiveRecording recording) {// 这里可以实现停止特定的FFmpeg进程// 在实际实现中,需要保存进程ID并通过操作系统命令停止进程log.info("手动停止录制, recordingId={}", recording.getId());}/*** 上传录制文件到MinIO*/private void uploadRecording(LiveRecording recording, File file) {try {// 设置状态为处理中LiveRecording processingRecording = new LiveRecording();processingRecording.setId(recording.getId());processingRecording.setStatus(2);  // 处理中processingRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(processingRecording);// 获取文件元数据long fileSize = file.length();// 使用FFmpeg获取视频时长String[] cmd = {"ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file.getAbsolutePath()};Process process = Runtime.getRuntime().exec(cmd);BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));String durationStr = reader.readLine();int duration = (int) Float.parseFloat(durationStr);// 上传到MinIOString objectName = "recordings/" + recording.getFileName();minioClient.uploadObject(UploadObjectArgs.builder().bucket(minioBucket).object(objectName).filename(file.getAbsolutePath()).contentType("video/mp4").build());// 构建访问URLString fileUrl = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioBucket).object(objectName).method(Method.GET).build());// 更新录制记录LiveRecording updatedRecording = new LiveRecording();updatedRecording.setId(recording.getId());updatedRecording.setFileUrl(fileUrl);updatedRecording.setFileSize(fileSize);updatedRecording.setDuration(duration);updatedRecording.setStatus(3);  // 可用状态updatedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(updatedRecording);log.info("录制文件上传完成, recordingId={}, fileSize={}, duration={}s", recording.getId(), fileSize, duration);// 删除本地文件file.delete();} catch (Exception e) {log.error("上传录制文件异常", e);// 更新录制状态为失败LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4);  // 失败状态failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}}
}

3.7 控制器实现

3.7.1 直播控制器
@RestController
@RequestMapping("/api/live")
@Slf4j
public class LiveController {@Autowiredprivate LiveStreamService liveStreamService;@Autowiredprivate LiveRecordingService recordingService;@Autowiredprivate LiveRoomMapper liveRoomMapper;/*** 创建直播间*/@PostMapping("/room")public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {try {LiveRoom createdRoom = liveStreamService.createLiveRoom(liveRoom);return ResponseEntity.ok(createdRoom);} catch (Exception e) {log.error("创建直播间异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取直播间详情*/@GetMapping("/room/{roomId}")public ResponseEntity<LiveRoom> getLiveRoom(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(liveRoom);} catch (Exception e) {log.error("获取直播间详情异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 开始直播*/@PostMapping("/room/{roomId}/start")public ResponseEntity<LiveRoom> startLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.startLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("开始直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 结束直播*/@PostMapping("/room/{roomId}/end")public ResponseEntity<LiveRoom> endLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.endLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("结束直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取活跃直播间列表*/@GetMapping("/rooms/active")public ResponseEntity<List<LiveRoom>> getActiveLiveRooms(@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRoom> rooms = liveStreamService.getActiveLiveRooms(page, size);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("获取活跃直播间列表异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取热门直播间*/@GetMapping("/rooms/hot")public ResponseEntity<List<LiveRoom>> getHotLiveRooms(@RequestParam(defaultValue = "10") int limit) {try {List<LiveRoom> rooms = liveStreamService.getHotLiveRooms(limit);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("获取热门直播间异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 增加观看人数*/@PostMapping("/room/{roomId}/view")public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {try {liveStreamService.incrementViewCount(roomId);return ResponseEntity.ok().build();} catch (Exception e) {log.error("增加观看人数异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 开始录制直播*/@PostMapping("/room/{roomId}/record/start")public ResponseEntity<LiveRecording> startRecording(@PathVariable Long roomId) {try {LiveRecording recording = recordingService.startRecording(roomId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("开始录制直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 停止录制直播*/@PostMapping("/record/{recordingId}/stop")public ResponseEntity<LiveRecording> stopRecording(@PathVariable Long recordingId) {try {LiveRecording recording = recordingService.stopRecording(recordingId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("停止录制直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取直播回放列表*/@GetMapping("/room/{roomId}/recordings")public ResponseEntity<List<LiveRecording>> getRecordings(@PathVariable Long roomId,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRecording> recordings = recordingService.getRecordings(roomId, page, size);return ResponseEntity.ok(recordings);} catch (Exception e) {log.error("获取直播回放列表异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}
}
3.7.2 SRS回调控制器
@RestController
@RequestMapping("/api/srs/callback")
@Slf4j
public class SrsCallbackController {@Autowiredprivate LiveStreamService liveStreamService;/*** 处理SRS on_publish回调* 当推流开始时,SRS会调用此接口*/@PostMapping("/on_publish")public ResponseEntity<Map<String, Object>> onPublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_publish回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();String param = callbackDto.getParam();Map<String, String> paramMap = HttpUtil.decodeParamMap(param, StandardCharsets.UTF_8);String token = paramMap.get("auth_key");String expire = paramMap.get("expire");callbackDto.setToken(token);callbackDto.setExpire(expire);// 验证推流密钥boolean valid = liveStreamService.validateStreamKey(callbackDto.getStream(), callbackDto.getToken(), callbackDto.getExpire());if (!valid) {result.put("code", 403);result.put("message", "Forbidden");return ResponseEntity.status(HttpStatus.FORBIDDEN).body(result);}// 处理流发布事件liveStreamService.handleStreamPublish(callbackDto.getApp(), callbackDto.getStream());result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_unpublish回调* 当推流结束时,SRS会调用此接口*/@PostMapping("/on_unpublish")public ResponseEntity<Map<String, Object>> onUnpublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_unpublish回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());// 处理流关闭事件liveStreamService.handleStreamClose(callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_play回调* 当播放流开始时,SRS会调用此接口*/@PostMapping("/on_play")public ResponseEntity<Map<String, Object>> onPlay(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_play回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_stop回调* 当播放流结束时,SRS会调用此接口*/@PostMapping("/on_stop")public ResponseEntity<Map<String, Object>> onStop(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_stop回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_dvr回调* 当DVR录制文件关闭时,SRS会调用此接口*/@PostMapping("/on_dvr")public ResponseEntity<Map<String, Object>> onDvr(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_dvr回调: app={}, stream={}, file={}", callbackDto.getApp(), callbackDto.getStream(), callbackDto.getFile());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}
}

3.8 WebSocket实现

3.8.1 WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 启用简单的消息代理registry.enableSimpleBroker("/topic");// 设置应用程序前缀registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册STOMP端点registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}
3.8.2 WebSocket控制器
@Controller
public class LiveChatController {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送聊天消息*/@MessageMapping("/chat/{roomId}")public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 检查直播间是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 设置消息时间message.setTimestamp(LocalDateTime.now());// 发送消息到订阅该直播间的所有客户端messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);}/*** 直播间状态变更通知*/public void notifyRoomStatusChange(Long roomId, int status) {Map<String, Object> payload = new HashMap<>();payload.put("roomId", roomId);payload.put("status", status);payload.put("timestamp", LocalDateTime.now());messagingTemplate.convertAndSend("/topic/room/" + roomId + "/status", payload);}/*** 发送直播点赞通知*/@MessageMapping("/like/{roomId}")public void sendLike(@DestinationVariable Long roomId, Map<String, Object> payload) {// 检查直播间是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 增加点赞数String key = "live:room:" + roomId + ":like_count";Long likeCount = redisTemplate.opsForValue().increment(key);// 定期同步到数据库if (likeCount % 100 == 0) {  // 每100个点赞同步一次LiveRoom room = new LiveRoom();room.setId(roomId);room.setLikeCount(likeCount);liveRoomMapper.updateById(room);}// 添加时间戳payload.put("timestamp", LocalDateTime.now());payload.put("likeCount", likeCount);// 发送点赞通知messagingTemplate.convertAndSend("/topic/room/" + roomId + "/like", payload);}@Datapublic static class ChatMessage {private String username;private String userId;private String content;private String avatar;private LocalDateTime timestamp;}
}

四、SRS服务器配置

SRS (Simple RTMP Server) 是一个优秀的开源流媒体服务器,下面是基本配置文件srs.conf

listen              1935;
max_connections     1000;
daemon              off;
srs_log_tank        file;
srs_log_file        ./objs/srs.log;
http_api {enabled         on;listen          1985;
}
http_server {enabled         on;listen          8080;dir             ./objs/nginx/html;
}
vhost __defaultVhost__ {hls {enabled         on;hls_path        ./objs/nginx/html/hls;hls_fragment    10;hls_window      60;}http_remux {enabled     on;mount       [vhost]/[app]/[stream].flv;}dvr {enabled      on;dvr_path     ./objs/nginx/html/dvr/[app]/[stream].[timestamp].mp4;dvr_plan     segment;dvr_duration 30;}http_hooks {enabled         on;on_publish      http://192.168.195.1:8080/api/srs/callback/on_publish;on_unpublish    http://192.168.195.1:8080/api/srs/callback/on_unpublish;on_play         http://192.168.195.1:8080/api/srs/callback/on_play;on_stop         http://192.168.195.1:8080/api/srs/callback/on_stop;on_dvr          http://192.168.195.1:8080/api/srs/callback/on_dvr;}
}

五、前端播放器实现

5.1 基于Video.js的播放器

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>直播播放器</title><link href="https://vjs.zencdn.net/7.20.3/video-js.css" rel="stylesheet" /><script src="https://vjs.zencdn.net/7.20.3/video.min.js"></script><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.0/dist/sockjs.min.js"></script><script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><script src="https://cdn.jsdelivr.net/npm/flv.js@1.6.2/dist/flv.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-flvjs@0.2.0/dist/videojs-flvjs.min.js"></script><style>.video-container {max-width: 800px;margin: 0 auto;}.video-js {width: 100%;height: 450px;}.room-info {margin-top: 20px;padding: 15px;background-color: #f8f9fa;border-radius: 5px;}.room-title {font-size: 24px;font-weight: bold;margin-bottom: 10px;}.streamer-info {display: flex;align-items: center;margin-bottom: 10px;}.streamer-avatar {width: 40px;height: 40px;border-radius: 50%;margin-right: 10px;}.streamer-name {font-weight: bold;}.room-stats {display: flex;gap: 20px;color: #666;}</style>
</head>
<body>
<div class="video-container"><video id="live-player" class="video-js vjs-default-skin vjs-big-play-centered" controls preload="auto"><p class="vjs-no-js">To view this video please enable JavaScript, and consider upgrading to a web browser that<a href="https://videojs.com/html5-video-support/" target="_blank">supports HTML5 video</a></p></video><div class="room-info"><div class="room-title" id="room-title">直播间标题</div><div class="streamer-info"><img class="streamer-avatar" id="streamer-avatar" src="https://api.dicebear.com/7.x/avataaars/svg?seed=user1" alt="主播头像"><span class="streamer-name" id="streamer-name">主播昵称</span></div><div class="room-stats"><div><i class="icon-eye"></i> <span id="view-count">0</span> 观看</div><div><i class="icon-heart"></i> <span id="like-count">0</span> 点赞</div></div></div>
</div><script>// 获取URL参数function getQueryParam(name) {const urlParams = new URLSearchParams(window.location.search);return urlParams.get(name);}// 初始化播放器function initPlayer() {const roomId = getQueryParam('roomId');if (!roomId) {alert('请指定直播间ID');return;}// 获取直播间信息fetch(`/api/live/room/${roomId}`).then(response => response.json()).then(room => {// 更新页面信息document.getElementById('room-title').textContent = room.title;document.getElementById('streamer-name').textContent = `主播ID: ${room.userId}`;document.getElementById('view-count').textContent = room.viewCount;document.getElementById('like-count').textContent = room.likeCount;// 判断直播状态if (room.status !== 1) {alert('直播未开始或已结束');return;}// 创建播放器const player = videojs('live-player', {autoplay: true,liveui: true,controls: true,preload: 'auto',responsive: true,fluid: true,sources: [/*{src: room.flvUrl,type: 'video/x-flv'},*/ {src: room.hlsUrl,type: 'application/x-mpegURL'}]});// 优先使用FLV.js/*if (flvjs.isSupported()) {player.flvjs({mediaDataSource: {type: 'flv',url: room.flvUrl}});}*/// 播放器错误处理player.on('error', function() {console.error('播放器错误,尝试切换播放源');// 尝试切换到HLSplayer.src({src: room.hlsUrl,type: 'application/x-mpegURL'});});// 统计观看人数fetch(`/api/live/room/${roomId}/view`, {method: 'POST'});// 连接WebSocket接收直播状态更新connectWebSocket(roomId);}).catch(error => {console.error('获取直播间信息失败:', error);alert('获取直播间信息失败');});}// 连接WebSocketfunction connectWebSocket(roomId) {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, function(frame) {console.log('Connected to WebSocket');// 订阅直播间状态变更stompClient.subscribe(`/topic/room/${roomId}/status`, function(message) {const data = JSON.parse(message.body);if (data.status !== 1) {alert('直播已结束');location.reload();}});// 订阅点赞更新stompClient.subscribe(`/topic/room/${roomId}/like`, function(message) {const data = JSON.parse(message.body);document.getElementById('like-count').textContent = data.likeCount;});});}// 页面加载完成后初始化document.addEventListener('DOMContentLoaded', initPlayer);
</script>
</body>
</html>

5.2 推流工具选择

对于主播端推流,可以选择以下工具:

  1. OBS Studio:开源、功能强大的推流软件
  2. FFmpeg:命令行工具,适合自动化推流
  3. WebRTC:浏览器直接采集和推流(低延迟但兼容性有限)

此处使用OBS Studio

六、性能优化与扩展

6.1 缓存策略

在高并发场景下,合理使用缓存至关重要:

@Configuration
@EnableCaching
public class CacheConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 直播间列表缓存1分钟cacheConfigurations.put("liveRoomList", config.entryTtl(Duration.ofMinutes(1)));// 直播间详情缓存5分钟cacheConfigurations.put("liveRoom", config.entryTtl(Duration.ofMinutes(5)));// 回放列表缓存30分钟cacheConfigurations.put("recordingList", config.entryTtl(Duration.ofMinutes(30)));return RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(config).withInitialCacheConfigurations(cacheConfigurations).build();}
}

在服务层添加缓存注解:

@Cacheable(value = "liveRoomList", key = "'active_page_' + #page + '_' + #size")
public List<LiveRoom> getActiveLiveRooms(int page, int size) {// 原有实现...
}@Cacheable(value = "liveRoom", key = "#roomId")
public LiveRoom getLiveRoomDetail(Long roomId) {return liveRoomMapper.selectById(roomId);
}@CacheEvict(value = "liveRoom", key = "#roomId")
@Caching(evict = {@CacheEvict(value = "liveRoomList", allEntries = true)
})
public LiveRoom updateLiveRoomStatus(Long roomId, int status) {// 更新逻辑...
}

6.2 负载均衡与集群部署

对于大型直播系统,单机部署无法满足需求,需要考虑集群部署:

# Docker Compose示例配置
version: '3'services:# 应用服务器集群app1:image: live-streaming-service:latestports:- "8081:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs1:1935/livedepends_on:- mysql- redisapp2:image: live-streaming-service:latestports:- "8082:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs2:1935/livedepends_on:- mysql- redis# 流媒体服务器集群srs1:image: ossrs/srs:latestports:- "1935:1935"- "1985:1985"- "8080:8080"volumes:- ./srs1.conf:/usr/local/srs/conf/srs.confsrs2:image: ossrs/srs:latestports:- "1936:1935"- "1986:1985"- "8081:8080"volumes:- ./srs2.conf:/usr/local/srs/conf/srs.conf# 负载均衡器nginx:image: nginx:latestports:- "80:80"- "443:443"volumes:- ./nginx/nginx.conf:/etc/nginx/nginx.conf- ./nginx/ssl:/etc/nginx/ssldepends_on:- app1- app2- srs1- srs2# 数据库和中间件mysql:image: mysql:8.0ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=password- MYSQL_DATABASE=live_streamingvolumes:- mysql-data:/var/lib/mysqlredis:image: redis:6.2ports:- "6379:6379"volumes:- redis-data:/datavolumes:mysql-data:redis-data:

NGINX负载均衡配置:

http {upstream app_servers {# IP哈希负载均衡,确保同一用户请求发送到同一服务器ip_hash;server app1:8080;server app2:8080;}upstream srs_http_servers {server srs1:8080;server srs2:8080;}server {listen 80;server_name live.example.com;# API请求location /api/ {proxy_pass http://app_servers;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}# WebSocket连接location /ws {proxy_pass http://app_servers;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# HLS分发location /hls/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}# HTTP-FLV分发location /live/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}}
}stream {upstream rtmp_servers {server srs1:1935;server srs2:1935;}server {listen 1935;proxy_pass rtmp_servers;}
}

6.3 流量控制与限流

为防止服务器被恶意攻击或过载,实现限流机制:

@Configuration
public class RateLimitConfig {@Beanpublic RedisRateLimiter redisRateLimiter(StringRedisTemplate redisTemplate) {return new RedisRateLimiter(redisTemplate);}
}@Component
public class RedisRateLimiter {private final StringRedisTemplate redisTemplate;private final String luaScript;public RedisRateLimiter(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// Lua脚本实现滑动窗口限流this.luaScript = "local key = KEYS[1] " +"local capacity = tonumber(ARGV[1]) " +"local period = tonumber(ARGV[2]) " +"local now = tonumber(ARGV[3]) " +"local requested = tonumber(ARGV[4]) " +// 移除过期的时间戳"redis.call('zremrangebyscore', key, 0, now - period) " +// 获取当前请求数"local currentCount = redis.call('zcard', key) " +// 如果请求数超过容量,返回0"if currentCount + requested > capacity then " +"    return 0 " +"end " +// 添加新请求的时间戳"for i = 1, requested do " +"    redis.call('zadd', key, now, now .. i) " +"end " +// 设置过期时间"redis.call('expire', key, period) " +// 返回剩余容量"return capacity - currentCount - requested";}/*** 尝试获取令牌* @param key 限流键* @param capacity 容量* @param period 时间窗口(秒)* @param requested 请求令牌数* @return 是否获取成功*/public boolean tryAcquire(String key, int capacity, int period, int requested) {long now = System.currentTimeMillis() / 1000;RedisScript<Long> script = RedisScript.of(luaScript, Long.class);Long remainingTokens = redisTemplate.execute(script,Collections.singletonList(key),String.valueOf(capacity),String.valueOf(period),String.valueOf(now),String.valueOf(requested));return remainingTokens != null && remainingTokens >= 0;}
}

使用切面实现API限流:

@Aspect
@Component
public class RateLimitAspect {private final RedisRateLimiter rateLimiter;@Autowiredpublic RateLimitAspect(RedisRateLimiter rateLimiter) {this.rateLimiter = rateLimiter;}@Around("@annotation(rateLimit)")public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {// 获取请求IPHttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();String ip = getClientIp(request);// 构建限流键String key = "rate_limit:" + rateLimit.key() + ":" + ip;// 尝试获取令牌boolean allowed = rateLimiter.tryAcquire(key, rateLimit.capacity(), rateLimit.period(), 1);if (allowed) {return joinPoint.proceed();} else {throw new TooManyRequestsException("请求过于频繁,请稍后再试");}}private String getClientIp(HttpServletRequest request) {String ip = request.getHeader("X-Forwarded-For");if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}// 取第一个IP地址if (ip != null && ip.contains(",")) {ip = ip.split(",")[0].trim();}return ip;}
}@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {String key();           // 限流键int capacity() default 10;  // 容量int period() default 60;    // 时间窗口(秒)
}

在控制器中应用限流注解:

@PostMapping("/room")
@RateLimit(key = "create_room", capacity = 5, period = 3600)  // 每小时限制创建5个直播间
public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {// 实现...
}@PostMapping("/room/{roomId}/view")
@RateLimit(key = "view_increment", capacity = 1, period = 5)  // 每5秒限制一次
public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {// 实现...
}

6.4 监控与告警

整合Prometheus和Grafana实现监控:

@Configuration
public class MonitoringConfig {@BeanMeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {return registry -> registry.config().commonTags("application", "live-streaming-service");}
}

添加自定义指标:

@Service
public class LiveStreamMonitoringService {private final MeterRegistry meterRegistry;private final Counter liveStartCounter;private final Counter liveEndCounter;private final Gauge activeStreamGauge;private final Timer streamProcessingTimer;@Autowiredpublic LiveStreamMonitoringService(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;// 创建指标this.liveStartCounter = Counter.builder("live.stream.start").description("直播开始计数").register(meterRegistry);this.liveEndCounter = Counter.builder("live.stream.end").description("直播结束计数").register(meterRegistry);this.activeStreamGauge = Gauge.builder("live.stream.active", this::getActiveStreamCount).description("当前活跃直播数").register(meterRegistry);this.streamProcessingTimer = Timer.builder("live.stream.processing").description("直播处理时间").register(meterRegistry);}// 获取活跃直播数private long getActiveStreamCount() {// 从Redis获取活跃直播间数量return redisTemplate.opsForSet().size("live:active_rooms");}// 记录直播开始public void recordLiveStart() {liveStartCounter.increment();}// 记录直播结束public void recordLiveEnd() {liveEndCounter.increment();}// 记录处理时间public <T> T recordProcessingTime(Supplier<T> supplier) {return streamProcessingTimer.record(supplier);}// 记录错误public void recordError(String errorType) {meterRegistry.counter("live.stream.errors", "type", errorType).increment();}
}

七、安全性考虑

7.1 推流鉴权

前面已经实现了基于时间戳和签名的推流鉴权。为了增强安全性,可以添加IP白名单:

@Service
public class StreamSecurityService {@Value("${live.security.ip-whitelist-enabled}")private boolean ipWhitelistEnabled;@Value("#{'${live.security.ip-whitelist:}'.split(',')}")private List<String> ipWhitelist;/*** 验证IP是否在白名单中*/public boolean isIpAllowed(String ip) {if (!ipWhitelistEnabled) {return true;}if (ipWhitelist.isEmpty()) {return true;}return ipWhitelist.contains(ip) || ipWhitelist.contains("0.0.0.0");}/*** 生成带有过期时间的推流URL*/public String generateSecureStreamUrl(String baseUrl, String streamKey, long expireSeconds) {long expireTimestamp = System.currentTimeMillis() / 1000 + expireSeconds;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&expire=" + expireTimestamp;}
}

7.2 播放鉴权

同样可以为播放URL添加鉴权:

@Service
public class PlayAuthService {@Value("${live.play.auth-enabled}")private boolean authEnabled;@Value("${live.play.auth-key}")private String authKey;@Value("${live.play.auth-expire}")private long authExpire;/*** 生成带鉴权的播放URL*/public String generateAuthPlayUrl(String baseUrl, String streamKey, Long userId) {if (!authEnabled) {return baseUrl;}long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + userId + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&user_id=" + userId + "&expire=" + expireTimestamp;}/*** 验证播放URL*/public boolean validatePlayUrl(String streamKey, String authToken, String userId, String expireStr) {if (!authEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expireStr);long currentTime = System.currentTimeMillis() / 1000;// 检查是否过期if (currentTime > expireTimestamp) {return false;}// 验证tokenString authString = streamKey + "-" + userId + "-" + expireStr + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(authToken);} catch (Exception e) {return false;}}
}

7.3 内容安全

对于用户生成的内容,需要进行审核和过滤:

@Service
public class ContentSecurityService {@Value("${live.content.sensitive-words-file}")private String sensitiveWordsFile;private Set<String> sensitiveWords = new HashSet<>();@PostConstructpublic void init() {// 加载敏感词库try {File file = new File(sensitiveWordsFile);if (file.exists()) {List<String> lines = Files.readAllLines(file.toPath());sensitiveWords.addAll(lines);}} catch (IOException e) {// 如果加载失败,使用默认敏感词sensitiveWords.add("敏感词1");sensitiveWords.add("敏感词2");}}/*** 过滤敏感词*/public String filterContent(String content) {if (content == null || content.isEmpty()) {return content;}String filteredContent = content;for (String word : sensitiveWords) {filteredContent = filteredContent.replaceAll(word, "***");}return filteredContent;}/*** 检查内容是否包含敏感词*/public boolean containsSensitiveWords(String content) {if (content == null || content.isEmpty()) {return false;}for (String word : sensitiveWords) {if (content.contains(word)) {return true;}}return false;}
}

在WebSocket消息处理中使用:

@MessageMapping("/chat/{roomId}")
public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 过滤敏感内容String filteredContent = contentSecurityService.filterContent(message.getContent());message.setContent(filteredContent);// 检查是否全是敏感词if (filteredContent.matches("\*+")) {// 记录违规日志log.warn("用户发送违规内容: userId={}, content={}", message.getUserId(), message.getContent());return;}// 正常发送消息messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);
}

八、演示说明

为了帮助大家快速理解和测试直播系统,下面提供完整的演示步骤:

8.1 环境准备

首先,需要准备以下环境:

  1. Java 21+ :运行SpringBoot应用
  2. MySQL 5.7+ :存储直播信息
  3. Redis:缓存和消息通信
  4. SRS:流媒体服务器
  5. FFmpeg:视频处理工具(录制直播回放)
  6. OBS Studio:用于测试推流

8.2 安装SRS

使用Docker可以快速部署SRS服务器:

# 拉取SRS镜像
docker pull ossrs/srs:4# 运行SRS容器
docker run --name srs -d --restart=always \-p 1935:1935 \-p 1985:1985 \-p 8080:8080 \-v $(pwd)/conf:/usr/local/srs/conf \ossrs/srs:4 objs/srs -c conf/srs.conf

将前面提供的SRS配置文件保存为conf/srs.conf

8.3 直播功能演示流程

8.3.1 创建直播间

通过API创建直播间

curl -X POST http://localhost:8080/api/live/room \-H "Content-Type: application/json" \-d '{"title": "测试直播间","userId": 1,"coverUrl": "https://example.com/cover.jpg"}'

响应示例

{"id": 1,"title": "测试直播间","userId": 1,"coverUrl": "https://example.com/cover.jpg","streamKey": "8f7d6e5c4b3a2f1e0d9c8b7a","streamUrl": "rtmp://localhost:1935/live/8f7d6e5c4b3a2f1e0d9c8b7a","hlsUrl": "http://localhost:8080/hls/8f7d6e5c4b3a2f1e0d9c8b7a.m3u8","flvUrl": "http://localhost:8080/live/8f7d6e5c4b3a2f1e0d9c8b7a.flv","status": 0,"viewCount": 0,"likeCount": 0
}

记录下streamKeystreamUrl,这将用于推流设置。

8.3.2 推流测试
  1. 打开OBS Studio,设置推流参数:

    • 设置 → 流
    • 服务:自定义
    • 服务器:rtmp://192.168.195.100:1935/live/2feabf98b5ee6bb0c3dbc6cd423084a3?auth_key=9875b710e26914eff9f9aa1cc1df0093&expire=1749220798
    • 流密钥:刚才获取的streamKey(例如:8f7d6e5c4b3a2f1e0d9c8b7a)
  2. 添加视频源(例如摄像头或屏幕捕获)

  3. 点击"开始推流"按钮开始直播

系统会自动检测到推流开始,并通过SRS回调更新直播间状态为"直播中"。

8.3.3 播放测试
  1. 打开前端页面:http://localhost:8080/play.html?roomId=1
  2. 页面将自动加载直播流并开始播放
  3. 测试播放:
    • HLS播放:http://192.168.195.100:8080/hls/live/2feabf98b5ee6bb0c3dbc6cd423084a3/2feabf98b5ee6bb0c3dbc6cd423084a3.m3u8
8.3.4 直播互动测试
  1. 多次刷新页面,查看观看数是否增加
8.3.5 直播回放测试

开始录制直播

curl -X POST http://localhost:8080/api/live/room/1/record/start

等待一段时间后,停止录制

# 使用返回的recordingId
curl -X POST http://localhost:8080/api/live/record/1/stop

查看回放列表

curl http://localhost:8080/api/live/room/1/recordings

使用返回的fileUrl播放回放视频

8.3.6 结束直播
  1. 在OBS Studio中点击"停止推流"按钮
  2. 系统会自动检测到推流结束,并更新直播间状态为"直播结束"
  3. 也可以通过API手动结束直播
curl -X POST http://localhost:8080/api/live/room/1/end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/84299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xcode 各版本真机调试包下载

下载地址 https://github.com/filsv/iOSDeviceSupport 使用方法&#xff1a; 添加到下面路径中&#xff0c;然后退出重启xcode /Applications/Xcode.app/Contents/Developer/Platforms/iPhoneOS.platform/DeviceSupport

DL00871-基于深度学习YOLOv11的盲人障碍物目标检测含完整数据集

基于深度学习YOLOv11的盲人障碍物目标检测&#xff1a;开启盲人出行新纪元 在全球范围内&#xff0c;盲人及视觉障碍者的出行问题一直是社会关注的重点。尽管技术不断进步&#xff0c;许多城市的无障碍设施依然未能满足盲人出行的实际需求。尤其是在复杂的城市环境中&#xff…

Python 训练 day46

知识点回顾&#xff1a; 不同CNN层的特征图&#xff1a;不同通道的特征图什么是注意力&#xff1a;注意力家族&#xff0c;类似于动物园&#xff0c;都是不同的模块&#xff0c;好不好试了才知道。通道注意力&#xff1a;模型的定义和插入的位置通道注意力后的特征图和热力图 作…

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…

Qwen系列之Qwen3解读:最强开源模型的细节拆解

文章目录 1.1分钟快览2.模型架构2.1.Dense模型2.2.MoE模型 3.预训练阶段3.1.数据3.2.训练3.3.评估 4.后训练阶段S1: 长链思维冷启动S2: 推理强化学习S3: 思考模式融合S4: 通用强化学习 5.全家桶中的小模型训练评估评估数据集评估细节评估效果弱智评估和民间Arena 分析展望 如果…

yolo模型精度提升策略

总结与行动建议 立即行动&#xff1a; 显著增加6种相似BGA的高质量、多样化训练数据&#xff08;2倍以上是合理起点&#xff09;。 实施针对性数据增强&#xff1a; 设计模拟BGA实际成像挑战&#xff08;反光、模糊、视角变化&#xff09;的增强方案。 升级模型与损失函数&am…

Kafka主题运维全指南:从基础配置到故障处理

#作者&#xff1a;张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1&#xff1a;主题删除失败。常见错误2&#xff1a;__consumer_offsets占用太多的磁盘。 主题日常管理 …

使用Docker部署MySQLRedis容器与常见命令

目录 1. 检查WSL配置2. 设置WSL版本3. 拉取MySQL镜像4. 验证镜像5. 运行MySQL容器在WSL环境中使用以下命令启动MySQL容器查看容器/镜像的完整信息显式指定宿主机挂载路径可选&#xff1a;在Windows的cmd中使用以下命令启动MySQL容器 6. 管理容器启动已创建的容器查看运行中的容…

01__C++入门

一、C的语法框架 首先学习一门语言&#xff0c;我们需要了解语言的基本框架&#xff0c;这一小节&#xff0c;我们学习C的历史应用&#xff0c;c和c的区别和c的标准 二、认识C 1、C的历史 所有的主流C编译器都支持这个版本的C&#xff08;1998年的版本&#xff09;。 2、C的应…

2024 CKA题库+详尽解析| 15、备份还原Etcd

目录 免费获取题库配套 CKA_v1.31_模拟系统 15、 备份还原Etcd 题目&#xff1a; 开始操作: 1&#xff09;、切换集群 2&#xff09;、登录master并提权 3&#xff09;、备份Etcd现有数据 4&#xff09;、验证备份数据快照 5&#xff09;、查看节点和Pod状态 6&am…

Flotherm许可的并发用户数限制

在电子产品热设计领域&#xff0c;Flotherm软件以其卓越的性能和精确的仿真能力而受到广大用户的青睐。然而&#xff0c;在使用Flotherm软件时&#xff0c;了解其许可的并发用户数限制对于优化资源配置和提升工作效率至关重要。本文将详细介绍Flotherm软件许可的并发用户数限制…

读取宝塔方法,查找容别名存放位置

可以查到对应方法 根据参数名可知 查找到 得到位置

【1】跨越技术栈鸿沟:字节跳动开源TRAE AI编程IDE的实战体验

2024年初&#xff0c;人工智能编程工具领域发生了一次静默的变革。当字节跳动宣布退出其TRAE项目&#xff08;一款融合大型语言模型能力的云端AI编程IDE&#xff09;时&#xff0c;技术社区曾短暂叹息。然而这一退场并非终点——通过开源社区的接力&#xff0c;TRAE在WayToAGI等…

git连接本地仓库以及gitee

参考:gitee创建新仓库并上传代码_gitee新建仓库导入代码-CSDN博客 git初始化以及添加git分支 在idea查看master主分支 报错 原因gitee推送更新失败问题记录&#xff1a;remote: error: hook declined to update refs/heads/master-CSDN博客 取消邮箱暴露

pocketflow库实现guardrail

目录 代码代码解释1. 系统架构2. 核心组件详解2.1 LLM调用函数2.2 UserInputNode&#xff08;用户输入节点&#xff09;2.3 GuardrailNode&#xff08;安全防护节点&#xff09;2.4 LLMNode&#xff08;LLM处理节点&#xff09; 3. 流程控制机制 示例运行 代码 from pocketflo…

Fetch API 使用详解:Bearer Token 与 localStorage 实践

Fetch API&#xff1a;现代浏览器内置的用于发送 HTTP 请求的 API&#xff0c;Bearer Token&#xff1a;一种基于令牌的身份验证方案&#xff0c;常用于 JWT 认证&#xff0c;localStorage&#xff1a;浏览器提供的持久化存储方案&#xff0c;用于在客户端存储数据。 token是我…

Netty自定义协议解析

目录 自定义协议设计 实现消息解码器 实现消息编码器 自定义消息对象 配置ChannelPipeline Netty提供了强大的编解码器抽象基类,这些基类能够帮助开发者快速实现自定义协议的解析。 自定义协议设计 在实现自定义协议解析之前,需要明确协议的具体格式。例如,一个简单的…

驭码 CodeRider 2.0 产品体验:智能研发的革新之旅

驭码 CodeRider 2.0 产品体验&#xff1a;智能研发的革新之旅 在当今快速发展的软件开发领域&#xff0c;研发效率与质量始终是开发者和企业关注的核心。面对开发协作流程繁琐、代码生成补全不准、代码审核低效、知识协同困难以及部署成本与灵活性难以平衡等问题&#xff0c;…

NLP学习路线图(二十六):自注意力机制

一、为何需要你?序列建模的困境 在你出现之前,循环神经网络(RNN)及其变种LSTM、GRU是处理序列数据(如文本、语音、时间序列)的主流工具。它们按顺序逐个处理输入元素,将历史信息压缩在一个隐藏状态向量中传递。 瓶颈显现: 长程依赖遗忘: 随着序列增长,早期信息在传递…

【渲染】Unity-分析URP的延迟渲染-DeferredShading

我是一名资深游戏开发&#xff0c;小时候喜欢看十万个为什么 介绍 本文旨在搞清楚延迟渲染在unity下如何实现的&#xff0c;为自己写延迟渲染打一个基础&#xff0c;打开从知到行的大门延迟渲染 输出物体表面信息(rt1, rt2, rt3, …) 着色(rt1, rt2, rt3, …)研究完感觉核心…