当下直播技术已经成为各类应用不可或缺的一部分,从社交媒体到在线教育,再到电子商务和游戏领域,直播功能正在被广泛应用。
本文将介绍如何使用SpringBoot框架构建一个直播流推拉系统。
一、直播技术基础
1.1 推流与拉流概念
直播系统的核心环节包括推流和拉流:
- 推流(Push) : 指主播将采集的音视频数据通过特定协议发送到流媒体服务器的过程
- 拉流(Pull) : 指观众从流媒体服务器获取音视频数据并播放的过程
1.2 常用直播协议
市面上主要的直播协议包括:
协议 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
RTMP | 低延迟(1-3秒)、成熟稳定 | 基于Flash、需要额外端口(1935) | 主播推流、低延迟直播 |
HLS | 兼容性好、使用HTTP协议 | 延迟高 | 大规模直播分发、移动端 |
WebRTC | 超低延迟(<1秒)、P2P通信 | 穿透复杂网络困难、部署复杂 | 实时互动、小规模视频会议 |
HTTP-FLV | 低延迟、兼容性较好 | 不支持可变码率 | 大规模观看、延迟敏感场景 |
SRT | 低延迟、高可靠性 | 生态相对较新 | 专业直播、跨国直播 |
1.3 直播系统架构概述
一个完整的直播系统通常包含以下组件:
- 采集端:负责采集、编码音视频数据
- 流媒体服务器:处理音视频流的转发、转码和分发
- CDN:提供内容分发服务,解决大规模用户访问问题
- 播放器:解码并播放音视频内容
- 信令服务:管理直播间信息、用户状态等
二、系统技术设计
2.1 整体架构
核心组件包括:
- API服务层:基于SpringBoot构建的RESTful API,处理直播间管理、用户认证等
- 媒体服务器集成层:集成开源流媒体服务器(如SRS)
- 流转发层:处理媒体流的转发、转码和适配
- 存储层:用于直播回放和点播
- WebSocket服务:提供直播互动功能
2.2 技术选型
- 后端框架:SpringBoot 3
- 流媒体服务器:SRS (Simple RTMP Server)
- 数据库:MySQL + Redis
- WebSocket:Spring WebSocket
- 存储系统:MinIO (对象存储)
- 前端播放器:Video.js、flv.js、hls.js
三、SpringBoot实现直播服务
3.1 项目依赖配置
首先配置pom.xml
文件,引入必要的依赖:
<dependencies><!-- SpringBoot 核心依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- WebSocket 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- Redis 支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- MySQL 支持 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- MyBatis Plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.5</version></dependency><!-- MinIO 客户端 --><dependency><groupId>io.minio</groupId><artifactId>minio</artifactId><version>8.4.6</version></dependency><!-- HTTP 客户端 --><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.14</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
3.2 应用配置
在application.yml
中配置应用参数:
server:port: 8080spring:application:name: live-streaming-service# 数据库配置datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/live_streaming?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghaiusername: rootpassword: root# Redis配置redis:host: localhostport: 6379database: 0# MyBatis Plus配置
mybatis-plus:mapper-locations: classpath:mapper/*.xmltype-aliases-package: com.example.livestream.entityconfiguration:map-underscore-to-camel-case: truelog-impl: org.apache.ibatis.logging.stdout.StdOutImpl# 流媒体服务器配置
live:srs:server-url: rtmp://192.168.195.100:1935/liveapi-url: http://192.168.195.100:1985/apihttp-flv-url: http://192.168.195.100:8080/livehls-url: http://192.168.195.100:8080/live/hlsrecord:save-path: /data/recordpush:key-check-enabled: trueauth-expire: 86400 # 24小时,单位秒auth-key: your_secret_key# MinIO配置
minio:endpoint: http://localhost:9000access-key: minioadminsecret-key: minioadminbucket: live-recordings
3.3 实体类设计
首先定义直播间实体:
@Data
@TableName("live_room")
public class LiveRoom {@TableId(value = "id", type = IdType.AUTO)private Long id;private String title;private String coverUrl;private Long userId;private String streamKey; // 推流密钥private String streamUrl; // 推流地址private String hlsUrl; // HLS播放地址private String flvUrl; // HTTP-FLV播放地址private Integer status; // 0:未开播 1:直播中 2:直播结束private Long viewCount; // 观看人数private Long likeCount; // 点赞数private LocalDateTime startTime; // 开播时间private LocalDateTime endTime; // 结束时间private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
直播流信息实体:
@Data
@TableName("live_stream")
public class LiveStream {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String streamId; // 流IDprivate String protocol; // 协议类型:rtmp/hls/flvprivate Integer bitrate; // 码率private String resolution; // 分辨率private Integer status; // 0:未启动 1:活跃 2:已结束private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
直播回放实体:
@Data
@TableName("live_recording")
public class LiveRecording {@TableId(value = "id", type = IdType.AUTO)private Long id;private Long roomId;private String fileName;private String fileUrl;private Long fileSize;private Integer duration; // 时长,单位秒private LocalDateTime startTime;private LocalDateTime endTime;private Integer status; // 0:录制中 1:录制完成 2:处理中 3:可用 4:删除private LocalDateTime createdAt;private LocalDateTime updatedAt;
}
3.4 数据库表设计
根据实体类,创建对应的数据库表:
-- 直播间表
CREATE TABLE `live_room` (`id` bigint NOT NULL AUTO_INCREMENT,`title` varchar(255) NOT NULL COMMENT '直播标题',`cover_url` varchar(255) DEFAULT NULL COMMENT '封面URL',`user_id` bigint NOT NULL COMMENT '主播用户ID',`stream_key` varchar(64) NOT NULL COMMENT '推流密钥',`stream_url` varchar(255) DEFAULT NULL COMMENT '推流地址',`hls_url` varchar(255) DEFAULT NULL COMMENT 'HLS播放地址',`flv_url` varchar(255) DEFAULT NULL COMMENT 'HTTP-FLV播放地址',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0未开播 1直播中 2直播结束',`view_count` bigint NOT NULL DEFAULT '0' COMMENT '观看人数',`like_count` bigint NOT NULL DEFAULT '0' COMMENT '点赞数',`start_time` datetime DEFAULT NULL COMMENT '开播时间',`end_time` datetime DEFAULT NULL COMMENT '结束时间',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_key` (`stream_key`),KEY `idx_user_id` (`user_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播间信息表';-- 直播流表
CREATE TABLE `live_stream` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播间ID',`stream_id` varchar(64) NOT NULL COMMENT '流ID',`protocol` varchar(20) NOT NULL COMMENT '协议类型',`bitrate` int DEFAULT NULL COMMENT '码率',`resolution` varchar(20) DEFAULT NULL COMMENT '分辨率',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0未启动 1活跃 2已结束',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `uk_stream_id` (`stream_id`),KEY `idx_room_id` (`room_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播流信息表';-- 直播回放表
CREATE TABLE `live_recording` (`id` bigint NOT NULL AUTO_INCREMENT,`room_id` bigint NOT NULL COMMENT '直播间ID',`file_name` varchar(255) NOT NULL COMMENT '文件名',`file_url` varchar(255) COMMENT '文件URL',`file_size` bigint DEFAULT NULL COMMENT '文件大小(字节)',`duration` int DEFAULT NULL COMMENT '时长(秒)',`start_time` datetime NOT NULL COMMENT '开始时间',`end_time` datetime DEFAULT NULL COMMENT '结束时间',`status` tinyint NOT NULL DEFAULT '0' COMMENT '状态:0录制中 1录制完成 2处理中 3可用 4删除',`created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),KEY `idx_room_id` (`room_id`),KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='直播回放表';
3.5 Mapper接口
使用MyBatis-Plus创建Mapper接口:
@Mapper
public interface LiveRoomMapper extends BaseMapper<LiveRoom> {// 自定义查询方法@Select("SELECT * FROM live_room WHERE status = 1 ORDER BY view_count DESC LIMIT #{limit}")List<LiveRoom> findHotLiveRooms(@Param("limit") int limit);
}@Mapper
public interface LiveStreamMapper extends BaseMapper<LiveStream> {// 基础CRUD方法由BaseMapper提供
}@Mapper
public interface LiveRecordingMapper extends BaseMapper<LiveRecording> {// 基础CRUD方法由BaseMapper提供
}
3.6 服务层实现
3.6.1 直播流服务
@Service
@Slf4j
public class LiveStreamService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveStreamMapper liveStreamMapper;@Autowiredprivate StringRedisTemplate redisTemplate;@Value("${live.srs.server-url}")private String srsServerUrl;@Value("${live.srs.api-url}")private String srsApiUrl;@Value("${live.srs.http-flv-url}")private String httpFlvUrl;@Value("${live.srs.hls-url}")private String hlsUrl;@Value("${live.push.key-check-enabled}")private boolean keyCheckEnabled;@Value("${live.push.auth-expire}")private long authExpire;@Value("${live.push.auth-key}")private String authKey;private RestTemplate restTemplate = new RestTemplate();/*** 创建直播间*/@Transactionalpublic LiveRoom createLiveRoom(LiveRoom liveRoom) {// 生成推流密钥String streamKey = generateStreamKey(liveRoom.getUserId());liveRoom.setStreamKey(streamKey);// 构建推流地址String pushUrl = buildPushUrl(streamKey);liveRoom.setStreamUrl(pushUrl);// 构建播放地址liveRoom.setHlsUrl(hlsUrl + "/" + streamKey + ".m3u8");liveRoom.setFlvUrl(httpFlvUrl + "/" + streamKey + ".flv");// 设置初始状态liveRoom.setStatus(0);liveRoom.setViewCount(0L);liveRoom.setLikeCount(0L);liveRoom.setCreatedAt(LocalDateTime.now());liveRoom.setUpdatedAt(LocalDateTime.now());// 保存到数据库liveRoomMapper.insert(liveRoom);return liveRoom;}/*** 生成推流密钥*/private String generateStreamKey(Long userId) {// 生成基于用户ID和时间戳的唯一密钥String baseKey = userId + "_" + System.currentTimeMillis();return DigestUtils.md5DigestAsHex(baseKey.getBytes());}/*** 构建推流地址*/private String buildPushUrl(String streamKey) {StringBuilder sb = new StringBuilder(srsServerUrl);sb.append("/").append(streamKey);// 如果启用了推流验证if (keyCheckEnabled) {long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());sb.append("?auth_key=").append(authToken).append("&expire=").append(expireTimestamp);}return sb.toString();}/*** 开始直播*/@Transactionalpublic LiveRoom startLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {throw new IllegalArgumentException("直播间不存在");}// 更新直播间状态为直播中liveRoom.setStatus(1);liveRoom.setStartTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 创建直播流记录LiveStream liveStream = new LiveStream();liveStream.setRoomId(roomId);liveStream.setStreamId(liveRoom.getStreamKey());liveStream.setProtocol("rtmp");liveStream.setStatus(1);liveStream.setCreatedAt(LocalDateTime.now());liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.insert(liveStream);// 更新Redis缓存中的活跃直播间redisTemplate.opsForSet().add("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 结束直播*/@Transactionalpublic LiveRoom endLiveStream(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播间不存在或未开播");}// 更新直播间状态为已结束liveRoom.setStatus(2);liveRoom.setEndTime(LocalDateTime.now());liveRoomMapper.updateById(liveRoom);// 更新直播流状态QueryWrapper<LiveStream> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 1);LiveStream liveStream = liveStreamMapper.selectOne(queryWrapper);if (liveStream != null) {liveStream.setStatus(2);liveStream.setUpdatedAt(LocalDateTime.now());liveStreamMapper.updateById(liveStream);}// 从Redis中移除活跃直播间redisTemplate.opsForSet().remove("live:active_rooms", String.valueOf(roomId));return liveRoom;}/*** 获取当前活跃的直播间列表*/public List<LiveRoom> getActiveLiveRooms(int page, int size) {Page<LiveRoom> pageParam = new Page<>(page, size);QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("status", 1).orderByDesc("view_count");return liveRoomMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 获取热门直播间*/public List<LiveRoom> getHotLiveRooms(int limit) {return liveRoomMapper.findHotLiveRooms(limit);}/*** 增加直播间观看人数*/public void incrementViewCount(Long roomId) {// 使用Redis进行计数String key = "live:room:" + roomId + ":view_count";redisTemplate.opsForValue().increment(key);// 定期同步到数据库if (Math.random() < 0.1) { // 10%概率同步,减少数据库压力String countStr = redisTemplate.opsForValue().get(key);if (countStr != null) {long count = Long.parseLong(countStr);LiveRoom room = new LiveRoom();room.setId(roomId);room.setViewCount(count);liveRoomMapper.updateById(room);}}}/*** 校验推流密钥*/public boolean validateStreamKey(String streamKey, String token, String expire) {if (!keyCheckEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expire);long currentTime = System.currentTimeMillis() / 1000;// 检查是否过期if (currentTime > expireTimestamp) {return false;}// 验证tokenString authString = streamKey + "-" + expire + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(token);} catch (Exception e) {log.error("验证推流密钥异常", e);return false;}}/*** 处理SRS回调 - 流发布*/public void handleStreamPublish(String app, String stream) {try {// 查找对应的直播间QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 0) {// 更新直播间状态startLiveStream(liveRoom.getId());log.info("直播流发布成功: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("处理流发布回调异常", e);}}/*** 处理SRS回调 - 流关闭*/public void handleStreamClose(String app, String stream) {try {// 查找对应的直播间QueryWrapper<LiveRoom> queryWrapper = new QueryWrapper<>();queryWrapper.eq("stream_key", stream);LiveRoom liveRoom = liveRoomMapper.selectOne(queryWrapper);if (liveRoom != null && liveRoom.getStatus() == 1) {// 更新直播间状态endLiveStream(liveRoom.getId());log.info("直播流关闭: app={}, stream={}, roomId={}", app, stream, liveRoom.getId());}} catch (Exception e) {log.error("处理流关闭回调异常", e);}}/*** 获取SRS服务器信息*/public Map<String, Object> getSrsServerInfo() {try {String url = srsApiUrl + "/v1/summaries";ResponseEntity<Map> response = restTemplate.getForEntity(url, Map.class);return response.getBody();} catch (Exception e) {log.error("获取SRS服务器信息异常", e);return Collections.emptyMap();}}
}
3.6.2 直播回放服务
@Service
@Slf4j
public class LiveRecordingService {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate LiveRecordingMapper recordingMapper;@Autowiredprivate MinioClient minioClient;@Value("${minio.bucket}")private String minioBucket;@Value("${live.record.save-path}")private String recordSavePath;/*** 开始录制直播*/@Transactionalpublic LiveRecording startRecording(Long roomId) {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {throw new IllegalArgumentException("直播间不存在或未开播");}// 创建录制记录LiveRecording recording = new LiveRecording();recording.setRoomId(roomId);recording.setFileName(liveRoom.getStreamKey() + "_" + System.currentTimeMillis() + ".mp4");recording.setStatus(0); // 录制中recording.setStartTime(LocalDateTime.now());recording.setCreatedAt(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.insert(recording);// 异步启动录制进程startRecordingProcess(liveRoom, recording);return recording;}/*** 停止录制直播*/@Transactionalpublic LiveRecording stopRecording(Long recordingId) {LiveRecording recording = recordingMapper.selectById(recordingId);if (recording == null || recording.getStatus() != 0) {throw new IllegalArgumentException("录制任务不存在或已结束");}// 更新录制状态recording.setStatus(1); // 录制完成recording.setEndTime(LocalDateTime.now());recording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(recording);// 异步停止录制进程并上传文件stopRecordingProcess(recording);return recording;}/*** 获取直播回放列表*/public List<LiveRecording> getRecordings(Long roomId, int page, int size) {Page<LiveRecording> pageParam = new Page<>(page, size);QueryWrapper<LiveRecording> queryWrapper = new QueryWrapper<>();queryWrapper.eq("room_id", roomId).eq("status", 3) // 可用状态.orderByDesc("start_time");return recordingMapper.selectPage(pageParam, queryWrapper).getRecords();}/*** 启动录制进程*/private void startRecordingProcess(LiveRoom liveRoom, LiveRecording recording) {// 使用线程池异步执行CompletableFuture.runAsync(() -> {try {File saveDir = new File(recordSavePath);if (!saveDir.exists()) {saveDir.mkdirs();}String outputPath = recordSavePath + "/" + recording.getFileName();String inputUrl = liveRoom.getFlvUrl();// 使用FFmpeg录制ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-i", inputUrl,"-c:v", "copy","-c:a", "aac","-strict", "-2",outputPath);Process process = pb.start();// 保存进程ID,以便后续停止String processKey = "live:recording:process:" + recording.getId();Runtime.getRuntime().addShutdownHook(new Thread(() -> {process.destroy();}));log.info("开始录制直播, roomId={}, recordingId={}", liveRoom.getId(), recording.getId());// 等待进程结束int exitCode = process.waitFor();log.info("录制进程结束, roomId={}, recordingId={}, exitCode={}", liveRoom.getId(), recording.getId(), exitCode);// 如果是正常结束,则更新状态并上传文件if (exitCode == 0) {uploadRecording(recording, new File(outputPath));}} catch (Exception e) {log.error("录制直播异常", e);// 更新录制状态为失败LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4); // 失败状态failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}});}/*** 停止录制进程*/private void stopRecordingProcess(LiveRecording recording) {// 这里可以实现停止特定的FFmpeg进程// 在实际实现中,需要保存进程ID并通过操作系统命令停止进程log.info("手动停止录制, recordingId={}", recording.getId());}/*** 上传录制文件到MinIO*/private void uploadRecording(LiveRecording recording, File file) {try {// 设置状态为处理中LiveRecording processingRecording = new LiveRecording();processingRecording.setId(recording.getId());processingRecording.setStatus(2); // 处理中processingRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(processingRecording);// 获取文件元数据long fileSize = file.length();// 使用FFmpeg获取视频时长String[] cmd = {"ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file.getAbsolutePath()};Process process = Runtime.getRuntime().exec(cmd);BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));String durationStr = reader.readLine();int duration = (int) Float.parseFloat(durationStr);// 上传到MinIOString objectName = "recordings/" + recording.getFileName();minioClient.uploadObject(UploadObjectArgs.builder().bucket(minioBucket).object(objectName).filename(file.getAbsolutePath()).contentType("video/mp4").build());// 构建访问URLString fileUrl = minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder().bucket(minioBucket).object(objectName).method(Method.GET).build());// 更新录制记录LiveRecording updatedRecording = new LiveRecording();updatedRecording.setId(recording.getId());updatedRecording.setFileUrl(fileUrl);updatedRecording.setFileSize(fileSize);updatedRecording.setDuration(duration);updatedRecording.setStatus(3); // 可用状态updatedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(updatedRecording);log.info("录制文件上传完成, recordingId={}, fileSize={}, duration={}s", recording.getId(), fileSize, duration);// 删除本地文件file.delete();} catch (Exception e) {log.error("上传录制文件异常", e);// 更新录制状态为失败LiveRecording failedRecording = new LiveRecording();failedRecording.setId(recording.getId());failedRecording.setStatus(4); // 失败状态failedRecording.setUpdatedAt(LocalDateTime.now());recordingMapper.updateById(failedRecording);}}
}
3.7 控制器实现
3.7.1 直播控制器
@RestController
@RequestMapping("/api/live")
@Slf4j
public class LiveController {@Autowiredprivate LiveStreamService liveStreamService;@Autowiredprivate LiveRecordingService recordingService;@Autowiredprivate LiveRoomMapper liveRoomMapper;/*** 创建直播间*/@PostMapping("/room")public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {try {LiveRoom createdRoom = liveStreamService.createLiveRoom(liveRoom);return ResponseEntity.ok(createdRoom);} catch (Exception e) {log.error("创建直播间异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取直播间详情*/@GetMapping("/room/{roomId}")public ResponseEntity<LiveRoom> getLiveRoom(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(liveRoom);} catch (Exception e) {log.error("获取直播间详情异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 开始直播*/@PostMapping("/room/{roomId}/start")public ResponseEntity<LiveRoom> startLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.startLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("开始直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 结束直播*/@PostMapping("/room/{roomId}/end")public ResponseEntity<LiveRoom> endLiveStream(@PathVariable Long roomId) {try {LiveRoom liveRoom = liveStreamService.endLiveStream(roomId);return ResponseEntity.ok(liveRoom);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("结束直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取活跃直播间列表*/@GetMapping("/rooms/active")public ResponseEntity<List<LiveRoom>> getActiveLiveRooms(@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRoom> rooms = liveStreamService.getActiveLiveRooms(page, size);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("获取活跃直播间列表异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取热门直播间*/@GetMapping("/rooms/hot")public ResponseEntity<List<LiveRoom>> getHotLiveRooms(@RequestParam(defaultValue = "10") int limit) {try {List<LiveRoom> rooms = liveStreamService.getHotLiveRooms(limit);return ResponseEntity.ok(rooms);} catch (Exception e) {log.error("获取热门直播间异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 增加观看人数*/@PostMapping("/room/{roomId}/view")public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {try {liveStreamService.incrementViewCount(roomId);return ResponseEntity.ok().build();} catch (Exception e) {log.error("增加观看人数异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 开始录制直播*/@PostMapping("/room/{roomId}/record/start")public ResponseEntity<LiveRecording> startRecording(@PathVariable Long roomId) {try {LiveRecording recording = recordingService.startRecording(roomId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("开始录制直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 停止录制直播*/@PostMapping("/record/{recordingId}/stop")public ResponseEntity<LiveRecording> stopRecording(@PathVariable Long recordingId) {try {LiveRecording recording = recordingService.stopRecording(recordingId);return ResponseEntity.ok(recording);} catch (IllegalArgumentException e) {return ResponseEntity.badRequest().build();} catch (Exception e) {log.error("停止录制直播异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}/*** 获取直播回放列表*/@GetMapping("/room/{roomId}/recordings")public ResponseEntity<List<LiveRecording>> getRecordings(@PathVariable Long roomId,@RequestParam(defaultValue = "1") int page,@RequestParam(defaultValue = "10") int size) {try {List<LiveRecording> recordings = recordingService.getRecordings(roomId, page, size);return ResponseEntity.ok(recordings);} catch (Exception e) {log.error("获取直播回放列表异常", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();}}
}
3.7.2 SRS回调控制器
@RestController
@RequestMapping("/api/srs/callback")
@Slf4j
public class SrsCallbackController {@Autowiredprivate LiveStreamService liveStreamService;/*** 处理SRS on_publish回调* 当推流开始时,SRS会调用此接口*/@PostMapping("/on_publish")public ResponseEntity<Map<String, Object>> onPublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_publish回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();String param = callbackDto.getParam();Map<String, String> paramMap = HttpUtil.decodeParamMap(param, StandardCharsets.UTF_8);String token = paramMap.get("auth_key");String expire = paramMap.get("expire");callbackDto.setToken(token);callbackDto.setExpire(expire);// 验证推流密钥boolean valid = liveStreamService.validateStreamKey(callbackDto.getStream(), callbackDto.getToken(), callbackDto.getExpire());if (!valid) {result.put("code", 403);result.put("message", "Forbidden");return ResponseEntity.status(HttpStatus.FORBIDDEN).body(result);}// 处理流发布事件liveStreamService.handleStreamPublish(callbackDto.getApp(), callbackDto.getStream());result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_unpublish回调* 当推流结束时,SRS会调用此接口*/@PostMapping("/on_unpublish")public ResponseEntity<Map<String, Object>> onUnpublish(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_unpublish回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());// 处理流关闭事件liveStreamService.handleStreamClose(callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_play回调* 当播放流开始时,SRS会调用此接口*/@PostMapping("/on_play")public ResponseEntity<Map<String, Object>> onPlay(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_play回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_stop回调* 当播放流结束时,SRS会调用此接口*/@PostMapping("/on_stop")public ResponseEntity<Map<String, Object>> onStop(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_stop回调: app={}, stream={}", callbackDto.getApp(), callbackDto.getStream());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}/*** 处理SRS on_dvr回调* 当DVR录制文件关闭时,SRS会调用此接口*/@PostMapping("/on_dvr")public ResponseEntity<Map<String, Object>> onDvr(@RequestBody SrsCallbackDto callbackDto) {log.info("SRS on_dvr回调: app={}, stream={}, file={}", callbackDto.getApp(), callbackDto.getStream(), callbackDto.getFile());Map<String, Object> result = new HashMap<>();result.put("code", 0);result.put("message", "Success");return ResponseEntity.ok(result);}
}
3.8 WebSocket实现
3.8.1 WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 启用简单的消息代理registry.enableSimpleBroker("/topic");// 设置应用程序前缀registry.setApplicationDestinationPrefixes("/app");}@Overridepublic void registerStompEndpoints(StompEndpointRegistry registry) {// 注册STOMP端点registry.addEndpoint("/ws").setAllowedOriginPatterns("*").withSockJS();}
}
3.8.2 WebSocket控制器
@Controller
public class LiveChatController {@Autowiredprivate LiveRoomMapper liveRoomMapper;@Autowiredprivate SimpMessagingTemplate messagingTemplate;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 发送聊天消息*/@MessageMapping("/chat/{roomId}")public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 检查直播间是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 设置消息时间message.setTimestamp(LocalDateTime.now());// 发送消息到订阅该直播间的所有客户端messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);}/*** 直播间状态变更通知*/public void notifyRoomStatusChange(Long roomId, int status) {Map<String, Object> payload = new HashMap<>();payload.put("roomId", roomId);payload.put("status", status);payload.put("timestamp", LocalDateTime.now());messagingTemplate.convertAndSend("/topic/room/" + roomId + "/status", payload);}/*** 发送直播点赞通知*/@MessageMapping("/like/{roomId}")public void sendLike(@DestinationVariable Long roomId, Map<String, Object> payload) {// 检查直播间是否存在LiveRoom liveRoom = liveRoomMapper.selectById(roomId);if (liveRoom == null || liveRoom.getStatus() != 1) {return;}// 增加点赞数String key = "live:room:" + roomId + ":like_count";Long likeCount = redisTemplate.opsForValue().increment(key);// 定期同步到数据库if (likeCount % 100 == 0) { // 每100个点赞同步一次LiveRoom room = new LiveRoom();room.setId(roomId);room.setLikeCount(likeCount);liveRoomMapper.updateById(room);}// 添加时间戳payload.put("timestamp", LocalDateTime.now());payload.put("likeCount", likeCount);// 发送点赞通知messagingTemplate.convertAndSend("/topic/room/" + roomId + "/like", payload);}@Datapublic static class ChatMessage {private String username;private String userId;private String content;private String avatar;private LocalDateTime timestamp;}
}
四、SRS服务器配置
SRS (Simple RTMP Server) 是一个优秀的开源流媒体服务器,下面是基本配置文件srs.conf
:
listen 1935;
max_connections 1000;
daemon off;
srs_log_tank file;
srs_log_file ./objs/srs.log;
http_api {enabled on;listen 1985;
}
http_server {enabled on;listen 8080;dir ./objs/nginx/html;
}
vhost __defaultVhost__ {hls {enabled on;hls_path ./objs/nginx/html/hls;hls_fragment 10;hls_window 60;}http_remux {enabled on;mount [vhost]/[app]/[stream].flv;}dvr {enabled on;dvr_path ./objs/nginx/html/dvr/[app]/[stream].[timestamp].mp4;dvr_plan segment;dvr_duration 30;}http_hooks {enabled on;on_publish http://192.168.195.1:8080/api/srs/callback/on_publish;on_unpublish http://192.168.195.1:8080/api/srs/callback/on_unpublish;on_play http://192.168.195.1:8080/api/srs/callback/on_play;on_stop http://192.168.195.1:8080/api/srs/callback/on_stop;on_dvr http://192.168.195.1:8080/api/srs/callback/on_dvr;}
}
五、前端播放器实现
5.1 基于Video.js的播放器
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>直播播放器</title><link href="https://vjs.zencdn.net/7.20.3/video-js.css" rel="stylesheet" /><script src="https://vjs.zencdn.net/7.20.3/video.min.js"></script><script src="https://cdn.jsdelivr.net/npm/sockjs-client@1.5.0/dist/sockjs.min.js"></script><script src="https://cdn.jsdelivr.net/npm/stompjs@2.3.3/lib/stomp.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-contrib-hls@5.15.0/dist/videojs-contrib-hls.min.js"></script><script src="https://cdn.jsdelivr.net/npm/flv.js@1.6.2/dist/flv.min.js"></script><script src="https://cdn.jsdelivr.net/npm/videojs-flvjs@0.2.0/dist/videojs-flvjs.min.js"></script><style>.video-container {max-width: 800px;margin: 0 auto;}.video-js {width: 100%;height: 450px;}.room-info {margin-top: 20px;padding: 15px;background-color: #f8f9fa;border-radius: 5px;}.room-title {font-size: 24px;font-weight: bold;margin-bottom: 10px;}.streamer-info {display: flex;align-items: center;margin-bottom: 10px;}.streamer-avatar {width: 40px;height: 40px;border-radius: 50%;margin-right: 10px;}.streamer-name {font-weight: bold;}.room-stats {display: flex;gap: 20px;color: #666;}</style>
</head>
<body>
<div class="video-container"><video id="live-player" class="video-js vjs-default-skin vjs-big-play-centered" controls preload="auto"><p class="vjs-no-js">To view this video please enable JavaScript, and consider upgrading to a web browser that<a href="https://videojs.com/html5-video-support/" target="_blank">supports HTML5 video</a></p></video><div class="room-info"><div class="room-title" id="room-title">直播间标题</div><div class="streamer-info"><img class="streamer-avatar" id="streamer-avatar" src="https://api.dicebear.com/7.x/avataaars/svg?seed=user1" alt="主播头像"><span class="streamer-name" id="streamer-name">主播昵称</span></div><div class="room-stats"><div><i class="icon-eye"></i> <span id="view-count">0</span> 观看</div><div><i class="icon-heart"></i> <span id="like-count">0</span> 点赞</div></div></div>
</div><script>// 获取URL参数function getQueryParam(name) {const urlParams = new URLSearchParams(window.location.search);return urlParams.get(name);}// 初始化播放器function initPlayer() {const roomId = getQueryParam('roomId');if (!roomId) {alert('请指定直播间ID');return;}// 获取直播间信息fetch(`/api/live/room/${roomId}`).then(response => response.json()).then(room => {// 更新页面信息document.getElementById('room-title').textContent = room.title;document.getElementById('streamer-name').textContent = `主播ID: ${room.userId}`;document.getElementById('view-count').textContent = room.viewCount;document.getElementById('like-count').textContent = room.likeCount;// 判断直播状态if (room.status !== 1) {alert('直播未开始或已结束');return;}// 创建播放器const player = videojs('live-player', {autoplay: true,liveui: true,controls: true,preload: 'auto',responsive: true,fluid: true,sources: [/*{src: room.flvUrl,type: 'video/x-flv'},*/ {src: room.hlsUrl,type: 'application/x-mpegURL'}]});// 优先使用FLV.js/*if (flvjs.isSupported()) {player.flvjs({mediaDataSource: {type: 'flv',url: room.flvUrl}});}*/// 播放器错误处理player.on('error', function() {console.error('播放器错误,尝试切换播放源');// 尝试切换到HLSplayer.src({src: room.hlsUrl,type: 'application/x-mpegURL'});});// 统计观看人数fetch(`/api/live/room/${roomId}/view`, {method: 'POST'});// 连接WebSocket接收直播状态更新connectWebSocket(roomId);}).catch(error => {console.error('获取直播间信息失败:', error);alert('获取直播间信息失败');});}// 连接WebSocketfunction connectWebSocket(roomId) {const socket = new SockJS('/ws');const stompClient = Stomp.over(socket);stompClient.connect({}, function(frame) {console.log('Connected to WebSocket');// 订阅直播间状态变更stompClient.subscribe(`/topic/room/${roomId}/status`, function(message) {const data = JSON.parse(message.body);if (data.status !== 1) {alert('直播已结束');location.reload();}});// 订阅点赞更新stompClient.subscribe(`/topic/room/${roomId}/like`, function(message) {const data = JSON.parse(message.body);document.getElementById('like-count').textContent = data.likeCount;});});}// 页面加载完成后初始化document.addEventListener('DOMContentLoaded', initPlayer);
</script>
</body>
</html>
5.2 推流工具选择
对于主播端推流,可以选择以下工具:
- OBS Studio:开源、功能强大的推流软件
- FFmpeg:命令行工具,适合自动化推流
- WebRTC:浏览器直接采集和推流(低延迟但兼容性有限)
此处使用OBS Studio
六、性能优化与扩展
6.1 缓存策略
在高并发场景下,合理使用缓存至关重要:
@Configuration
@EnableCaching
public class CacheConfig {@Beanpublic CacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(10)).serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));Map<String, RedisCacheConfiguration> cacheConfigurations = new HashMap<>();// 直播间列表缓存1分钟cacheConfigurations.put("liveRoomList", config.entryTtl(Duration.ofMinutes(1)));// 直播间详情缓存5分钟cacheConfigurations.put("liveRoom", config.entryTtl(Duration.ofMinutes(5)));// 回放列表缓存30分钟cacheConfigurations.put("recordingList", config.entryTtl(Duration.ofMinutes(30)));return RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(config).withInitialCacheConfigurations(cacheConfigurations).build();}
}
在服务层添加缓存注解:
@Cacheable(value = "liveRoomList", key = "'active_page_' + #page + '_' + #size")
public List<LiveRoom> getActiveLiveRooms(int page, int size) {// 原有实现...
}@Cacheable(value = "liveRoom", key = "#roomId")
public LiveRoom getLiveRoomDetail(Long roomId) {return liveRoomMapper.selectById(roomId);
}@CacheEvict(value = "liveRoom", key = "#roomId")
@Caching(evict = {@CacheEvict(value = "liveRoomList", allEntries = true)
})
public LiveRoom updateLiveRoomStatus(Long roomId, int status) {// 更新逻辑...
}
6.2 负载均衡与集群部署
对于大型直播系统,单机部署无法满足需求,需要考虑集群部署:
# Docker Compose示例配置
version: '3'services:# 应用服务器集群app1:image: live-streaming-service:latestports:- "8081:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs1:1935/livedepends_on:- mysql- redisapp2:image: live-streaming-service:latestports:- "8082:8080"environment:- SPRING_PROFILES_ACTIVE=prod- SERVER_PORT=8080- LIVE_SRS_SERVER_URL=rtmp://srs2:1935/livedepends_on:- mysql- redis# 流媒体服务器集群srs1:image: ossrs/srs:latestports:- "1935:1935"- "1985:1985"- "8080:8080"volumes:- ./srs1.conf:/usr/local/srs/conf/srs.confsrs2:image: ossrs/srs:latestports:- "1936:1935"- "1986:1985"- "8081:8080"volumes:- ./srs2.conf:/usr/local/srs/conf/srs.conf# 负载均衡器nginx:image: nginx:latestports:- "80:80"- "443:443"volumes:- ./nginx/nginx.conf:/etc/nginx/nginx.conf- ./nginx/ssl:/etc/nginx/ssldepends_on:- app1- app2- srs1- srs2# 数据库和中间件mysql:image: mysql:8.0ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=password- MYSQL_DATABASE=live_streamingvolumes:- mysql-data:/var/lib/mysqlredis:image: redis:6.2ports:- "6379:6379"volumes:- redis-data:/datavolumes:mysql-data:redis-data:
NGINX负载均衡配置:
http {upstream app_servers {# IP哈希负载均衡,确保同一用户请求发送到同一服务器ip_hash;server app1:8080;server app2:8080;}upstream srs_http_servers {server srs1:8080;server srs2:8080;}server {listen 80;server_name live.example.com;# API请求location /api/ {proxy_pass http://app_servers;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}# WebSocket连接location /ws {proxy_pass http://app_servers;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";}# HLS分发location /hls/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}# HTTP-FLV分发location /live/ {proxy_pass http://srs_http_servers;proxy_set_header Host $host;}}
}stream {upstream rtmp_servers {server srs1:1935;server srs2:1935;}server {listen 1935;proxy_pass rtmp_servers;}
}
6.3 流量控制与限流
为防止服务器被恶意攻击或过载,实现限流机制:
@Configuration
public class RateLimitConfig {@Beanpublic RedisRateLimiter redisRateLimiter(StringRedisTemplate redisTemplate) {return new RedisRateLimiter(redisTemplate);}
}@Component
public class RedisRateLimiter {private final StringRedisTemplate redisTemplate;private final String luaScript;public RedisRateLimiter(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// Lua脚本实现滑动窗口限流this.luaScript = "local key = KEYS[1] " +"local capacity = tonumber(ARGV[1]) " +"local period = tonumber(ARGV[2]) " +"local now = tonumber(ARGV[3]) " +"local requested = tonumber(ARGV[4]) " +// 移除过期的时间戳"redis.call('zremrangebyscore', key, 0, now - period) " +// 获取当前请求数"local currentCount = redis.call('zcard', key) " +// 如果请求数超过容量,返回0"if currentCount + requested > capacity then " +" return 0 " +"end " +// 添加新请求的时间戳"for i = 1, requested do " +" redis.call('zadd', key, now, now .. i) " +"end " +// 设置过期时间"redis.call('expire', key, period) " +// 返回剩余容量"return capacity - currentCount - requested";}/*** 尝试获取令牌* @param key 限流键* @param capacity 容量* @param period 时间窗口(秒)* @param requested 请求令牌数* @return 是否获取成功*/public boolean tryAcquire(String key, int capacity, int period, int requested) {long now = System.currentTimeMillis() / 1000;RedisScript<Long> script = RedisScript.of(luaScript, Long.class);Long remainingTokens = redisTemplate.execute(script,Collections.singletonList(key),String.valueOf(capacity),String.valueOf(period),String.valueOf(now),String.valueOf(requested));return remainingTokens != null && remainingTokens >= 0;}
}
使用切面实现API限流:
@Aspect
@Component
public class RateLimitAspect {private final RedisRateLimiter rateLimiter;@Autowiredpublic RateLimitAspect(RedisRateLimiter rateLimiter) {this.rateLimiter = rateLimiter;}@Around("@annotation(rateLimit)")public Object rateLimit(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {// 获取请求IPHttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();String ip = getClientIp(request);// 构建限流键String key = "rate_limit:" + rateLimit.key() + ":" + ip;// 尝试获取令牌boolean allowed = rateLimiter.tryAcquire(key, rateLimit.capacity(), rateLimit.period(), 1);if (allowed) {return joinPoint.proceed();} else {throw new TooManyRequestsException("请求过于频繁,请稍后再试");}}private String getClientIp(HttpServletRequest request) {String ip = request.getHeader("X-Forwarded-For");if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getHeader("WL-Proxy-Client-IP");}if (ip == null || ip.isEmpty() || "unknown".equalsIgnoreCase(ip)) {ip = request.getRemoteAddr();}// 取第一个IP地址if (ip != null && ip.contains(",")) {ip = ip.split(",")[0].trim();}return ip;}
}@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {String key(); // 限流键int capacity() default 10; // 容量int period() default 60; // 时间窗口(秒)
}
在控制器中应用限流注解:
@PostMapping("/room")
@RateLimit(key = "create_room", capacity = 5, period = 3600) // 每小时限制创建5个直播间
public ResponseEntity<LiveRoom> createLiveRoom(@RequestBody LiveRoom liveRoom) {// 实现...
}@PostMapping("/room/{roomId}/view")
@RateLimit(key = "view_increment", capacity = 1, period = 5) // 每5秒限制一次
public ResponseEntity<Void> incrementViewCount(@PathVariable Long roomId) {// 实现...
}
6.4 监控与告警
整合Prometheus和Grafana实现监控:
@Configuration
public class MonitoringConfig {@BeanMeterRegistryCustomizer<MeterRegistry> metricsCommonTags() {return registry -> registry.config().commonTags("application", "live-streaming-service");}
}
添加自定义指标:
@Service
public class LiveStreamMonitoringService {private final MeterRegistry meterRegistry;private final Counter liveStartCounter;private final Counter liveEndCounter;private final Gauge activeStreamGauge;private final Timer streamProcessingTimer;@Autowiredpublic LiveStreamMonitoringService(MeterRegistry meterRegistry) {this.meterRegistry = meterRegistry;// 创建指标this.liveStartCounter = Counter.builder("live.stream.start").description("直播开始计数").register(meterRegistry);this.liveEndCounter = Counter.builder("live.stream.end").description("直播结束计数").register(meterRegistry);this.activeStreamGauge = Gauge.builder("live.stream.active", this::getActiveStreamCount).description("当前活跃直播数").register(meterRegistry);this.streamProcessingTimer = Timer.builder("live.stream.processing").description("直播处理时间").register(meterRegistry);}// 获取活跃直播数private long getActiveStreamCount() {// 从Redis获取活跃直播间数量return redisTemplate.opsForSet().size("live:active_rooms");}// 记录直播开始public void recordLiveStart() {liveStartCounter.increment();}// 记录直播结束public void recordLiveEnd() {liveEndCounter.increment();}// 记录处理时间public <T> T recordProcessingTime(Supplier<T> supplier) {return streamProcessingTimer.record(supplier);}// 记录错误public void recordError(String errorType) {meterRegistry.counter("live.stream.errors", "type", errorType).increment();}
}
七、安全性考虑
7.1 推流鉴权
前面已经实现了基于时间戳和签名的推流鉴权。为了增强安全性,可以添加IP白名单:
@Service
public class StreamSecurityService {@Value("${live.security.ip-whitelist-enabled}")private boolean ipWhitelistEnabled;@Value("#{'${live.security.ip-whitelist:}'.split(',')}")private List<String> ipWhitelist;/*** 验证IP是否在白名单中*/public boolean isIpAllowed(String ip) {if (!ipWhitelistEnabled) {return true;}if (ipWhitelist.isEmpty()) {return true;}return ipWhitelist.contains(ip) || ipWhitelist.contains("0.0.0.0");}/*** 生成带有过期时间的推流URL*/public String generateSecureStreamUrl(String baseUrl, String streamKey, long expireSeconds) {long expireTimestamp = System.currentTimeMillis() / 1000 + expireSeconds;String authString = streamKey + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&expire=" + expireTimestamp;}
}
7.2 播放鉴权
同样可以为播放URL添加鉴权:
@Service
public class PlayAuthService {@Value("${live.play.auth-enabled}")private boolean authEnabled;@Value("${live.play.auth-key}")private String authKey;@Value("${live.play.auth-expire}")private long authExpire;/*** 生成带鉴权的播放URL*/public String generateAuthPlayUrl(String baseUrl, String streamKey, Long userId) {if (!authEnabled) {return baseUrl;}long expireTimestamp = System.currentTimeMillis() / 1000 + authExpire;String authString = streamKey + "-" + userId + "-" + expireTimestamp + "-" + authKey;String authToken = DigestUtils.md5DigestAsHex(authString.getBytes());return baseUrl + "?auth_key=" + authToken + "&user_id=" + userId + "&expire=" + expireTimestamp;}/*** 验证播放URL*/public boolean validatePlayUrl(String streamKey, String authToken, String userId, String expireStr) {if (!authEnabled) {return true;}try {long expireTimestamp = Long.parseLong(expireStr);long currentTime = System.currentTimeMillis() / 1000;// 检查是否过期if (currentTime > expireTimestamp) {return false;}// 验证tokenString authString = streamKey + "-" + userId + "-" + expireStr + "-" + authKey;String calculatedToken = DigestUtils.md5DigestAsHex(authString.getBytes());return calculatedToken.equals(authToken);} catch (Exception e) {return false;}}
}
7.3 内容安全
对于用户生成的内容,需要进行审核和过滤:
@Service
public class ContentSecurityService {@Value("${live.content.sensitive-words-file}")private String sensitiveWordsFile;private Set<String> sensitiveWords = new HashSet<>();@PostConstructpublic void init() {// 加载敏感词库try {File file = new File(sensitiveWordsFile);if (file.exists()) {List<String> lines = Files.readAllLines(file.toPath());sensitiveWords.addAll(lines);}} catch (IOException e) {// 如果加载失败,使用默认敏感词sensitiveWords.add("敏感词1");sensitiveWords.add("敏感词2");}}/*** 过滤敏感词*/public String filterContent(String content) {if (content == null || content.isEmpty()) {return content;}String filteredContent = content;for (String word : sensitiveWords) {filteredContent = filteredContent.replaceAll(word, "***");}return filteredContent;}/*** 检查内容是否包含敏感词*/public boolean containsSensitiveWords(String content) {if (content == null || content.isEmpty()) {return false;}for (String word : sensitiveWords) {if (content.contains(word)) {return true;}}return false;}
}
在WebSocket消息处理中使用:
@MessageMapping("/chat/{roomId}")
public void sendMessage(@DestinationVariable Long roomId, ChatMessage message) {// 过滤敏感内容String filteredContent = contentSecurityService.filterContent(message.getContent());message.setContent(filteredContent);// 检查是否全是敏感词if (filteredContent.matches("\*+")) {// 记录违规日志log.warn("用户发送违规内容: userId={}, content={}", message.getUserId(), message.getContent());return;}// 正常发送消息messagingTemplate.convertAndSend("/topic/chat/" + roomId, message);
}
八、演示说明
为了帮助大家快速理解和测试直播系统,下面提供完整的演示步骤:
8.1 环境准备
首先,需要准备以下环境:
- Java 21+ :运行SpringBoot应用
- MySQL 5.7+ :存储直播信息
- Redis:缓存和消息通信
- SRS:流媒体服务器
- FFmpeg:视频处理工具(录制直播回放)
- OBS Studio:用于测试推流
8.2 安装SRS
使用Docker可以快速部署SRS服务器:
# 拉取SRS镜像
docker pull ossrs/srs:4# 运行SRS容器
docker run --name srs -d --restart=always \-p 1935:1935 \-p 1985:1985 \-p 8080:8080 \-v $(pwd)/conf:/usr/local/srs/conf \ossrs/srs:4 objs/srs -c conf/srs.conf
将前面提供的SRS配置文件保存为conf/srs.conf
。
8.3 直播功能演示流程
8.3.1 创建直播间
通过API创建直播间
curl -X POST http://localhost:8080/api/live/room \-H "Content-Type: application/json" \-d '{"title": "测试直播间","userId": 1,"coverUrl": "https://example.com/cover.jpg"}'
响应示例
{"id": 1,"title": "测试直播间","userId": 1,"coverUrl": "https://example.com/cover.jpg","streamKey": "8f7d6e5c4b3a2f1e0d9c8b7a","streamUrl": "rtmp://localhost:1935/live/8f7d6e5c4b3a2f1e0d9c8b7a","hlsUrl": "http://localhost:8080/hls/8f7d6e5c4b3a2f1e0d9c8b7a.m3u8","flvUrl": "http://localhost:8080/live/8f7d6e5c4b3a2f1e0d9c8b7a.flv","status": 0,"viewCount": 0,"likeCount": 0
}
记录下streamKey
和streamUrl
,这将用于推流设置。
8.3.2 推流测试
-
打开OBS Studio,设置推流参数:
- 设置 → 流
- 服务:自定义
- 服务器:rtmp://192.168.195.100:1935/live/2feabf98b5ee6bb0c3dbc6cd423084a3?auth_key=9875b710e26914eff9f9aa1cc1df0093&expire=1749220798
- 流密钥:刚才获取的streamKey(例如:8f7d6e5c4b3a2f1e0d9c8b7a)
-
添加视频源(例如摄像头或屏幕捕获)
-
点击"开始推流"按钮开始直播
系统会自动检测到推流开始,并通过SRS回调更新直播间状态为"直播中"。
8.3.3 播放测试
- 打开前端页面:http://localhost:8080/play.html?roomId=1
- 页面将自动加载直播流并开始播放
- 测试播放:
-
- HLS播放:http://192.168.195.100:8080/hls/live/2feabf98b5ee6bb0c3dbc6cd423084a3/2feabf98b5ee6bb0c3dbc6cd423084a3.m3u8
8.3.4 直播互动测试
- 多次刷新页面,查看观看数是否增加
8.3.5 直播回放测试
开始录制直播
curl -X POST http://localhost:8080/api/live/room/1/record/start
等待一段时间后,停止录制
# 使用返回的recordingId
curl -X POST http://localhost:8080/api/live/record/1/stop
查看回放列表
curl http://localhost:8080/api/live/room/1/recordings
使用返回的fileUrl播放回放视频
8.3.6 结束直播
- 在OBS Studio中点击"停止推流"按钮
- 系统会自动检测到推流结束,并更新直播间状态为"直播结束"
- 也可以通过API手动结束直播
curl -X POST http://localhost:8080/api/live/room/1/end