XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor.
 
 ~Stream ()
 Destructor.
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
 
void Disconnect (bool force=false)
 Disconnect the stream.
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection.
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error.
 
const std::string & GetName () const
 Return stream name.
 
const URLGetURL () const
 Get the URL.
 
XRootDStatus Initialize ()
 Initializer.
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
 
void SetChannelData (AnyObject *channelData)
 Set the channel data.
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
 
void SetJobManager (JobManager *jobManager)
 Set job manager.
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
 
void SetPoller (Poller *poller)
 Set the poller.
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
 
void SetTransport (TransportHandler *transport)
 Set the transport.
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58 {
59 Disconnected = 0,
60 Connected = 1,
61 Connecting = 2,
62 Error = 3
63 };
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL * url,
const URL & prefer = URL() )

Constructor.

Definition at line 96 of file XrdClStream.cc.

96 :
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
void Disconnect(bool force=false)
Disconnect the stream.

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL & url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1188 of file XrdClStream.cc.

1189 {
1190 Log *log = DefaultEnv::GetLog();
1191
1192 //--------------------------------------------------------------------------
1193 // Resolve all the addresses of the host we're supposed to connect to
1194 //--------------------------------------------------------------------------
1195 std::vector<XrdNetAddr> prefaddrs;
1196 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1197 if( !st.IsOK() )
1198 {
1199 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1200 , pStreamName.c_str(), url.GetHostName().c_str() );
1201 return false;
1202 }
1203
1204 //--------------------------------------------------------------------------
1205 // Resolve all the addresses of the alias
1206 //--------------------------------------------------------------------------
1207 std::vector<XrdNetAddr> aliasaddrs;
1208 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1209 if( !st.IsOK() )
1210 {
1211 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1212 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1213 return false;
1214 }
1215
1216 //--------------------------------------------------------------------------
1217 // Now check if the preferred host is part of the alias
1218 //--------------------------------------------------------------------------
1219 auto itr = prefaddrs.begin();
1220 for( ; itr != prefaddrs.end() ; ++itr )
1221 {
1222 auto itr2 = aliasaddrs.begin();
1223 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1224 if( itr->Same( &*itr2 ) ) return true;
1225 }
1226
1227 return false;
1228 }
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t subStream)

Disables respective uplink if empty.

Definition at line 585 of file XrdClStream.cc.

586 {
587 XrdSysMutexHelper scopedLock( pMutex );
588 Log *log = DefaultEnv::GetLog();
589
590 if( pSubStreams[subStream]->outQueue->IsEmpty() )
591 {
592 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593 pSubStreams[subStream]->socket->GetStreamName().c_str() );
594 pSubStreams[subStream]->socket->DisableUplink();
595 }
596 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
@ Disconnected
The socket is disconnected.

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID & path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus status,
bool hush = false )

Force error.

Definition at line 927 of file XrdClStream.cc.

928 {
929 XrdSysMutexHelper scopedLock( pMutex );
930 Log *log = DefaultEnv::GetLog();
931 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
932 {
933 if( pSubStreams[substream]->status != Socket::Connected ) continue;
934 pSubStreams[substream]->socket->Close();
935 pSubStreams[substream]->status = Socket::Disconnected;
936
937 if( !hush )
938 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
939 pStreamName.c_str(), status.ToString().c_str() );
940
941 //--------------------------------------------------------------------
942 // Reinsert the stuff that we have failed to sent
943 //--------------------------------------------------------------------
944 if( pSubStreams[substream]->outMsgHelper.msg )
945 {
946 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
947 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
948 h.stateful );
949 pIncomingQueue->RemoveMessageHandler(h.handler);
950 pSubStreams[substream]->outMsgHelper.Reset();
951 }
952
953 //--------------------------------------------------------------------
954 // Reinsert the receiving handler and reset any partially read partial
955 //--------------------------------------------------------------------
956 if( pSubStreams[substream]->inMsgHelper.handler )
957 {
958 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
959 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
960 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
961 if( xrdHandler ) xrdHandler->PartialReceived();
962 h.Reset();
963 }
964 }
965
966 pConnectionCount = 0;
967
968 //------------------------------------------------------------------------
969 // We're done here, unlock the stream mutex to avoid deadlocks and
970 // report the disconnection event to the handlers
971 //------------------------------------------------------------------------
972 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
973 "message handlers.", pStreamName.c_str() );
974
975 SubStreamList::iterator it;
976 OutQueue q;
977 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
978 q.GrabItems( *(*it)->outQueue );
979 scopedLock.UnLock();
980
981 q.Report( status );
982
983 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
984 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
985 }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171 {
172 return pStreamName;
173 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158 {
159 return pUrl;
160 }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172 {
173 if( !pTransport || !pPoller || !pChannelData )
174 return XRootDStatus( stError, errUninitialized );
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t stream,
MsgHandler *& incHandler )

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1157 of file XrdClStream.cc.

1159 {
1160 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1161 if( !mh.handler )
1163
1164 uint16_t action = mh.handler->InspectStatusRsp();
1165 mh.action |= action;
1166
1167 if( action & MsgHandler::RemoveHandler )
1168 pIncomingQueue->RemoveMessageHandler( mh.handler );
1169
1170 if( action & MsgHandler::Raw )
1171 {
1172 incHandler = mh.handler;
1173 return MsgHandler::Raw;
1174 }
1175
1176 if( action & MsgHandler::Corrupted )
1177 return MsgHandler::Corrupted;
1178
1179 if( action & MsgHandler::More )
1180 return MsgHandler::More;
1181
1182 return MsgHandler::None;
1183 }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > & msg,
uint16_t stream )

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1136 of file XrdClStream.cc.

1137 {
1138 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1139 if( !mh.handler )
1140 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1141 mh.expires,
1142 mh.action );
1143
1144 if( !mh.handler )
1145 return nullptr;
1146
1147 if( mh.action & MsgHandler::Raw )
1148 return mh.handler;
1149 return nullptr;
1150 }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t subStream)

Call back when a message has been reconstructed.

Definition at line 623 of file XrdClStream.cc.

624 {
625 XrdSysMutexHelper scopedLock( pMutex );
626 pSubStreams[subStream]->status = Socket::Connected;
627
628 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629 Log *log = DefaultEnv::GetLog();
630 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631 subStream, ipstack.c_str() );
632
633 if( subStream == 0 )
634 {
635 pLastStreamError = 0;
636 pLastFatalError = XRootDStatus();
637 pConnectionCount = 0;
638 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639 pSessionId = ++sSessCntGen;
640
641 //------------------------------------------------------------------------
642 // Create the streams if they don't exist yet
643 //------------------------------------------------------------------------
644 if( pSubStreams.size() == 1 && numSub > 1 )
645 {
646 for( uint16_t i = 1; i < numSub; ++i )
647 {
648 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650 pChannelData, i, this );
651 pSubStreams.push_back( new SubStreamData() );
652 pSubStreams[i]->socket = s;
653 }
654 }
655
656 //------------------------------------------------------------------------
657 // Connect the extra streams, if we fail we move all the outgoing items
658 // to stream 0, we don't need to enable the uplink here, because it
659 // should be already enabled after the handshaking process is completed.
660 //------------------------------------------------------------------------
661 if( pSubStreams.size() > 1 )
662 {
663 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664 pStreamName.c_str(), pSubStreams.size() - 1 );
665 for( size_t i = 1; i < pSubStreams.size(); ++i )
666 {
667 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669 if( !st.IsOK() )
670 {
671 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672 pSubStreams[i]->socket->Close();
673 }
674 else
675 {
676 pSubStreams[i]->status = Socket::Connecting;
677 }
678 }
679 }
680
681 //------------------------------------------------------------------------
682 // Inform monitoring
683 //------------------------------------------------------------------------
684 pBytesSent = 0;
685 pBytesReceived = 0;
686 gettimeofday( &pConnectionDone, 0 );
687 Monitor *mon = DefaultEnv::GetMonitor();
688 if( mon )
689 {
690 Monitor::ConnectInfo i;
691 i.server = pUrl->GetHostId();
692 i.sTOD = pConnectionStarted;
693 i.eTOD = pConnectionDone;
694 i.streams = pSubStreams.size();
695
696 AnyObject qryResult;
697 std::string *qryResponse = 0;
698 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699 qryResult.Get( qryResponse );
700 i.auth = *qryResponse;
701 delete qryResponse;
702 mon->Event( Monitor::EvConnect, &i );
703 }
704
705 //------------------------------------------------------------------------
706 // For every connected control-stream call the global on-connect handler
707 //------------------------------------------------------------------------
709 }
710 else if( pOnDataConnJob )
711 {
712 //------------------------------------------------------------------------
713 // For every connected data-stream call the on-connect handler
714 //------------------------------------------------------------------------
715 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
716 }
717 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t subStream,
XRootDStatus status )

On connect error.

Definition at line 722 of file XrdClStream.cc.

723 {
724 XrdSysMutexHelper scopedLock( pMutex );
725 Log *log = DefaultEnv::GetLog();
726 pSubStreams[subStream]->socket->Close();
727 time_t now = ::time(0);
728
729 //--------------------------------------------------------------------------
730 // For every connection error call the global connection error handler
731 //--------------------------------------------------------------------------
733
734 //--------------------------------------------------------------------------
735 // If we connected subStream == 0 and cannot connect >0 then we just give
736 // up and move the outgoing messages to another queue
737 //--------------------------------------------------------------------------
738 if( subStream > 0 )
739 {
740 pSubStreams[subStream]->status = Socket::Disconnected;
741 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
742 if( pSubStreams[0]->status == Socket::Connected )
743 {
744 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
745 if( !st.IsOK() )
746 OnFatalError( 0, st, scopedLock );
747 return;
748 }
749
750 if( pSubStreams[0]->status == Socket::Connecting )
751 return;
752
753 OnFatalError( subStream, status, scopedLock );
754 return;
755 }
756
757 //--------------------------------------------------------------------------
758 // Check if we still have time to try and do something in the current window
759 //--------------------------------------------------------------------------
760 time_t elapsed = now-pConnectionInitTime;
761 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
762 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
763
764 //------------------------------------------------------------------------
765 // If we have some IP addresses left we try them
766 //------------------------------------------------------------------------
767 if( !pAddresses.empty() )
768 {
769 XRootDStatus st;
770 do
771 {
772 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
773 pAddresses.pop_back();
774 pConnectionInitTime = ::time( 0 );
775 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
776 }
777 while( !pAddresses.empty() && !st.IsOK() );
778
779 if( !st.IsOK() )
780 OnFatalError( subStream, st, scopedLock );
781
782 return;
783 }
784 //------------------------------------------------------------------------
785 // If we still can retry with the same host name, we sleep until the end
786 // of the connection window and try
787 //------------------------------------------------------------------------
788 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
789 && !status.IsFatal() )
790 {
791 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
792 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
793
794 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
795 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
796 return;
797 }
798 //--------------------------------------------------------------------------
799 // We are out of the connection window, the only thing we can do here
800 // is re-resolving the host name and retrying if we still can
801 //--------------------------------------------------------------------------
802 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
803 {
804 pAddresses.clear();
805 pSubStreams[0]->status = Socket::Disconnected;
806 PathID path( 0, 0 );
807 XRootDStatus st = EnableLink( path );
808 if( !st.IsOK() )
809 OnFatalError( subStream, st, scopedLock );
810 return;
811 }
812
813 //--------------------------------------------------------------------------
814 // Else, we fail
815 //--------------------------------------------------------------------------
816 OnFatalError( subStream, status, scopedLock );
817 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t subStream,
XRootDStatus status )

On error.

Definition at line 822 of file XrdClStream.cc.

823 {
824 XrdSysMutexHelper scopedLock( pMutex );
825 Log *log = DefaultEnv::GetLog();
826 pSubStreams[subStream]->socket->Close();
827 pSubStreams[subStream]->status = Socket::Disconnected;
828
829 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
830 pStreamName.c_str(), subStream, status.ToString().c_str() );
831
832 //--------------------------------------------------------------------------
833 // Reinsert the stuff that we have failed to sent
834 //--------------------------------------------------------------------------
835 if( pSubStreams[subStream]->outMsgHelper.msg )
836 {
837 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
838 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
839 h.stateful );
840 pIncomingQueue->RemoveMessageHandler(h.handler);
841 pSubStreams[subStream]->outMsgHelper.Reset();
842 }
843
844 //--------------------------------------------------------------------------
845 // Reinsert the receiving handler and reset any partially read partial
846 //--------------------------------------------------------------------------
847 if( pSubStreams[subStream]->inMsgHelper.handler )
848 {
849 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
850 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
851 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
852 if( xrdHandler ) xrdHandler->PartialReceived();
853 h.Reset();
854 }
855
856 //--------------------------------------------------------------------------
857 // We are dealing with an error of a peripheral stream. If we don't have
858 // anything to send don't bother recovering. Otherwise move the requests
859 // to stream 0 if possible.
860 //--------------------------------------------------------------------------
861 if( subStream > 0 )
862 {
863 if( pSubStreams[subStream]->outQueue->IsEmpty() )
864 return;
865
866 if( pSubStreams[0]->status != Socket::Disconnected )
867 {
868 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
869 if( pSubStreams[0]->status == Socket::Connected )
870 {
871 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
872 if( !st.IsOK() )
873 OnFatalError( 0, st, scopedLock );
874 return;
875 }
876 }
877 OnFatalError( subStream, status, scopedLock );
878 return;
879 }
880
881 //--------------------------------------------------------------------------
882 // If we lost the stream 0 we have lost the session, we re-enable the
883 // stream if we still have things in one of the outgoing queues, otherwise
884 // there is not point to recover at this point.
885 //--------------------------------------------------------------------------
886 if( subStream == 0 )
887 {
888 MonitorDisconnection( status );
889
890 SubStreamList::iterator it;
891 size_t outstanding = 0;
892 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
893 outstanding += (*it)->outQueue->GetSizeStateless();
894
895 if( outstanding )
896 {
897 PathID path( 0, 0 );
898 XRootDStatus st = EnableLink( path );
899 if( !st.IsOK() )
900 {
901 OnFatalError( 0, st, scopedLock );
902 return;
903 }
904 }
905
906 //------------------------------------------------------------------------
907 // We're done here, unlock the stream mutex to avoid deadlocks and
908 // report the disconnection event to the handlers
909 //------------------------------------------------------------------------
910 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
911 "message handlers.", pStreamName.c_str() );
912 OutQueue q;
913 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
914 q.GrabStateful( *(*it)->outQueue );
915 scopedLock.UnLock();
916
917 q.Report( status );
918 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
919 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
920 return;
921 }
922 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t subStream,
std::shared_ptr< Message > msg,
uint32_t bytesReceived )

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520 pStreamName.c_str(), msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
kXR_char streamid[2]
Definition XProtocol.hh:914
ServerResponseHeader hdr
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t subStream,
Message * msg,
uint32_t bytesSent )

Definition at line 601 of file XrdClStream.cc.

604 {
605 pTransport->MessageSent( msg, subStream, bytesSent,
606 *pChannelData );
607 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608 pBytesSent += bytesSent;
609 if( h.handler )
610 {
611 // ensure expiration time is assigned if still in queue
612 pIncomingQueue->AssignTimeout( h.handler );
613 // OnStatusReady may cause the handler to delete itself, in
614 // which case the handler or the user callback may also delete msg
615 h.handler->OnStatusReady( msg, XRootDStatus() );
616 }
617 pSubStreams[subStream]->outMsgHelper.Reset();
618 }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t subStream)

On read timeout.

Definition at line 1044 of file XrdClStream.cc.

1045 {
1046 //--------------------------------------------------------------------------
1047 // We only take the main stream into account
1048 //--------------------------------------------------------------------------
1049 if( substream != 0 )
1050 return true;
1051
1052 //--------------------------------------------------------------------------
1053 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1054 // It is assumed that the underlying transport makes sure that there is no
1055 // pending requests that are not answered, ie. all possible virtual streams
1056 // are de-allocated
1057 //--------------------------------------------------------------------------
1058 Log *log = DefaultEnv::GetLog();
1059 SubStreamList::iterator it;
1060 time_t now = time(0);
1061
1062 XrdSysMutexHelper scopedLock( pMutex );
1063 uint32_t outgoingMessages = 0;
1064 time_t lastActivity = 0;
1065 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1066 {
1067 outgoingMessages += (*it)->outQueue->GetSize();
1068 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1069 if( lastActivity < sockLastActivity )
1070 lastActivity = sockLastActivity;
1071 }
1072
1073 if( !outgoingMessages )
1074 {
1075 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1076 *pChannelData );
1077 if( disconnect )
1078 {
1079 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1080 pStreamName.c_str() );
1081 scopedLock.UnLock();
1082 //----------------------------------------------------------------------
1083 // Important note!
1084 //
1085 // This destroys the Stream object itself, the underlined
1086 // AsyncSocketHandler object (that called this method) and the Channel
1087 // object that aggregates this Stream.
1088 //----------------------------------------------------------------------
1090 return false;
1091 }
1092 }
1093
1094 //--------------------------------------------------------------------------
1095 // Check if the stream is broken
1096 //--------------------------------------------------------------------------
1097 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1098 *pChannelData );
1099 if( !st.IsOK() )
1100 {
1101 scopedLock.UnLock();
1102 OnError( substream, st );
1103 return false;
1104 }
1105 return true;
1106 }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t subStream)

Definition at line 545 of file XrdClStream.cc.

546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562
563 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564 "from out-queue to in-queue, starting to send outgoing.",
565 pUrl->GetHostId().c_str(), h.handler,
566 h.msg->GetObfuscatedDescription().c_str() );
567
568 scopedLock.UnLock();
569
570 if( h.handler )
571 {
572 bool rmMsg = false;
573 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574 if( rmMsg )
575 {
576 Log *log = DefaultEnv::GetLog();
577 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578 pStreamName.c_str() );
579 }
580 h.handler->OnReadyToSend( h.msg );
581 }
582 return std::make_pair( h.msg, h.handler );
583 }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t subStream)

On write timeout.

Definition at line 1111 of file XrdClStream.cc.

1112 {
1113 return true;
1114 }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t query,
AnyObject & result )

Query the stream.

Definition at line 1233 of file XrdClStream.cc.

1234 {
1235 switch( query )
1236 {
1238 {
1239 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1240 return Status();
1241 }
1242
1244 {
1245 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1246 return Status();
1247 }
1248
1250 {
1251 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1252 return Status();
1253 }
1254
1255 default:
1256 return Status( stError, errQueryNotSupported );
1257 }
1258 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler * handler)

Register channel event handler.

Definition at line 1119 of file XrdClStream.cc.

1120 {
1121 pChannelEvHandlers.AddHandler( handler );
1122 }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler * handler)

Remove a channel event handler.

Definition at line 1127 of file XrdClStream.cc.

1128 {
1129 pChannelEvHandlers.RemoveHandler( handler );
1130 }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
311 return XRootDStatus( stError, errInvalidSession );
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject * channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116 {
117 pChannelData = channelData;
118 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue * incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108 {
109 pIncomingQueue = incomingQueue;
110 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager * jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132 {
133 pJobManager = jobManager;
134 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > & onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller * poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100 {
101 pPoller = poller;
102 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager * taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124 {
125 pTaskManager = taskManager;
126 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler * transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92 {
93 pTransport = transport;
94 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
389 q.Report( XRootDStatus( stError, errOperationExpired ) );
390 pIncomingQueue->ReportTimeout( now );
391 }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: