#include "rcp-tap.h" #include "flags.h" /* * TclObject & Initialization */ static class RcpTapClass : public TclClass { public: RcpTapClass() : TclClass("Agent/RCPTap") {} TclObject* create(int , const char*const*) { return (new RcpTap()); } } class_rcptap; RcpTap::RcpTap() : Agent(PT_TCP), curseq_(0), maxseq_(-1), r_seqno_(-1), o_seqno_(-1), nreqpack_(0), closed_(0), ts_peer_(0) { type_ = PT_TCP; peer_ = NULL; repnxt_ = 0; for (int i=0;i maxseq_) advanceby(newseq - curseq_); else advanceby(maxseq_ - curseq_); return TCL_OK; } if (strcmp(argv[1], "advanceby") == 0) { advanceby(atoi(argv[2])); return TCL_OK; } if (strcmp(argv[1], "attach-peer") == 0) { peer_ = (Agent*)TclObject::lookup(argv[2]); return (peer_ ? TCL_OK : TCL_ERROR); } } return (Agent::command(argc, argv)); } /* * one-way TCP uses packets instead of bytes */ void RcpTap::sendmsg(int nbytes, const char* /*flags*/) { if (nbytes == -1 && curseq_ <= TCP_MAXSEQ) curseq_ = TCP_MAXSEQ; else curseq_ += (nbytes/size_ + (nbytes%size_ ? 1 : 0)); //send_much(0, 0, maxburst_); /* XXX: inform the receiver to start pulling to facilitate the same * usage in the TCL scripts for TCP and RCP */ assert(peer_); peer_->sendmsg(curseq_,NULL,0); /* note the sematics is different */ } void RcpTap::advanceby(int delta) { curseq_ += delta; if (delta > 0) closed_ = 0; //send_much(0, 0, maxburst_); assert(peer_); peer_->sendmsg(curseq_,NULL,0); /* note the semantics is different */ } /* * recv */ void RcpTap::recv(Packet *pkt, Handler *) { hdr_tcp *tcph = hdr_tcp::access(pkt); assert(HDR_CMN(pkt)->ptype() == PT_REQUEST); ++nreqpack_; ts_peer_ = tcph->ts(); int p=0, j=0; for (int i=0;isa_length();i++) { int u = tcph->sack_area_[i/2][i%2]; assert(u>=0); int m = rep_(repnxt_-1-j>0 ? repnxt_-1-j : 0); if (u!=m) { // fprintf(stderr,"\nt=%f i=%d u=%d j=%d m=%d",NOW,i,u,j,m); output(u, TCP_REASON_REQOVER); p ++; } else j++; } repnxt_ += (tcph->sa_length()-j); for (int i=0;isa_length();i++) rep_(repnxt_-1-i)=tcph->sack_area_[i/2][i%2]; int seqno = r_seqno_ = tcph->seqno(); if (hdr_flags::access(pkt)->pull()==1) { output(seqno, 0); p ++; rep_(repnxt_++) = seqno; } else { // assert(seqno==maxseq_+1); // is pull necessary? /* * pull: accumulated request * * XXX: what if t_seqno_ is reset? */ assert(seqno>maxseq_); for (int i=maxseq_+1;i<=seqno;i++) { output(i, (i==seqno?0:TCP_REASON_REQOVER)); p ++; if (i==seqno) rep_(repnxt_++) = i; // sync the state } } Packet::free(pkt); } /* * output packets */ int RcpTap::headersize() { int total = tcpip_base_hdr_size_; assert (total>0); if (ts_option_) total += ts_option_size_; return (total); } void RcpTap::output(int seqno, int reason) { Packet *p = allocpkt(); hdr_tcp *tcph = hdr_tcp::access(p); int databytes = hdr_cmn::access(p)->size(); tcph->seqno() = o_seqno_ = seqno; tcph->ts() = Scheduler::instance().clock(); tcph->ts_echo()= ts_peer_; tcph->reason() = reason; if (reason==TCP_REASON_REQOVER) hdr_flags::access(p)->no_ts_ = 1; // don't use for rtt if (seqno == 0) { if (syn_) { // send out initial SYN packet databytes = 0; curseq_ += 1; // XXX: inform the receiver hdr_cmn::access(p)->size() = tcpip_base_hdr_size_; } else if (useHeaders_ == true) { hdr_cmn::access(p)->size() += headersize(); } } else if (useHeaders_ == true) { hdr_cmn::access(p)->size() += headersize(); } ++ndatapack_; ndatabytes_ += databytes; send(p, 0); if (seqno == curseq_ && seqno > maxseq_) idle(); if (seqno > maxseq_) maxseq_ = seqno; else { ++nrexmitpack_; nrexmitbytes_ += databytes; } }