CS144 Lab3 TCPSender

发布于:2024-06-28 ⋅ 阅读:(17) ⋅ 点赞:(0)
#ifndef SPONGE_LIBSPONGE_TCP_SENDER_HH
#define SPONGE_LIBSPONGE_TCP_SENDER_HH

#include <functional>
#include <queue>

#include "byte_stream.hh"
#include "tcp_config.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"

//! Simple retransmission timer for tcp time out ARQ
class Timer
{
private:
    bool _active{false};
    bool _expried{false};

    unsigned _timer_odd{0};

public:
    void start(const unsigned &rto);

    void stop();

    void tick(const unsigned &ms);

    bool is_active() const { return _active; }
    bool is_expired() const { return _expried && _active; }
};

//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender
{
private:
    //! our initial sequence number, the number for our SYN.
    WrappingInt32 _isn;

    //! outbound queue of segments that the TCPSender wants sent
    std::queue<TCPSegment> _segments_out{};

    //! retransmission timer for the connection
    unsigned int _initial_retransmission_timeout;

    //! outgoing stream of bytes that have not yet been sent
    ByteStream _stream;

    //! the (absolute) sequence number for the next byte to be sent
    uint64_t _next_seqno{0};

    //! timer for TCP ACQ
    Timer _timer{};

    //! RTO timeout
    unsigned int _rto{_initial_retransmission_timeout};

    //! counter for retransmission
    unsigned int _consecutive_retransmissions_count{0};

    //! FIFO for retrans segment
    std::queue<TCPSegment> _flight_buffer{};

    //! window size from receiver
    uint16_t _window_size{1};

    //! last ackno
    uint64_t _last_ackno{0};
    // void send_segment(TCPSegment &seg);

    //!\breif function for sending tcp payload to stream queue
    //!\param[in] info for segment
    //!\param[in] head flag SYN
    //!\param[in] head flag FIN
    bool _send_segment(std::string &&payload, bool syn = false, bool fin = false);

public:
    //! Initialize a TCPSender
    TCPSender(const size_t capacity       = TCPConfig::DEFAULT_CAPACITY,
              const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
              const std::optional<WrappingInt32> fixed_isn = {});

    //! \name "Input" interface for the writer
    //!@{
    ByteStream &stream_in() { return _stream; }
    const ByteStream &stream_in() const { return _stream; }
    //!@}

    //! \name Methods that can cause the TCPSender to send a segment
    //!@{

    //! \brief A new acknowledgment was received
    void ack_received(const WrappingInt32 ackno, const uint16_t window_size);

    //! \brief Generate an empty-payload segment (useful for creating empty ACK
    //! segments)
    void send_empty_segment();

    //! \brief create and send segments to fill as much of the window as possible
    void fill_window();

    //! \brief Notifies the TCPSender of the passage of time
    void tick(const size_t ms_since_last_tick);
    //!@}

    //! \name Accessors
    //!@{

    //! \brief How many sequence numbers are occupied by segments sent but not
    //! yet acknowledged? \note count is in "sequence space," i.e. SYN and FIN
    //! each count for one byte (see TCPSegment::length_in_sequence_space())
    size_t bytes_in_flight() const;

    //! \brief Number of consecutive retransmissions that have occurred in a row
    unsigned int consecutive_retransmissions() const;

    //! \brief TCPSegments that the TCPSender has enqueued for transmission.
    //! \note These must be dequeued and sent by the TCPConnection,
    //! which will need to fill in the fields that are set by the TCPReceiver
    //! (ackno and window size) before sending.
    std::queue<TCPSegment> &segments_out() { return _segments_out; }
    //!@}

    //! \name What is the next sequence number? (used for testing)
    //!@{

    //! \brief absolute seqno for the next byte to be sent
    uint64_t next_seqno_absolute() const { return _next_seqno; }

    //! \brief relative seqno for the next byte to be sent
    WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
    //!@}
};

#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
#include "tcp_sender.hh"
#include "tcp_config.hh"
#include <random>
#include <cassert>
#include <string>
#include <iostream>

using namespace std;

void Timer::start(const unsigned &rto)
{
    _active    = true;
    _expried   = false;
    _timer_odd = rto;
}

void Timer::stop()
{
    _active = _expried = false;
    _timer_odd         = 0;
}

void Timer::tick(const unsigned &ms)
{
    if (!_active)
        return;
    else if (ms >= _timer_odd)
    {
        _expried   = true;
        _timer_odd = 0;
    }
    else
        _timer_odd -= ms;
}

//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before
//! retransmitting the oldest outstanding segment \param[in] fixed_isn the
//! Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout,
                     const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})),
      _initial_retransmission_timeout{retx_timeout}, _stream(capacity)
{
}

//! bytes in flight which not ack
size_t TCPSender::bytes_in_flight() const
{
    return _next_seqno - _last_ackno;
}

//! \param[in] the payload for tcp segment
//! \param[in] tcp head flag SYN
//! \param[in] tcp head flag fin
bool TCPSender::_send_segment(std::string &&payload, bool syn, bool fin)
{
    // Construct TCP Header
    TCPSegment segment;
    segment.header().syn   = syn;
    segment.header().fin   = fin;
    segment.header().seqno = next_seqno();

    // Payload
    segment.payload()   = Buffer(move(payload));
    size_t segment_size = segment.length_in_sequence_space();
    if (segment_size == 0) return false;

    // update
    _next_seqno += segment_size;
    _flight_buffer.push(segment);
    _segments_out.emplace(segment);

    // Set Timer
    if (!_timer.is_active()) _timer.start(_rto);

    return true;
}

void TCPSender::fill_window()
{
    // CLOSED (waiting for stream to begin no SYN sent)
    if (next_seqno_absolute() == 0)
    {
        // send SYN
        _send_segment("", true);
        return;
    }
    // SYN_SEND (stream started but nothing acknowledged)
    else if (next_seqno_absolute() > 0
             && next_seqno_absolute() == bytes_in_flight())
    {
        return;
    }
    size_t cur_window_size = (_window_size == 0) ? 1 : _window_size;
    while (cur_window_size > bytes_in_flight())
    {
        // SYN_ACKED (stream ongoing)
        if (next_seqno_absolute() > bytes_in_flight() && !stream_in().eof())
        {
            // flag for send success
            bool success_send = false;

            size_t payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE,
                                      cur_window_size - bytes_in_flight());
            string payload      = move(_stream.read(payload_size));
            // stream reached EOF and  remaining window size can insert FIN flag
            if (stream_in().eof()
                && cur_window_size - bytes_in_flight() - payload.size() > 0)
                success_send = _send_segment(move(payload), false, true);
            else
                success_send = _send_segment(move(payload));
            // Nothing to send cause segment length is zore,break.
            if (!success_send) break;
        }
        else if (stream_in().eof())
        {
            // SYN_ACK (stream ongoing, stream has reached EOF, but FIN flag hasn't been sent yet)
            if (next_seqno_absolute() < stream_in().bytes_written() + 2)
            {
                _send_segment("", false, true);
            }
            else
                // FIN_SENT
                break;
        }
    }
}

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size)
{
    // update window size
    _window_size = window_size;
    // get 64-bit absolute ackno
    uint64_t abs_ackno = unwrap(ackno, _isn, _last_ackno);
    // if something impossible return
    if (abs_ackno > next_seqno_absolute()) return;
    // if ackno is new ack, check retrans buffer
    if (abs_ackno > _last_ackno)
    {
        // update new 64-bit ackno
        _last_ackno = abs_ackno;

        while (!_flight_buffer.empty())
        {
            const TCPSegment &seg = _flight_buffer.front();
            if (seg.header().seqno.raw_value() + seg.length_in_sequence_space()
                <= ackno.raw_value())
                _flight_buffer.pop();
            else
                break;
        }
        // update timer setting
        _consecutive_retransmissions_count = 0;
        _rto                               = _initial_retransmission_timeout;
        if (!_flight_buffer.empty())
            _timer.start(_rto);
        else
            _timer.stop();
    }
    fill_window();
}

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick)
{
    // timer elapse
    _timer.tick(ms_since_last_tick);
    // if timer out and retrans buffer is not empty
    if (_timer.is_expired() && !_flight_buffer.empty())
    {
        // retrans
        _segments_out.push(_flight_buffer.front());
        // window size has odd cause of internet's bad status, double RTO
        if (_window_size > 0)
        {
            _rto *= 2;
            ++_consecutive_retransmissions_count;
            std::cout << _consecutive_retransmissions_count << std::endl;
        }
        _timer.start(_rto);
    }
    else if (_flight_buffer.empty())
    {
        _timer.stop();
    }
}

//! \param return retransmission times
unsigned int TCPSender::consecutive_retransmissions() const
{
    return _consecutive_retransmissions_count;
}

void TCPSender::send_empty_segment()
{
    TCPSegment empty_segment;
    empty_segment.header().seqno = next_seqno();
    _segments_out.push(empty_segment);
}

        改了三天,终于通关收工,真的太多坑了。过几天来总结复盘。