一个函数写了两天,学成这样确实可以放弃了
更新:写了一个星期
从这个实验(准确来说是上个实验)开始,接下来的三个实验将实现一个 TCP Socket
,其结构如下所示:
实验目标
在本次实验中,需要实现的为 TCP Receiver
中的 Stream Reassembler
部件,其包涵了 lab0
中实现的 ByteStream
,所以如果上个实验的测试点没有全过去(除了最后一个测试点),那在这个实验中可能还需要修改上一次实验的代码,建议把 lab0
完全通关再来写这个。
StreamReassembler
的工作原理在 handout
中描述的不算很详细,在这里解释一下它的原理:
Info
TCP 发送方将其字节流划分为多个短段 (每个不超过 1460 字节的子字符串),以便每个短段都能装入数据报中。
但是网络可能会重新排序这些数据报,或者丢弃它们,或者不止一次地发送它们。接收端必须将这些段重新组装成它们开始时的连续字节流。
接收端必须将这些段重新组装成它们开始时的连续字节流。
本质上来说,就是使用不可靠的数据报提供一对可靠的字节流,从加粗的字可以看出实际上接受方收到的数据段可能是重复乱序的,所以需要对这些数据段进行取舍,以拼接出一份正确的数据报。
接下来回答一些思路上的疑问:
FAQ
-
传入的参数
index
的意义当前数据段
data
第一个字符在整个数据包中的位置 -
capacity
到底指代什么还是这张图
因此我们需要记录当前已经输出了的数据报的索引,以此来确定我们应该留下哪些数据段,丢弃哪些数据段
-
重组后如何输出结果
每次收到数据段后就尝试进行输出
实验过程
成员定义
在这之前,因为我们需要存储暂时不能输出的数据段,这里我的做法是使用 set
来存储,并按照 index
的大小进行排序(红黑树),时间复杂度为 ,应该会有快的方法但我懒得写了,思考了一下可能用位图和队列来存储会更快。
这里我的存储结构定义如下:
struct Node{ std::string data = ""; size_t idx = 0; bool operator < (const Node b) const{ return idx < b.idx; } }; //!< 消息结构
std::set<Node> _sequence = {}; //!< 消息序列 size_t _first_unread; //!< 第一个未读字节的索引 size_t _unassembled_bytes; //!< 未组装字节的数量 bool _eof_flag; //!< eof标志
在构造函数中初始化如下:
StreamReassembler::StreamReassembler(const size_t capacity) : _first_unread(0), _unassembled_bytes(0),_eof_flag(false), _output(capacity), _capacity(capacity) {}
push_substring
实验最难的部分就是这个函数(感觉是废话), push_substring
的难点在于子串的合并,暴力的话需要讨论情况太多,相当于一个大模拟,所以我的流程是:
- 判断是否超出容量
- 处理前后冗余部分
- 合并子串
- 写入字节流
- 处理
EOF
判断容量
显然我们应该扔掉索引超过 first_unacceptable
的内容,因为这种我们不能进行暂存,并且网络会重复发,我们不必担心之后会收不到
if(index >= this->_first_unread + this->_capacity) return;
处理冗余
冗余是指类似如下的情况:
- 传入数据段
{data="ghX", idx=6}
,然而capacity=8
因此我们无法存下最后一个字符X
,所以X
在这里需要被去除 - 传入数据段
{data="abcde", idx=0}
,然而first_unread=2
,因此字符ab
实际上已经被输出,我们不应该再次存储
显然,因此在这里我们需要对传入的数据段的首尾进行判断,截取其子串进行存储。
Node node;if(index + data.length() < this->_first_unread) _handle_eof(eof);else { size_t head_offset = 0, tail_offset = 0; if(index < this->_first_unread){ head_offset = this->_first_unread - index; node.idx = this->_first_unread; } else if(index + data.length() > this->_first_unread + this->_capacity){ tail_offset = index + data.length() - this->_first_unread - this->_capacity; node.idx = index; } else { node.idx = index; } node.data.assign(data.begin() + head_offset, data.end() - tail_offset);}this->_unassembled_bytes += node.data.length();
合并子串
假设在 set
中已经存储了如下两个结构:
{data="abc", idx=0}, {data="gh", idx=6}
现在传入的数据段为:{data="cdefh", idx=2}
显然这个数据段是可以被合并的,并且前后均需要被合并,于是我们在这里使用 lower_bound
进行二分合并:
int merge = 0;auto iter = this->_sequence.lower_bound(node);while((iter != this->_sequence.end()) && (merge = _rectifying(node, *iter)) >= 0){ this->_unassembled_bytes -= merge; this->_sequence.erase(iter); iter = this->_sequence.lower_bound(node);}
while(iter != this->_sequence.begin()){ iter--; merge = _rectifying(node, *iter); if(merge >= 0){ this->_unassembled_bytes -= merge; this->_sequence.erase(iter); iter = this->_sequence.lower_bound(node); }}
if(node.data.length() > 0) this->_sequence.insert(node);
其中 _rectifying
将 b
合并至 a
中,并更新其 idx
,如下:
int StreamReassembler::_rectifying(StreamReassembler::Node &a, const StreamReassembler::Node &b) { int merge = -1; Node x, y; if(a.idx > b.idx) x = b, y = a; else x = a, y = b;
if(x.idx + x.data.length() < y.idx) merge = -1; else if(x.idx + x.data.length() >= y.idx + y.data.length()){ a = x; merge = int(y.data.length()); } else { merge = int(x.idx + x.data.length() - y.idx); a.data = x.data + y.data.substr(merge); a.idx = x.idx; } return merge;}
注意到,最后我们对 node.data
进行空串的判断,我们需要忽略空串而不是将其加入 set
中
为什么不在一开始就进行忽略,这是因为空串可能包含着 EOF
标识,我们需要对其进行判断。
处理输出
简单的处理,只需要对 first_unread
进行判断,并将 set
中的内容写入 ByteStream
中即可
while(!this->_sequence.empty() && this->_sequence.begin()->idx == this->_first_unread){ const Node &x = *this->_sequence.begin(); size_t write = this->_output.write(x.data); this->_first_unread += write; this->_unassembled_bytes -= write; this->_sequence.erase(this->_sequence.begin());}
判断 eof
对 eof
的判断并不像想象中的那样无脑,因为 eof
只是数据段中可能包涵的一个标识,并且这个 eof
标识有可能先到达了而中间数据端还没有到(或者还没有被翻译出去),因此真实的 eof
判断应该由传入的标识与 StreamReassembler
是否存在未翻译的字符共同决定。
void StreamReassembler::_handle_eof(const bool eof) { if(eof) this->_eof_flag = true; if(this->_eof_flag && this->_unassembled_bytes == 0) this->_output.end_input();}
当完全达到 eof
后,我们需要关闭 ByteStream
的写端。
其他函数
size_t StreamReassembler::unassembled_bytes() const { return this->_unassembled_bytes;}
bool StreamReassembler::empty() const { return this->_unassembled_bytes == 0; }
实验结果
运行 make -j4
后,运行 make check_lab1
,如下所示:
最后一个错误仍然是因为我不是真实的 Linux
系统(WSL1),所以会导致这个