#include "peer.h" #include #include #include "./btcontent.h" #include "./msgencode.h" #include "./peerlist.h" #include "./btconfig.h" btBasic Self; void btBasic::SetIp(struct sockaddr_in6 addr) { memcpy(&m_sin.sin6_addr,&addr.sin6_addr,sizeof(struct in6_addr)); } void btBasic::SetAddress(struct sockaddr_in6 addr) { memcpy(&m_sin,&addr,sizeof(struct sockaddr_in6)); } int btBasic::IpEquiv(struct sockaddr_in6 addr) { // fprintf(stdout,"IpEquiv: %s <=> ", inet_ntoa(m_sin.sin_addr)); // fprintf(stdout,"%s\n", inet_ntoa(addr.sin_addr)); return (memcmp(&m_sin.sin6_addr,&addr.sin6_addr,sizeof(struct in6_addr)) == 0) ? 1 : 0; } int btPeer::Need_Local_Data() { if( m_state.remote_interested && !bitfield.IsFull()){ if( BTCONTENT.pBF->IsFull() ) return 1; // i am seed BitField tmpBitfield = *BTCONTENT.pBF; tmpBitfield.Except(bitfield); return tmpBitfield.IsEmpty() ? 0 : 1; } return 0; } int btPeer::Need_Remote_Data() { if( BTCONTENT.pBF->IsFull()) return 0; else if( bitfield.IsFull() ) return 1; else{ BitField tmpBitfield = bitfield; tmpBitfield.Except(*BTCONTENT.pBF); return tmpBitfield.IsEmpty() ? 0 : 1; } return 0; } btPeer::btPeer() { m_f_keepalive = 0; m_status = P_CONNECTING; m_unchoke_timestamp = (time_t) 0; time(&m_last_timestamp); m_state.remote_choked = m_state.local_choked = 1; m_state.remote_interested = m_state.local_interested = 0; m_err_count = 0; m_cached_idx = BTCONTENT.GetNPieces(); } int btPeer::SetLocal(unsigned char s) { switch(s){ case M_CHOKE: if( m_state.local_choked ) return 0; m_state.local_choked = 1; break; case M_UNCHOKE: if( !reponse_q.IsEmpty() ) StartULTimer(); if( !m_state.local_choked ) return 0; time(&m_unchoke_timestamp); m_state.local_choked = 0; break; case M_INTERESTED: if( m_state.local_interested ) return 0; m_state.local_interested = 1; break; case M_NOT_INTERESTED: if( !m_state.local_interested ) return 0; m_state.local_interested = 0; break; default: return -1; // BUG ??? } return stream.Send_State(s); } int btPeer::RequestPiece() { size_t idx; PENDINGQUEUE.ReAssign(&request_q,bitfield); if( !request_q.IsEmpty() ) return SendRequest(); if( m_cached_idx < BTCONTENT.GetNPieces() ){ idx = m_cached_idx; m_cached_idx = BTCONTENT.GetNPieces(); if( !BTCONTENT.pBF->IsSet(idx) && !PENDINGQUEUE.Exist(idx) && !WORLD.AlreadyRequested(idx) ){ return (request_q.CreateWithIdx(idx) < 0) ? -1 : SendRequest(); } }else{ BitField tmpBitField; if( bitfield.IsFull() ){ tmpBitField = *BTCONTENT.pBF; tmpBitField.Invert(); }else{ tmpBitField = bitfield; tmpBitField.Except(*BTCONTENT.pBF); } if( !tmpBitField.IsEmpty() ){ WORLD.CheckBitField(tmpBitField); if(tmpBitField.IsEmpty()){ btPeer *peer = WORLD.Who_Can_Abandon(this); if(peer){ peer->StopDLTimer(); request_q = peer->request_q; if(peer->CancelRequest(request_q.GetHead()) < 0 || peer->RequestCheck() < 0){ peer->CloseConnection(); } return SendRequest(); } }else{ idx = tmpBitField.Random(); return (request_q.CreateWithIdx(idx) < 0) ? -1 : SendRequest(); } } } return 0; } int btPeer::MsgDeliver() { size_t r,idx,off,len; char *msgbuf = stream.in_buffer.BasePointer(); r = ntohl(*(size_t*) msgbuf); if( 0 == r ){ time(&m_last_timestamp); if( !m_f_keepalive ) if( stream.Send_Keepalive() < 0 ) return -1; m_f_keepalive = 0; return (!m_state.remote_choked && request_q.IsEmpty()) ? RequestCheck() : 0; }else{ switch(msgbuf[4]){ case M_CHOKE: if(H_BASE_LEN != r){ return -1;} m_state.remote_choked = 1; StopDLTimer(); if( !request_q.IsEmpty()){ PSLICE ps = request_q.GetHead(); PENDINGQUEUE.Pending(&request_q); if( CancelRequest(ps) < 0) return -1; } return 0; case M_UNCHOKE: if(H_BASE_LEN != r){return -1;} m_state.remote_choked = 0; return RequestCheck(); case M_INTERESTED: if(H_BASE_LEN != r){return -1;} m_state.remote_interested = 1; break; case M_NOT_INTERESTED: if(r != H_BASE_LEN){return -1;} m_state.remote_interested = 0; StopULTimer(); /* remove peer's reponse queue */ if( !reponse_q.IsEmpty()) reponse_q.Empty(); return 0; case M_HAVE: if(H_HAVE_LEN != r){return -1;} idx = ntohl(*(size_t*) (msgbuf + 5)); if( idx >= BTCONTENT.GetNPieces() || bitfield.IsSet(idx)) return -1; bitfield.Set(idx); if( bitfield.IsFull() && BTCONTENT.pBF->IsFull() ){ return -2; } if( !BTCONTENT.pBF->IsSet(idx) ) m_cached_idx = idx; return ( !m_state.remote_choked && request_q.IsEmpty() ) ? RequestCheck() : 0; case M_REQUEST: if(H_REQUEST_LEN != r || !m_state.remote_interested){ return -1; } idx = ntohl(*(size_t*)(msgbuf + 5)); if( !BTCONTENT.pBF->IsSet(idx) ) return -1; off = ntohl(*(size_t*)(msgbuf + 9)); len = ntohl(*(size_t*)(msgbuf + 13)); if( !reponse_q.IsValidRequest(idx, off, len) ) return -1; return reponse_q.Add(idx, off, len); case M_PIECE: if( request_q.IsEmpty() || !m_state.local_interested){ m_err_count++; return 0; } return PieceDeliver(r); case M_BITFIELD: if( (r - 1) != bitfield.NBytes() || !bitfield.IsEmpty()) return -1; bitfield.SetReferBuffer(msgbuf + 5); if(bitfield.IsFull() && BTCONTENT.pBF->IsFull()) return -2; return 0; case M_CANCEL: if(r != H_CANCEL_LEN || !m_state.remote_interested) return -1; idx = ntohl(*(size_t*)(msgbuf + 5)); off = ntohl(*(size_t*)(msgbuf + 9)); len = ntohl(*(size_t*)(msgbuf + 13)); if( reponse_q.Remove(idx,off,len) < 0 ){ m_err_count++; return 0; } if( reponse_q.IsEmpty() ) StopULTimer(); return 0; default: return -1; // unknow message type } } return 0; } int btPeer::ReponseSlice() { size_t len = 0; reponse_q.Peek((size_t*) 0,(size_t*) 0, &len); if(len && stream.out_buffer.LeftSize() <= (len + 13 + 3 * 1024)) stream.Flush(); if(len && stream.out_buffer.LeftSize() > (len + 13 + 3 * 1024)){ size_t idx,off; reponse_q.Pop(&idx,&off,(size_t *) 0); if(BTCONTENT.ReadSlice(BTCONTENT.global_piece_buffer,idx,off,len) != 0 ){ return -1; } Self.DataSended(len); DataSended(len); return stream.Send_Piece(idx,off,BTCONTENT.global_piece_buffer,len); } return 0; } int btPeer::SendRequest() { PSLICE ps = request_q.GetHead(); for( ; ps ; ps = ps->next ) if(stream.Send_Request(ps->index,ps->offset,ps->length) < 0){ return -1; } return stream.Flush(); } int btPeer::CancelRequest(PSLICE ps) { for( ; ps; ps = ps->next){ if(stream.Send_Cancel(ps->index,ps->offset,ps->length) < 0) return -1; } return stream.Flush(); } int btPeer::ReportComplete(size_t idx) { if( BTCONTENT.APieceComplete(idx) ){ WORLD.Tell_World_I_Have(idx); if( BTCONTENT.pBF->IsFull() ){ ResetDLTimer(); WORLD.CloseAllConnectionToSeed(); } }else m_err_count++; return (P_FAILED == m_status) ? -1 : RequestCheck(); } int btPeer::PieceDeliver(size_t mlen) { size_t idx,off,len; char *msgbuf = stream.in_buffer.BasePointer(); idx = ntohl(*(size_t*) (msgbuf + 5)); off = ntohl(*(size_t*) (msgbuf + 9)); len = mlen - 9; if( request_q.Remove(idx,off,len) < 0 ){ m_err_count++; return 0; } if(BTCONTENT.WriteSlice((char*)(msgbuf + 13),idx,off,len) < 0){ return 0; } Self.StartDLTimer(); Self.DataRecved(len); DataRecved(len); /* if piece download complete. */ return request_q.IsEmpty() ? ReportComplete(idx) : 0; } int btPeer::RequestCheck() { if( BandWidthLimit() ) return 0; if( BTCONTENT.pBF->IsFull() ){ if( bitfield.IsFull() ){ return -1; } return SetLocal(M_NOT_INTERESTED); } if( Need_Remote_Data() ){ if(!m_state.local_interested && SetLocal(M_INTERESTED) < 0) return -1; if(request_q.IsEmpty() && !m_state.remote_choked){ if( RequestPiece() < 0 ) return -1; } } if(!request_q.IsEmpty()) StartDLTimer(); return 0; } void btPeer::CloseConnection() { if( P_FAILED != m_status ){ m_status = P_FAILED; stream.Close(); } } int btPeer::HandShake() { ssize_t r = stream.Feed(); if( r < 0 ) return -1; else if( r < 68 ){ if(r && memcmp(stream.in_buffer.BasePointer(),BTCONTENT.GetShakeBuffer(),r) != 0) return -1; return 0; } if( memcmp(stream.in_buffer.BasePointer(),BTCONTENT.GetShakeBuffer(),48) != 0 ) return -1; // ignore peer id verify if( !BTCONTENT.pBF->IsEmpty()){ char *bf = new char[BTCONTENT.pBF->NBytes()]; #ifndef WINDOWS if(!bf) return -1; #endif BTCONTENT.pBF->WriteToBuffer(bf); r = stream.Send_Bitfield(bf,BTCONTENT.pBF->NBytes()); delete []bf; } if( r >= 0){ if( stream.in_buffer.PickUp(68) < 0 ) return -1; m_status = P_SUCCESS; } return r; } int btPeer::Send_ShakeInfo() { return stream.Send_Buffer((char*)BTCONTENT.GetShakeBuffer(),68); } int btPeer::BandWidthLimit() { if( cfg_max_bandwidth <= 0 ) return 0; return ((Self.RateDL() + Self.RateUL()*2) / 1024 >= cfg_max_bandwidth) ? 1:0; } int btPeer::NeedWrite() { int yn = 0; if( stream.out_buffer.Count() || // data need send in buffer. (!reponse_q.IsEmpty() && CouldReponseSlice() && !BandWidthLimit()) || P_CONNECTING == m_status ) // peer is connecting yn = 1; return yn; } int btPeer::CouldReponseSlice() { if(!m_state.local_choked && (stream.out_buffer.LeftSize() > reponse_q.GetRequestLen() + 4 * 1024 )) return 1; return 0; } int btPeer::AreYouOK() { m_f_keepalive = 1; return stream.Send_Keepalive(); } int btPeer::RecvModule() { int f_peer_closed = 0; ssize_t r; if ( 64 < m_err_count ) return -1; r = stream.Feed(); if( r < 0 && r != -2 ) return -1; else if ( r == -2 ) f_peer_closed = 1; r = stream.HaveMessage(); for( ; r;){ if( r < 0 ) return -1; if(MsgDeliver() < 0 || stream.PickMessage() < 0) return -1; r = stream.HaveMessage(); } return f_peer_closed ? -1 : 0; } int btPeer::SendModule() { if( stream.out_buffer.Count() && stream.Flush() < 0) return -1; if(! reponse_q.IsEmpty() && CouldReponseSlice() ) { StartULTimer(); Self.StartULTimer(); } for(; !reponse_q.IsEmpty() && CouldReponseSlice(); ) if( ReponseSlice() < 0) return -1; return 0; } void btPeer::dump() { struct sockaddr_in6 sin; char buforek[256]; GetAddress(&sin); printf("%s: %d -> %d:%d %lud:%lud\n", inet_ntop(AF_INET6, &sin.sin6_addr, buforek, sizeof(buforek) ), bitfield.Count(), Is_Remote_UnChoked() ? 1 : 0, request_q.IsEmpty() ? 0 : 1, (unsigned long)TotalDL(), (unsigned long)TotalUL()); }