#ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
#define SPONGE_LIBSPONGE_TCP_SENDER_HH
#include <functional>
#include <queue>
#include "byte_stream.hh"
#include "tcp_config.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"
//! Simple retransmission timer for tcp time out ARQ
class Timer
{
private:
bool _active{false};
bool _expried{false};
unsigned _timer_odd{0};
public:
void start(const unsigned &rto);
void stop();
void tick(const unsigned &ms);
bool is_active() const { return _active; }
bool is_expired() const { return _expried && _active; }
};
//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender
{
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{};
//! retransmission timer for the connection
unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{0};
//! timer for TCP ACQ
Timer _timer{};
//! RTO timeout
unsigned int _rto{_initial_retransmission_timeout};
//! counter for retransmission
unsigned int _consecutive_retransmissions_count{0};
//! FIFO for retrans segment
std::queue<TCPSegment> _flight_buffer{};
//! window size from receiver
uint16_t _window_size{1};
//! last ackno
uint64_t _last_ackno{0};
// void send_segment(TCPSegment &seg);
//!\breif function for sending tcp payload to stream queue
//!\param[in] info for segment
//!\param[in] head flag SYN
//!\param[in] head flag FIN
bool _send_segment(std::string &&payload, bool syn = false, bool fin = false);
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {});
//! \name "Input" interface for the writer
//!@{
ByteStream &stream_in() { return _stream; }
const ByteStream &stream_in() const { return _stream; }
//!@}
//! \name Methods that can cause the TCPSender to send a segment
//!@{
//! \brief A new acknowledgment was received
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
//! \brief Generate an empty-payload segment (useful for creating empty ACK
//! segments)
void send_empty_segment();
//! \brief create and send segments to fill as much of the window as possible
void fill_window();
//! \brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick);
//!@}
//! \name Accessors
//!@{
//! \brief How many sequence numbers are occupied by segments sent but not
//! yet acknowledged? \note count is in "sequence space," i.e. SYN and FIN
//! each count for one byte (see TCPSegment::length_in_sequence_space())
size_t bytes_in_flight() const;
//! \brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions() const;
//! \brief TCPSegments that the TCPSender has enqueued for transmission.
//! \note These must be dequeued and sent by the TCPConnection,
//! which will need to fill in the fields that are set by the TCPReceiver
//! (ackno and window size) before sending.
std::queue<TCPSegment> &segments_out() { return _segments_out; }
//!@}
//! \name What is the next sequence number? (used for testing)
//!@{
//! \brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute() const { return _next_seqno; }
//! \brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
//!@}
};
#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
#include "tcp_sender.hh"
#include "tcp_config.hh"
#include <random>
#include <cassert>
#include <string>
#include <iostream>
using namespace std;
void Timer::start(const unsigned &rto)
{
_active = true;
_expried = false;
_timer_odd = rto;
}
void Timer::stop()
{
_active = _expried = false;
_timer_odd = 0;
}
void Timer::tick(const unsigned &ms)
{
if (!_active)
return;
else if (ms >= _timer_odd)
{
_expried = true;
_timer_odd = 0;
}
else
_timer_odd -= ms;
}
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before
//! retransmitting the oldest outstanding segment \param[in] fixed_isn the
//! Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout,
const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{random_device()()})),
_initial_retransmission_timeout{retx_timeout}, _stream(capacity)
{
}
//! bytes in flight which not ack
size_t TCPSender::bytes_in_flight() const
{
return _next_seqno - _last_ackno;
}
//! \param[in] the payload for tcp segment
//! \param[in] tcp head flag SYN
//! \param[in] tcp head flag fin
bool TCPSender::_send_segment(std::string &&payload, bool syn, bool fin)
{
// Construct TCP Header
TCPSegment segment;
segment.header().syn = syn;
segment.header().fin = fin;
segment.header().seqno = next_seqno();
// Payload
segment.payload() = Buffer(move(payload));
size_t segment_size = segment.length_in_sequence_space();
if (segment_size == 0) return false;
// update
_next_seqno += segment_size;
_flight_buffer.push(segment);
_segments_out.emplace(segment);
// Set Timer
if (!_timer.is_active()) _timer.start(_rto);
return true;
}
void TCPSender::fill_window()
{
// CLOSED (waiting for stream to begin no SYN sent)
if (next_seqno_absolute() == 0)
{
// send SYN
_send_segment("", true);
return;
}
// SYN_SEND (stream started but nothing acknowledged)
else if (next_seqno_absolute() > 0
&& next_seqno_absolute() == bytes_in_flight())
{
return;
}
size_t cur_window_size = (_window_size == 0) ? 1 : _window_size;
while (cur_window_size > bytes_in_flight())
{
// SYN_ACKED (stream ongoing)
if (next_seqno_absolute() > bytes_in_flight() && !stream_in().eof())
{
// flag for send success
bool success_send = false;
size_t payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE,
cur_window_size - bytes_in_flight());
string payload = move(_stream.read(payload_size));
// stream reached EOF and remaining window size can insert FIN flag
if (stream_in().eof()
&& cur_window_size - bytes_in_flight() - payload.size() > 0)
success_send = _send_segment(move(payload), false, true);
else
success_send = _send_segment(move(payload));
// Nothing to send cause segment length is zore,break.
if (!success_send) break;
}
else if (stream_in().eof())
{
// SYN_ACK (stream ongoing, stream has reached EOF, but FIN flag hasn't been sent yet)
if (next_seqno_absolute() < stream_in().bytes_written() + 2)
{
_send_segment("", false, true);
}
else
// FIN_SENT
break;
}
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size)
{
// update window size
_window_size = window_size;
// get 64-bit absolute ackno
uint64_t abs_ackno = unwrap(ackno, _isn, _last_ackno);
// if something impossible return
if (abs_ackno > next_seqno_absolute()) return;
// if ackno is new ack, check retrans buffer
if (abs_ackno > _last_ackno)
{
// update new 64-bit ackno
_last_ackno = abs_ackno;
while (!_flight_buffer.empty())
{
const TCPSegment &seg = _flight_buffer.front();
if (seg.header().seqno.raw_value() + seg.length_in_sequence_space()
<= ackno.raw_value())
_flight_buffer.pop();
else
break;
}
// update timer setting
_consecutive_retransmissions_count = 0;
_rto = _initial_retransmission_timeout;
if (!_flight_buffer.empty())
_timer.start(_rto);
else
_timer.stop();
}
fill_window();
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick)
{
// timer elapse
_timer.tick(ms_since_last_tick);
// if timer out and retrans buffer is not empty
if (_timer.is_expired() && !_flight_buffer.empty())
{
// retrans
_segments_out.push(_flight_buffer.front());
// window size has odd cause of internet's bad status, double RTO
if (_window_size > 0)
{
_rto *= 2;
++_consecutive_retransmissions_count;
std::cout << _consecutive_retransmissions_count << std::endl;
}
_timer.start(_rto);
}
else if (_flight_buffer.empty())
{
_timer.stop();
}
}
//! \param return retransmission times
unsigned int TCPSender::consecutive_retransmissions() const
{
return _consecutive_retransmissions_count;
}
void TCPSender::send_empty_segment()
{
TCPSegment empty_segment;
empty_segment.header().seqno = next_seqno();
_segments_out.push(empty_segment);
}
改了三天,终于通关收工,真的太多坑了。过几天来总结复盘。