42#include <sys/socket.h>
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType(
Utils::IPAll ),
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
159 pStreamName.c_str() );
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
173 if( !pTransport || !pPoller || !pChannelData )
177 pChannelData, 0,
this );
179 pSubStreams[0]->socket = s;
212 return pSubStreams[0]->socket->EnableUplink();
216 return pSubStreams[path.
up]->socket->EnableUplink();
227 time_t now = ::time(0);
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
232 gettimeofday( &pConnectionStarted, 0 );
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
245 pLastFatalError = st;
249 if( pPrefer.IsValid() )
251 std::vector<XrdNetAddr> addrresses;
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
272 pAddresses.swap( tmp );
279 while( !pAddresses.empty() )
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.
up )
320 "substream %d, using 0 instead", pStreamName.c_str(),
326 "substream %d expecting answer at %d", pStreamName.c_str(),
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.
up]->outQueue->PushBack( msg, handler,
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369 (*it)->socket->Close();
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
390 pIncomingQueue->ReportTimeout( now );
405 StreamConnectorTask(
const XrdCl::URL &url,
const std::string &n ):
408 std::string name =
"StreamConnectorTask for ";
431 ServerResponse *rsp =
reinterpret_cast<ServerResponse*
>( response.GetBuffer() );
434 ClientCloseRequest *req;
437 memcpy( req->
fhandle,
reinterpret_cast<uint8_t*
>( rsp->
body.buffer.data ), 4 );
439 msg->SetSessionId( pSessionId );
440 NullResponseHandler *handler =
new NullResponseHandler();
441 MessageSendParams params;
443 params.followRedirects =
false;
444 params.stateful =
true;
452 bool Stream::IsPartial(
Message &msg )
454 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
460 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
472 std::shared_ptr<Message> msg,
473 uint32_t bytesReceived )
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
487 if( !IsPartial( *msg ) )
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
496 RequestClose( *msg );
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->
hdr.
status,
520 pStreamName.c_str(), msg.get() );
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
528 if( IsPartial( *msg ) )
537 Job *job =
new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
544 std::pair<Message *, MsgHandler *>
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
554 pSubStreams[subStream]->socket->DisableUplink();
559 h.
msg = pSubStreams[subStream]->outQueue->PopMessage( h.
handler,
564 "from out-queue to in-queue, starting to send outgoing.",
565 pUrl->GetHostId().c_str(), h.
handler,
573 pIncomingQueue->AddMessageHandler( h.
handler, rmMsg );
578 pStreamName.c_str() );
590 if( pSubStreams[subStream]->outQueue->IsEmpty() )
593 pSubStreams[subStream]->socket->GetStreamName().c_str() );
594 pSubStreams[subStream]->socket->DisableUplink();
605 pTransport->MessageSent( msg, subStream, bytesSent,
608 pBytesSent += bytesSent;
612 pIncomingQueue->AssignTimeout( h.
handler );
617 pSubStreams[subStream]->outMsgHelper.
Reset();
628 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
631 subStream, ipstack.c_str() );
635 pLastStreamError = 0;
637 pConnectionCount = 0;
638 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639 pSessionId = ++sSessCntGen;
644 if( pSubStreams.size() == 1 && numSub > 1 )
646 for( uint16_t i = 1; i < numSub; ++i )
648 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
650 pChannelData, i,
this );
652 pSubStreams[i]->socket = s;
661 if( pSubStreams.size() > 1 )
664 pStreamName.c_str(), pSubStreams.size() - 1 );
665 for(
size_t i = 1; i < pSubStreams.size(); ++i )
667 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
671 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672 pSubStreams[i]->socket->Close();
686 gettimeofday( &pConnectionDone, 0 );
691 i.
server = pUrl->GetHostId();
692 i.
sTOD = pConnectionStarted;
693 i.
eTOD = pConnectionDone;
694 i.
streams = pSubStreams.size();
697 std::string *qryResponse =
nullptr;
699 qryResult.
Get( qryResponse );
702 i.
auth = *qryResponse;
716 else if( pOnDataConnJob )
721 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
732 pSubStreams[subStream]->socket->Close();
733 time_t now = ::time(0);
747 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
750 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
752 OnFatalError( 0, st, scopedLock );
759 OnFatalError( subStream, status, scopedLock );
766 time_t elapsed = now-pConnectionInitTime;
768 pStreamName.c_str(), (
long long) elapsed, pConnectionWindow );
773 if( !pAddresses.empty() )
778 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
779 pAddresses.pop_back();
780 pConnectionInitTime = ::time( 0 );
781 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
783 while( !pAddresses.empty() && !st.
IsOK() );
786 OnFatalError( subStream, st, scopedLock );
794 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
798 pStreamName.c_str(), (
long long) (pConnectionWindow - elapsed) );
800 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
801 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
808 else if( pConnectionCount < pConnectionRetry && !status.
IsFatal() )
815 OnFatalError( subStream, st, scopedLock );
822 OnFatalError( subStream, status, scopedLock );
832 pSubStreams[subStream]->socket->Close();
836 pStreamName.c_str(), subStream, status.
ToString().c_str() );
841 if( pSubStreams[subStream]->outMsgHelper.msg )
846 pIncomingQueue->RemoveMessageHandler(h.
handler);
847 pSubStreams[subStream]->outMsgHelper.
Reset();
853 if( pSubStreams[subStream]->inMsgHelper.handler )
869 if( pSubStreams[subStream]->outQueue->IsEmpty() )
874 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
877 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
879 OnFatalError( 0, st, scopedLock );
883 OnFatalError( subStream, status, scopedLock );
894 MonitorDisconnection( status );
896 SubStreamList::iterator it;
897 size_t outstanding = 0;
898 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
899 outstanding += (*it)->outQueue->GetSizeStateless();
907 OnFatalError( 0, st, scopedLock );
917 "message handlers.", pStreamName.c_str() );
919 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
937 for(
size_t substream = 0; substream < pSubStreams.size(); ++substream )
940 pSubStreams[substream]->socket->Close();
945 pStreamName.c_str(), status.
ToString().c_str() );
950 if( pSubStreams[substream]->outMsgHelper.msg )
955 pIncomingQueue->RemoveMessageHandler(h.
handler);
956 pSubStreams[substream]->outMsgHelper.
Reset();
962 if( pSubStreams[substream]->inMsgHelper.handler )
972 pConnectionCount = 0;
979 "message handlers.", pStreamName.c_str() );
981 SubStreamList::iterator it;
983 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
996 void Stream::OnFatalError( uint16_t subStream,
1003 pStreamName.c_str(), status.
ToString().c_str() );
1011 pConnectionCount = 0;
1012 pLastStreamError = ::time(0);
1013 pLastFatalError = status;
1016 SubStreamList::iterator it;
1018 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1019 q.GrabItems( *(*it)->outQueue );
1032 void Stream::MonitorDisconnection(
XRootDStatus status )
1037 Monitor::DisconnectInfo i;
1038 i.server = pUrl->GetHostId();
1039 i.rBytes = pBytesReceived;
1040 i.sBytes = pBytesSent;
1041 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1055 if( substream != 0 )
1065 SubStreamList::iterator it;
1066 time_t now = time(0);
1069 uint32_t outgoingMessages = 0;
1070 time_t lastActivity = 0;
1071 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1073 outgoingMessages += (*it)->outQueue->GetSize();
1074 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1075 if( lastActivity < sockLastActivity )
1076 lastActivity = sockLastActivity;
1079 if( !outgoingMessages )
1081 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1086 pStreamName.c_str() );
1103 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1127 pChannelEvHandlers.AddHandler( handler );
1135 pChannelEvHandlers.RemoveHandler( handler );
1146 mh.
handler = pIncomingQueue->GetHandlerForMessage( msg,
1174 pIncomingQueue->RemoveMessageHandler( mh.
handler );
1201 std::vector<XrdNetAddr> prefaddrs;
1206 , pStreamName.c_str(), url.
GetHostName().c_str() );
1213 std::vector<XrdNetAddr> aliasaddrs;
1218 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1225 auto itr = prefaddrs.begin();
1226 for( ; itr != prefaddrs.end() ; ++itr )
1228 auto itr2 = aliasaddrs.begin();
1229 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1230 if( itr->Same( &*itr2 ) )
return true;
1245 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpAddr() ),
false );
1251 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpStack() ),
false );
1257 result.
Set(
new std::string( pSubStreams[0]->socket->GetHostName() ),
false );
union ServerResponse::@040373375333017131300127053271011057331004327334 body
struct ServerResponseBody_Status bdy
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ StreamBroken
The stream is broken.
@ FatalError
Stream has been broken and won't be recovered.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
const std::string & GetHostName() const
Get the name of the target host.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.