XRootD
Loading...
Searching...
No Matches
XrdClXRootDTransport.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
28#include "XrdCl/XrdClSocket.hh"
29#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClTls.hh"
35#include "XrdNet/XrdNetAddr.hh"
36#include "XrdNet/XrdNetUtils.hh"
39#include "XrdOuc/XrdOucUtils.hh"
40#include "XrdOuc/XrdOucCRC.hh"
42#include "XrdSys/XrdSysTimer.hh"
47#include "XrdSys/XrdSysE2T.hh"
48#include "XrdCl/XrdClTls.hh"
49#include "XrdCl/XrdClSocket.hh"
51#include "XrdVersion.hh"
52
53#include <arpa/inet.h>
54#include <sys/types.h>
55#include <unistd.h>
56#include <dlfcn.h>
57#include <sstream>
58#include <iomanip>
59#include <set>
60#include <limits>
61
62#include <atomic>
63
65
66namespace XrdCl
67{
69 {
71
72 static void UnloadHandler()
73 {
74 UnloadHandler( "root" );
75 UnloadHandler( "xroot" );
76 }
77
78 static void UnloadHandler( const std::string &trProt )
79 {
81 TransportHandler *trHandler = trManager->GetHandler( trProt );
82 trHandler->WaitBeforeExit();
83 }
84
85 void Register( const std::string &protocol )
86 {
87 XrdSysRWLockHelper scope( lock, false ); // obtain write lock
88 std::pair< std::set<std::string>::iterator, bool > ret = protocols.insert( protocol );
89 // if that's the first time we are using the protocol, the sec lib
90 // was just loaded so now's the time to register the atexit handler
91 if( ret.second )
92 {
93 atexit( UnloadHandler );
94 }
95 }
96
99 std::set<std::string> protocols;
100 };
101
102 //----------------------------------------------------------------------------
104 //----------------------------------------------------------------------------
106 {
107 //--------------------------------------------------------------------------
108 // Define the stream status for the link negotiation purposes
109 //--------------------------------------------------------------------------
122
123 //--------------------------------------------------------------------------
124 // Constructor
125 //--------------------------------------------------------------------------
129
131 uint8_t pathId;
132 };
133
134 //----------------------------------------------------------------------------
136 //----------------------------------------------------------------------------
138 {
139 StreamSelector( uint16_t size )
140 {
141 //----------------------------------------------------------------------
142 // Subtract one because we shouldn't take into account the control
143 // stream.
144 //----------------------------------------------------------------------
145 strmqueues.resize( size - 1, 0 );
146 }
147
148 //------------------------------------------------------------------------
149 // @param size : number of streams
150 //------------------------------------------------------------------------
151 void AdjustQueues( uint16_t size )
152 {
153 strmqueues.resize( size - 1, 0);
154 }
155
156 //------------------------------------------------------------------------
157 // @param connected : bitarray stating if given sub-stream is connected
158 //
159 // @return : substream number
160 //------------------------------------------------------------------------
161 uint16_t Select( const std::vector<bool> &connected )
162 {
163 uint16_t ret = 0;
164 size_t minval = std::numeric_limits<size_t>::max();
165
166 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
167 {
168 if( !connected[i] ) continue;
169
170 if( strmqueues[i] < minval )
171 {
172 ret = i;
173 minval = strmqueues[i];
174 }
175 }
176
177 ++strmqueues[ret];
178 return ret + 1;
179 }
180
181 //--------------------------------------------------------------------------
182 // Update queue for given substream
183 //--------------------------------------------------------------------------
184 void MsgReceived( uint16_t substrm )
185 {
186 if( substrm > 0 )
187 --strmqueues[substrm - 1];
188 }
189
190 private:
191
192 std::vector<size_t> strmqueues;
193 };
194
196 {
197 BindPrefSelector( std::vector<std::string> && bindprefs ) :
198 bindprefs( std::move( bindprefs ) ), next( 0 )
199 {
200 }
201
202 inline const std::string& Get()
203 {
204 std::string &ret = bindprefs[next];
205 ++next;
206 if( next >= bindprefs.size() )
207 next = 0;
208 return ret;
209 }
210
211 private:
212 std::vector<std::string> bindprefs;
213 size_t next;
214 };
215
216 //----------------------------------------------------------------------------
218 //----------------------------------------------------------------------------
220 {
221 //--------------------------------------------------------------------------
222 // Constructor
223 //--------------------------------------------------------------------------
224 XRootDChannelInfo( const URL &url ):
225 serverFlags(0),
227 firstLogIn(true),
228 authBuffer(0),
229 authProtocol(0),
230 authParams(0),
231 authEnv(0),
232 finstcnt(0),
233 openFiles(0),
234 waitBarrier(0),
235 protection(0),
236 protRespBody(0),
237 protRespSize(0),
238 encrypted(false),
239 istpc(false)
240 {
242 memset( sessionId, 0, 16 );
243 memset( oldSessionId, 0, 16 );
244 }
245
246 //--------------------------------------------------------------------------
247 // Destructor
248 //--------------------------------------------------------------------------
250 {
251 delete [] authBuffer;
252 }
253
254 typedef std::vector<XRootDStreamInfo> StreamInfoVector;
255
256 //--------------------------------------------------------------------------
257 // Data
258 //--------------------------------------------------------------------------
259 uint32_t serverFlags;
261 uint8_t sessionId[16];
262 uint8_t oldSessionId[16];
264 std::shared_ptr<SIDManager> sidManager;
270 std::string streamName;
271 std::string authProtocolName;
272 std::set<uint16_t> sentOpens;
273 std::set<uint16_t> sentCloses;
274 std::atomic<uint32_t> finstcnt; // file instance count
275 uint32_t openFiles;
279 unsigned int protRespSize;
280 std::unique_ptr<StreamSelector> strmSelector;
282 bool istpc;
283 std::unique_ptr<BindPrefSelector> bindSelector;
284 std::string logintoken;
286 };
287
288 //----------------------------------------------------------------------------
289 // Constructor
290 //----------------------------------------------------------------------------
292 pSecUnloadHandler( new PluginUnloadHandler() )
293 {
294 }
295
296 //----------------------------------------------------------------------------
297 // Destructor
298 //----------------------------------------------------------------------------
300 {
301 delete pSecUnloadHandler; pSecUnloadHandler = 0;
302 }
303
304 //----------------------------------------------------------------------------
305 // Read message header from socket
306 //----------------------------------------------------------------------------
308 {
309 //--------------------------------------------------------------------------
310 // A new message - allocate the space needed for the header
311 //--------------------------------------------------------------------------
312 if( message.GetCursor() == 0 && message.GetSize() < 8 )
313 message.Allocate( 8 );
314
315 //--------------------------------------------------------------------------
316 // Read the message header
317 //--------------------------------------------------------------------------
318 if( message.GetCursor() < 8 )
319 {
320 size_t leftToBeRead = 8 - message.GetCursor();
321 while( leftToBeRead )
322 {
323 int bytesRead = 0;
324 XRootDStatus status = socket->Read( message.GetBufferAtCursor(),
325 leftToBeRead, bytesRead );
326 if( !status.IsOK() || status.code == suRetry )
327 return status;
328
329 leftToBeRead -= bytesRead;
330 message.AdvanceCursor( bytesRead );
331 }
332 UnMarshallHeader( message );
333
334 uint32_t bodySize = *(uint32_t*)(message.GetBuffer(4));
335 Log *log = DefaultEnv::GetLog();
336 log->Dump( XRootDTransportMsg, "[msg: %p] Expecting %d bytes of message "
337 "body", &message, bodySize );
338
339 return XRootDStatus( stOK, suDone );
340 }
342 }
343
344 //----------------------------------------------------------------------------
345 // Read message body from socket
346 //----------------------------------------------------------------------------
348 {
349 //--------------------------------------------------------------------------
350 // Retrieve the body
351 //--------------------------------------------------------------------------
352 size_t leftToBeRead = 0;
353 uint32_t bodySize = 0;
355 bodySize = rsphdr->dlen;
356
357 if( message.GetSize() < bodySize + 8 )
358 message.ReAllocate( bodySize + 8 );
359
360 leftToBeRead = bodySize-(message.GetCursor()-8);
361 while( leftToBeRead )
362 {
363 int bytesRead = 0;
364 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
365
366 if( !status.IsOK() || status.code == suRetry )
367 return status;
368
369 leftToBeRead -= bytesRead;
370 message.AdvanceCursor( bytesRead );
371 }
372
373 return XRootDStatus( stOK, suDone );
374 }
375
376 //----------------------------------------------------------------------------
377 // Read more of the message body from socket
378 //----------------------------------------------------------------------------
380 {
382 if( rsphdr->status != kXR_status )
384
385 //--------------------------------------------------------------------------
386 // In case of non kXR_status responses we read all the response, including
387 // data. For kXR_status responses we first read only the remainder of the
388 // header. The header must then be unmarshalled, and then a second call to
389 // GetMore (repeated for suRetry as needed) will read the data.
390 //--------------------------------------------------------------------------
391
392 uint32_t bodySize = rsphdr->dlen;
393 if( bodySize+8 < sizeof( ServerResponseStatus ) )
395 "kXR_status: invalid message size." );
396
398 bodySize += rspst->bdy.dlen;
399
400 if( message.GetSize() < bodySize + 8 )
401 message.ReAllocate( bodySize + 8 );
402
403 size_t leftToBeRead = bodySize-(message.GetCursor()-8);
404 while( leftToBeRead )
405 {
406 int bytesRead = 0;
407 XRootDStatus status = socket->Read( message.GetBufferAtCursor(), leftToBeRead, bytesRead );
408
409 if( !status.IsOK() || status.code == suRetry )
410 return status;
411
412 leftToBeRead -= bytesRead;
413 message.AdvanceCursor( bytesRead );
414 }
415
416 // Unmarchal to message body
417 Log *log = DefaultEnv::GetLog();
419 if( !st.IsOK() && st.code == errDataError )
420 {
421 log->Error( XRootDTransportMsg, "[msg: %p] %s", &message,
422 st.GetErrorMessage().c_str() );
423 return st;
424 }
425
426 if( !st.IsOK() )
427 {
428 log->Error( XRootDTransportMsg, "[msg: %p] Failed to unmarshall status body.",
429 &message );
430 return st;
431 }
432
433 return XRootDStatus( stOK, suDone );
434 }
435
436 //----------------------------------------------------------------------------
437 // Initialize channel
438 //----------------------------------------------------------------------------
440 AnyObject &channelData )
441 {
442 XRootDChannelInfo *info = new XRootDChannelInfo( url );
443 XrdSysMutexHelper scopedLock( info->mutex );
444 channelData.Set( info );
445
446 Env *env = DefaultEnv::GetEnv();
447 int streams = DefaultSubStreamsPerChannel;
448 env->GetInt( "SubStreamsPerChannel", streams );
449 if( streams < 1 ) streams = 1;
450 info->stream.resize( streams );
451 info->strmSelector.reset( new StreamSelector( streams ) );
452 info->encrypted = url.IsSecure();
453 info->istpc = url.IsTPC();
454 info->logintoken = url.GetLoginToken();
455 }
456
457 //----------------------------------------------------------------------------
458 // Finalize channel
459 //----------------------------------------------------------------------------
463
464 //----------------------------------------------------------------------------
465 // HandShake
466 //----------------------------------------------------------------------------
468 AnyObject &channelData )
469 {
470 XRootDChannelInfo *info = 0;
471 channelData.Get( info );
472 XrdSysMutexHelper scopedLock( info->mutex );
473
474 if( info->stream.size() <= handShakeData->subStreamId )
475 {
476 Log *log = DefaultEnv::GetLog();
478 "[%s] Internal error: not enough substreams",
479 handShakeData->streamName.c_str() );
481 }
482
483 if( handShakeData->subStreamId == 0 )
484 {
485 info->streamName = handShakeData->streamName;
486 return HandShakeMain( handShakeData, channelData );
487 }
488 return HandShakeParallel( handShakeData, channelData );
489 }
490
491 //----------------------------------------------------------------------------
492 // Hand shake the main stream
493 //----------------------------------------------------------------------------
494 XRootDStatus XRootDTransport::HandShakeMain( HandShakeData *handShakeData,
495 AnyObject &channelData )
496 {
497 XRootDChannelInfo *info = 0;
498 channelData.Get( info );
499 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
500
501 //--------------------------------------------------------------------------
502 // First step - we need to create and initial handshake and send it out
503 //--------------------------------------------------------------------------
506 {
507 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
510 return XRootDStatus( stOK, suContinue );
511 }
512
513 //--------------------------------------------------------------------------
514 // Second step - we got the reply message to the initial handshake
515 //--------------------------------------------------------------------------
517 {
518 XRootDStatus st = ProcessServerHS( handShakeData, info );
519 if( st.IsOK() )
521 else
523 return st;
524 }
525
526 //--------------------------------------------------------------------------
527 // Third step - we got the response to the protocol request, we need
528 // to process it and send out a login request
529 //--------------------------------------------------------------------------
531 {
532 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
533
534 if( !st.IsOK() )
535 {
537 return st;
538 }
539
540 if( st.code == suRetry )
541 {
542 handShakeData->out = GenerateProtocol( handShakeData, info,
545 return XRootDStatus( stOK, suRetry );
546 }
547
548 handShakeData->out = GenerateLogIn( handShakeData, info );
550 return XRootDStatus( stOK, suContinue );
551 }
552
553 //--------------------------------------------------------------------------
554 // Fourth step - handle the log in response and proceed with the
555 // authentication if required by the server
556 //--------------------------------------------------------------------------
558 {
559 XRootDStatus st = ProcessLogInResp( handShakeData, info );
560
561 if( !st.IsOK() )
562 {
564 return st;
565 }
566
567 if( st.IsOK() && st.code == suDone )
568 {
569 //----------------------------------------------------------------------
570 // If it's not our first log in we need to end the previous session
571 // to make sure that the server noticed our disconnection and closed
572 // all the writable handles that we owned
573 //----------------------------------------------------------------------
574 if( !info->firstLogIn )
575 {
576 handShakeData->out = GenerateEndSession( handShakeData, info );
578 return XRootDStatus( stOK, suContinue );
579 }
580
582 info->firstLogIn = false;
583 return st;
584 }
585
586 st = DoAuthentication( handShakeData, info );
587 if( !st.IsOK() )
589 else
591 return st;
592 }
593
594 //--------------------------------------------------------------------------
595 // Fifth step and later - proceed with the authentication
596 //--------------------------------------------------------------------------
598 {
599 XRootDStatus st = DoAuthentication( handShakeData, info );
600
601 if( !st.IsOK() )
602 {
604 return st;
605 }
606
607 if( st.IsOK() && st.code == suDone )
608 {
609 //----------------------------------------------------------------------
610 // If it's not our first log in we need to end the previous session
611 //----------------------------------------------------------------------
612 if( !info->firstLogIn )
613 {
614 handShakeData->out = GenerateEndSession( handShakeData, info );
616 return XRootDStatus( stOK, suContinue );
617 }
618
620 info->firstLogIn = false;
621 return st;
622 }
623
624 return st;
625 }
626
627 //--------------------------------------------------------------------------
628 // The last step - kXR_endsess returned
629 //--------------------------------------------------------------------------
631 {
632 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
633
634 if( st.IsOK() && st.code == suDone )
635 {
637 }
638 else if( !st.IsOK() )
639 {
641 }
642
643 return st;
644 }
645
646 return XRootDStatus( stOK, suDone );
647 }
648
649 //----------------------------------------------------------------------------
650 // Hand shake parallel stream
651 //----------------------------------------------------------------------------
652 XRootDStatus XRootDTransport::HandShakeParallel( HandShakeData *handShakeData,
653 AnyObject &channelData )
654 {
655 XRootDChannelInfo *info = 0;
656 channelData.Get( info );
657
658 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
659
660 //--------------------------------------------------------------------------
661 // First step - we need to create and initial handshake and send it out
662 //--------------------------------------------------------------------------
663 if( sInfo.status == XRootDStreamInfo::Disconnected ||
664 sInfo.status == XRootDStreamInfo::Broken )
665 {
666 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
668 sInfo.status = XRootDStreamInfo::HandShakeSent;
669 return XRootDStatus( stOK, suContinue );
670 }
671
672 //--------------------------------------------------------------------------
673 // Second step - we got the reply message to the initial handshake,
674 // if successful we need to send bind
675 //--------------------------------------------------------------------------
676 if( sInfo.status == XRootDStreamInfo::HandShakeSent )
677 {
678 XRootDStatus st = ProcessServerHS( handShakeData, info );
679 if( st.IsOK() )
681 else
682 sInfo.status = XRootDStreamInfo::Broken;
683 return st;
684 }
685
686 //--------------------------------------------------------------------------
687 // Second step bis - we got the response to the protocol request, we need
688 // to process it and send out a bind request
689 //--------------------------------------------------------------------------
690 if( sInfo.status == XRootDStreamInfo::HandShakeReceived )
691 {
692 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
693
694 if( !st.IsOK() )
695 {
696 sInfo.status = XRootDStreamInfo::Broken;
697 return st;
698 }
699
700 handShakeData->out = GenerateBind( handShakeData, info );
701 sInfo.status = XRootDStreamInfo::BindSent;
702 return XRootDStatus( stOK, suContinue );
703 }
704
705 //--------------------------------------------------------------------------
706 // Third step - we got the response to the kXR_bind
707 //--------------------------------------------------------------------------
708 if( sInfo.status == XRootDStreamInfo::BindSent )
709 {
710 XRootDStatus st = ProcessBindResp( handShakeData, info );
711
712 if( !st.IsOK() )
713 {
714 sInfo.status = XRootDStreamInfo::Broken;
715 return st;
716 }
717 sInfo.status = XRootDStreamInfo::Connected;
718 return XRootDStatus();
719 }
720 return XRootDStatus();
721 }
722
723 //------------------------------------------------------------------------
724 // @return true if handshake has been done and stream is connected,
725 // false otherwise
726 //------------------------------------------------------------------------
728 AnyObject &channelData )
729 {
730 XRootDChannelInfo *info = 0;
731 channelData.Get( info );
732 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
733 return ( sInfo.status == XRootDStreamInfo::Connected );
734 }
735
736 //----------------------------------------------------------------------------
737 // Check if the stream should be disconnected
738 //----------------------------------------------------------------------------
739 bool XRootDTransport::IsStreamTTLElapsed( time_t inactiveTime,
740 AnyObject &channelData )
741 {
742 XRootDChannelInfo *info = 0;
743 channelData.Get( info );
744 Env *env = DefaultEnv::GetEnv();
745 Log *log = DefaultEnv::GetLog();
746
747 //--------------------------------------------------------------------------
748 // Check the TTL settings for the current server
749 //--------------------------------------------------------------------------
750 int ttl;
751 if( info->serverFlags & kXR_isServer )
752 {
754 env->GetInt( "DataServerTTL", ttl );
755 }
756 else
757 {
759 env->GetInt( "LoadBalancerTTL", ttl );
760 }
761
762 //--------------------------------------------------------------------------
763 // See whether we can give a go-ahead for the disconnection
764 //--------------------------------------------------------------------------
765 XrdSysMutexHelper scopedLock( info->mutex );
766 uint16_t allocatedSIDs = info->sidManager->GetNumberOfAllocatedSIDs();
767 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
768 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
769 info->streamName.c_str(), (long long) inactiveTime, ttl, allocatedSIDs,
770 info->openFiles, info->finstcnt.load( std::memory_order_relaxed ) );
771
772 if( info->openFiles != 0 && info->finstcnt.load( std::memory_order_relaxed ) != 0 )
773 return false;
774
775 if( !allocatedSIDs && inactiveTime > ttl )
776 return true;
777
778 return false;
779 }
780
781 //----------------------------------------------------------------------------
782 // Check the stream is broken - ie. TCP connection got broken and
783 // went undetected by the TCP stack
784 //----------------------------------------------------------------------------
786 AnyObject &channelData )
787 {
788 XRootDChannelInfo *info = 0;
789 channelData.Get( info );
790 Env *env = DefaultEnv::GetEnv();
791 Log *log = DefaultEnv::GetLog();
792
793 int streamTimeout = DefaultStreamTimeout;
794 env->GetInt( "StreamTimeout", streamTimeout );
795
796 XrdSysMutexHelper scopedLock( info->mutex );
797
798 const time_t now = time(0);
799 const bool anySID =
800 info->sidManager->IsAnySIDOldAs( now - streamTimeout );
801
802 log->Dump( XRootDTransportMsg, "[%s] Stream inactive since %lld seconds, "
803 "stream timeout: %d, any SID: %d, wait barrier: %s",
804 info->streamName.c_str(), (long long) inactiveTime, streamTimeout,
805 anySID, Utils::TimeToString(info->waitBarrier).c_str() );
806
807 if( inactiveTime < streamTimeout )
808 return Status();
809
810 if( now < info->waitBarrier )
811 return Status();
812
813 if( !anySID )
814 return Status();
815
817 }
818
819 //----------------------------------------------------------------------------
820 // Multiplex
821 //----------------------------------------------------------------------------
823 {
824 return PathID( 0, 0 );
825 }
826
827 //----------------------------------------------------------------------------
828 // Multiplex
829 //----------------------------------------------------------------------------
831 AnyObject &channelData,
832 PathID *hint )
833 {
834 XRootDChannelInfo *info = 0;
835 channelData.Get( info );
836 XrdSysMutexHelper scopedLock( info->mutex );
837
838 //--------------------------------------------------------------------------
839 // If we're not connected to a data server or we don't know that yet
840 // we stream through 0
841 //--------------------------------------------------------------------------
842 if( !(info->serverFlags & kXR_isServer) || info->stream.size() == 0 )
843 return PathID( 0, 0 );
844
845 //--------------------------------------------------------------------------
846 // Select the streams
847 //--------------------------------------------------------------------------
848 Log *log = DefaultEnv::GetLog();
849 uint16_t upStream = 0;
850 uint16_t downStream = 0;
851
852 if( hint )
853 {
854 upStream = hint->up;
855 downStream = hint->down;
856 }
857 else
858 {
859 upStream = 0;
860 std::vector<bool> connected;
861 connected.reserve( info->stream.size() - 1 );
862 size_t nbConnected = 0;
863 for( size_t i = 1; i < info->stream.size(); ++i )
864 if( info->stream[i].status == XRootDStreamInfo::Connected )
865 {
866 connected.push_back( true );
867 ++nbConnected;
868 }
869 else
870 connected.push_back( false );
871
872 if( nbConnected == 0 )
873 downStream = 0;
874 else
875 downStream = info->strmSelector->Select( connected );
876 }
877
878 if( upStream >= info->stream.size() )
879 {
881 "[%s] Up link stream %d does not exist, using 0",
882 info->streamName.c_str(), upStream );
883 upStream = 0;
884 }
885
886 if( downStream >= info->stream.size() )
887 {
889 "[%s] Down link stream %d does not exist, using 0",
890 info->streamName.c_str(), downStream );
891 downStream = 0;
892 }
893
894 //--------------------------------------------------------------------------
895 // Modify the message
896 //--------------------------------------------------------------------------
897 UnMarshallRequest( msg );
899 switch( hdr->requestid )
900 {
901 //------------------------------------------------------------------------
902 // Read - we update the path id to tell the server where we want to
903 // get the response, but we still send the request through stream 0
904 // We need to allocate space for read_args if we don't have it
905 // included yet
906 //------------------------------------------------------------------------
907 case kXR_read:
908 {
909 if( msg->GetSize() < sizeof(ClientReadRequest) + 8 )
910 {
911 msg->ReAllocate( sizeof(ClientReadRequest) + 8 );
912 void *newBuf = msg->GetBuffer(sizeof(ClientReadRequest));
913 memset( newBuf, 0, 8 );
915 req->dlen += 8;
916 }
917 read_args *args = (read_args*)msg->GetBuffer(sizeof(ClientReadRequest));
918 args->pathid = info->stream[downStream].pathId;
919 break;
920 }
921
922
923 //------------------------------------------------------------------------
924 // PgRead - we update the path id to tell the server where we want to
925 // get the response, but we still send the request through stream 0
926 // We need to allocate space for ClientPgReadReqArgs if we don't have it
927 // included yet
928 //------------------------------------------------------------------------
929 case kXR_pgread:
930 {
931 if( msg->GetSize() < sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) )
932 {
933 msg->ReAllocate( sizeof( ClientPgReadRequest ) + sizeof( ClientPgReadReqArgs ) );
934 void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
935 memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
937 req->dlen += sizeof( ClientPgReadReqArgs );
938 }
939 ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
940 msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
941 args->pathid = info->stream[downStream].pathId;
942 break;
943 }
944
945 //------------------------------------------------------------------------
946 // ReadV - the situation is identical to read but we don't need any
947 // additional structures to specify the return path
948 //------------------------------------------------------------------------
949 case kXR_readv:
950 {
952 req->pathid = info->stream[downStream].pathId;
953 break;
954 }
955
956 //------------------------------------------------------------------------
957 // Write - multiplexing writes doesn't work properly in the server
958 //------------------------------------------------------------------------
959 case kXR_write:
960 {
961// ClientWriteRequest *req = (ClientWriteRequest*)msg->GetBuffer();
962// req->pathid = info->stream[downStream].pathId;
963 break;
964 }
965
966 //------------------------------------------------------------------------
967 // WriteV - multiplexing writes doesn't work properly in the server
968 //------------------------------------------------------------------------
969 case kXR_writev:
970 {
971// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
972// req->pathid = info->stream[downStream].pathId;
973 break;
974 }
975
976 //------------------------------------------------------------------------
977 // PgWrite - multiplexing writes doesn't work properly in the server
978 //------------------------------------------------------------------------
979 case kXR_pgwrite:
980 {
981// ClientWriteVRequest *req = (ClientWriteVRequest*)msg->GetBuffer();
982// req->pathid = info->stream[downStream].pathId;
983 break;
984 }
985 };
986 MarshallRequest( msg );
987 return PathID( upStream, downStream );
988 }
989
990 //----------------------------------------------------------------------------
991 // Return a number of substreams per stream that should be created
992 // This depends on the environment and whether we are connected to
993 // a data server or not
994 //----------------------------------------------------------------------------
996 {
997 XRootDChannelInfo *info = 0;
998 channelData.Get( info );
999 XrdSysMutexHelper scopedLock( info->mutex );
1000
1001 //--------------------------------------------------------------------------
1002 // If the connection has been opened in order to orchestrate a TPC or
1003 // the remote server is a Manager or Metamanager we will need only one
1004 // (control) stream.
1005 //--------------------------------------------------------------------------
1006 if( info->istpc || !(info->serverFlags & kXR_isServer ) ) return 1;
1007
1008 //--------------------------------------------------------------------------
1009 // Number of streams requested by user
1010 //--------------------------------------------------------------------------
1011 uint16_t ret = info->stream.size();
1012
1014 int nodata = DefaultTlsNoData;
1015 env->GetInt( "TlsNoData", nodata );
1016
1017 // Does the server require the stream 0 to be encrypted?
1018 bool srvTlsStrm0 = ( info->serverFlags & kXR_gotoTLS ) ||
1019 ( info->serverFlags & kXR_tlsLogin ) ||
1020 ( info->serverFlags & kXR_tlsSess );
1021 // Does the server NOT require the data streams to be encrypted?
1022 bool srvNoTlsData = !( info->serverFlags & kXR_tlsData );
1023 // Does the user require the stream 0 to be encrypted?
1024 bool usrTlsStrm0 = info->encrypted;
1025 // Does the user NOT require the data streams to be encrypted?
1026 bool usrNoTlsData = !info->encrypted || ( info->encrypted && nodata );
1027
1028 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1029 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1030 {
1031 //------------------------------------------------------------------------
1032 // The server or user asked us to encrypt stream 0, but to send the data
1033 // (read/write) using a plain TCP connection
1034 //------------------------------------------------------------------------
1035 if( ret == 1 ) ++ret;
1036 }
1037
1038 if( ret > info->stream.size() )
1039 {
1040 info->stream.resize( ret );
1041 info->strmSelector->AdjustQueues( ret );
1042 }
1043
1044 return ret;
1045 }
1046
1047 //----------------------------------------------------------------------------
1048 // Marshall
1049 //----------------------------------------------------------------------------
1051 {
1052 ClientRequest *req = (ClientRequest*)msg;
1053 switch( req->header.requestid )
1054 {
1055 //------------------------------------------------------------------------
1056 // kXR_protocol
1057 //------------------------------------------------------------------------
1058 case kXR_protocol:
1059 req->protocol.clientpv = htonl( req->protocol.clientpv );
1060 break;
1061
1062 //------------------------------------------------------------------------
1063 // kXR_login
1064 //------------------------------------------------------------------------
1065 case kXR_login:
1066 req->login.pid = htonl( req->login.pid );
1067 break;
1068
1069 //------------------------------------------------------------------------
1070 // kXR_locate
1071 //------------------------------------------------------------------------
1072 case kXR_locate:
1073 req->locate.options = htons( req->locate.options );
1074 break;
1075
1076 //------------------------------------------------------------------------
1077 // kXR_query
1078 //------------------------------------------------------------------------
1079 case kXR_query:
1080 req->query.infotype = htons( req->query.infotype );
1081 break;
1082
1083 //------------------------------------------------------------------------
1084 // kXR_truncate
1085 //------------------------------------------------------------------------
1086 case kXR_truncate:
1087 req->truncate.offset = htonll( req->truncate.offset );
1088 break;
1089
1090 //------------------------------------------------------------------------
1091 // kXR_mkdir
1092 //------------------------------------------------------------------------
1093 case kXR_mkdir:
1094 req->mkdir.mode = htons( req->mkdir.mode );
1095 break;
1096
1097 //------------------------------------------------------------------------
1098 // kXR_chmod
1099 //------------------------------------------------------------------------
1100 case kXR_chmod:
1101 req->chmod.mode = htons( req->chmod.mode );
1102 break;
1103
1104 //------------------------------------------------------------------------
1105 // kXR_open
1106 //------------------------------------------------------------------------
1107 case kXR_open:
1108 req->open.mode = htons( req->open.mode );
1109 req->open.options = htons( req->open.options );
1110 break;
1111
1112 //------------------------------------------------------------------------
1113 // kXR_read
1114 //------------------------------------------------------------------------
1115 case kXR_read:
1116 req->read.offset = htonll( req->read.offset );
1117 req->read.rlen = htonl( req->read.rlen );
1118 break;
1119
1120 //------------------------------------------------------------------------
1121 // kXR_write
1122 //------------------------------------------------------------------------
1123 case kXR_write:
1124 req->write.offset = htonll( req->write.offset );
1125 break;
1126
1127 //------------------------------------------------------------------------
1128 // kXR_mv
1129 //------------------------------------------------------------------------
1130 case kXR_mv:
1131 req->mv.arg1len = htons( req->mv.arg1len );
1132 break;
1133
1134 //------------------------------------------------------------------------
1135 // kXR_readv
1136 //------------------------------------------------------------------------
1137 case kXR_readv:
1138 {
1139 uint16_t numChunks = (req->readv.dlen)/16;
1140 readahead_list *dataChunk = (readahead_list*)( msg + 24 );
1141 for( size_t i = 0; i < numChunks; ++i )
1142 {
1143 dataChunk[i].rlen = htonl( dataChunk[i].rlen );
1144 dataChunk[i].offset = htonll( dataChunk[i].offset );
1145 }
1146 break;
1147 }
1148
1149 //------------------------------------------------------------------------
1150 // kXR_writev
1151 //------------------------------------------------------------------------
1152 case kXR_writev:
1153 {
1154 uint16_t numChunks = (req->writev.dlen)/16;
1155 XrdProto::write_list *wrtList =
1156 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
1157 for( size_t i = 0; i < numChunks; ++i )
1158 {
1159 wrtList[i].wlen = htonl( wrtList[i].wlen );
1160 wrtList[i].offset = htonll( wrtList[i].offset );
1161 }
1162
1163 break;
1164 }
1165
1166 case kXR_pgread:
1167 {
1168 req->pgread.offset = htonll( req->pgread.offset );
1169 req->pgread.rlen = htonl( req->pgread.rlen );
1170 break;
1171 }
1172
1173 case kXR_pgwrite:
1174 {
1175 req->pgwrite.offset = htonll( req->pgwrite.offset );
1176 break;
1177 }
1178
1179 //------------------------------------------------------------------------
1180 // kXR_prepare
1181 //------------------------------------------------------------------------
1182 case kXR_prepare:
1183 {
1184 req->prepare.optionX = htons( req->prepare.optionX );
1185 req->prepare.port = htons( req->prepare.port );
1186 break;
1187 }
1188
1189 case kXR_chkpoint:
1190 {
1191 if( req->chkpoint.opcode == kXR_ckpXeq )
1192 MarshallRequest( msg + 24 );
1193 break;
1194 }
1195 };
1196
1197 req->header.requestid = htons( req->header.requestid );
1198 req->header.dlen = htonl( req->header.dlen );
1199 return XRootDStatus();
1200 }
1201
1202 //----------------------------------------------------------------------------
1203 // Unmarshall the request - sometimes the requests need to be rewritten,
1204 // so we need to unmarshall them
1205 //----------------------------------------------------------------------------
1207 {
1208 if( !msg->IsMarshalled() ) return XRootDStatus( stOK, suAlreadyDone );
1209 // We rely on the marshaling process to be symmetric!
1210 // First we unmarshall the request ID and the length because
1211 // MarshallRequest() relies on these, and then we need to unmarshall these
1212 // two again, because they get marshalled in MarshallRequest().
1213 // All this is pretty damn ugly and should be rewritten.
1214 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1215 req->header.requestid = htons( req->header.requestid );
1216 req->header.dlen = htonl( req->header.dlen );
1217 XRootDStatus st = MarshallRequest( msg );
1218 req->header.requestid = htons( req->header.requestid );
1219 req->header.dlen = htonl( req->header.dlen );
1220 msg->SetIsMarshalled( false );
1221 return st;
1222 }
1223
1224 //----------------------------------------------------------------------------
1225 // Unmarshall the body of the incoming message
1226 //----------------------------------------------------------------------------
1228 {
1230
1231 //--------------------------------------------------------------------------
1232 // kXR_ok
1233 //--------------------------------------------------------------------------
1234 if( m->hdr.status == kXR_ok )
1235 {
1236 switch( reqType )
1237 {
1238 //----------------------------------------------------------------------
1239 // kXR_protocol
1240 //----------------------------------------------------------------------
1241 case kXR_protocol:
1242 if( m->hdr.dlen < 8 )
1243 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_protocol: body too short." );
1244 m->body.protocol.pval = ntohl( m->body.protocol.pval );
1245 m->body.protocol.flags = ntohl( m->body.protocol.flags );
1246 break;
1247 }
1248 }
1249 //--------------------------------------------------------------------------
1250 // kXR_error
1251 //--------------------------------------------------------------------------
1252 else if( m->hdr.status == kXR_error )
1253 {
1254 if( m->hdr.dlen < 4 )
1255 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_error: body too short." );
1256 m->body.error.errnum = ntohl( m->body.error.errnum );
1257 }
1258
1259 //--------------------------------------------------------------------------
1260 // kXR_wait
1261 //--------------------------------------------------------------------------
1262 else if( m->hdr.status == kXR_wait )
1263 {
1264 if( m->hdr.dlen < 4 )
1265 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_wait: body too short." );
1266 m->body.wait.seconds = htonl( m->body.wait.seconds );
1267 }
1268
1269 //--------------------------------------------------------------------------
1270 // kXR_redirect
1271 //--------------------------------------------------------------------------
1272 else if( m->hdr.status == kXR_redirect )
1273 {
1274 if( m->hdr.dlen < 4 )
1275 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_redirect: body too short." );
1276 m->body.redirect.port = htonl( m->body.redirect.port );
1277 }
1278
1279 //--------------------------------------------------------------------------
1280 // kXR_waitresp
1281 //--------------------------------------------------------------------------
1282 else if( m->hdr.status == kXR_waitresp )
1283 {
1284 if( m->hdr.dlen < 4 )
1285 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_waitresp: body too short." );
1286 m->body.waitresp.seconds = htonl( m->body.waitresp.seconds );
1287 }
1288
1289 //--------------------------------------------------------------------------
1290 // kXR_attn
1291 //--------------------------------------------------------------------------
1292 else if( m->hdr.status == kXR_attn )
1293 {
1294 if( m->hdr.dlen < 4 )
1295 return XRootDStatus( stError, errInvalidMessage, 0, "kXR_attn: body too short." );
1296 m->body.attn.actnum = htonl( m->body.attn.actnum );
1297 }
1298
1299 return XRootDStatus();
1300 }
1301
1302 //------------------------------------------------------------------------
1304 //------------------------------------------------------------------------
1306 {
1307 //--------------------------------------------------------------------------
1308 // Calculate the crc32c before the unmarshaling the body!
1309 //--------------------------------------------------------------------------
1311 char *buffer = msg.GetBuffer( 8 + sizeof( rspst->bdy.crc32c ) );
1312 size_t length = rspst->hdr.dlen - sizeof( rspst->bdy.crc32c );
1313 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1314
1315 size_t stlen = sizeof( ServerResponseStatus );
1316 switch( reqType )
1317 {
1318 case kXR_pgread:
1319 {
1320 stlen += sizeof( ServerResponseBody_pgRead );
1321 break;
1322 }
1323
1324 case kXR_pgwrite:
1325 {
1326 stlen += sizeof( ServerResponseBody_pgWrite );
1327 break;
1328 }
1329 }
1330
1331 if( msg.GetSize() < stlen ) return XRootDStatus( stError, errInvalidMessage, 0,
1332 "kXR_status: invalid message size." );
1333
1334 rspst->bdy.crc32c = ntohl( rspst->bdy.crc32c );
1335 rspst->bdy.dlen = ntohl( rspst->bdy.dlen );
1336
1337 switch( reqType )
1338 {
1339 case kXR_pgread:
1340 {
1342 pgrdbdy->offset = ntohll( pgrdbdy->offset );
1343 break;
1344 }
1345
1346 case kXR_pgwrite:
1347 {
1349 pgwrtbdy->offset = ntohll( pgwrtbdy->offset );
1350 break;
1351 }
1352 }
1353
1354 //--------------------------------------------------------------------------
1355 // Do the integrity checks
1356 //--------------------------------------------------------------------------
1357 if( crcval != rspst->bdy.crc32c )
1358 {
1359 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1360 "corrupted (crc32c integrity check failed)." );
1361 }
1362
1363 if( rspst->hdr.streamid[0] != rspst->bdy.streamID[0] ||
1364 rspst->hdr.streamid[1] != rspst->bdy.streamID[1] )
1365 {
1366 return XRootDStatus( stError, errDataError, 0, "response header corrupted "
1367 "(stream ID mismatch)." );
1368 }
1369
1370
1371
1372 if( rspst->bdy.requestid + kXR_1stRequest != reqType )
1373 {
1374 return XRootDStatus( stError, errDataError, 0, "kXR_status response header corrupted "
1375 "(request ID mismatch)." );
1376 }
1377
1378 return XRootDStatus();
1379 }
1380
1382 {
1384 uint16_t reqType = rsp->status.bdy.requestid + kXR_1stRequest;
1385
1386 switch( reqType )
1387 {
1388 case kXR_pgwrite:
1389 {
1390 //--------------------------------------------------------------------------
1391 // If there's no additional data there's nothing to unmarshal
1392 //--------------------------------------------------------------------------
1393 if( rsp->status.bdy.dlen == 0 ) return XRootDStatus();
1394 //--------------------------------------------------------------------------
1395 // If there's not enough data to form correction-segment report an error
1396 //--------------------------------------------------------------------------
1397 if( size_t( rsp->status.bdy.dlen ) < sizeof( ServerResponseBody_pgWrCSE ) )
1399 "kXR_status: invalid message size." );
1400
1401 //--------------------------------------------------------------------------
1402 // Calculate the crc32c for the additional data
1403 //--------------------------------------------------------------------------
1405 cse->cseCRC = ntohl( cse->cseCRC );
1406 size_t length = rsp->status.bdy.dlen - sizeof( uint32_t );
1407 void* buffer = msg.GetBuffer( sizeof( ServerResponseV2 ) + sizeof( uint32_t ) );
1408 uint32_t crcval = XrdOucCRC::Calc32C( buffer, length );
1409
1410 //--------------------------------------------------------------------------
1411 // Do the integrity checks
1412 //--------------------------------------------------------------------------
1413 if( crcval != cse->cseCRC )
1414 {
1415 return XRootDStatus( stError, errDataError, 0, "kXR_status response header "
1416 "corrupted (crc32c integrity check failed)." );
1417 }
1418
1419 cse->dlFirst = ntohs( cse->dlFirst );
1420 cse->dlLast = ntohs( cse->dlLast );
1421
1422 size_t pgcnt = ( rsp->status.bdy.dlen - sizeof( ServerResponseBody_pgWrCSE ) ) /
1423 sizeof( kXR_int64 );
1424 kXR_int64 *pgoffs = (kXR_int64*)msg.GetBuffer( sizeof( ServerResponseV2 ) +
1425 sizeof( ServerResponseBody_pgWrCSE ) );
1426
1427 for( size_t i = 0; i < pgcnt; ++i )
1428 pgoffs[i] = ntohll( pgoffs[i] );
1429
1430 return XRootDStatus();
1431 break;
1432 }
1433
1434 default:
1435 break;
1436 }
1437
1439 }
1440
1441 //----------------------------------------------------------------------------
1442 // Unmarshall the header of the incoming message
1443 //----------------------------------------------------------------------------
1445 {
1447 header->status = ntohs( header->status );
1448 header->dlen = ntohl( header->dlen );
1449 }
1450
1451 //----------------------------------------------------------------------------
1452 // Log server error response
1453 //----------------------------------------------------------------------------
1455 {
1456 Log *log = DefaultEnv::GetLog();
1457 ServerResponse *rsp = (ServerResponse *)msg.GetBuffer();
1458 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
1459 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
1460 log->Error( XRootDTransportMsg, "Server responded with an error [%d]: %s",
1461 rsp->body.error.errnum, errmsg );
1462 delete [] errmsg;
1463 }
1464
1465 //------------------------------------------------------------------------
1466 // Number of currently connected data streams
1467 //------------------------------------------------------------------------
1469 {
1470 XRootDChannelInfo *info = 0;
1471 channelData.Get( info );
1472 XrdSysMutexHelper scopedLock( info->mutex );
1473
1474 uint16_t nbConnected = 0;
1475 for( size_t i = 1; i < info->stream.size(); ++i )
1476 if( info->stream[i].status == XRootDStreamInfo::Connected )
1477 ++nbConnected;
1478
1479 return nbConnected;
1480 }
1481
1482 //----------------------------------------------------------------------------
1483 // The stream has been disconnected, do the cleanups
1484 //----------------------------------------------------------------------------
1486 uint16_t subStreamId )
1487 {
1488 XRootDChannelInfo *info = 0;
1489 channelData.Get( info );
1490 XrdSysMutexHelper scopedLock( info->mutex );
1491
1492 CleanUpProtection( info );
1493
1494 if( !info->stream.empty() )
1495 {
1496 XRootDStreamInfo &sInfo = info->stream[subStreamId];
1498 }
1499
1500 if( subStreamId == 0 )
1501 {
1502 info->sidManager->ReleaseAllTimedOut();
1503 info->sentOpens.clear();
1504 info->sentCloses.clear();
1505 info->openFiles = 0;
1506 info->waitBarrier = 0;
1507 }
1508 }
1509
1510 //------------------------------------------------------------------------
1511 // Query the channel
1512 //------------------------------------------------------------------------
1514 AnyObject &result,
1515 AnyObject &channelData )
1516 {
1517 XRootDChannelInfo *info = 0;
1518 channelData.Get( info );
1519 XrdSysMutexHelper scopedLock( info->mutex );
1520
1521 switch( query )
1522 {
1523 //------------------------------------------------------------------------
1524 // Protocol name
1525 //------------------------------------------------------------------------
1527 result.Set( (const char*)"XRootD", false );
1528 return Status();
1529
1530 //------------------------------------------------------------------------
1531 // Authentication
1532 //------------------------------------------------------------------------
1534 result.Set( new std::string( info->authProtocolName ), false );
1535 return Status();
1536
1537 //------------------------------------------------------------------------
1538 // Server flags
1539 //------------------------------------------------------------------------
1541 result.Set( new int( info->serverFlags ), false );
1542 return Status();
1543
1544 //------------------------------------------------------------------------
1545 // Protocol version
1546 //------------------------------------------------------------------------
1548 result.Set( new int( info->protocolVersion ), false );
1549 return Status();
1550
1552 result.Set( new bool( info->encrypted ), false );
1553 return Status();
1554 };
1556 }
1557
1558 //----------------------------------------------------------------------------
1559 // Check whether the transport can hijack the message
1560 //----------------------------------------------------------------------------
1562 uint16_t subStream,
1563 AnyObject &channelData )
1564 {
1565 XRootDChannelInfo *info = 0;
1566 channelData.Get( info );
1567 XrdSysMutexHelper scopedLock( info->mutex );
1568 Log *log = DefaultEnv::GetLog();
1569
1570 //--------------------------------------------------------------------------
1571 // Update the substream queues
1572 //--------------------------------------------------------------------------
1573 info->strmSelector->MsgReceived( subStream );
1574
1575 //--------------------------------------------------------------------------
1576 // Check whether this message is a response to a request that has
1577 // timed out, and if so, drop it
1578 //--------------------------------------------------------------------------
1580 if( rsp->hdr.status == kXR_attn )
1581 {
1582 return NoAction;
1583 }
1584
1585 if( info->sidManager->IsTimedOut( rsp->hdr.streamid ) )
1586 {
1587 log->Error( XRootDTransportMsg, "Message %p, stream [%d, %d] is a "
1588 "response that we're no longer interested in (timed out)",
1589 &msg, rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
1590 //------------------------------------------------------------------------
1591 // If it is kXR_waitresp there will be another one,
1592 // so we don't release the sid yet
1593 //------------------------------------------------------------------------
1594 if( rsp->hdr.status != kXR_waitresp )
1595 info->sidManager->ReleaseTimedOut( rsp->hdr.streamid );
1596 //------------------------------------------------------------------------
1597 // If it is a successful response to an open request
1598 // that timed out, we need to send a close
1599 //------------------------------------------------------------------------
1600 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1601 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1602 if( sidIt != info->sentOpens.end() )
1603 {
1604 info->sentOpens.erase( sidIt );
1605 if( rsp->hdr.status == kXR_ok ) return RequestClose;
1606 }
1607 return DigestMsg;
1608 }
1609
1610 //--------------------------------------------------------------------------
1611 // If we have a wait or waitresp
1612 //--------------------------------------------------------------------------
1613 uint32_t seconds = 0;
1614 if( rsp->hdr.status == kXR_wait )
1615 seconds = ntohl( rsp->body.wait.seconds ) + 5; // we need extra time
1616 // to re-send the request
1617 else if( rsp->hdr.status == kXR_waitresp )
1618 {
1619 seconds = ntohl( rsp->body.waitresp.seconds );
1620
1621 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %u seconds, "
1622 "setting up wait barrier.",
1623 info->streamName.c_str(),
1624 seconds );
1625 }
1626
1627 time_t barrier = time(0) + seconds;
1628 if( info->waitBarrier < barrier )
1629 info->waitBarrier = barrier;
1630
1631 //--------------------------------------------------------------------------
1632 // If we got a response to an open request, we may need to bump the counter
1633 // of open files
1634 //--------------------------------------------------------------------------
1635 uint16_t sid; memcpy( &sid, rsp->hdr.streamid, 2 );
1636 std::set<uint16_t>::iterator sidIt = info->sentOpens.find( sid );
1637 if( sidIt != info->sentOpens.end() )
1638 {
1639 if( rsp->hdr.status == kXR_waitresp )
1640 return NoAction;
1641 info->sentOpens.erase( sidIt );
1642 if( rsp->hdr.status == kXR_ok )
1643 {
1644 ++info->openFiles;
1645 info->finstcnt.fetch_add( 1, std::memory_order_relaxed ); // another file File object instance has been bound with this connection
1646 }
1647 return NoAction;
1648 }
1649
1650 //--------------------------------------------------------------------------
1651 // If we got a response to a close, we may need to decrement the counter of
1652 // open files
1653 //--------------------------------------------------------------------------
1654 sidIt = info->sentCloses.find( sid );
1655 if( sidIt != info->sentCloses.end() )
1656 {
1657 if( rsp->hdr.status == kXR_waitresp )
1658 return NoAction;
1659 info->sentCloses.erase( sidIt );
1660 --info->openFiles;
1661 return NoAction;
1662 }
1663 return NoAction;
1664 }
1665
1666 //----------------------------------------------------------------------------
1667 // Notify the transport about a message having been sent
1668 //----------------------------------------------------------------------------
1670 uint16_t subStream,
1671 uint32_t bytesSent,
1672 AnyObject &channelData )
1673 {
1674 // Called when a message has been sent. For messages that return on a
1675 // different pathid (and hence may use a different poller) it is possible
1676 // that the server has already replied and the reply will trigger
1677 // MessageReceived() before this method has been called. However for open
1678 // and close this is never the case and this method is used for tracking
1679 // only those.
1680 XRootDChannelInfo *info = 0;
1681 channelData.Get( info );
1682 XrdSysMutexHelper scopedLock( info->mutex );
1683 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
1684 uint16_t reqid = ntohs( req->header.requestid );
1685
1686
1687 //--------------------------------------------------------------------------
1688 // We need to track opens to know if we can close streams due to idleness
1689 //--------------------------------------------------------------------------
1690 uint16_t sid;
1691 memcpy( &sid, req->header.streamid, 2 );
1692
1693 if( reqid == kXR_open )
1694 info->sentOpens.insert( sid );
1695 else if( reqid == kXR_close )
1696 info->sentCloses.insert( sid );
1697 }
1698
1699
1700 //----------------------------------------------------------------------------
1701 // Get signature for given message
1702 //----------------------------------------------------------------------------
1704 {
1705 XRootDChannelInfo *info = 0;
1706 channelData.Get( info );
1707 return GetSignature( toSign, sign, info );
1708 }
1709
1710 //------------------------------------------------------------------------
1712 //------------------------------------------------------------------------
1714 Message *&sign,
1715 XRootDChannelInfo *info )
1716 {
1717 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
1718 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
1719
1720 ClientRequest *thereq = reinterpret_cast<ClientRequest*>( toSign->GetBuffer() );
1721 if( !info ) return Status( stError, errInternal );
1722 if( info->protection )
1723 {
1724 SecurityRequest *newreq = 0;
1725 // check if we have to secure the request in the first place
1726 if( !( NEED2SECURE ( info->protection )( *thereq ) ) ) return Status();
1727 // secure (sign/encrypt) the request
1728 int rc = info->protection->Secure( newreq, *thereq, 0 );
1729 // there was an error
1730 if( rc < 0 )
1731 return Status( stError, errInternal, -rc );
1732
1733 sign = new Message();
1734 sign->Grab( reinterpret_cast<char*>( newreq ), rc );
1735 }
1736
1737 return Status();
1738 }
1739
1740 //------------------------------------------------------------------------
1742 //------------------------------------------------------------------------
1744 {
1745 XRootDChannelInfo *info = 0;
1746 channelData.Get( info );
1747 if( info->finstcnt.load( std::memory_order_relaxed ) > 0 )
1748 info->finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1749 }
1750
1751 //----------------------------------------------------------------------------
1752 // Wait before exit
1753 //----------------------------------------------------------------------------
1755 {
1756 XrdSysRWLockHelper scope( pSecUnloadHandler->lock, false ); // obtain write lock
1757 pSecUnloadHandler->unloaded = true;
1758 }
1759
1760 //----------------------------------------------------------------------------
1761 // @return : true if encryption should be turned on, false otherwise
1762 //----------------------------------------------------------------------------
1764 AnyObject &channelData )
1765 {
1766 XRootDChannelInfo *info = 0;
1767 channelData.Get( info );
1768
1770 int notlsok = DefaultNoTlsOK;
1771 env->GetInt( "NoTlsOK", notlsok );
1772
1773 if( notlsok )
1774 return info->encrypted;
1775
1776 // Did the server instructed us to switch to TLS right away?
1777 if( info->serverFlags & kXR_gotoTLS )
1778 {
1779 info->encrypted = true;
1780 return true ;
1781 }
1782
1783 XRootDStreamInfo &sInfo = info->stream[handShakeData->subStreamId];
1784
1785 //--------------------------------------------------------------------------
1786 // The control stream (sub-stream 0) might need to switch to TLS before
1787 // login or after login
1788 //--------------------------------------------------------------------------
1789 if( handShakeData->subStreamId == 0 )
1790 {
1791 //------------------------------------------------------------------------
1792 // We are about to login and the server asked to start encrypting
1793 // before login
1794 //------------------------------------------------------------------------
1795 if( ( sInfo.status == XRootDStreamInfo::LoginSent ) &&
1796 ( info->serverFlags & kXR_tlsLogin ) )
1797 {
1798 info->encrypted = true;
1799 return true;
1800 }
1801
1802 //--------------------------------------------------------------------
1803 // The hand-shake is done and the server requested to encrypt the session
1804 //--------------------------------------------------------------------
1805 if( (sInfo.status == XRootDStreamInfo::Connected ||
1806 //--------------------------------------------------------------------
1807 // we really need to turn on TLS before we sent kXR_endsess and we
1808 // are about to do so (1st enable encryption, then send kXR_endsess)
1809 //--------------------------------------------------------------------
1811 ( info->serverFlags & kXR_tlsSess ) )
1812 {
1813 info->encrypted = true;
1814 return true;
1815 }
1816 }
1817 //--------------------------------------------------------------------------
1818 // A data stream (sub-stream > 0) if need be will be switched to TLS before
1819 // bind.
1820 //--------------------------------------------------------------------------
1821 else
1822 {
1823 //------------------------------------------------------------------------
1824 // We are about to bind a data stream and the server asked to start
1825 // encrypting before bind
1826 //------------------------------------------------------------------------
1827 if( ( sInfo.status == XRootDStreamInfo::BindSent ) &&
1828 ( info->serverFlags & kXR_tlsData ) )
1829 {
1830 info->encrypted = true;
1831 return true;
1832 }
1833 }
1834
1835 return false;
1836 }
1837
1838 //------------------------------------------------------------------------
1839 // Get bind preference for the next data stream
1840 //------------------------------------------------------------------------
1842 AnyObject &channelData )
1843 {
1844 XRootDChannelInfo *info = 0;
1845 channelData.Get( info );
1846 if( !bool( info->bindSelector ) )
1847 return url;
1848
1849 return URL( info->bindSelector->Get() );
1850 }
1851
1852 //----------------------------------------------------------------------------
1853 // Generate the message to be sent as an initial handshake
1854 // (handshake+kXR_protocol)
1855 //----------------------------------------------------------------------------
1856 Message *XRootDTransport::GenerateInitialHSProtocol( HandShakeData *hsData,
1857 XRootDChannelInfo *info,
1858 kXR_char expect )
1859 {
1860 Log *log = DefaultEnv::GetLog();
1862 "[%s] Sending out the initial hand shake + kXR_protocol",
1863 hsData->streamName.c_str() );
1864
1865 Message *msg = new Message();
1866
1867 msg->Allocate( 20+sizeof(ClientProtocolRequest) );
1868 msg->Zero();
1869
1871 init->fourth = htonl(4);
1872 init->fifth = htonl(2012);
1873
1875 InitProtocolReq( proto, info, expect );
1876
1877 return msg;
1878 }
1879
1880 //------------------------------------------------------------------------
1881 // Generate the protocol message
1882 //------------------------------------------------------------------------
1883 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1884 XRootDChannelInfo *info,
1885 kXR_char expect )
1886 {
1887 Log *log = DefaultEnv::GetLog();
1888 log->Debug( XRootDTransportMsg,
1889 "[%s] Sending out the kXR_protocol",
1890 hsData->streamName.c_str() );
1891
1892 Message *msg = new Message();
1893 msg->Allocate( sizeof(ClientProtocolRequest) );
1894 msg->Zero();
1895
1896 ClientProtocolRequest *proto = (ClientProtocolRequest *)msg->GetBuffer();
1897 InitProtocolReq( proto, info, expect );
1898
1899 return msg;
1900 }
1901
1902 //------------------------------------------------------------------------
1903 // Initialize protocol request
1904 //------------------------------------------------------------------------
1905 void XRootDTransport::InitProtocolReq( ClientProtocolRequest *request,
1906 XRootDChannelInfo *info,
1907 kXR_char expect )
1908 {
1909 request->requestid = htons(kXR_protocol);
1910 request->clientpv = htonl(kXR_PROTOCOLVERSION);
1913
1914 int notlsok = DefaultNoTlsOK;
1915 int tlsnodata = DefaultTlsNoData;
1916
1918
1919 env->GetInt( "NoTlsOK", notlsok );
1920
1922 env->GetInt( "TlsNoData", tlsnodata );
1923
1924 if (info->encrypted || InitTLS())
1926
1927 if (info->encrypted && !(notlsok || tlsnodata))
1929
1930 request->expect = expect;
1931
1932 //--------------------------------------------------------------------------
1933 // If we are in the curse of establishing a connection in the context of
1934 // TPC update the expect! (this will be never followed be a bind)
1935 //--------------------------------------------------------------------------
1936 if( info->istpc )
1938 }
1939
1940 //----------------------------------------------------------------------------
1941 // Process the server initial handshake response
1942 //----------------------------------------------------------------------------
1943 XRootDStatus XRootDTransport::ProcessServerHS( HandShakeData *hsData,
1944 XRootDChannelInfo *info )
1945 {
1946 Log *log = DefaultEnv::GetLog();
1947
1948 Message *msg = hsData->in;
1949 ServerResponseHeader *respHdr = (ServerResponseHeader *)msg->GetBuffer();
1950 ServerInitHandShake *hs = (ServerInitHandShake *)msg->GetBuffer(4);
1951
1952 if( respHdr->status != kXR_ok )
1953 {
1954 log->Error( XRootDTransportMsg, "[%s] Invalid hand shake response",
1955 hsData->streamName.c_str() );
1956
1957 return XRootDStatus( stFatal, errHandShakeFailed, 0, "Invalid hand shake response." );
1958 }
1959
1960 info->protocolVersion = ntohl(hs->protover);
1961 info->serverFlags = ntohl(hs->msgval) == kXR_DataServer ?
1964
1965 log->Debug( XRootDTransportMsg,
1966 "[%s] Got the server hand shake response (%s, protocol "
1967 "version %x)",
1968 hsData->streamName.c_str(),
1969 ServerFlagsToStr( info->serverFlags ).c_str(),
1970 info->protocolVersion );
1971
1972 return XRootDStatus( stOK, suContinue );
1973 }
1974
1975 //----------------------------------------------------------------------------
1976 // Process the protocol response
1977 //----------------------------------------------------------------------------
1978 XRootDStatus XRootDTransport::ProcessProtocolResp( HandShakeData *hsData,
1979 XRootDChannelInfo *info )
1980 {
1981 Log *log = DefaultEnv::GetLog();
1982
1983 XRootDStatus st = UnMarshallBody( hsData->in, kXR_protocol );
1984 if( !st.IsOK() )
1985 return st;
1986
1987 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
1988
1989
1990 if( rsp->hdr.status != kXR_ok )
1991 {
1992 log->Error( XRootDTransportMsg, "[%s] kXR_protocol request failed",
1993 hsData->streamName.c_str() );
1994
1995 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_protocol request failed" );
1996 }
1997
1999 int notlsok = DefaultNoTlsOK;
2000 env->GetInt( "NoTlsOK", notlsok );
2001
2002 if( rsp->body.protocol.pval < kXR_PROTTLSVERSION && info->encrypted )
2003 {
2004 //------------------------------------------------------------------------
2005 // User requested an encrypted connection but the server is to old to
2006 // support it!
2007 //------------------------------------------------------------------------
2008 if( !notlsok ) return XRootDStatus( stFatal, errTlsError, ENOTSUP, "TLS not supported" );
2009
2010 //------------------------------------------------------------------------
2011 // We are falling back to unencrypted data transmission, as configured
2012 // in XRD_NOTLSOK environment variable
2013 //------------------------------------------------------------------------
2014 log->Info( XRootDTransportMsg,
2015 "[%s] Falling back to unencrypted transmission, server does "
2016 "not support TLS encryption.",
2017 hsData->streamName.c_str() );
2018 info->encrypted = false;
2019 }
2020
2021 if( rsp->body.protocol.pval >= 0x297 )
2022 info->serverFlags = rsp->body.protocol.flags;
2023
2024 if( rsp->hdr.dlen > 8 )
2025 {
2026 info->protRespBody = new ServerResponseBody_Protocol();
2027 info->protRespBody->flags = rsp->body.protocol.flags;
2028 info->protRespBody->pval = rsp->body.protocol.pval;
2029
2030 char* bodybuff = reinterpret_cast<char*>( &rsp->body.protocol.secreq );
2031 size_t bodysize = rsp->hdr.dlen - 8;
2032 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2033 if( !st.IsOK() )
2034 return st;
2035 }
2036
2037 log->Debug( XRootDTransportMsg,
2038 "[%s] kXR_protocol successful (%s, protocol version %x)",
2039 hsData->streamName.c_str(),
2040 ServerFlagsToStr( info->serverFlags ).c_str(),
2041 info->protocolVersion );
2042
2043 if( !( info->serverFlags & kXR_haveTLS ) && info->encrypted )
2044 {
2045 //------------------------------------------------------------------------
2046 // User requested an encrypted connection but the server was not configured
2047 // to support encryption!
2048 //------------------------------------------------------------------------
2049 return XRootDStatus( stFatal, errTlsError, ECONNREFUSED,
2050 "Server was not configured to support encryption." );
2051 }
2052
2053 //--------------------------------------------------------------------------
2054 // Now see if we have to enforce encryption in case the server does not
2055 // support PgRead/PgWrite
2056 //--------------------------------------------------------------------------
2057 int tlsOnNoPgrw = DefaultWantTlsOnNoPgrw;
2058 env->GetInt( "WantTlsOnNoPgrw", tlsOnNoPgrw );
2059 if( !( info->serverFlags & kXR_suppgrw ) && tlsOnNoPgrw )
2060 {
2061 //------------------------------------------------------------------------
2062 // If user requested encryption just make sure it is not switched off for
2063 // data
2064 //------------------------------------------------------------------------
2065 if( info->encrypted )
2066 {
2067 log->Debug( XRootDTransportMsg,
2068 "[%s] Server does not support PgRead/PgWrite and"
2069 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2070 hsData->streamName.c_str() );
2071 env->PutInt( "TlsNoData", DefaultTlsNoData );
2072 }
2073 //------------------------------------------------------------------------
2074 // Otherwise, if server is not enforcing data encryption, we will need to
2075 // redo the protocol request with kXR_wantTLS set.
2076 //------------------------------------------------------------------------
2077 else if( !( info->serverFlags & kXR_tlsData ) &&
2078 ( info->serverFlags & kXR_haveTLS ) )
2079 {
2080 info->encrypted = true;
2081 return XRootDStatus( stOK, suRetry );
2082 }
2083 }
2084
2085 return XRootDStatus( stOK, suContinue );
2086 }
2087
2088 XRootDStatus XRootDTransport::ProcessProtocolBody( char *bodybuff,
2089 size_t bodysize,
2090 XRootDChannelInfo *info )
2091 {
2092 //--------------------------------------------------------------------------
2093 // Parse bind preferences
2094 //--------------------------------------------------------------------------
2095 XrdProto::bifReqs *bifreq = reinterpret_cast<XrdProto::bifReqs*>( bodybuff );
2096 if( bodysize >= sizeof( XrdProto::bifReqs ) && bifreq->theTag == 'B' )
2097 {
2098 bodybuff += sizeof( XrdProto::bifReqs );
2099 bodysize -= sizeof( XrdProto::bifReqs );
2100
2101 if( bodysize < bifreq->bifILen )
2102 return XRootDStatus( stError, errDataError, 0, "Received incomplete "
2103 "protocol response." );
2104 std::string bindprefs_str( bodybuff, bifreq->bifILen );
2105 std::vector<std::string> bindprefs;
2106 Utils::splitString( bindprefs, bindprefs_str, "," );
2107 info->bindSelector.reset( new BindPrefSelector( std::move( bindprefs ) ) );
2108 bodybuff += bifreq->bifILen;
2109 bodysize -= bifreq->bifILen;
2110 }
2111 //--------------------------------------------------------------------------
2112 // Parse security requirements
2113 //--------------------------------------------------------------------------
2114 XrdProto::secReqs *secreq = reinterpret_cast<XrdProto::secReqs*>( bodybuff );
2115 if( bodysize >= 6 /*XrdProto::secReqs*/ && secreq->theTag == 'S' )
2116 {
2117 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2118 info->protRespSize = bodysize + 8 /*pval & flags*/;
2119 }
2120
2121 return XRootDStatus();
2122 }
2123
2124 //----------------------------------------------------------------------------
2125 // Generate the bind message
2126 //----------------------------------------------------------------------------
2127 Message *XRootDTransport::GenerateBind( HandShakeData *hsData,
2128 XRootDChannelInfo *info )
2129 {
2130 Log *log = DefaultEnv::GetLog();
2131
2132 log->Debug( XRootDTransportMsg,
2133 "[%s] Sending out the bind request",
2134 hsData->streamName.c_str() );
2135
2136
2137 Message *msg = new Message( sizeof( ClientBindRequest ) );
2138 ClientBindRequest *bindReq = (ClientBindRequest *)msg->GetBuffer();
2139
2140 bindReq->requestid = kXR_bind;
2141 memcpy( bindReq->sessid, info->sessionId, 16 );
2142 bindReq->dlen = 0;
2143 MarshallRequest( msg );
2144 return msg;
2145 }
2146
2147 //----------------------------------------------------------------------------
2148 // Generate the bind message
2149 //----------------------------------------------------------------------------
2150 XRootDStatus XRootDTransport::ProcessBindResp( HandShakeData *hsData,
2151 XRootDChannelInfo *info )
2152 {
2153 Log *log = DefaultEnv::GetLog();
2154
2155 XRootDStatus st = UnMarshallBody( hsData->in, kXR_bind );
2156 if( !st.IsOK() )
2157 return st;
2158
2159 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2160
2161 if( rsp->hdr.status != kXR_ok )
2162 {
2163 log->Error( XRootDTransportMsg, "[%s] kXR_bind request failed",
2164 hsData->streamName.c_str() );
2165 return XRootDStatus( stFatal, errHandShakeFailed, 0, "kXR_bind request failed" );
2166 }
2167
2168 info->stream[hsData->subStreamId].pathId = rsp->body.bind.substreamid;
2169 log->Debug( XRootDTransportMsg, "[%s] kXR_bind successful",
2170 hsData->streamName.c_str() );
2171
2172 return XRootDStatus();
2173 }
2174
2175 //----------------------------------------------------------------------------
2176 // Generate the login message
2177 //----------------------------------------------------------------------------
2178 Message *XRootDTransport::GenerateLogIn( HandShakeData *hsData,
2179 XRootDChannelInfo *info )
2180 {
2181 Log *log = DefaultEnv::GetLog();
2182 Env *env = DefaultEnv::GetEnv();
2183
2184 //--------------------------------------------------------------------------
2185 // Compute the login cgi
2186 //--------------------------------------------------------------------------
2187 int timeZone = XrdSysTimer::TimeZone();
2188 char *hostName = XrdNetUtils::MyHostName();
2189 std::string countryCode = Utils::FQDNToCC( hostName );
2190 char *cgiBuffer = new char[1024 + info->logintoken.size()];
2191 std::string appName;
2192 std::string monInfo;
2193 env->GetString( "AppName", appName );
2194 env->GetString( "MonInfo", monInfo );
2195 if( info->logintoken.empty() )
2196 {
2197 snprintf( cgiBuffer, 1024,
2198 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2199 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2200 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2201 }
2202 else
2203 {
2204 snprintf( cgiBuffer, 1024,
2205 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2206 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2207 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2208 }
2209 uint16_t cgiLen = strlen( cgiBuffer );
2210 free( hostName );
2211
2212 //--------------------------------------------------------------------------
2213 // Generate the message
2214 //--------------------------------------------------------------------------
2215 Message *msg = new Message( sizeof(ClientLoginRequest) + cgiLen );
2216 ClientLoginRequest *loginReq = (ClientLoginRequest *)msg->GetBuffer();
2217
2218 loginReq->requestid = kXR_login;
2219 loginReq->pid = ::getpid();
2220 loginReq->capver[0] = kXR_asyncap | kXR_ver005;
2221 loginReq->dlen = cgiLen;
2223#ifdef WITH_XRDEC
2224 loginReq->ability2 = kXR_ecredir;
2225#endif
2226
2227 int multiProtocol = 0;
2228 env->GetInt( "MultiProtocol", multiProtocol );
2229 if(multiProtocol)
2230 loginReq->ability |= kXR_multipr;
2231
2232 //--------------------------------------------------------------------------
2233 // Check the IP stacks
2234 //--------------------------------------------------------------------------
2236 bool dualStack = false;
2237 bool privateIPv6 = false;
2238 bool privateIPv4 = false;
2239
2240 if( (stacks & XrdNetUtils::hasIP64) == XrdNetUtils::hasIP64 )
2241 {
2242 dualStack = true;
2243 loginReq->ability |= kXR_hasipv64;
2244 }
2245
2246 if( (stacks & XrdNetUtils::hasIPv6) && !(stacks & XrdNetUtils::hasPub6) )
2247 {
2248 privateIPv6 = true;
2249 loginReq->ability |= kXR_onlyprv6;
2250 }
2251
2252 if( (stacks & XrdNetUtils::hasIPv4) && !(stacks & XrdNetUtils::hasPub4) )
2253 {
2254 privateIPv4 = true;
2255 loginReq->ability |= kXR_onlyprv4;
2256 }
2257
2258 // The following code snippet tries to overcome the problem that this host
2259 // may still be dual-stacked but we don't know it because one of the
2260 // interfaces was not registered in DNS.
2261 //
2262 if( !dualStack && hsData->serverAddr )
2263 {if ( ( ( stacks & XrdNetUtils::hasIPv4 )
2264 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv6))
2265 || ( ( stacks & XrdNetUtils::hasIPv6 )
2266 && hsData->serverAddr->isIPType(XrdNetAddrInfo::IPv4)))
2267 {dualStack = true;
2268 loginReq->ability |= kXR_hasipv64;
2269 }
2270 }
2271
2272 //--------------------------------------------------------------------------
2273 // Check the username
2274 //--------------------------------------------------------------------------
2275 std::string buffer( 8, 0 );
2276 if( hsData->url->GetUserName().length() )
2277 buffer = hsData->url->GetUserName();
2278 else
2279 {
2280 char *name = new char[1024];
2281 if( !XrdOucUtils::UserName( geteuid(), name, 1024 ) )
2282 buffer = name;
2283 else
2284 buffer = "_anon_";
2285 delete [] name;
2286 }
2287 buffer.resize( 8, 0 );
2288 std::copy( buffer.begin(), buffer.end(), (char*)loginReq->username );
2289
2290 msg->Append( cgiBuffer, cgiLen, 24 );
2291
2292 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_login request, "
2293 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2294 "private IPv6: %s", hsData->streamName.c_str(),
2295 loginReq->username, cgiBuffer, dualStack ? "true" : "false",
2296 privateIPv4 ? "true" : "false",
2297 privateIPv6 ? "true" : "false" );
2298
2299 delete [] cgiBuffer;
2300 MarshallRequest( msg );
2301 return msg;
2302 }
2303
2304 //----------------------------------------------------------------------------
2305 // Process the protocol response
2306 //----------------------------------------------------------------------------
2307 XRootDStatus XRootDTransport::ProcessLogInResp( HandShakeData *hsData,
2308 XRootDChannelInfo *info )
2309 {
2310 Log *log = DefaultEnv::GetLog();
2311
2312 XRootDStatus st = UnMarshallBody( hsData->in, kXR_login );
2313 if( !st.IsOK() )
2314 return st;
2315
2316 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2317
2318 if( rsp->hdr.status != kXR_ok )
2319 {
2320 log->Error( XRootDTransportMsg, "[%s] Got invalid login response",
2321 hsData->streamName.c_str() );
2322 return XRootDStatus( stFatal, errLoginFailed, 0, "Got invalid login response." );
2323 }
2324
2325 if( !info->firstLogIn )
2326 memcpy( info->oldSessionId, info->sessionId, 16 );
2327
2328 if( rsp->hdr.dlen == 0 && info->protocolVersion <= 0x289 )
2329 {
2330 //--------------------------------------------------------------------------
2331 // This if statement is there only to support dCache inaccurate
2332 // implementation of XRoot protocol, that in some cases returns
2333 // an empty login response for protocol version <= 2.8.9.
2334 //--------------------------------------------------------------------------
2335 memset( info->sessionId, 0, 16 );
2336 log->Warning( XRootDTransportMsg,
2337 "[%s] Logged in, accepting empty login response.",
2338 hsData->streamName.c_str() );
2339 return XRootDStatus();
2340 }
2341
2342 if( rsp->hdr.dlen < 16 )
2343 return XRootDStatus( stError, errDataError, 0, "Login response too short." );
2344
2345 memcpy( info->sessionId, rsp->body.login.sessid, 16 );
2346
2347 std::string sessId = Utils::Char2Hex( rsp->body.login.sessid, 16 );
2348
2349 log->Debug( XRootDTransportMsg, "[%s] Logged in, session: %s",
2350 hsData->streamName.c_str(), sessId.c_str() );
2351
2352 //--------------------------------------------------------------------------
2353 // We have an authentication info to process
2354 //--------------------------------------------------------------------------
2355 if( rsp->hdr.dlen > 16 )
2356 {
2357 size_t len = rsp->hdr.dlen-16;
2358 info->authBuffer = new char[len+1];
2359 info->authBuffer[len] = 0;
2360 memcpy( info->authBuffer, rsp->body.login.sec, len );
2361 log->Debug( XRootDTransportMsg, "[%s] Authentication is required: %s",
2362 hsData->streamName.c_str(), info->authBuffer );
2363
2364 return XRootDStatus( stOK, suContinue );
2365 }
2366
2367 return XRootDStatus();
2368 }
2369
2370 //----------------------------------------------------------------------------
2371 // Do the authentication
2372 //----------------------------------------------------------------------------
2373 XRootDStatus XRootDTransport::DoAuthentication( HandShakeData *hsData,
2374 XRootDChannelInfo *info )
2375 {
2376 //--------------------------------------------------------------------------
2377 // Prepare
2378 //--------------------------------------------------------------------------
2379 Log *log = DefaultEnv::GetLog();
2380 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2381 XrdSecCredentials *credentials = 0;
2382 std::string protocolName;
2383
2384 //--------------------------------------------------------------------------
2385 // We're doing this for the first time
2386 //--------------------------------------------------------------------------
2387 if( sInfo.status == XRootDStreamInfo::LoginSent )
2388 {
2389 log->Debug( XRootDTransportMsg, "[%s] Sending authentication data",
2390 hsData->streamName.c_str() );
2391
2392 //------------------------------------------------------------------------
2393 // Set up the authentication environment
2394 //------------------------------------------------------------------------
2395 info->authEnv = new XrdOucEnv();
2396 info->authEnv->Put( "sockname", hsData->clientName.c_str() );
2397 info->authEnv->Put( "username", hsData->url->GetUserName().c_str() );
2398 info->authEnv->Put( "password", hsData->url->GetPassword().c_str() );
2399
2400 const URL::ParamsMap &urlParams = hsData->url->GetParams();
2401 URL::ParamsMap::const_iterator it;
2402 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2403 {
2404 if( it->first.compare( 0, 4, "xrd." ) == 0 ||
2405 it->first.compare( 0, 6, "xrdcl." ) == 0 )
2406 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2407 }
2408
2409 //------------------------------------------------------------------------
2410 // Initialize some other structs
2411 //------------------------------------------------------------------------
2412 size_t authBuffLen = strlen( info->authBuffer );
2413 char *pars = (char *)malloc( authBuffLen + 1 );
2414 memcpy( pars, info->authBuffer, authBuffLen );
2415 info->authParams = new XrdSecParameters( pars, authBuffLen );
2416 sInfo.status = XRootDStreamInfo::AuthSent;
2417 delete [] info->authBuffer;
2418 info->authBuffer = 0;
2419
2420 //------------------------------------------------------------------------
2421 // Find a protocol that gives us valid credentials
2422 //------------------------------------------------------------------------
2423 XRootDStatus st = GetCredentials( credentials, hsData, info );
2424 if( !st.IsOK() )
2425 {
2426 CleanUpAuthentication( info );
2427 return st;
2428 }
2429 protocolName = info->authProtocol->Entity.prot;
2430 }
2431
2432 //--------------------------------------------------------------------------
2433 // We've been here already
2434 //--------------------------------------------------------------------------
2435 else
2436 {
2437 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2438 protocolName = info->authProtocol->Entity.prot;
2439
2440 //------------------------------------------------------------------------
2441 // We're required to send out more authentication data
2442 //------------------------------------------------------------------------
2443 if( rsp->hdr.status == kXR_authmore )
2444 {
2445 log->Debug( XRootDTransportMsg,
2446 "[%s] Sending more authentication data for %s",
2447 hsData->streamName.c_str(), protocolName.c_str() );
2448
2449 uint32_t len = rsp->hdr.dlen;
2450 char *secTokenData = (char*)malloc( len );
2451 memcpy( secTokenData, rsp->body.authmore.data, len );
2452 XrdSecParameters *secToken = new XrdSecParameters( secTokenData, len );
2453 XrdOucErrInfo ei( "", info->authEnv);
2454 credentials = info->authProtocol->getCredentials( secToken, &ei );
2455 delete secToken;
2456
2457 //----------------------------------------------------------------------
2458 // The protocol handler refuses to give us the data
2459 //----------------------------------------------------------------------
2460 if( !credentials )
2461 {
2462 log->Error( XRootDTransportMsg,
2463 "[%s] Auth protocol handler for %s refuses to give "
2464 "us more credentials %s",
2465 hsData->streamName.c_str(), protocolName.c_str(),
2466 ei.getErrText() );
2467 CleanUpAuthentication( info );
2468 return XRootDStatus( stFatal, errAuthFailed, 0, ei.getErrText() );
2469 }
2470 }
2471
2472 //------------------------------------------------------------------------
2473 // We have succeeded
2474 //------------------------------------------------------------------------
2475 else if( rsp->hdr.status == kXR_ok )
2476 {
2477 info->authProtocolName = info->authProtocol->Entity.prot;
2478
2479 //----------------------------------------------------------------------
2480 // Do we need protection?
2481 //----------------------------------------------------------------------
2482 if( info->protRespBody )
2483 {
2484 int rc = XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2485 if( rc > 0 )
2486 {
2487 log->Debug( XRootDTransportMsg,
2488 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2489 }
2490 else if( rc == 0 )
2491 {
2492 log->Debug( XRootDTransportMsg,
2493 "[%s] XrdSecProtect: no protection needed.",
2494 hsData->streamName.c_str() );
2495 }
2496 else
2497 {
2498 log->Debug( XRootDTransportMsg,
2499 "[%s] Failed to load XrdSecProtect: %s",
2500 hsData->streamName.c_str(), XrdSysE2T( -rc ) );
2501 CleanUpAuthentication( info );
2502
2503 return XRootDStatus( stError, errAuthFailed, -rc, XrdSysE2T( -rc ) );
2504 }
2505 }
2506
2507 if( !info->protection )
2508 CleanUpAuthentication( info );
2509 else
2510 pSecUnloadHandler->Register( info->authProtocolName );
2511
2512 log->Debug( XRootDTransportMsg,
2513 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2514 protocolName.c_str() );
2515
2516 //--------------------------------------------------------------------
2517 // Clear the SSL error queue of the calling thread, as there might be
2518 // some leftover from the authentication!
2519 //--------------------------------------------------------------------
2521
2522 return XRootDStatus();
2523 }
2524 //------------------------------------------------------------------------
2525 // Failure
2526 //------------------------------------------------------------------------
2527 else if( rsp->hdr.status == kXR_error )
2528 {
2529 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
2530 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
2531 log->Error( XRootDTransportMsg,
2532 "[%s] Authentication with %s failed: %s",
2533 hsData->streamName.c_str(), protocolName.c_str(),
2534 errmsg );
2535 delete [] errmsg;
2536
2537 info->authProtocol->Delete();
2538 info->authProtocol = 0;
2539
2540 //----------------------------------------------------------------------
2541 // Find another protocol that gives us valid credentials
2542 //----------------------------------------------------------------------
2543 XRootDStatus st = GetCredentials( credentials, hsData, info );
2544 if( !st.IsOK() )
2545 {
2546 CleanUpAuthentication( info );
2547 return st;
2548 }
2549 protocolName = info->authProtocol->Entity.prot;
2550 }
2551 //------------------------------------------------------------------------
2552 // God knows what
2553 //------------------------------------------------------------------------
2554 else
2555 {
2556 info->authProtocolName = info->authProtocol->Entity.prot;
2557 CleanUpAuthentication( info );
2558
2559 log->Error( XRootDTransportMsg,
2560 "[%s] Authentication with %s failed: unexpected answer",
2561 hsData->streamName.c_str(), protocolName.c_str() );
2562 return XRootDStatus( stFatal, errAuthFailed, 0, "Authentication failed: unexpected answer." );
2563 }
2564 }
2565
2566 //--------------------------------------------------------------------------
2567 // Generate the client request
2568 //--------------------------------------------------------------------------
2569 Message *msg = new Message( sizeof(ClientAuthRequest)+credentials->size );
2570 msg->Zero();
2571 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
2572 char *reqBuffer = msg->GetBuffer(sizeof(ClientAuthRequest));
2573
2574 req->header.requestid = kXR_auth;
2575 req->auth.dlen = credentials->size;
2576 memcpy( req->auth.credtype, protocolName.c_str(),
2577 protocolName.length() > 4 ? 4 : protocolName.length() );
2578
2579 memcpy( reqBuffer, credentials->buffer, credentials->size );
2580 hsData->out = msg;
2581 MarshallRequest( msg );
2582 delete credentials;
2583
2584 //------------------------------------------------------------------------
2585 // Clear the SSL error queue of the calling thread, as there might be
2586 // some leftover from the authentication!
2587 //------------------------------------------------------------------------
2589
2590 return XRootDStatus( stOK, suContinue );
2591 }
2592
2593 //------------------------------------------------------------------------
2594 // Get the initial credentials using one of the protocols
2595 //------------------------------------------------------------------------
2596 XRootDStatus XRootDTransport::GetCredentials( XrdSecCredentials *&credentials,
2597 HandShakeData *hsData,
2598 XRootDChannelInfo *info )
2599 {
2600 //--------------------------------------------------------------------------
2601 // Set up the auth handler
2602 //--------------------------------------------------------------------------
2603 Log *log = DefaultEnv::GetLog();
2604 XrdOucErrInfo ei( "", info->authEnv);
2605 XrdSecGetProt_t authHandler = GetAuthHandler();
2606 if( !authHandler )
2607 return XRootDStatus( stFatal, errAuthFailed, 0, "Could not load authentication handler." );
2608
2609 //--------------------------------------------------------------------------
2610 // Retrieve secuid and secgid, if available. These will override the fsuid
2611 // and fsgid of the current thread reading the credentials to prevent
2612 // security holes in case this process is running with elevated permissions.
2613 //--------------------------------------------------------------------------
2614 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secuid") : 0;
2615 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get("xrdcl.secgid") : 0;
2616
2617 int secuid = -1;
2618 int secgid = -1;
2619
2620 if(secuidc) secuid = atoi(secuidc);
2621 if(secgidc) secgid = atoi(secgidc);
2622
2623#ifdef __linux__
2624 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2625 if(!uidSetter.IsOk()) {
2626 log->Error( XRootDTransportMsg, "[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2627 hsData->streamName.c_str(), secuid, secgid );
2628 return XRootDStatus( stFatal, errAuthFailed, 0, "Error while setting (fsuid, fsgid)." );
2629 }
2630#else
2631 if(secuid >= 0 || secgid >= 0) {
2632 log->Error( XRootDTransportMsg, "[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2633 hsData->streamName.c_str() );
2634 return XRootDStatus( stFatal, errAuthFailed, 0, "xrdcl.secuid and xrdcl.secgid"
2635 " only supported on Linux" );
2636 }
2637#endif
2638
2639 //--------------------------------------------------------------------------
2640 // Loop over the possible protocols to find one that gives us valid
2641 // credentials
2642 //--------------------------------------------------------------------------
2643 XrdNetAddr &srvAddrInfo = *const_cast<XrdNetAddr *>(hsData->serverAddr);
2644 srvAddrInfo.SetTLS( info->encrypted );
2645 while(1)
2646 {
2647 //------------------------------------------------------------------------
2648 // Get the protocol
2649 //------------------------------------------------------------------------
2650 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2651 srvAddrInfo,
2652 *info->authParams,
2653 &ei );
2654 if( !info->authProtocol )
2655 {
2656 log->Error( XRootDTransportMsg, "[%s] No protocols left to try",
2657 hsData->streamName.c_str() );
2658 return XRootDStatus( stFatal, errAuthFailed, 0, "No protocols left to try" );
2659 }
2660
2661 std::string protocolName = info->authProtocol->Entity.prot;
2662 log->Debug( XRootDTransportMsg, "[%s] Trying to authenticate using %s",
2663 hsData->streamName.c_str(), protocolName.c_str() );
2664
2665 //------------------------------------------------------------------------
2666 // Get the credentials from the current protocol
2667 //------------------------------------------------------------------------
2668 credentials = info->authProtocol->getCredentials( 0, &ei );
2669 if( !credentials )
2670 {
2671 log->Debug( XRootDTransportMsg,
2672 "[%s] Cannot get credentials for protocol %s: %s",
2673 hsData->streamName.c_str(), protocolName.c_str(),
2674 ei.getErrText() );
2675 info->authProtocol->Delete();
2676 continue;
2677 }
2678 return XRootDStatus( stOK, suContinue );
2679 }
2680 }
2681
2682 //------------------------------------------------------------------------
2683 // Clean up the data structures created for the authentication process
2684 //------------------------------------------------------------------------
2685 Status XRootDTransport::CleanUpAuthentication( XRootDChannelInfo *info )
2686 {
2687 if( info->authProtocol )
2688 info->authProtocol->Delete();
2689 delete info->authParams;
2690 delete info->authEnv;
2691 info->authProtocol = 0;
2692 info->authParams = 0;
2693 info->authEnv = 0;
2695 return Status();
2696 }
2697
2698 //------------------------------------------------------------------------
2699 // Clean up the data structures created for the protection purposes
2700 //------------------------------------------------------------------------
2701 Status XRootDTransport::CleanUpProtection( XRootDChannelInfo *info )
2702 {
2703 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
2704 if( pSecUnloadHandler->unloaded ) return Status( stError, errInvalidOp );
2705
2706 if( info->protection )
2707 {
2708 info->protection->Delete();
2709 info->protection = 0;
2710
2711 CleanUpAuthentication( info );
2712 }
2713
2714 if( info->protRespBody )
2715 {
2716 delete info->protRespBody;
2717 info->protRespBody = 0;
2718 info->protRespSize = 0;
2719 }
2720
2721 return Status();
2722 }
2723
2724 //----------------------------------------------------------------------------
2725 // Get the authentication function handle
2726 //----------------------------------------------------------------------------
2727 XrdSecGetProt_t XRootDTransport::GetAuthHandler()
2728 {
2729 Log *log = DefaultEnv::GetLog();
2730 char errorBuff[1024];
2731
2732 // the static constructor is invoked only once and it is guaranteed that this
2733 // is thread safe
2734 static std::atomic<XrdSecGetProt_t> authHandler( XrdSecLoadSecFactory( errorBuff, 1024 ) );
2735 auto ret = authHandler.load( std::memory_order_relaxed );
2736 if( ret ) return ret;
2737
2738 // if we are here it means we failed to load the security library for the
2739 // first time and we hope the environment changed
2740
2741 // obtain a lock
2742 static XrdSysMutex mtx;
2743 XrdSysMutexHelper lck( mtx );
2744 // check if in the meanwhile some else didn't load the library
2745 ret = authHandler.load( std::memory_order_relaxed );
2746 if( ret ) return ret;
2747
2748 // load the library
2749 ret = XrdSecLoadSecFactory( errorBuff, 1024 );
2750 authHandler.store( ret, std::memory_order_relaxed );
2751 // if we failed report an error
2752 if( !ret )
2753 {
2754 log->Error( XRootDTransportMsg,
2755 "Unable to get the security framework: %s", errorBuff );
2756 return 0;
2757 }
2758 return ret;
2759 }
2760
2761 //----------------------------------------------------------------------------
2762 // Generate the end session message
2763 //----------------------------------------------------------------------------
2764 Message *XRootDTransport::GenerateEndSession( HandShakeData *hsData,
2765 XRootDChannelInfo *info )
2766 {
2767 Log *log = DefaultEnv::GetLog();
2768
2769 //--------------------------------------------------------------------------
2770 // Generate the message
2771 //--------------------------------------------------------------------------
2772 Message *msg = new Message( sizeof(ClientEndsessRequest) );
2773 ClientEndsessRequest *endsessReq = (ClientEndsessRequest *)msg->GetBuffer();
2774
2775 endsessReq->requestid = kXR_endsess;
2776 memcpy( endsessReq->sessid, info->oldSessionId, 16 );
2777 std::string sessId = Utils::Char2Hex( endsessReq->sessid, 16 );
2778
2779 log->Debug( XRootDTransportMsg, "[%s] Sending out kXR_endsess for session:"
2780 " %s", hsData->streamName.c_str(), sessId.c_str() );
2781
2782 MarshallRequest( msg );
2783
2784 Message *sign = 0;
2785 GetSignature( msg, sign, info );
2786 if( sign )
2787 {
2788 //------------------------------------------------------------------------
2789 // Now place both the signature and the request in a single buffer
2790 //------------------------------------------------------------------------
2791 uint32_t size = sign->GetSize();
2792 sign->ReAllocate( size + msg->GetSize() );
2793 char* buffer = sign->GetBuffer( size );
2794 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2795 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2796 }
2797
2798 return msg;
2799 }
2800
2801 //----------------------------------------------------------------------------
2802 // Process the protocol response
2803 //----------------------------------------------------------------------------
2804 Status XRootDTransport::ProcessEndSessionResp( HandShakeData *hsData,
2805 XRootDChannelInfo *info )
2806 {
2807 Log *log = DefaultEnv::GetLog();
2808
2809 Status st = UnMarshallBody( hsData->in, kXR_endsess );
2810 if( !st.IsOK() )
2811 return st;
2812
2813 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2814
2815 // If we're good, we're good!
2816 if( rsp->hdr.status == kXR_ok )
2817 return Status();
2818
2819 // we ignore not found errors as such an error means the connection
2820 // has been already terminated
2821 if( rsp->hdr.status == kXR_error && rsp->body.error.errnum == kXR_NotFound )
2822 return Status();
2823
2824 // other errors
2825 if( rsp->hdr.status == kXR_error )
2826 {
2827 std::string errorMsg( rsp->body.error.errmsg, rsp->hdr.dlen - 4 );
2828 log->Error( XRootDTransportMsg, "[%s] Got error response to "
2829 "kXR_endsess: %s", hsData->streamName.c_str(),
2830 errorMsg.c_str() );
2831 return Status( stFatal, errHandShakeFailed );
2832 }
2833
2834 // Wait Response.
2835 if( rsp->hdr.status == kXR_wait )
2836 {
2837 std::string msg( rsp->body.wait.infomsg, rsp->hdr.dlen - 4 );
2838 log->Info( XRootDTransportMsg, "[%s] Got wait response to "
2839 "kXR_endsess: %s", hsData->streamName.c_str(),
2840 msg.c_str() );
2841 hsData->out = GenerateEndSession( hsData, info );
2842 return Status( stOK, suRetry );
2843 }
2844
2845 // Any other response is protocol violation
2846 return Status( stError, errDataError );
2847 }
2848
2849 //----------------------------------------------------------------------------
2850 // Get a string representation of the server flags
2851 //----------------------------------------------------------------------------
2852 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2853 {
2854 std::string repr = "type: ";
2855 if( flags & kXR_isManager )
2856 repr += "manager ";
2857
2858 else if( flags & kXR_isServer )
2859 repr += "server ";
2860
2861 repr += "[";
2862
2863 if( flags & kXR_attrMeta )
2864 repr += "meta ";
2865
2866 else if( flags & kXR_attrCache )
2867 repr += "cache ";
2868
2869 else if( flags & kXR_attrProxy )
2870 repr += "proxy ";
2871
2872 else if( flags & kXR_attrSuper )
2873 repr += "super ";
2874
2875 else
2876 repr += " ";
2877
2878 repr.erase( repr.length()-1, 1 );
2879
2880 repr += "]";
2881 return repr;
2882 }
2883}
2884
2885namespace
2886{
2887 // Extract file name from a request
2888 //----------------------------------------------------------------------------
2889 char *GetDataAsString( char *msg )
2890 {
2892 char *fn = new char[req->dlen+1];
2893 memcpy( fn, msg + 24, req->dlen );
2894 fn[req->dlen] = 0;
2895 return fn;
2896 }
2897}
2898
2899namespace XrdCl
2900{
2901 //----------------------------------------------------------------------------
2902 // Get the description of a message
2903 //----------------------------------------------------------------------------
2904 void XRootDTransport::GenerateDescription( char *msg, std::ostringstream &o )
2905 {
2906 Log *log = DefaultEnv::GetLog();
2907 if( log->GetLevel() < Log::ErrorMsg )
2908 return;
2909
2910 ClientRequestHdr *req = (ClientRequestHdr *)msg;
2911 switch( req->requestid )
2912 {
2913 //------------------------------------------------------------------------
2914 // kXR_open
2915 //------------------------------------------------------------------------
2916 case kXR_open:
2917 {
2918 ClientOpenRequest *sreq = (ClientOpenRequest *)msg;
2919 o << "kXR_open (";
2920 char *fn = GetDataAsString( msg );
2921 o << "file: " << fn << ", ";
2922 delete [] fn;
2923 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
2924 o << std::setbase(10);
2925 o << "flags: ";
2926 if( sreq->options == 0 )
2927 o << "none";
2928 else
2929 {
2930 if( sreq->options & kXR_compress )
2931 o << "kXR_compress ";
2932 if( sreq->options & kXR_delete )
2933 o << "kXR_delete ";
2934 if( sreq->options & kXR_force )
2935 o << "kXR_force ";
2936 if( sreq->options & kXR_mkpath )
2937 o << "kXR_mkpath ";
2938 if( sreq->options & kXR_new )
2939 o << "kXR_new ";
2940 if( sreq->options & kXR_nowait )
2941 o << "kXR_nowait ";
2942 if( sreq->options & kXR_open_apnd )
2943 o << "kXR_open_apnd ";
2944 if( sreq->options & kXR_open_read )
2945 o << "kXR_open_read ";
2946 if( sreq->options & kXR_open_updt )
2947 o << "kXR_open_updt ";
2948 if( sreq->options & kXR_open_wrto )
2949 o << "kXR_open_wrto ";
2950 if( sreq->options & kXR_posc )
2951 o << "kXR_posc ";
2952 if( sreq->options & kXR_prefname )
2953 o << "kXR_prefname ";
2954 if( sreq->options & kXR_refresh )
2955 o << "kXR_refresh ";
2956 if( sreq->options & kXR_4dirlist )
2957 o << "kXR_4dirlist ";
2958 if( sreq->options & kXR_replica )
2959 o << "kXR_replica ";
2960 if( sreq->options & kXR_seqio )
2961 o << "kXR_seqio ";
2962 if( sreq->options & kXR_async )
2963 o << "kXR_async ";
2964 if( sreq->options & kXR_retstat )
2965 o << "kXR_retstat ";
2966 }
2967 o << ")";
2968 break;
2969 }
2970
2971 //------------------------------------------------------------------------
2972 // kXR_close
2973 //------------------------------------------------------------------------
2974 case kXR_close:
2975 {
2977 o << "kXR_close (";
2978 o << "handle: " << FileHandleToStr( sreq->fhandle );
2979 o << ")";
2980 break;
2981 }
2982
2983 //------------------------------------------------------------------------
2984 // kXR_stat
2985 //------------------------------------------------------------------------
2986 case kXR_stat:
2987 {
2988 ClientStatRequest *sreq = (ClientStatRequest *)msg;
2989 o << "kXR_stat (";
2990 if( sreq->dlen )
2991 {
2992 char *fn = GetDataAsString( msg );;
2993 o << "path: " << fn << ", ";
2994 delete [] fn;
2995 }
2996 else
2997 {
2998 o << "handle: " << FileHandleToStr( sreq->fhandle );
2999 o << ", ";
3000 }
3001 o << "flags: ";
3002 if( sreq->options == 0 )
3003 o << "none";
3004 else
3005 {
3006 if( sreq->options & kXR_vfs )
3007 o << "kXR_vfs";
3008 }
3009 o << ")";
3010 break;
3011 }
3012
3013 //------------------------------------------------------------------------
3014 // kXR_read
3015 //------------------------------------------------------------------------
3016 case kXR_read:
3017 {
3018 ClientReadRequest *sreq = (ClientReadRequest *)msg;
3019 o << "kXR_read (";
3020 o << "handle: " << FileHandleToStr( sreq->fhandle );
3021 o << std::setbase(10);
3022 o << ", ";
3023 o << "offset: " << sreq->offset << ", ";
3024 o << "size: " << sreq->rlen << ")";
3025 break;
3026 }
3027
3028 //------------------------------------------------------------------------
3029 // kXR_pgread
3030 //------------------------------------------------------------------------
3031 case kXR_pgread:
3032 {
3034 o << "kXR_pgread (";
3035 o << "handle: " << FileHandleToStr( sreq->fhandle );
3036 o << std::setbase(10);
3037 o << ", ";
3038 o << "offset: " << sreq->offset << ", ";
3039 o << "size: " << sreq->rlen << ")";
3040 break;
3041 }
3042
3043 //------------------------------------------------------------------------
3044 // kXR_write
3045 //------------------------------------------------------------------------
3046 case kXR_write:
3047 {
3049 o << "kXR_write (";
3050 o << "handle: " << FileHandleToStr( sreq->fhandle );
3051 o << std::setbase(10);
3052 o << ", ";
3053 o << "offset: " << sreq->offset << ", ";
3054 o << "size: " << sreq->dlen << ")";
3055 break;
3056 }
3057
3058 //------------------------------------------------------------------------
3059 // kXR_pgwrite
3060 //------------------------------------------------------------------------
3061 case kXR_pgwrite:
3062 {
3064 o << "kXR_pgwrite (";
3065 o << "handle: " << FileHandleToStr( sreq->fhandle );
3066 o << std::setbase(10);
3067 o << ", ";
3068 o << "offset: " << sreq->offset << ", ";
3069 o << "size: " << sreq->dlen << ")";
3070 break;
3071 }
3072
3073 //------------------------------------------------------------------------
3074 // kXR_fattr
3075 //------------------------------------------------------------------------
3076 case kXR_fattr:
3077 {
3079 int nattr = sreq->numattr;
3080 int options = sreq->options;
3081 o << "kXR_fattr";
3082 switch (sreq->subcode) {
3083 case kXR_fattrGet:
3084 o << "Get";
3085 break;
3086 case kXR_fattrSet:
3087 o << "Set";
3088 break;
3089 case kXR_fattrList:
3090 o << "List";
3091 break;
3092 case kXR_fattrDel:
3093 o << "Delete";
3094 break;
3095 default:
3096 o << " unknown subcode: " << sreq->subcode;
3097 break;
3098 }
3099 o << " (handle: " << FileHandleToStr( sreq->fhandle );
3100 o << std::setbase(10);
3101 if (nattr)
3102 o << ", numattr: " << nattr;
3103 if (options) {
3104 o << ", options: ";
3105 if (options & 0x01)
3106 o << "new";
3107 if (options & 0x10)
3108 o << "list values";
3109 }
3110 o << ", total size: " << req->dlen << ")";
3111 break;
3112 }
3113
3114 //------------------------------------------------------------------------
3115 // kXR_sync
3116 //------------------------------------------------------------------------
3117 case kXR_sync:
3118 {
3119 ClientSyncRequest *sreq = (ClientSyncRequest *)msg;
3120 o << "kXR_sync (";
3121 o << "handle: " << FileHandleToStr( sreq->fhandle );
3122 o << ")";
3123 break;
3124 }
3125
3126 //------------------------------------------------------------------------
3127 // kXR_truncate
3128 //------------------------------------------------------------------------
3129 case kXR_truncate:
3130 {
3132 o << "kXR_truncate (";
3133 if( !sreq->dlen )
3134 o << "handle: " << FileHandleToStr( sreq->fhandle );
3135 else
3136 {
3137 char *fn = GetDataAsString( msg );
3138 o << "file: " << fn;
3139 delete [] fn;
3140 }
3141 o << std::setbase(10);
3142 o << ", ";
3143 o << "offset: " << sreq->offset;
3144 o << ")";
3145 break;
3146 }
3147
3148 //------------------------------------------------------------------------
3149 // kXR_readv
3150 //------------------------------------------------------------------------
3151 case kXR_readv:
3152 {
3153 unsigned char *fhandle = 0;
3154 o << "kXR_readv (";
3155
3156 o << "handle: ";
3157 readahead_list *dataChunk = (readahead_list*)(msg + 24 );
3158 fhandle = dataChunk[0].fhandle;
3159 if( fhandle )
3160 o << FileHandleToStr( fhandle );
3161 else
3162 o << "unknown";
3163 o << ", ";
3164 o << std::setbase(10);
3165 o << "chunks: [";
3166 uint64_t size = 0;
3167 for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3168 {
3169 size += dataChunk[i].rlen;
3170 o << "(offset: " << dataChunk[i].offset;
3171 o << ", size: " << dataChunk[i].rlen << "); ";
3172 }
3173 o << "], ";
3174 o << "total size: " << size << ")";
3175 break;
3176 }
3177
3178 //------------------------------------------------------------------------
3179 // kXR_writev
3180 //------------------------------------------------------------------------
3181 case kXR_writev:
3182 {
3183 unsigned char *fhandle = 0;
3184 o << "kXR_writev (";
3185
3186 XrdProto::write_list *wrtList =
3187 reinterpret_cast<XrdProto::write_list*>( msg + 24 );
3188 uint64_t size = 0;
3189 uint32_t numChunks = 0;
3190 for( size_t i = 0; i < req->dlen/sizeof(XrdProto::write_list); ++i )
3191 {
3192 fhandle = wrtList[i].fhandle;
3193 size += wrtList[i].wlen;
3194 ++numChunks;
3195 }
3196 o << "handle: ";
3197 if( fhandle )
3198 o << FileHandleToStr( fhandle );
3199 else
3200 o << "unknown";
3201 o << ", ";
3202 o << std::setbase(10);
3203 o << "chunks: " << numChunks << ", ";
3204 o << "total size: " << size << ")";
3205 break;
3206 }
3207
3208 //------------------------------------------------------------------------
3209 // kXR_locate
3210 //------------------------------------------------------------------------
3211 case kXR_locate:
3212 {
3214 char *fn = GetDataAsString( msg );;
3215 o << "kXR_locate (";
3216 o << "path: " << fn << ", ";
3217 delete [] fn;
3218 o << "flags: ";
3219 if( sreq->options == 0 )
3220 o << "none";
3221 else
3222 {
3223 if( sreq->options & kXR_refresh )
3224 o << "kXR_refresh ";
3225 if( sreq->options & kXR_prefname )
3226 o << "kXR_prefname ";
3227 if( sreq->options & kXR_nowait )
3228 o << "kXR_nowait ";
3229 if( sreq->options & kXR_force )
3230 o << "kXR_force ";
3231 if( sreq->options & kXR_compress )
3232 o << "kXR_compress ";
3233 }
3234 o << ")";
3235 break;
3236 }
3237
3238 //------------------------------------------------------------------------
3239 // kXR_mv
3240 //------------------------------------------------------------------------
3241 case kXR_mv:
3242 {
3243 ClientMvRequest *sreq = (ClientMvRequest *)msg;
3244 o << "kXR_mv (";
3245 o << "source: ";
3246 o.write( msg + sizeof( ClientMvRequest ), sreq->arg1len );
3247 o << ", ";
3248 o << "destination: ";
3249 o.write( msg + sizeof( ClientMvRequest ) + sreq->arg1len + 1, sreq->dlen - sreq->arg1len - 1 );
3250 o << ")";
3251 break;
3252 }
3253
3254 //------------------------------------------------------------------------
3255 // kXR_query
3256 //------------------------------------------------------------------------
3257 case kXR_query:
3258 {
3260 o << "kXR_query (";
3261 o << "code: ";
3262 switch( sreq->infotype )
3263 {
3264 case kXR_Qconfig: o << "kXR_Qconfig"; break;
3265 case kXR_Qckscan: o << "kXR_Qckscan"; break;
3266 case kXR_Qcksum: o << "kXR_Qcksum"; break;
3267 case kXR_Qopaque: o << "kXR_Qopaque"; break;
3268 case kXR_Qopaquf: o << "kXR_Qopaquf"; break;
3269 case kXR_Qopaqug: o << "kXR_Qopaqug"; break;
3270 case kXR_QPrep: o << "kXR_QPrep"; break;
3271 case kXR_Qspace: o << "kXR_Qspace"; break;
3272 case kXR_QStats: o << "kXR_QStats"; break;
3273 case kXR_Qvisa: o << "kXR_Qvisa"; break;
3274 case kXR_Qxattr: o << "kXR_Qxattr"; break;
3275 default: o << sreq->infotype; break;
3276 }
3277 o << ", ";
3278
3279 if( sreq->infotype == kXR_Qopaqug || sreq->infotype == kXR_Qvisa )
3280 {
3281 o << "handle: " << FileHandleToStr( sreq->fhandle );
3282 o << ", ";
3283 }
3284
3285 o << "arg length: " << sreq->dlen << ")";
3286 break;
3287 }
3288
3289 //------------------------------------------------------------------------
3290 // kXR_rm
3291 //------------------------------------------------------------------------
3292 case kXR_rm:
3293 {
3294 o << "kXR_rm (";
3295 char *fn = GetDataAsString( msg );;
3296 o << "path: " << fn << ")";
3297 delete [] fn;
3298 break;
3299 }
3300
3301 //------------------------------------------------------------------------
3302 // kXR_mkdir
3303 //------------------------------------------------------------------------
3304 case kXR_mkdir:
3305 {
3307 o << "kXR_mkdir (";
3308 char *fn = GetDataAsString( msg );
3309 o << "path: " << fn << ", ";
3310 delete [] fn;
3311 o << "mode: 0" << std::setbase(8) << sreq->mode << ", ";
3312 o << std::setbase(10);
3313 o << "flags: ";
3314 if( sreq->options[0] == 0 )
3315 o << "none";
3316 else
3317 {
3318 if( sreq->options[0] & kXR_mkdirpath )
3319 o << "kXR_mkdirpath";
3320 }
3321 o << ")";
3322 break;
3323 }
3324
3325 //------------------------------------------------------------------------
3326 // kXR_rmdir
3327 //------------------------------------------------------------------------
3328 case kXR_rmdir:
3329 {
3330 o << "kXR_rmdir (";
3331 char *fn = GetDataAsString( msg );
3332 o << "path: " << fn << ")";
3333 delete [] fn;
3334 break;
3335 }
3336
3337 //------------------------------------------------------------------------
3338 // kXR_chmod
3339 //------------------------------------------------------------------------
3340 case kXR_chmod:
3341 {
3343 o << "kXR_chmod (";
3344 char *fn = GetDataAsString( msg );
3345 o << "path: " << fn << ", ";
3346 delete [] fn;
3347 o << "mode: 0" << std::setbase(8) << sreq->mode << ")";
3348 break;
3349 }
3350
3351 //------------------------------------------------------------------------
3352 // kXR_ping
3353 //------------------------------------------------------------------------
3354 case kXR_ping:
3355 {
3356 o << "kXR_ping ()";
3357 break;
3358 }
3359
3360 //------------------------------------------------------------------------
3361 // kXR_protocol
3362 //------------------------------------------------------------------------
3363 case kXR_protocol:
3364 {
3366 o << "kXR_protocol (";
3367 o << "clientpv: 0x" << std::setbase(16) << sreq->clientpv << ")";
3368 break;
3369 }
3370
3371 //------------------------------------------------------------------------
3372 // kXR_dirlist
3373 //------------------------------------------------------------------------
3374 case kXR_dirlist:
3375 {
3376 o << "kXR_dirlist (";
3377 char *fn = GetDataAsString( msg );;
3378 o << "path: " << fn << ")";
3379 delete [] fn;
3380 break;
3381 }
3382
3383 //------------------------------------------------------------------------
3384 // kXR_set
3385 //------------------------------------------------------------------------
3386 case kXR_set:
3387 {
3388 o << "kXR_set (";
3389 char *fn = GetDataAsString( msg );;
3390 o << "data: " << fn << ")";
3391 delete [] fn;
3392 break;
3393 }
3394
3395 //------------------------------------------------------------------------
3396 // kXR_prepare
3397 //------------------------------------------------------------------------
3398 case kXR_prepare:
3399 {
3401 o << "kXR_prepare (";
3402 o << "flags: ";
3403
3404 if( sreq->options == 0 )
3405 o << "none";
3406 else
3407 {
3408 if( sreq->options & kXR_stage )
3409 o << "kXR_stage ";
3410 if( sreq->options & kXR_wmode )
3411 o << "kXR_wmode ";
3412 if( sreq->options & kXR_coloc )
3413 o << "kXR_coloc ";
3414 if( sreq->options & kXR_fresh )
3415 o << "kXR_fresh ";
3416 }
3417
3418 o << ", priority: " << (int) sreq->prty << ", ";
3419
3420 char *fn = GetDataAsString( msg );
3421 char *cursor;
3422 for( cursor = fn; *cursor; ++cursor )
3423 if( *cursor == '\n' ) *cursor = ' ';
3424
3425 o << "paths: " << fn << ")";
3426 delete [] fn;
3427 break;
3428 }
3429
3430 case kXR_chkpoint:
3431 {
3433 o << "kXR_chkpoint (";
3434 o << "opcode: ";
3435 if( sreq->opcode == kXR_ckpBegin ) o << "kXR_ckpBegin)";
3436 else if( sreq->opcode == kXR_ckpCommit ) o << "kXR_ckpCommit)";
3437 else if( sreq->opcode == kXR_ckpQuery ) o << "kXR_ckpQuery)";
3438 else if( sreq->opcode == kXR_ckpRollback ) o << "kXR_ckpRollback)";
3439 else if( sreq->opcode == kXR_ckpXeq )
3440 {
3441 o << "kXR_ckpXeq) ";
3442 // In this case our request body will be one of kXR_pgwrite,
3443 // kXR_truncate, kXR_write, or kXR_writev request.
3444 GenerateDescription( msg + sizeof( ClientChkPointRequest ), o );
3445 }
3446
3447 break;
3448 }
3449
3450 //------------------------------------------------------------------------
3451 // Default
3452 //------------------------------------------------------------------------
3453 default:
3454 {
3455 o << "kXR_unknown (length: " << req->dlen << ")";
3456 break;
3457 }
3458 };
3459 }
3460
3461 //----------------------------------------------------------------------------
3462 // Get a string representation of file handle
3463 //----------------------------------------------------------------------------
3464 std::string XRootDTransport::FileHandleToStr( const unsigned char handle[4] )
3465 {
3466 std::ostringstream o;
3467 o << "0x";
3468 for( uint8_t i = 0; i < 4; ++i )
3469 {
3470 o << std::setbase(16) << std::setfill('0') << std::setw(2);
3471 o << (int)handle[i];
3472 }
3473 return o.str();
3474 }
3475}
static const int kXR_ckpRollback
Definition XProtocol.hh:215
@ kXR_NotFound
kXR_int16 arg1len
Definition XProtocol.hh:430
#define kXR_isManager
struct ClientTruncateRequest truncate
Definition XProtocol.hh:875
union ServerResponse::@0 body
@ kXR_ecredir
Definition XProtocol.hh:371
#define kXR_tlsLogin
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
#define kXR_suppgrw
kXR_char fhandle[4]
Definition XProtocol.hh:531
kXR_unt16 requestid
Definition XProtocol.hh:394
ServerResponseStatus status
kXR_char fhandle[4]
Definition XProtocol.hh:782
#define kXR_gotoTLS
#define kXR_attrMeta
struct ClientPgReadRequest pgread
Definition XProtocol.hh:861
kXR_char fhandle[4]
Definition XProtocol.hh:807
#define kXR_haveTLS
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char fhandle[4]
Definition XProtocol.hh:771
struct ClientMkdirRequest mkdir
Definition XProtocol.hh:858
kXR_int32 dlen
Definition XProtocol.hh:431
struct ClientAuthRequest auth
Definition XProtocol.hh:847
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 options
Definition XProtocol.hh:481
static const int kXR_ckpXeq
Definition XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition XProtocol.hh:862
#define kXR_attrSuper
struct ClientReadVRequest readv
Definition XProtocol.hh:868
kXR_char pathid
Definition XProtocol.hh:653
kXR_char credtype[4]
Definition XProtocol.hh:170
kXR_char username[8]
Definition XProtocol.hh:396
@ kXR_open_wrto
Definition XProtocol.hh:469
@ kXR_compress
Definition XProtocol.hh:452
@ kXR_async
Definition XProtocol.hh:458
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_prefname
Definition XProtocol.hh:461
@ kXR_nowait
Definition XProtocol.hh:467
@ kXR_open_read
Definition XProtocol.hh:456
@ kXR_open_updt
Definition XProtocol.hh:457
@ kXR_mkpath
Definition XProtocol.hh:460
@ kXR_seqio
Definition XProtocol.hh:468
@ kXR_replica
Definition XProtocol.hh:465
@ kXR_posc
Definition XProtocol.hh:466
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_force
Definition XProtocol.hh:454
@ kXR_4dirlist
Definition XProtocol.hh:464
@ kXR_open_apnd
Definition XProtocol.hh:462
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:860
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_authmore
Definition XProtocol.hh:902
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:846
kXR_char fhandle[4]
Definition XProtocol.hh:509
kXR_char fhandle[4]
Definition XProtocol.hh:645
kXR_char fhandle[4]
Definition XProtocol.hh:659
struct ClientWriteVRequest writev
Definition XProtocol.hh:877
kXR_char fhandle[4]
Definition XProtocol.hh:229
struct ClientLoginRequest login
Definition XProtocol.hh:857
kXR_unt16 requestid
Definition XProtocol.hh:157
kXR_char fhandle[4]
Definition XProtocol.hh:633
kXR_char sessid[16]
Definition XProtocol.hh:181
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_bind
Definition XProtocol.hh:136
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_login
Definition XProtocol.hh:119
@ kXR_auth
Definition XProtocol.hh:112
@ kXR_endsess
Definition XProtocol.hh:135
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_1stRequest
Definition XProtocol.hh:111
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
struct ClientChmodRequest chmod
Definition XProtocol.hh:850
#define kXR_isServer
#define kXR_attrCache
struct ClientQueryRequest query
Definition XProtocol.hh:866
struct ClientReadRequest read
Definition XProtocol.hh:867
struct ClientMvRequest mv
Definition XProtocol.hh:859
kXR_int32 rlen
Definition XProtocol.hh:660
kXR_unt16 requestid
Definition XProtocol.hh:180
kXR_char sessid[16]
Definition XProtocol.hh:259
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ServerResponseHeader hdr
@ kXR_asyncap
Definition XProtocol.hh:378
#define kXR_attrProxy
kXR_char options[1]
Definition XProtocol.hh:416
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
static const int kXR_ckpCommit
Definition XProtocol.hh:213
kXR_int64 offset
Definition XProtocol.hh:661
@ kXR_vfs
Definition XProtocol.hh:763
struct ClientPrepareRequest prepare
Definition XProtocol.hh:864
@ kXR_mkdirpath
Definition XProtocol.hh:410
@ kXR_wmode
Definition XProtocol.hh:591
@ kXR_fresh
Definition XProtocol.hh:593
@ kXR_coloc
Definition XProtocol.hh:592
@ kXR_stage
Definition XProtocol.hh:590
static const int kXR_ckpQuery
Definition XProtocol.hh:214
#define kXR_tlsSess
#define kXR_DataServer
struct ClientWriteRequest write
Definition XProtocol.hh:876
#define kXR_PROTTLSVERSION
Definition XProtocol.hh:72
kXR_char capver[1]
Definition XProtocol.hh:399
struct ClientProtocolRequest protocol
Definition XProtocol.hh:865
@ kXR_QPrep
Definition XProtocol.hh:616
@ kXR_Qopaqug
Definition XProtocol.hh:625
@ kXR_Qconfig
Definition XProtocol.hh:621
@ kXR_Qopaquf
Definition XProtocol.hh:624
@ kXR_Qckscan
Definition XProtocol.hh:620
@ kXR_Qxattr
Definition XProtocol.hh:618
@ kXR_Qspace
Definition XProtocol.hh:619
@ kXR_Qvisa
Definition XProtocol.hh:622
@ kXR_QStats
Definition XProtocol.hh:615
@ kXR_Qcksum
Definition XProtocol.hh:617
@ kXR_Qopaque
Definition XProtocol.hh:623
struct ClientLocateRequest locate
Definition XProtocol.hh:856
@ kXR_ver005
Definition XProtocol.hh:389
#define kXR_tlsData
@ kXR_readrdok
Definition XProtocol.hh:360
@ kXR_fullurl
Definition XProtocol.hh:358
@ kXR_onlyprv4
Definition XProtocol.hh:362
@ kXR_lclfile
Definition XProtocol.hh:364
@ kXR_multipr
Definition XProtocol.hh:359
@ kXR_redirflags
Definition XProtocol.hh:365
@ kXR_hasipv64
Definition XProtocol.hh:361
@ kXR_onlyprv6
Definition XProtocol.hh:363
ServerResponseHeader hdr
static const int kXR_ckpBegin
Definition XProtocol.hh:212
long long kXR_int64
Definition XPtypes.hh:98
unsigned char kXR_char
Definition XPtypes.hh:65
XrdVERSIONINFOREF(XrdCl)
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters &sectoken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
void Zero()
Zero.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
@ ErrorMsg
report errors
Definition XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition XrdClLog.hh:258
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Definition XrdClTls.cc:422
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition XrdClURL.hh:31
std::string GetChannelId() const
Definition XrdClURL.cc:512
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool IsSecure() const
Does the protocol indicate encryption.
Definition XrdClURL.cc:482
bool IsTPC() const
Is the URL used in TPC context.
Definition XrdClURL.cc:490
std::string GetLoginToken() const
Get the login token if present in the opaque info.
Definition XrdClURL.cc:367
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
void SetTLS(bool val)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
static int TimeZone()
const uint16_t suRetry
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint64_t XRootDMsg
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suDone
const uint16_t suContinue
bool InitTLS()
Definition XrdClTls.cc:96
const int DefaultTlsNoData
const int DefaultNoTlsOK
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
XrdSysError Log
Definition XrdConfig.cc:113
kXR_char fhandle[4]
Definition XProtocol.hh:832
struct ServerResponseBifs_Protocol bifReqs
kXR_char fhandle[4]
Definition XProtocol.hh:288
BindPrefSelector(std::vector< std::string > &&bindprefs)
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
std::unique_ptr< BindPrefSelector > bindSelector
std::atomic< uint32_t > finstcnt
ServerResponseBody_Protocol * protRespBody
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
Generic structure to pass security information back and forth.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.