XRootD
Loading...
Searching...
No Matches
XrdClStream.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClStream.hh"
26#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClChannel.hh"
29#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClMonitor.hh"
39
40#include <sys/types.h>
41#include <algorithm>
42#include <sys/socket.h>
43#include <sys/time.h>
44
45namespace XrdCl
46{
47 //----------------------------------------------------------------------------
48 // Statics
49 //----------------------------------------------------------------------------
50 RAtomic_uint64_t Stream::sSessCntGen{0};
51
52 //----------------------------------------------------------------------------
53 // Incoming message helper
54 //----------------------------------------------------------------------------
56 {
57 InMessageHelper( Message *message = 0,
58 MsgHandler *hndlr = 0,
59 time_t expir = 0,
60 uint16_t actio = 0 ):
61 msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62 void Reset()
63 {
64 msg = 0; handler = 0; expires = 0; action = 0;
65 }
68 time_t expires;
69 uint16_t action;
70 };
71
72 //----------------------------------------------------------------------------
73 // Sub stream helper
74 //----------------------------------------------------------------------------
92
93 //----------------------------------------------------------------------------
94 // Constructor
95 //----------------------------------------------------------------------------
96 Stream::Stream( const URL *url, const URL &prefer ):
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
149
150 //----------------------------------------------------------------------------
151 // Destructor
152 //----------------------------------------------------------------------------
154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
167
168 //----------------------------------------------------------------------------
169 // Initializer
170 //----------------------------------------------------------------------------
172 {
173 if( !pTransport || !pPoller || !pChannelData )
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
182
183 //------------------------------------------------------------------------
184 // Make sure that the underlying socket handler gets write readiness
185 // events
186 //------------------------------------------------------------------------
188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
293
294 //----------------------------------------------------------------------------
295 // Queue the message for sending
296 //----------------------------------------------------------------------------
298 MsgHandler *handler,
299 bool stateful,
300 time_t expires )
301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetObfuscatedDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetObfuscatedDescription().c_str(), msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
343
344 //----------------------------------------------------------------------------
345 // Force connection
346 //----------------------------------------------------------------------------
348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
359
360 //----------------------------------------------------------------------------
361 // Disconnect the stream
362 //----------------------------------------------------------------------------
363 void Stream::Disconnect( bool /*force*/ )
364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
373
374 //----------------------------------------------------------------------------
375 // Handle a clock event
376 //----------------------------------------------------------------------------
377 void Stream::Tick( time_t now )
378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
390 pIncomingQueue->ReportTimeout( now );
391 }
392}
393
394//------------------------------------------------------------------------------
395// Handle message timeouts and reconnection in the future
396//------------------------------------------------------------------------------
397namespace
398{
399 class StreamConnectorTask: public XrdCl::Task
400 {
401 public:
402 //------------------------------------------------------------------------
403 // Constructor
404 //------------------------------------------------------------------------
405 StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
406 url( url )
407 {
408 std::string name = "StreamConnectorTask for ";
409 name += n;
410 SetName( name );
411 }
412
413 //------------------------------------------------------------------------
414 // Run the task
415 //------------------------------------------------------------------------
416 time_t Run( time_t )
417 {
419 return 0;
420 }
421
422 private:
423 XrdCl::URL url;
424 };
425}
426
427namespace XrdCl
428{
429 XRootDStatus Stream::RequestClose( Message &response )
430 {
431 ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
432 if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
433 Message *msg;
435 MessageUtils::CreateRequest( msg, req );
436 req->requestid = kXR_close;
437 memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
439 msg->SetSessionId( pSessionId );
440 NullResponseHandler *handler = new NullResponseHandler();
441 MessageSendParams params;
442 params.timeout = 0;
443 params.followRedirects = false;
444 params.stateful = true;
446 return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
447 }
448
449 //------------------------------------------------------------------------
450 // Check if message is a partial response
451 //------------------------------------------------------------------------
452 bool Stream::IsPartial( Message &msg )
453 {
454 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
455 if( rsphdr->status == kXR_oksofar )
456 return true;
457
458 if( rsphdr->status == kXR_status )
459 {
460 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
462 return true;
463 }
464
465 return false;
466 }
467
468 //----------------------------------------------------------------------------
469 // Call back when a message has been reconstructed
470 //----------------------------------------------------------------------------
471 void Stream::OnIncoming( uint16_t subStream,
472 std::shared_ptr<Message> msg,
473 uint32_t bytesReceived )
474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520 pStreamName.c_str(), msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
540
541 //----------------------------------------------------------------------------
542 // Call when one of the sockets is ready to accept a new message
543 //----------------------------------------------------------------------------
544 std::pair<Message *, MsgHandler *>
545 Stream::OnReadyToWrite( uint16_t subStream )
546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562
563 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564 "from out-queue to in-queue, starting to send outgoing.",
565 pUrl->GetHostId().c_str(), h.handler,
566 h.msg->GetObfuscatedDescription().c_str() );
567
568 scopedLock.UnLock();
569
570 if( h.handler )
571 {
572 bool rmMsg = false;
573 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574 if( rmMsg )
575 {
576 Log *log = DefaultEnv::GetLog();
577 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578 pStreamName.c_str() );
579 }
580 h.handler->OnReadyToSend( h.msg );
581 }
582 return std::make_pair( h.msg, h.handler );
583 }
584
585 void Stream::DisableIfEmpty( uint16_t subStream )
586 {
587 XrdSysMutexHelper scopedLock( pMutex );
588 Log *log = DefaultEnv::GetLog();
589
590 if( pSubStreams[subStream]->outQueue->IsEmpty() )
591 {
592 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593 pSubStreams[subStream]->socket->GetStreamName().c_str() );
594 pSubStreams[subStream]->socket->DisableUplink();
595 }
596 }
597
598 //----------------------------------------------------------------------------
599 // Call when a message is written to the socket
600 //----------------------------------------------------------------------------
601 void Stream::OnMessageSent( uint16_t subStream,
602 Message *msg,
603 uint32_t bytesSent )
604 {
605 pTransport->MessageSent( msg, subStream, bytesSent,
606 *pChannelData );
607 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608 pBytesSent += bytesSent;
609 if( h.handler )
610 {
611 // ensure expiration time is assigned if still in queue
612 pIncomingQueue->AssignTimeout( h.handler );
613 // OnStatusReady may cause the handler to delete itself, in
614 // which case the handler or the user callback may also delete msg
616 }
617 pSubStreams[subStream]->outMsgHelper.Reset();
618 }
619
620 //----------------------------------------------------------------------------
621 // Call back when a message has been reconstructed
622 //----------------------------------------------------------------------------
623 void Stream::OnConnect( uint16_t subStream )
624 {
625 XrdSysMutexHelper scopedLock( pMutex );
626 pSubStreams[subStream]->status = Socket::Connected;
627
628 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629 Log *log = DefaultEnv::GetLog();
630 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631 subStream, ipstack.c_str() );
632
633 if( subStream == 0 )
634 {
635 pLastStreamError = 0;
636 pLastFatalError = XRootDStatus();
637 pConnectionCount = 0;
638 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639 pSessionId = ++sSessCntGen;
640
641 //------------------------------------------------------------------------
642 // Create the streams if they don't exist yet
643 //------------------------------------------------------------------------
644 if( pSubStreams.size() == 1 && numSub > 1 )
645 {
646 for( uint16_t i = 1; i < numSub; ++i )
647 {
648 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650 pChannelData, i, this );
651 pSubStreams.push_back( new SubStreamData() );
652 pSubStreams[i]->socket = s;
653 }
654 }
655
656 //------------------------------------------------------------------------
657 // Connect the extra streams, if we fail we move all the outgoing items
658 // to stream 0, we don't need to enable the uplink here, because it
659 // should be already enabled after the handshaking process is completed.
660 //------------------------------------------------------------------------
661 if( pSubStreams.size() > 1 )
662 {
663 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664 pStreamName.c_str(), pSubStreams.size() - 1 );
665 for( size_t i = 1; i < pSubStreams.size(); ++i )
666 {
667 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669 if( !st.IsOK() )
670 {
671 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672 pSubStreams[i]->socket->Close();
673 }
674 else
675 {
676 pSubStreams[i]->status = Socket::Connecting;
677 }
678 }
679 }
680
681 //------------------------------------------------------------------------
682 // Inform monitoring
683 //------------------------------------------------------------------------
684 pBytesSent = 0;
685 pBytesReceived = 0;
686 gettimeofday( &pConnectionDone, 0 );
688 if( mon )
689 {
691 i.server = pUrl->GetHostId();
692 i.sTOD = pConnectionStarted;
693 i.eTOD = pConnectionDone;
694 i.streams = pSubStreams.size();
695
696 AnyObject qryResult;
697 std::string *qryResponse = 0;
698 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699 qryResult.Get( qryResponse );
700 i.auth = *qryResponse;
701 delete qryResponse;
702 mon->Event( Monitor::EvConnect, &i );
703 }
704
705 //------------------------------------------------------------------------
706 // For every connected control-stream call the global on-connect handler
707 //------------------------------------------------------------------------
709 }
710 else if( pOnDataConnJob )
711 {
712 //------------------------------------------------------------------------
713 // For every connected data-stream call the on-connect handler
714 //------------------------------------------------------------------------
715 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
716 }
717 }
718
719 //----------------------------------------------------------------------------
720 // On connect error
721 //----------------------------------------------------------------------------
722 void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
723 {
724 XrdSysMutexHelper scopedLock( pMutex );
725 Log *log = DefaultEnv::GetLog();
726 pSubStreams[subStream]->socket->Close();
727 time_t now = ::time(0);
728
729 //--------------------------------------------------------------------------
730 // For every connection error call the global connection error handler
731 //--------------------------------------------------------------------------
733
734 //--------------------------------------------------------------------------
735 // If we connected subStream == 0 and cannot connect >0 then we just give
736 // up and move the outgoing messages to another queue
737 //--------------------------------------------------------------------------
738 if( subStream > 0 )
739 {
740 pSubStreams[subStream]->status = Socket::Disconnected;
741 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
742 if( pSubStreams[0]->status == Socket::Connected )
743 {
744 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
745 if( !st.IsOK() )
746 OnFatalError( 0, st, scopedLock );
747 return;
748 }
749
750 if( pSubStreams[0]->status == Socket::Connecting )
751 return;
752
753 OnFatalError( subStream, status, scopedLock );
754 return;
755 }
756
757 //--------------------------------------------------------------------------
758 // Check if we still have time to try and do something in the current window
759 //--------------------------------------------------------------------------
760 time_t elapsed = now-pConnectionInitTime;
761 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
762 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
763
764 //------------------------------------------------------------------------
765 // If we have some IP addresses left we try them
766 //------------------------------------------------------------------------
767 if( !pAddresses.empty() )
768 {
769 XRootDStatus st;
770 do
771 {
772 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
773 pAddresses.pop_back();
774 pConnectionInitTime = ::time( 0 );
775 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
776 }
777 while( !pAddresses.empty() && !st.IsOK() );
778
779 if( !st.IsOK() )
780 OnFatalError( subStream, st, scopedLock );
781
782 return;
783 }
784 //------------------------------------------------------------------------
785 // If we still can retry with the same host name, we sleep until the end
786 // of the connection window and try
787 //------------------------------------------------------------------------
788 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
789 && !status.IsFatal() )
790 {
791 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
792 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
793
794 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
795 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
796 return;
797 }
798 //--------------------------------------------------------------------------
799 // We are out of the connection window, the only thing we can do here
800 // is re-resolving the host name and retrying if we still can
801 //--------------------------------------------------------------------------
802 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
803 {
804 pAddresses.clear();
805 pSubStreams[0]->status = Socket::Disconnected;
806 PathID path( 0, 0 );
807 XRootDStatus st = EnableLink( path );
808 if( !st.IsOK() )
809 OnFatalError( subStream, st, scopedLock );
810 return;
811 }
812
813 //--------------------------------------------------------------------------
814 // Else, we fail
815 //--------------------------------------------------------------------------
816 OnFatalError( subStream, status, scopedLock );
817 }
818
819 //----------------------------------------------------------------------------
820 // Call back when an error has occurred
821 //----------------------------------------------------------------------------
822 void Stream::OnError( uint16_t subStream, XRootDStatus status )
823 {
824 XrdSysMutexHelper scopedLock( pMutex );
825 Log *log = DefaultEnv::GetLog();
826 pSubStreams[subStream]->socket->Close();
827 pSubStreams[subStream]->status = Socket::Disconnected;
828
829 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
830 pStreamName.c_str(), subStream, status.ToString().c_str() );
831
832 //--------------------------------------------------------------------------
833 // Reinsert the stuff that we have failed to sent
834 //--------------------------------------------------------------------------
835 if( pSubStreams[subStream]->outMsgHelper.msg )
836 {
837 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
838 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
839 h.stateful );
840 pIncomingQueue->RemoveMessageHandler(h.handler);
841 pSubStreams[subStream]->outMsgHelper.Reset();
842 }
843
844 //--------------------------------------------------------------------------
845 // Reinsert the receiving handler and reset any partially read partial
846 //--------------------------------------------------------------------------
847 if( pSubStreams[subStream]->inMsgHelper.handler )
848 {
849 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
850 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
851 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
852 if( xrdHandler ) xrdHandler->PartialReceived();
853 h.Reset();
854 }
855
856 //--------------------------------------------------------------------------
857 // We are dealing with an error of a peripheral stream. If we don't have
858 // anything to send don't bother recovering. Otherwise move the requests
859 // to stream 0 if possible.
860 //--------------------------------------------------------------------------
861 if( subStream > 0 )
862 {
863 if( pSubStreams[subStream]->outQueue->IsEmpty() )
864 return;
865
866 if( pSubStreams[0]->status != Socket::Disconnected )
867 {
868 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
869 if( pSubStreams[0]->status == Socket::Connected )
870 {
871 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
872 if( !st.IsOK() )
873 OnFatalError( 0, st, scopedLock );
874 return;
875 }
876 }
877 OnFatalError( subStream, status, scopedLock );
878 return;
879 }
880
881 //--------------------------------------------------------------------------
882 // If we lost the stream 0 we have lost the session, we re-enable the
883 // stream if we still have things in one of the outgoing queues, otherwise
884 // there is not point to recover at this point.
885 //--------------------------------------------------------------------------
886 if( subStream == 0 )
887 {
888 MonitorDisconnection( status );
889
890 SubStreamList::iterator it;
891 size_t outstanding = 0;
892 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
893 outstanding += (*it)->outQueue->GetSizeStateless();
894
895 if( outstanding )
896 {
897 PathID path( 0, 0 );
898 XRootDStatus st = EnableLink( path );
899 if( !st.IsOK() )
900 {
901 OnFatalError( 0, st, scopedLock );
902 return;
903 }
904 }
905
906 //------------------------------------------------------------------------
907 // We're done here, unlock the stream mutex to avoid deadlocks and
908 // report the disconnection event to the handlers
909 //------------------------------------------------------------------------
910 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
911 "message handlers.", pStreamName.c_str() );
912 OutQueue q;
913 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
914 q.GrabStateful( *(*it)->outQueue );
915 scopedLock.UnLock();
916
917 q.Report( status );
918 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
919 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
920 return;
921 }
922 }
923
924 //------------------------------------------------------------------------
925 // Force error
926 //------------------------------------------------------------------------
927 void Stream::ForceError( XRootDStatus status, bool hush )
928 {
929 XrdSysMutexHelper scopedLock( pMutex );
930 Log *log = DefaultEnv::GetLog();
931 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
932 {
933 if( pSubStreams[substream]->status != Socket::Connected ) continue;
934 pSubStreams[substream]->socket->Close();
935 pSubStreams[substream]->status = Socket::Disconnected;
936
937 if( !hush )
938 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
939 pStreamName.c_str(), status.ToString().c_str() );
940
941 //--------------------------------------------------------------------
942 // Reinsert the stuff that we have failed to sent
943 //--------------------------------------------------------------------
944 if( pSubStreams[substream]->outMsgHelper.msg )
945 {
946 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
947 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
948 h.stateful );
949 pIncomingQueue->RemoveMessageHandler(h.handler);
950 pSubStreams[substream]->outMsgHelper.Reset();
951 }
952
953 //--------------------------------------------------------------------
954 // Reinsert the receiving handler and reset any partially read partial
955 //--------------------------------------------------------------------
956 if( pSubStreams[substream]->inMsgHelper.handler )
957 {
958 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
959 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
960 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
961 if( xrdHandler ) xrdHandler->PartialReceived();
962 h.Reset();
963 }
964 }
965
966 pConnectionCount = 0;
967
968 //------------------------------------------------------------------------
969 // We're done here, unlock the stream mutex to avoid deadlocks and
970 // report the disconnection event to the handlers
971 //------------------------------------------------------------------------
972 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
973 "message handlers.", pStreamName.c_str() );
974
975 SubStreamList::iterator it;
976 OutQueue q;
977 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
978 q.GrabItems( *(*it)->outQueue );
979 scopedLock.UnLock();
980
981 q.Report( status );
982
983 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
984 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
985 }
986
987 //----------------------------------------------------------------------------
988 // On fatal error
989 //----------------------------------------------------------------------------
990 void Stream::OnFatalError( uint16_t subStream,
991 XRootDStatus status,
992 XrdSysMutexHelper &lock )
993 {
994 Log *log = DefaultEnv::GetLog();
995 pSubStreams[subStream]->status = Socket::Disconnected;
996 log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
997 pStreamName.c_str(), status.ToString().c_str() );
998
999 //--------------------------------------------------------------------------
1000 // Don't set the stream error windows for authentication errors as the user
1001 // may refresh his credential at any time
1002 //--------------------------------------------------------------------------
1003 if( status.code != errAuthFailed )
1004 {
1005 pConnectionCount = 0;
1006 pLastStreamError = ::time(0);
1007 pLastFatalError = status;
1008 }
1009
1010 SubStreamList::iterator it;
1011 OutQueue q;
1012 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1013 q.GrabItems( *(*it)->outQueue );
1014 lock.UnLock();
1015
1016 status.status = stFatal;
1017 q.Report( status );
1018 pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1019 pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1020
1021 }
1022
1023 //----------------------------------------------------------------------------
1024 // Inform monitoring about disconnection
1025 //----------------------------------------------------------------------------
1026 void Stream::MonitorDisconnection( XRootDStatus status )
1027 {
1028 Monitor *mon = DefaultEnv::GetMonitor();
1029 if( mon )
1030 {
1031 Monitor::DisconnectInfo i;
1032 i.server = pUrl->GetHostId();
1033 i.rBytes = pBytesReceived;
1034 i.sBytes = pBytesSent;
1035 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1036 i.status = status;
1037 mon->Event( Monitor::EvDisconnect, &i );
1038 }
1039 }
1040
1041 //----------------------------------------------------------------------------
1042 // Call back when a message has been reconstructed
1043 //----------------------------------------------------------------------------
1044 bool Stream::OnReadTimeout( uint16_t substream )
1045 {
1046 //--------------------------------------------------------------------------
1047 // We only take the main stream into account
1048 //--------------------------------------------------------------------------
1049 if( substream != 0 )
1050 return true;
1051
1052 //--------------------------------------------------------------------------
1053 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1054 // It is assumed that the underlying transport makes sure that there is no
1055 // pending requests that are not answered, ie. all possible virtual streams
1056 // are de-allocated
1057 //--------------------------------------------------------------------------
1058 Log *log = DefaultEnv::GetLog();
1059 SubStreamList::iterator it;
1060 time_t now = time(0);
1061
1062 XrdSysMutexHelper scopedLock( pMutex );
1063 uint32_t outgoingMessages = 0;
1064 time_t lastActivity = 0;
1065 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1066 {
1067 outgoingMessages += (*it)->outQueue->GetSize();
1068 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1069 if( lastActivity < sockLastActivity )
1070 lastActivity = sockLastActivity;
1071 }
1072
1073 if( !outgoingMessages )
1074 {
1075 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1076 *pChannelData );
1077 if( disconnect )
1078 {
1079 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1080 pStreamName.c_str() );
1081 scopedLock.UnLock();
1082 //----------------------------------------------------------------------
1083 // Important note!
1084 //
1085 // This destroys the Stream object itself, the underlined
1086 // AsyncSocketHandler object (that called this method) and the Channel
1087 // object that aggregates this Stream.
1088 //----------------------------------------------------------------------
1090 return false;
1091 }
1092 }
1093
1094 //--------------------------------------------------------------------------
1095 // Check if the stream is broken
1096 //--------------------------------------------------------------------------
1097 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1098 *pChannelData );
1099 if( !st.IsOK() )
1100 {
1101 scopedLock.UnLock();
1102 OnError( substream, st );
1103 return false;
1104 }
1105 return true;
1106 }
1107
1108 //----------------------------------------------------------------------------
1109 // Call back when a message has been reconstru
1110 //----------------------------------------------------------------------------
1111 bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1112 {
1113 return true;
1114 }
1115
1116 //----------------------------------------------------------------------------
1117 // Register channel event handler
1118 //----------------------------------------------------------------------------
1120 {
1121 pChannelEvHandlers.AddHandler( handler );
1122 }
1123
1124 //----------------------------------------------------------------------------
1125 // Remove a channel event handler
1126 //----------------------------------------------------------------------------
1128 {
1129 pChannelEvHandlers.RemoveHandler( handler );
1130 }
1131
1132 //----------------------------------------------------------------------------
1133 // Install a incoming message handler
1134 //----------------------------------------------------------------------------
1135 MsgHandler*
1136 Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1137 {
1138 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1139 if( !mh.handler )
1140 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1141 mh.expires,
1142 mh.action );
1143
1144 if( !mh.handler )
1145 return nullptr;
1146
1147 if( mh.action & MsgHandler::Raw )
1148 return mh.handler;
1149 return nullptr;
1150 }
1151
1152 //----------------------------------------------------------------------------
1156 //----------------------------------------------------------------------------
1157 uint16_t Stream::InspectStatusRsp( uint16_t stream,
1158 MsgHandler *&incHandler )
1159 {
1160 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1161 if( !mh.handler )
1163
1164 uint16_t action = mh.handler->InspectStatusRsp();
1165 mh.action |= action;
1166
1167 if( action & MsgHandler::RemoveHandler )
1168 pIncomingQueue->RemoveMessageHandler( mh.handler );
1169
1170 if( action & MsgHandler::Raw )
1171 {
1172 incHandler = mh.handler;
1173 return MsgHandler::Raw;
1174 }
1175
1176 if( action & MsgHandler::Corrupted )
1177 return MsgHandler::Corrupted;
1178
1179 if( action & MsgHandler::More )
1180 return MsgHandler::More;
1181
1182 return MsgHandler::None;
1183 }
1184
1185 //----------------------------------------------------------------------------
1186 // Check if channel can be collapsed using given URL
1187 //----------------------------------------------------------------------------
1188 bool Stream::CanCollapse( const URL &url )
1189 {
1190 Log *log = DefaultEnv::GetLog();
1191
1192 //--------------------------------------------------------------------------
1193 // Resolve all the addresses of the host we're supposed to connect to
1194 //--------------------------------------------------------------------------
1195 std::vector<XrdNetAddr> prefaddrs;
1196 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1197 if( !st.IsOK() )
1198 {
1199 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1200 , pStreamName.c_str(), url.GetHostName().c_str() );
1201 return false;
1202 }
1203
1204 //--------------------------------------------------------------------------
1205 // Resolve all the addresses of the alias
1206 //--------------------------------------------------------------------------
1207 std::vector<XrdNetAddr> aliasaddrs;
1208 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1209 if( !st.IsOK() )
1210 {
1211 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1212 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1213 return false;
1214 }
1215
1216 //--------------------------------------------------------------------------
1217 // Now check if the preferred host is part of the alias
1218 //--------------------------------------------------------------------------
1219 auto itr = prefaddrs.begin();
1220 for( ; itr != prefaddrs.end() ; ++itr )
1221 {
1222 auto itr2 = aliasaddrs.begin();
1223 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1224 if( itr->Same( &*itr2 ) ) return true;
1225 }
1226
1227 return false;
1228 }
1229
1230 //------------------------------------------------------------------------
1231 // Query the stream
1232 //------------------------------------------------------------------------
1233 Status Stream::Query( uint16_t query, AnyObject &result )
1234 {
1235 switch( query )
1236 {
1238 {
1239 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1240 return Status();
1241 }
1242
1244 {
1245 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1246 return Status();
1247 }
1248
1250 {
1251 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1252 return Status();
1253 }
1254
1255 default:
1257 }
1258 }
1259
1260}
union ServerResponse::@0 body
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
kXR_char fhandle[4]
Definition XProtocol.hh:229
@ kXR_close
Definition XProtocol.hh:115
ServerResponseHeader hdr
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:452
Random utilities.
Definition XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
@ kXR_PartialResult
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.