用Kotlin实现一个超简陋的BitTorrent客户端(二)

用Kotlin实现一个超简陋的BitTorrent客户端(二)

于是上次提到的那个客户端的进展……大概是完工了(。这里记录一下第二阶段踩的一些坑,一些关于BitTorrent协议的有趣的地方,也有一些我自己觉得值得记录下来的一些实现细节吧。

续篇

首先提一下在解析PeerResponse的时候,遇到了值的类型为List<Result<*>>但是需要获取Result<List<*>>的情况。这个是很经典的Result和List的互操作了,于是导入了库里面的sequenceLeft(list:)函数。原来的库用的是自行实现的链表数据结构,但是稍微改造一下就可以把函数用在Kotlin内建的List上了。

1
2
3
4
5
6
fun <A> sequenceLeft(list: List<Result<A>>): Result<List<A>> =
list.fold<Result<A>, Result<List<A>>>(
Result(listOf())
) { x, y ->
map2(y, x) { a: A -> { b: List<A> -> b + a }}
}.map { it.reversed() }

注意这里的fold要显式注明参数类型。

于是又接着继续写下去了。本着从简单到难的原则,首先实现了几个data class。

  • BitTorrentMessage

    这个data class代表一个在peer之间传来传去的数据,Message的数据结构基于这里的描述。注意这个类的toString方法被重载了,首先会生成包含用4个Byte encode的消息长度,然后是一个Byte表示的Message Id,紧接着是这个Message的payload。

    重载的方法是用的List<Byte>.map { it.toChar() }.joinToString(""),这样的写法可以安全地把字符串按照每个字符的字面量来串接起来。

  • Block

    这个类是对每个分块(最小的传输单元)的简单包装,加上了piece、offset、length等数据,以及一个代表该Block是否已经传输完毕的Status。

  • Piece

    这个类表示由一组分块组成的分片,每个分片对应着pieces hash里面的一个hash值。这个值会在该分片下载完毕后和该分片的hash值进行匹配。如果不匹配的话,每个分块都会被重新标pending然后重新回到下载流程。注意这里的getData()方法只需要把每个block的data join起来就行了。

    这里在进行hash值匹配的时候有一个坑,之前提取的hashValue虽然长度也是20,但是底层的字符数组的长度变成了40,每一个byte后面紧跟一个0。所以跟getData().hash().hexDecode()后的值(字符数组长度为20)不匹配了。对hashValue使用map { it.toByte() }可以消除这些0,不过这样一来两边都要变成字符数组来进行匹配。

    至于为什么会多出一堆0来,还是跟字符串的encoding有关吧。

    顺便解释一下BitTorrent里面分片和分块的概念。用BitTorrent下载文件的时候,数据被分成大小相等(除了最后一个)的若干个分片,每个分片的大小由我们之前解析的info字典里的piece length指定。但是,实际进行传输的时候使用的是更小的分块,因此每个分片又可以被细分为若干个大小相等(除了最后一个)的分块。分块的大小一般是2.pow(14)也就是16384字节。

和Peers沟通

从Tracker那里得到peer的信息后,接下来要做的自然就是和每个peer建立连接请求下载数据。这里建立TCP连接就直接用了Ktor的Socket库了。这个Socket附带着一个ByteReadChannel和一个ByteWriteChannel,因为只能各打开一次,所以就在初始化socket的时候把它们顺便也开启了。

注意openWriteChannel()的时候要把autoFlush设为true。

在发送数据的时候,按照老样子,要用map { it.code.toByte() }获取字节数组,然后可以转换为ByteArray以便和writeAvailable()的函数签名对得上。

在接受数据的时候,有两种情况,一种是已经知道要接受多少数据了(在握手的时候),只要按照这个大小建立一个buffer然后传给ByteReadChannel去读取即可。另一种情况是不知道要读取多少数据,这时候就先读前4个字节,获取一个Int(接下来要读的长度),然后根据这个长度去开buffer。注意无论哪种情况在Ktor里面都要readFully(),不然channel里面残留的字节会在下一次读取的时候被读到,然后导致接下来读的都是错的。

BitTorrent握手

建立好TCP连接后,就可以向Peer发送一个BitTorrent的handshake消息了。这个消息包括5个部分:

  • 协议头长度,永远是19(0x13)。
  • 协议头,永远是BitTorrent protocol。
  • 8个保留字符,用于指示是否使用某些扩展协议。但是这里不支持任何扩展协议,所以全部写0x0就可。
  • 先前计算得出的infohash,用于指示想要哪些文件。
  • 客户端自己的Peer ID

一个handshake消息可能长这样:

1
\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch

这里就使用了iterate函数来生成Peer ID最后的12位随机字串了。iterate的原理大概是给定一个种子、一个变换函数和一个长度生成一个列表,和随机数的生成原理相近。

然后就是发送握手消息,如果一切顺利的话应该马上就可以收到Peer回复相同格式的消息,接下来比对两方的info_hash是否一致,再进行下一步。

交换信息

完成handshake之后,就可以发送和接受消息了……还不能,除非对方已经做好了准备。这个时候,客户端就可以被看作是Choked了。只有对方给客户端发送一个Unchoke之后,才可以向对方请求数据。要让对方把这边Unchoke的话,需要先向对方发送一条Interested信息。

除此之外,客户端还会收到一条BitField消息,这条消息会告诉这边对方有哪些分片(Pieces)。BitField的存储方式就是一条ByteArray,如果第index个bit的值为1,则说明对方拥有这个分片。如果值为0,就说明对方没有这个分片。

整个流程大约如下(不一定准确,因为貌似后面还有坑)

1
2
3
4
5
客户端 ----- Handshake  ----> Peer
客户端 <---- Handshake ----- Peer
客户端 <---- BitField ----- Peer
客户端 ----- Interested ----> Peer
客户端 <---- Unchoke ----- Peer

有关建立连接、和Peer握手、进行信息传输的部分实现在Worker类里面。这里有一个点是socket和那两个channel如果创建失败,会返回null,所以不适合用lateinit var而要直接用nullable,如果是null,就不需要关闭了。

发起请求

一个BitTorrent Request的结构如下:

1
<len=0013><id=6><index><begin><length>

这里的index是指明要下载的是第几个Piece。begin,或者说offset,则是这个Block在Piece里的起始位置,length是这个Block的实际大小。

比如说,要下载一个大小为135168字节的文件,每个Piece长度49152,那么要发送的Request如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
piece 0:
request block 0: <index=0><begin=0><length=16384>
request block 1: <index=0><begin=16384><length=16384>
request block 2: <index=0><begin=32768><length=16384>

piece 1:
request block 0: <index=1><begin=0><length=16384>
request block 1: <index=1><begin=16384><length=16384>
request block 2: <index=1><begin=32768><length=16384>

piece 2:
request block 0: <index=2><begin=0><length=16384>
request block 1: <index=2><begin=16384><length=16384>
request block 2: <index=2><begin=32768><length=4096>

之前提到的info字典表里面的pieces,就是每一个piece的SHA1 hash值拼接在一起的结果。想要确认是否下载到了正确的分块,只要计算一下它的Hash并和这个Hash值进行比较即可。

这里有几个需要注意的点:

  • Java内部使用的是网络序,就不需要像C/C++那样用htonl来进行转换了。

  • 然后是个大坑:我一开始用的代码是类似这样的:

    1
    2
    3
    4
    val index = block.piece.intToBytes().map { it.toByte() }
    val offset = block.offset.intToBytes().map { it.toByte() }
    val length = block.length.intToBytes().map { it.toByte() }
    val payload = (index + offset + length).joinToString("")

    这里有两个坑:

    • indexoffsetlength长度不一定为完整的Int(4个字节),如果值很小的话可能只有1、2个字节,就会出现长度不一致。这里需要补前导0。
    • joinToString() 函数会把字面量直接变为字符串,比如说[0x64,0x0]会直接变成 "640"

    解决方案是写一个expandToByteInts()函数来补前导0,然后把转换得到的List换成Char的List,再join。这个思路模仿了Long.expandWithRem(divideBy:)的写法(后面有提到)

    1
    2
    3
    4
    5
    6
    fun Int.expandToByteInts(): List<Int> = unfold(this to 4) {
    if (it.second == 0)
    null
    else
    it.first % 256 to (it.first / 256 to it.second - 1)
    }.reversed()

    有关fold/unfold/iterate这几个辅助函数的实现可以参考utils/fp包。这几个函数提供了把许多操作写成FP的写法的可能性。

管理Pieces

有了这些信息,还要去管理记录到底哪些Peer有哪些Piece,已经下载了哪些Piece还有还需要下载哪些Piece,所以需要一个PieceManager。这个PieceManager的主要任务是:

  • 创建并存储所有的Piece对象
  • 创建待下载的文件
  • 在和新的Peer进行连接/断开连接的时候添加/更新/移除对应的Peer的信息(ID和BitField)
  • 记录已下载的Block,在一个Piece的所有Block都下载完成的时候检查它的SHA1 Hash是否与info里面的值匹配,如果匹配,就把Piece写入文件
  • 找到下一个需要下载的Block
  • 检查文件是否已经下载完成

除此之外,也提供追踪进展的功能。

这里需要注意的有几点:

  • 使用了RandomAccessFile(因为可以随机在不同的offset上进行读写),在初始化的时候要setLength为种子文件里面提取出的length值。在每次写入操作前要先计算出写入文件的offset(Piece的index乘以pieceLength),然后seek到这个offset再写入。这里的offset并不是RandomAccessFile.write(data:off:len)里面的offset,后者指的是对于写入的data的offset。

  • 因为要对Piece对象调用各种方法,所以AtomicReference是不能用的了。需要使用到Mutex锁。但是与此同时,也需要小心Mutex互锁的情况。例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    suspend fun isComplete(): Boolean = mutex.withLock {
    ...
    }

    suspend fun displayProgressBar() {
    mutex.withLock {
    ...
    if (isComplete()) {
    ...
    }
    }
    }

    这里的isComplete会把mutex锁住,然后在displayProgressBar()里又有一个大锁,结果就是程序被彻底锁死。解决方案是在displayProgressBar()里不用withLock而是把lock的过程拆开,在进入函数的时候上锁,在调用isComplete()前把锁解开。

  • blockReceived()函数里面有个从pendingRequests里移除包含当前Block的Request的过程,但是如果pendingRequests里面不包含当前的Block也不应该报错,因为当前的Block很可能并不是pending的。

  • 关于获取下一个Block的策略,大致上是这样的:

    1. 如果之前请求的Block超时了,重新请求
    2. 请求现在正在下载的Piece里的下一个Block
    3. 如果当前没有正在下载的Piece,那么找到最「稀有」(被最少Peer所拥有)的Piece并请求下载
  • 这里在initiatePieces()里面定义了一个Long.expandWithRem(divideBy:)函数,主要是简单调用unfold来expand一个整数到一个列表,这个列表的每一项(除了最后一项)都是除数然后最后一项是余数,把这个列表的每一项加起来可以得到这个整数。例如,17 expandWith 5 可以得到 [5,5,5,2]。这里的注意事项是最后一项如果是0的时候应该被吃掉。

    然后就可以利用这个函数来生成Piece和对每个Piece生成Block了。

  • nextRequest()函数里面,为了方便流式调用,把nullable转换成Option然后再转回来,这样并不是最好的写法,而且因为只在这个地方使用Option,所以需要写好几个帮助函数来根据不同的情况生成对应的各种Option值。不过这样一来,函数本体就可以很简单地这样写出来了:

    1
    2
    3
    4
    5
    6
    7
    mutex.withLock {
    return missingPiecesEmpty().flatMap { cld ->
    peersContainsIdEmpty().flatMap {
    expiredRequestOpt().mapNone(::nextOngoingOpt).mapNone { getRarestPieceOpt(Option.Some(cld)) }
    }
    }.toNullable()
    }

    比一堆if套娃可读性要高不知道哪里去了(bushi

    转换函数大概都长这样,就是简单地生成一个Option,作为桥接器:

    1
    2
    3
    4
    fun expiredRequestOpt(): Option<Block> = when (val er = expiredRequest(peerId)) {
    null -> Option.None
    else -> Option.Some(er)
    }

多线程管理

既然上了Kotlin怎么能不用可爱的协程呢?(划掉

有几个不同的地方用了协程的不同特性:

  • PeerRetriever生产Peer并push到一个队列中,然后每个Worker从队列里提取Peer,这是一个生产-消费模型,用Channel就可以很好的协调起来了。

    首先在TorrentClient里生产Peer:

    1
    2
    3
    4
    5
    6
    PeerRetriever(...)
    .retrievePeers(...)
    .let {
    it.first.forEach { peers.send(it) }
    // 这里的第一个it是个Peer的列表对Interval的Pair,取first
    }

    然后把peers传给每一个Worker,

    最后在Worker的主入口函数里消费Peer:peer = peers.receive()

    Channel的容量设置为Channel.UNLIMITED,因为不想在生产端进行阻塞。

  • 每一个Worker都是一个协程,这一点做起来很简单,Worker的start()函数本就是suspend的,只需要在workers.onEach()里面的it.start()外面包裹一层scope就可以了:

    1
    2
    3
    CoroutineScope(Dispatchers.IO).launch {
    it.start()
    }

    当然也可以不包裹起来,这时候就是单线程模式,第一个协程下载完毕后才会交出控制权然后其他协程依次被启动。

    单线程对于debug相对多线程要友好许多(雾

  • 进度条也在一个单独的协程里,实现起来只要简单地把pieceManager.trackProgress()包裹在一个CoroutineScope下就可以了。

当然了,启动协程后还需要各线程之间进行交流,控制每个Worker在未完成下载的时候进行循环,在循环里反复请求下载下一个块,最后下载完毕后退出。这些主要是通过Worker类里面的terminatedrequestPending等flag,PieceManager里的isComplete()来进行控制的。

相对应的,PieceManager里面的missingPiecesonGoingPiecesfinishedPiecespendingRequests都使用了ConcurrentLinkedDeque,配合Mutex来避免产生race condition。

记录日志

C++里面的loguru感觉很好用,但是Java里没有对应的库。对应的,选用了Logback库,配置xml文件参考了这里还有这里。Logback在打日志的时候会把自己的记录也打印出来,要关闭的话可以参考这里,不过必须把debug调为false才行。

然后为了能被全局开启关闭,就写了个带参单例Log类,在Log类里面设置了个flag(根据这里的提示设为@Volatile),然后把info/warn/error/debug分别包装起来。在Kotlin里面实现单例模式很简单,只要所有东西都写在companion object里面就好了。

这里有个看起来不是很漂亮的地方是Log里重复写了4次几乎相同的函数,但是考虑到反射的性能开销还有Log是高频调用的开销,就觉得还是这样好了(

关于Tracker的回复……Compact或者不是

想了很久没有想到怎样用orElse或者getOrElse来串接non-compact和compact的运算逻辑,最后还是添加了一个Result.isSuccess()方法来提取尝试提取non-compact的结果,如果提取成功就按照non-compact来走,不然就提取compact的结果并且开始解码。

具体的实现倒是平淡无奇,只是简单地照着描述去实现而已,不过感觉写起来相当的丑啊(

记一个很隐蔽的bug的发现过程

起因是下载的时候总是进度为0,开了日志后发现虽然有不断调用下一个Piece,但是offset永远为0。

  • 检查nextOngoing函数,怀疑是peer的BitField存错了(因为看到了一堆-1)
  • 首先检查addPeer函数,发现问题不在这里
  • 然后检查hasPiece函数,发现也没问题,而且-1实际上就是ff,换句话说就是每一个Piece都有的意思
  • 回过头再看nextOngoing函数,给这个函数打断点,这里必须拆分出next变量并判断变量是否为空,然后在if和else里分别打断点了
  • 发现了两个问题
    • nextOngoing里面的linq可能写错了,因为firstOrNull { ba.hasPiece(it.index) }只测试这个Piece被Peer所拥有,但是这个Piece不一定有Missing块
    • pendingRequest总是不会把每一个Piece的第一项包含进去
  • 仔细想的话,这里的问题是在pendingRequest不包含每个Piece的第一项所以它会一直处于Pending但是无法被提取的状态。如果所有的Pending的Block都在pendingRequest队列里面的话,那么确实不需要filter if any is Missing,反过来说,即使filter了any is Missing,仍然会有Piece因为含有Pending而被pass掉……
  • 最后的解决方案是在getRarestPieceOpt()里面加了一行?.also { block -> block.appendToPendingRequests() }。同时,把pending的间隔调短为1秒(因为会反复调取来下载在队列中但未被完全下载的块,所以间隔不能调太长)

尾声

基本上到这里就差不多了。断断续续修了几个bug改了一些东西又花了一些时间,现在的客户端可以用来下载小型文件,大文件的话,还不确定能不能跑起来。

因为算是第一次接触网络编程还有用协程来写,所以感觉磕磕碰碰的。代码方面,倒是试图利用linq和一些基础的FP的概念来让代码可读性变得更好一点。不过感觉最重要的还是对网络还有协议的理解了。除此之外,Kotlin/Java的字符串的实现和C/C++的很不一样,用起来坑也很多,尤其是对于Bencode这种把二进制结构和字符串缝合在一起的数据结构。不夸张地说,这个项目里起码三分之一的时间都用来处理字符串相关的问题了。

实现的功能也就只有最基本的下载,不支持断点续传、多个文件下载以及做种,也没有pipeline。不过两周时间内磕磕碰碰写出一个能跑的客户端还是很有成就感的w

已知的问题

  • 和特定的Peer连接的时候Handshake会失败(socket试图读68个字节但读了个EOF)
  • 线程没吃满
  • 建立TCP时候的Timeout太长了

参考