本文将深入探讨如何使用Kotlin和RandomAccessFile实现高效的断点续传功能,涵盖原理分析、完整代码实现、性能优化及工程实践要点。
一、断点续传核心原理
1.1 HTTP断点续传协议
1.2 RandomAccessFile核心优势
特性 | 传统FileInputStream | RandomAccessFile |
---|---|---|
随机访问能力 | ❌ | ✅ |
大文件处理效率 | ⭐⭐ | ⭐⭐⭐⭐ |
内存占用 | 高 | 低 |
断点续传实现复杂度 | 高 | 低 |
文件修改能力 | ❌ | ✅ |
二、服务端完整实现(Kotlin + Spring Boot)
2.1 依赖配置
// build.gradle.kts
dependencies {implementation("org.springframework.boot:spring-boot-starter-web")implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
}
2.2 控制器实现
@RestController
class DownloadController {@GetMapping("/download/{filename}")suspend fun downloadFile(@PathVariable filename: String,request: HttpServletRequest,response: HttpServletResponse) {val file = File("/data/files/$filename").takeIf { it.exists() } ?: throw FileNotFoundException("File not found")// 解析Range请求头val (start, end) = parseRangeHeader(request, file.length())// 设置HTTP响应头response.configureHeaders(file, start, end)// 使用RandomAccessFile进行文件传输transferFileContent(file, response, start, end)}private fun parseRangeHeader(request: HttpServletRequest, fileLength: Long): Pair<Long, Long> {val rangeHeader = request.getHeader("Range")?.takeIf { it.startsWith("bytes=") }?: return 0L to fileLength - 1val range = rangeHeader.substring(6).split("-")val start = range[0].toLongOrNull() ?: 0Lval end = range.getOrNull(1)?.toLongOrNull() ?: fileLength - 1return start to min(end, fileLength - 1)}private fun HttpServletResponse.configureHeaders(file: File, start: Long, end: Long) {val fileLength = file.length()val contentLength = end - start + 1status = if (start > 0) HttpStatus.PARTIAL_CONTENT.value() else HttpStatus.OK.value()contentType = "application/octet-stream"setHeader("Accept-Ranges", "bytes")setHeader("Content-Disposition", "attachment; filename=\"${file.name}\"")setHeader("Content-Length", contentLength.toString())if (status == HttpStatus.PARTIAL_CONTENT.value()) {setHeader("Content-Range", "bytes $start-$end/$fileLength")}}private suspend fun transferFileContent(file: File, response: HttpServletResponse,start: Long, end: Long) = withContext(Dispatchers.IO) {RandomAccessFile(file, "r").use { raf ->raf.seek(start)val output = response.outputStreamval buffer = ByteArray(8192)var bytesRemaining = end - start + 1while (bytesRemaining > 0) {val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()val bytesRead = raf.read(buffer, 0, readSize)if (bytesRead == -1) breakoutput.write(buffer, 0, bytesRead)output.flush()bytesRemaining -= bytesRead}}}
}
2.3 关键代码解析
1. 文件指针定位
raf.seek(start) // 将文件指针移动到断点位置
2. 分块传输逻辑
while (bytesRemaining > 0) {val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()val bytesRead = raf.read(buffer, 0, readSize)// ... 写入输出流
}
3. HTTP头处理
// 部分内容响应
setHeader("Content-Range", "bytes $start-$end/$fileLength")
status = HttpStatus.PARTIAL_CONTENT.value()
三、客户端完整实现(Kotlin)
3.1 文件下载器类
class ResumableDownloader(private val url: String,private val savePath: String,private val chunkSize: Int = 8192
) {private var downloadedBytes: Long = 0private val progressListeners = mutableListOf<(Long, Long) -> Unit>()fun addProgressListener(listener: (Long, Long) -> Unit) {progressListeners.add(listener)}suspend fun startDownload() = withContext(Dispatchers.IO) {val file = File(savePath)downloadedBytes = if (file.exists()) file.length() else 0Lwhile (true) {try {val connection = URL(url).openConnection() as HttpURLConnectionconnection.setRequestProperty("Range", "bytes=$downloadedBytes-")if (connection.responseCode !in 200..299) {if (connection.responseCode == 416) { // 范围请求错误file.delete() // 删除无效文件downloadedBytes = 0continue}throw IOException("HTTP error: ${connection.responseCode}")}// 获取文件总大小val contentRange = connection.getHeaderField("Content-Range")val totalSize = contentRange?.split("/")?.last()?.toLongOrNull() ?: connection.contentLengthLong.takeIf { it > 0 } ?: -1// 执行下载downloadChunks(connection, file, totalSize)break} catch (e: SocketTimeoutException) {println("Timeout, retrying...")} catch (e: IOException) {if (e.message?.contains("reset") == true) {println("Connection reset, retrying...")} else {throw e}}}}private suspend fun downloadChunks(connection: HttpURLConnection,file: File,totalSize: Long) {RandomAccessFile(file, "rw").use { raf ->raf.seek(downloadedBytes)val input = connection.inputStreamval buffer = ByteArray(chunkSize)while (true) {val bytesRead = input.read(buffer)if (bytesRead == -1) breakraf.write(buffer, 0, bytesRead)downloadedBytes += bytesRead// 更新进度if (totalSize > 0) {progressListeners.forEach { it(downloadedBytes, totalSize) }}}}}
}
3.2 使用示例
fun main() = runBlocking {val downloader = ResumableDownloader(url = "https://example.com/large-file.zip",savePath = "downloads/large-file.zip")downloader.addProgressListener { current, total ->val percent = (current.toDouble() / total * 100).toInt()println("Downloaded: $current/$total ($percent%)")}try {downloader.startDownload()println("Download completed successfully!")} catch (e: Exception) {println("Download failed: ${e.message}")println("Resume position: ${File("downloads/large-file.zip").length()} bytes")}
}
四、性能优化策略
4.1 内存映射文件加速
private fun transferWithMemoryMap(file: File, start: Long, end: Long, output: OutputStream) {RandomAccessFile(file, "r").use { raf ->val channel = raf.channelval buffer = channel.map(FileChannel.MapMode.READ_ONLY, start, end - start + 1)output.write(buffer.array(), buffer.arrayOffset(), buffer.remaining())}
}
4.2 零拷贝技术(Linux系统)
private fun transferZeroCopy(file: File, response: HttpServletResponse, start: Long, end: Long) {FileInputStream(file).use { fis ->val channel = fis.channelval outputChannel = Channels.newChannel(response.outputStream)var position = startval totalBytes = end - start + 1var remaining = totalByteswhile (remaining > 0) {val transferred = channel.transferTo(position, remaining, outputChannel)position += transferredremaining -= transferred}}
}
五、工程实践要点
5.1 断点存储设计
// 断点信息数据类
data class DownloadState(val url: String,val filePath: String,val downloaded: Long,val totalSize: Long,val timestamp: Long = System.currentTimeMillis()
)// 持久化存储
class DownloadStateRepository {private val states = ConcurrentHashMap<String, DownloadState>()fun saveState(key: String, state: DownloadState) {states[key] = state// 实际项目应持久化到数据库或文件}fun loadState(key: String): DownloadState? {return states[key]}
}
5.2 多线程下载实现
class MultiThreadDownloader(private val url: String,private val savePath: String,private val threadCount: Int = 4
) {suspend fun download() = coroutineScope {val totalSize = getFileSize()val chunkSize = totalSize / threadCount// 创建临时文件RandomAccessFile(savePath, "rw").use {it.setLength(totalSize) // 预分配空间}// 启动多个下载协程(0 until threadCount).map { threadId ->async(Dispatchers.IO) {val start = threadId * chunkSizeval end = if (threadId == threadCount - 1) {totalSize - 1} else {(threadId + 1) * chunkSize - 1}downloadChunk(start, end)}}.awaitAll()}private suspend fun downloadChunk(start: Long, end: Long) {val connection = URL(url).openConnection() as HttpURLConnectionconnection.setRequestProperty("Range", "bytes=$start-$end")RandomAccessFile(savePath, "rw").use { raf ->raf.seek(start)connection.inputStream.use { input ->input.copyTo(raf.channel)}}}
}
六、完整解决方案对比
方案 | 实现复杂度 | 大文件支持 | 内存效率 | 适用场景 |
---|---|---|---|---|
RandomAccessFile | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 通用文件传输 |
内存映射 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 超大文件读取 |
NIO零拷贝 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 高性能服务器 |
多线程分块下载 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 高速下载环境 |
七、总结与最佳实践
核心要点总结:
- HTTP协议:正确处理
Range
请求头和Content-Range
响应头 - 文件定位:使用
RandomAccessFile.seek()
实现精确跳转 - 分块传输:采用8-16KB缓冲区平衡内存与IO效率
- 错误恢复:
- 捕获
ClientAbortException
处理客户端中断 - 实现自动重试机制(3次重试策略)
- 捕获
- 进度监控:实时回调下载进度用于UI更新
生产环境建议:
// 1. 添加超时控制
connection.connectTimeout = 30_000
connection.readTimeout = 120_000// 2. 限流保护
val maxSpeed = 1024 * 1024 // 1MB/s
val startTime = System.currentTimeMillis()
var bytesTransferred = 0Lwhile (/*...*/) {// ... 传输逻辑bytesTransferred += bytesRead// 限速控制val elapsed = System.currentTimeMillis() - startTimeval expectedTime = bytesTransferred * 1000 / maxSpeedif (elapsed < expectedTime) {delay(expectedTime - elapsed)}
}// 3. 文件校验
fun verifyFile(file: File, expectedHash: String): Boolean {val digest = MessageDigest.getInstance("SHA-256")file.forEachBlock { buffer, bytesRead ->digest.update(buffer, 0, bytesRead)}return digest.digest().joinToString("") { "%02x".format(it) } == expectedHash
}