本文将深入探讨如何使用Kotlin和RandomAccessFile实现高效的断点续传功能,涵盖原理分析、完整代码实现、性能优化及工程实践要点。
一、断点续传核心原理
1.1 HTTP断点续传协议
1.2 RandomAccessFile核心优势
特性 | 传统FileInputStream | RandomAccessFile |
---|---|---|
随机访问能力 | ❌ | ✅ |
大文件处理效率 | ⭐⭐ | ⭐⭐⭐⭐ |
内存占用 | 高 | 低 |
断点续传实现复杂度 | 高 | 低 |
文件修改能力 | ❌ | ✅ |
二、服务端完整实现(Kotlin + Spring Boot)
2.1 依赖配置
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
}
2.2 控制器实现
@RestController
class DownloadController {
@GetMapping("/download/{filename}")
suspend fun downloadFile(
@PathVariable filename: String,
request: HttpServletRequest,
response: HttpServletResponse
) {
val file = File("/data/files/$filename").takeIf { it.exists() }
?: throw FileNotFoundException("File not found")
// 解析Range请求头
val (start, end) = parseRangeHeader(request, file.length())
// 设置HTTP响应头
response.configureHeaders(file, start, end)
// 使用RandomAccessFile进行文件传输
transferFileContent(file, response, start, end)
}
private fun parseRangeHeader(
request: HttpServletRequest,
fileLength: Long
): Pair<Long, Long> {
val rangeHeader = request.getHeader("Range")?.takeIf { it.startsWith("bytes=") }
?: return 0L to fileLength - 1
val range = rangeHeader.substring(6).split("-")
val start = range[0].toLongOrNull() ?: 0L
val end = range.getOrNull(1)?.toLongOrNull() ?: fileLength - 1
return start to min(end, fileLength - 1)
}
private fun HttpServletResponse.configureHeaders(
file: File,
start: Long,
end: Long
) {
val fileLength = file.length()
val contentLength = end - start + 1
status = if (start > 0) HttpStatus.PARTIAL_CONTENT.value() else HttpStatus.OK.value()
contentType = "application/octet-stream"
setHeader("Accept-Ranges", "bytes")
setHeader("Content-Disposition", "attachment; filename=\"${file.name}\"")
setHeader("Content-Length", contentLength.toString())
if (status == HttpStatus.PARTIAL_CONTENT.value()) {
setHeader("Content-Range", "bytes $start-$end/$fileLength")
}
}
private suspend fun transferFileContent(
file: File,
response: HttpServletResponse,
start: Long,
end: Long
) = withContext(Dispatchers.IO) {
RandomAccessFile(file, "r").use { raf ->
raf.seek(start)
val output = response.outputStream
val buffer = ByteArray(8192)
var bytesRemaining = end - start + 1
while (bytesRemaining > 0) {
val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()
val bytesRead = raf.read(buffer, 0, readSize)
if (bytesRead == -1) break
output.write(buffer, 0, bytesRead)
output.flush()
bytesRemaining -= bytesRead
}
}
}
}
2.3 关键代码解析
1. 文件指针定位
raf.seek(start) // 将文件指针移动到断点位置
2. 分块传输逻辑
while (bytesRemaining > 0) {
val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()
val bytesRead = raf.read(buffer, 0, readSize)
// ... 写入输出流
}
3. HTTP头处理
// 部分内容响应
setHeader("Content-Range", "bytes $start-$end/$fileLength")
status = HttpStatus.PARTIAL_CONTENT.value()
三、客户端完整实现(Kotlin)
3.1 文件下载器类
class ResumableDownloader(
private val url: String,
private val savePath: String,
private val chunkSize: Int = 8192
) {
private var downloadedBytes: Long = 0
private val progressListeners = mutableListOf<(Long, Long) -> Unit>()
fun addProgressListener(listener: (Long, Long) -> Unit) {
progressListeners.add(listener)
}
suspend fun startDownload() = withContext(Dispatchers.IO) {
val file = File(savePath)
downloadedBytes = if (file.exists()) file.length() else 0L
while (true) {
try {
val connection = URL(url).openConnection() as HttpURLConnection
connection.setRequestProperty("Range", "bytes=$downloadedBytes-")
if (connection.responseCode !in 200..299) {
if (connection.responseCode == 416) { // 范围请求错误
file.delete() // 删除无效文件
downloadedBytes = 0
continue
}
throw IOException("HTTP error: ${connection.responseCode}")
}
// 获取文件总大小
val contentRange = connection.getHeaderField("Content-Range")
val totalSize = contentRange?.split("/")?.last()?.toLongOrNull()
?: connection.contentLengthLong.takeIf { it > 0 }
?: -1
// 执行下载
downloadChunks(connection, file, totalSize)
break
} catch (e: SocketTimeoutException) {
println("Timeout, retrying...")
} catch (e: IOException) {
if (e.message?.contains("reset") == true) {
println("Connection reset, retrying...")
} else {
throw e
}
}
}
}
private suspend fun downloadChunks(
connection: HttpURLConnection,
file: File,
totalSize: Long
) {
RandomAccessFile(file, "rw").use { raf ->
raf.seek(downloadedBytes)
val input = connection.inputStream
val buffer = ByteArray(chunkSize)
while (true) {
val bytesRead = input.read(buffer)
if (bytesRead == -1) break
raf.write(buffer, 0, bytesRead)
downloadedBytes += bytesRead
// 更新进度
if (totalSize > 0) {
progressListeners.forEach { it(downloadedBytes, totalSize) }
}
}
}
}
}
3.2 使用示例
fun main() = runBlocking {
val downloader = ResumableDownloader(
url = "https://example.com/large-file.zip",
savePath = "downloads/large-file.zip"
)
downloader.addProgressListener { current, total ->
val percent = (current.toDouble() / total * 100).toInt()
println("Downloaded: $current/$total ($percent%)")
}
try {
downloader.startDownload()
println("Download completed successfully!")
} catch (e: Exception) {
println("Download failed: ${e.message}")
println("Resume position: ${File("downloads/large-file.zip").length()} bytes")
}
}
四、性能优化策略
4.1 内存映射文件加速
private fun transferWithMemoryMap(file: File, start: Long, end: Long, output: OutputStream) {
RandomAccessFile(file, "r").use { raf ->
val channel = raf.channel
val buffer = channel.map(
FileChannel.MapMode.READ_ONLY,
start,
end - start + 1
)
output.write(buffer.array(), buffer.arrayOffset(), buffer.remaining())
}
}
4.2 零拷贝技术(Linux系统)
private fun transferZeroCopy(file: File, response: HttpServletResponse, start: Long, end: Long) {
FileInputStream(file).use { fis ->
val channel = fis.channel
val outputChannel = Channels.newChannel(response.outputStream)
var position = start
val totalBytes = end - start + 1
var remaining = totalBytes
while (remaining > 0) {
val transferred = channel.transferTo(position, remaining, outputChannel)
position += transferred
remaining -= transferred
}
}
}
五、工程实践要点
5.1 断点存储设计
// 断点信息数据类
data class DownloadState(
val url: String,
val filePath: String,
val downloaded: Long,
val totalSize: Long,
val timestamp: Long = System.currentTimeMillis()
)
// 持久化存储
class DownloadStateRepository {
private val states = ConcurrentHashMap<String, DownloadState>()
fun saveState(key: String, state: DownloadState) {
states[key] = state
// 实际项目应持久化到数据库或文件
}
fun loadState(key: String): DownloadState? {
return states[key]
}
}
5.2 多线程下载实现
class MultiThreadDownloader(
private val url: String,
private val savePath: String,
private val threadCount: Int = 4
) {
suspend fun download() = coroutineScope {
val totalSize = getFileSize()
val chunkSize = totalSize / threadCount
// 创建临时文件
RandomAccessFile(savePath, "rw").use {
it.setLength(totalSize) // 预分配空间
}
// 启动多个下载协程
(0 until threadCount).map { threadId ->
async(Dispatchers.IO) {
val start = threadId * chunkSize
val end = if (threadId == threadCount - 1) {
totalSize - 1
} else {
(threadId + 1) * chunkSize - 1
}
downloadChunk(start, end)
}
}.awaitAll()
}
private suspend fun downloadChunk(start: Long, end: Long) {
val connection = URL(url).openConnection() as HttpURLConnection
connection.setRequestProperty("Range", "bytes=$start-$end")
RandomAccessFile(savePath, "rw").use { raf ->
raf.seek(start)
connection.inputStream.use { input ->
input.copyTo(raf.channel)
}
}
}
}
六、完整解决方案对比
方案 | 实现复杂度 | 大文件支持 | 内存效率 | 适用场景 |
---|---|---|---|---|
RandomAccessFile | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 通用文件传输 |
内存映射 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 超大文件读取 |
NIO零拷贝 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 高性能服务器 |
多线程分块下载 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 高速下载环境 |
七、总结与最佳实践
核心要点总结:
- HTTP协议:正确处理
Range
请求头和Content-Range
响应头 - 文件定位:使用
RandomAccessFile.seek()
实现精确跳转 - 分块传输:采用8-16KB缓冲区平衡内存与IO效率
- 错误恢复:
- 捕获
ClientAbortException
处理客户端中断 - 实现自动重试机制(3次重试策略)
- 捕获
- 进度监控:实时回调下载进度用于UI更新
生产环境建议:
// 1. 添加超时控制
connection.connectTimeout = 30_000
connection.readTimeout = 120_000
// 2. 限流保护
val maxSpeed = 1024 * 1024 // 1MB/s
val startTime = System.currentTimeMillis()
var bytesTransferred = 0L
while (/*...*/) {
// ... 传输逻辑
bytesTransferred += bytesRead
// 限速控制
val elapsed = System.currentTimeMillis() - startTime
val expectedTime = bytesTransferred * 1000 / maxSpeed
if (elapsed < expectedTime) {
delay(expectedTime - elapsed)
}
}
// 3. 文件校验
fun verifyFile(file: File, expectedHash: String): Boolean {
val digest = MessageDigest.getInstance("SHA-256")
file.forEachBlock { buffer, bytesRead ->
digest.update(buffer, 0, bytesRead)
}
return digest.digest().joinToString("") { "%02x".format(it) } == expectedHash
}