#include "rcp.h" #include "ip.h" #include "flags.h" #include "random.h" #include "tcp-sink.h" // for MWM only static class RcpClass : public TclClass { public: RcpClass() : TclClass("Agent/TCP/RCP") {} // or Agent/RCP TclObject* create(int , const char*const*) { return (new RcpAgent(new Racker)); } } class_rcp; /***************************************************************************** * * Racker * *****************************************************************************/ Racker::Racker() : wndmask_(MWM), pipe_(0) { rcvnext_ = 0; maxseen_ = maxreq_ = -1; buf_ = new PKI[MWS]; /* resequencing & request */ memset(buf_, 0, sizeof(PKI)*(MWS)); bufsize_ = MWM; blength_ = npacket_ = 0; } Racker::~Racker() { delete[] buf_; } void Racker::reset(double wnd) { rcvnext_ = 0; maxseen_ = maxreq_ = -1; memset(buf_, 0, sizeof(PKI)*(wndmask_+1)); bufsize_ = (int) wnd; assert(bufsize_<=wndmask_); } /* * Note request() introduces a very small time difference among packets * bursting out at the same time such that the transmission order can * be inferred when performing loss detection (out-of-order) at resequence() * -- hopefully it should not be necessary in real-life implementation * * Ideally, this does not have the effects of using the "burst_send" or * "delay_send" timer that tries to smooth the burstiness, * since the time gap does not prevent packets from non-overlapping */ int Racker::request(int fid, int seq, double* ts, int force) { #define TEENY_TIME 0.000001 // 1us static double reqt_=-1.0, reqi_=0.0; // need re-entry int xmt = seq; if (xmt<0) return -1; // could be due to flow control if (seq > maxreq_) {maxreq_=seq; assert(maxreq_-rcvnext_ < wndmask_);} if (reqt_ != NOW) {reqt_=NOW; reqi_=0.0;} else {reqi_+=TEENY_TIME;} pki_(seq).req = xmt; // sequence number to pull pki_(seq).sent = *ts = reqt_+reqi_; // when is it sent out pki_(seq).seen = 0; // has received or not pki_(seq).later= 0; // how many later packets rcvd pipe_ ++; // for send_much // printf("\nt=%f fid=%d seq=%d xmit=%d sent=%f",NOW,fid,seq,xmt,*ts); return xmt; } int Racker::resequence(Packet *pkt) { int seq = HDR_TCP(pkt)->seqno(); assert(seq-rcvnext_ < wndmask_); int numBytes = HDR_CMN(pkt)->size(); assert(numBytes>0); int numToDeliver = 0; int is_dup = FALSE; int next = rcvnext_; if (seq > maxseen_) { maxseen_ = seq; assert(seq<=maxreq_); } if (seq < next) is_dup = TRUE; if (seq >= next && seq <= maxseen_) { if (pki_(seq).seen) is_dup = TRUE; pki_(seq).seen = numBytes; while (pki_(next).seen) { numToDeliver += pki_(next).seen; next++; if (next>maxseen_) break; } rcvnext_ = next; } blength_ += (maxseen_ - rcvnext_ + 1); npacket_ += 1; // if (is_dup) printf("\n%.6f\tduplicate\t%d",NOW,seq); /* * set the pipe size */ pipe_ = 0; // recalculate pipe_ double t= HDR_TCP(pkt)->ts_echo(); // when is it sent out //assert(t==pki_(seq).sent); // void if reassigned for (int i=rcvnext_;i<=maxreq_;i++) if (!pki_(i).seen) { if (pki_(i).sent=3) loseit(i); if (!islost(i)) pipe_ ++; // still in transit } /* * translate the incoming data packet to an accumulated ACK * required by TCP's congestion control */ HDR_TCP(pkt)->seqno() = rcvnext_ - 1; return numToDeliver; } #define LATERPKT NUMDUPACKS // 3 in tcp.h int Racker::islost(int seq) { return (pki_(seq).seen==0 && pki_(seq).later>=LATERPKT); } void Racker::loseit(int seq) { if (pki_(seq).laterreset(wnd_); repnxt_ = 0; } void RcpAgent::delay_bind_init_all() { delay_bind_init_one("o_seqno_"); delay_bind_init_one("r_seqno_"); delay_bind_init_one("b_length_"); TcpAgent::delay_bind_init_all(); } int RcpAgent::delay_bind_dispatch(const char *var, const char *local, TclObject *tracer) { if (delay_bind(var,local,"o_seqno_",&o_seqno_,tracer)) return TCL_OK; if (delay_bind(var,local,"r_seqno_",&r_seqno_,tracer)) return TCL_OK; if (delay_bind(var,local,"b_length_",&b_length_,tracer)) return TCL_OK; return TcpAgent::delay_bind_dispatch(var, local, tracer); } int RcpAgent::command(int argc, const char*const* argv) { if (argc == 2) { if (strcmp(argv[1], "reseq-buffer") == 0) { double b = racker_->buffer(); Tcl::instance().resultf("%.2f", b); return TCL_OK; } } return (TcpAgent::command(argc, argv)); } /* * any function recognizable by RCPTap should do here (Agent member function) */ void RcpAgent::sendmsg(int cseq, AppData*, const char*) { curseq_ = cseq; assert(curseq_>0); send_much(0, 0, maxburst_); } void RcpAgent::recv(Packet *pkt, Handler *h) { if (state_==SLEEP_) {Packet::free(pkt); return;} // not receiving any ++ndatapack_; /* * deliver the packet to hub and let it do some processing */ r_seqno_ = HDR_TCP(pkt)->seqno(); // recv sequence # /* * reliability */ int numToDeliver = racker_->resequence(pkt); if (numToDeliver) recvBytes(numToDeliver); //printf("\nt=%f fid=%d seq=%d",NOW,fid_,int(r_seqno_)); b_length_ = racker_->buflen(); // inst buffer length if (state_==PASSIVE_) {Packet::free(pkt); return;} // simply receiving /* * congestion control */ hdr_tcp *tcph = hdr_tcp::access(pkt); if (state_==NORMAL_) { if (tcph->seqno() > last_ack_) { recv_newack_helper(pkt); timeout_ = FALSE; if (last_ack_==0 && delay_growth_) cwnd_ = initial_window(); } else if ((int)tcph->seqno() < last_ack_) { } else if (timeout_ == FALSE) { // if (pdrop_[(1+tcph->seqno())&MWM]>0.0) {rcp_eln(pkt); return;} dupacks_ ++; if (racker_->islost(tcph->seqno()+1)) { // if (dupacks_ == numdupacks_) { /* printf("\nt=%f fid=%d dup seq=%d islost=%d recover=%d", NOW,fid_,tcph->seqno(), racker_->islost(tcph->seqno()+1),int(maxseq_)); assert(racker_->islost(tcph->seqno()+1)); */ dupack_action(); } else if (dupacks_ < numdupacks_ && singledup_) { send_one(); } } if (dupacks_ == 0) send_much(FALSE, 0, maxburst_); } else { if (tcph->seqno() >= recover_) { /* printf("\nt=%f out-of-recover recover=%d seq=%d", NOW,recover_,tcph->seqno()); */ recover_ = 0; state_ = NORMAL_; timeout_ = FALSE; newack(pkt); } else if (tcph->seqno() > highest_ack_) { highest_ack_ = tcph->seqno(); t_backoff_ = 1; newtimer(pkt); } else if (timeout_ == FALSE) { // if (pdrop_[(1+tcph->seqno())&MWM]>0.0) {rcp_eln(pkt); return;} if (dupacks_ > 0) dupacks_ ++; } send_much(0, 0, maxburst_); } Packet::free(pkt); } void RcpAgent::dupack_action() { int recovered = (highest_ack_ > recover_); if (recovered || !bug_fix_) goto rcp_action; if (bug_fix_) return; rcp_action: trace_event("FAST_RECOVERY"); recover_ = maxseq_; state_ = FASTRECOV_; last_cwnd_action_ = CWND_ACTION_DUPACK; slowdown(CLOSE_SSTHRESH_HALF|CLOSE_CWND_HALF); reset_rtx_timer(1, 0); sendpkt(last_ack_+1, TCP_REASON_DUPACK, 1); } int RcpAgent::send_allowed(int seq) { // if (seq >= curseq_) return 0; // int top = highest_ack_ + int(wnd_); // if (seq >= top) return 0; return (racker_->pipe() < cwnd_); } void RcpAgent::send_much(int force, int reason, int maxburst) { int npacket = 0; if (!force && delsnd_timer_.status()==TIMER_PENDING) return; while (1) { int seq = racker_->nextseq(); if (seq<0 || seq >= curseq_) break; if (!force && !send_allowed(seq)) break; if (!force && overhead_!=0 && delsnd_timer_.status()!=TIMER_PENDING) { delsnd_timer_.resched(Random::uniform(overhead_)); return; } if (!sendpkt(seq)) break; if (seq>=t_seqno_) t_seqno_ = seq + 1; force = 0; if (maxburst && ++npacket==maxburst) break; } } void RcpAgent::send_one() { send_much(0, 0, 1); } void RcpAgent::resume() { send_much(0, 0, maxburst_); } void RcpAgent::output(int seqno, int reason) { /* * make sure the change of syntax is known * (output may return without sending) */ assert("RCP::output(): use sendpkt() instead"==0); } int RcpAgent::sendpkt(int seqno, int reason, int force) { /* * find a token */ double ts; int xmt = racker_->request(fid_, seqno, &ts, force); if (xmt<0) {assert(!force && rtx_timer_.status()==TIMER_PENDING); return 0;} //printf("\nt=%f fid=%d sendpkt seq=%d xmt=%d ts=%f",NOW,fid,seqno,xmt,ts); // /* * packet header */ Packet* p = allocpkt(); hdr_tcp *tcph = hdr_tcp::access(p); hdr_flags* hf = hdr_flags::access(p); int databytes = hdr_cmn::access(p)->size(); tcph->seqno() = xmt; // packet to request tcph->ts() = ts; assert(ts>=NOW); tcph->ts_echo() = ts_peer_; tcph->reason() = reason; if (cong_action_) { // set from slowdown() hf->cong_action() = TRUE; // congestion action. cong_action_ = FALSE; } if (syn_ && firstpkt_) { // send SYN? curseq_ += 1; firstpkt_ = 0; HDR_CMN(p)->size() = databytes = 0; assert(useHeaders_==true); } if (useHeaders_==true) HDR_CMN(p)->size() += headersize(); /* * see if this is a new packet or a retransmission */ if (seqno > maxseq_) { maxseq_ = seqno; if (!rtt_active_) { rtt_active_ = 1; if (seqno > rtt_seq_) { rtt_seq_ = seqno; rtt_ts_ = Scheduler::instance().clock(); } } } else { ++nrexmitpack_; nrexmitbytes_ += databytes; hf->pull() = 1; } /* * to protect from request loss (reuse the SACK related fields) */ int N = tcph->sa_length() = (repnxt_>=REPN ? REPN : repnxt_); for (int i=0;isack_area_[i/2][i%2]=rep_(repnxt_- 1 - i); rep_(repnxt_++) = tcph->seqno(); /* * off it goes */ o_seqno_ = tcph->seqno(); // seq number requested ++nackpack_; // overload as nreqpack_ if (ts==NOW) send(p, 0); else Scheduler::instance().schedule(target_, p, ts-NOW); /* * reset timer if none pending */ int force_set_rtx_timer = (highest_ack_==maxseq_ ? 1 : 0); if (!(rtx_timer_.status() == TIMER_PENDING) || force_set_rtx_timer) set_rtx_timer(); if (seqno == curseq_ && seqno > maxseq_) // tell application I have idle(); // sent everything so far return 1; // sent one } void RcpAgent::timeout(int tno) { if (tno==TCP_TIMER_RTX) { trace_event("TIMEOUT"); if (cwnd_ < 1) cwnd_ = 1; if (highest_ack_==maxseq_ && !slow_start_restart_) { /* * no outstanding data */ } else { dupacks_ = 0; recover_ = maxseq_; state_ = NORMAL_; timeout_ = TRUE; if (highest_ack_==-1 && wnd_init_option_==2) wnd_init_option_ = 1; if (highest_ack_==maxseq_ && restart_bugfix_) { /* * no outstanding data, don't cut ssthresh_ */ slowdown(CLOSE_CWND_ONE); } else if (highest_ack_timeout(); last_cwnd_action_ = CWND_ACTION_TIMEOUT; send_much(1, TCP_REASON_TIMEOUT, maxburst_); } else { timeout_nonrtx(tno); } } /* * Check if the packet (ack) has the ELN bit set, and if it does, and if the * last ELN-rxmitted packet is smaller than this one, then retransmit the * packet. Do not adjust the cwnd when this happens. */ void RcpAgent::rcp_eln(Packet *pkt) { //int eln_rxmit; hdr_tcp *tcph = hdr_tcp::access(pkt); int ack = tcph->seqno() + 1; if (++dupacks_ == eln_rxmit_thresh_ && ack > eln_last_rxmit_) { /* Retransmit this packet */ // printf("\nt=%f ack=%d elnlast=%d",NOW,ack,int(eln_last_rxmit_)); sendpkt(last_ack_ + 1, TCP_REASON_DUPACK, 1); eln_last_rxmit_ = last_ack_+1; } else send_much(0, 0, maxburst_); Packet::free(pkt); return; }